hyper/proto/h1/
conn.rs

1use std::fmt;
2#[cfg(feature = "server")]
3use std::future::Future;
4use std::io;
5use std::marker::{PhantomData, Unpin};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8#[cfg(feature = "server")]
9use std::time::{Duration, Instant};
10
11use crate::rt::{Read, Write};
12use bytes::{Buf, Bytes};
13use futures_util::ready;
14use http::header::{HeaderValue, CONNECTION, TE};
15use http::{HeaderMap, Method, Version};
16use http_body::Frame;
17use httparse::ParserConfig;
18
19use super::io::Buffered;
20use super::{Decoder, Encode, EncodedBuf, Encoder, Http1Transaction, ParseContext, Wants};
21use crate::body::DecodedLength;
22#[cfg(feature = "server")]
23use crate::common::time::Time;
24use crate::headers::connection_keep_alive;
25use crate::proto::{BodyLength, MessageHead};
26#[cfg(feature = "server")]
27use crate::rt::Sleep;
28
29const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
30
31/// This handles a connection, which will have been established over an
32/// `Read + Write` (like a socket), and will likely include multiple
33/// `Transaction`s over HTTP.
34///
35/// The connection will determine when a message begins and ends as well as
36/// determine if this connection can be kept alive after the message,
37/// or if it is complete.
38pub(crate) struct Conn<I, B, T> {
39    io: Buffered<I, EncodedBuf<B>>,
40    state: State,
41    _marker: PhantomData<fn(T)>,
42}
43
44impl<I, B, T> Conn<I, B, T>
45where
46    I: Read + Write + Unpin,
47    B: Buf,
48    T: Http1Transaction,
49{
50    pub(crate) fn new(io: I) -> Conn<I, B, T> {
51        Conn {
52            io: Buffered::new(io),
53            state: State {
54                allow_half_close: false,
55                cached_headers: None,
56                error: None,
57                keep_alive: KA::Busy,
58                method: None,
59                h1_parser_config: ParserConfig::default(),
60                h1_max_headers: None,
61                #[cfg(feature = "server")]
62                h1_header_read_timeout: None,
63                #[cfg(feature = "server")]
64                h1_header_read_timeout_fut: None,
65                #[cfg(feature = "server")]
66                h1_header_read_timeout_running: false,
67                #[cfg(feature = "server")]
68                date_header: true,
69                #[cfg(feature = "server")]
70                timer: Time::Empty,
71                preserve_header_case: false,
72                #[cfg(feature = "ffi")]
73                preserve_header_order: false,
74                title_case_headers: false,
75                h09_responses: false,
76                #[cfg(feature = "ffi")]
77                on_informational: None,
78                notify_read: false,
79                reading: Reading::Init,
80                writing: Writing::Init,
81                upgrade: None,
82                // We assume a modern world where the remote speaks HTTP/1.1.
83                // If they tell us otherwise, we'll downgrade in `read_head`.
84                version: Version::HTTP_11,
85                allow_trailer_fields: false,
86            },
87            _marker: PhantomData,
88        }
89    }
90
91    #[cfg(feature = "server")]
92    pub(crate) fn set_timer(&mut self, timer: Time) {
93        self.state.timer = timer;
94    }
95
96    #[cfg(feature = "server")]
97    pub(crate) fn set_flush_pipeline(&mut self, enabled: bool) {
98        self.io.set_flush_pipeline(enabled);
99    }
100
101    pub(crate) fn set_write_strategy_queue(&mut self) {
102        self.io.set_write_strategy_queue();
103    }
104
105    pub(crate) fn set_max_buf_size(&mut self, max: usize) {
106        self.io.set_max_buf_size(max);
107    }
108
109    #[cfg(feature = "client")]
110    pub(crate) fn set_read_buf_exact_size(&mut self, sz: usize) {
111        self.io.set_read_buf_exact_size(sz);
112    }
113
114    pub(crate) fn set_write_strategy_flatten(&mut self) {
115        self.io.set_write_strategy_flatten();
116    }
117
118    #[cfg(feature = "client")]
119    pub(crate) fn set_h1_parser_config(&mut self, parser_config: ParserConfig) {
120        self.state.h1_parser_config = parser_config;
121    }
122
123    pub(crate) fn set_title_case_headers(&mut self) {
124        self.state.title_case_headers = true;
125    }
126
127    pub(crate) fn set_preserve_header_case(&mut self) {
128        self.state.preserve_header_case = true;
129    }
130
131    #[cfg(feature = "ffi")]
132    pub(crate) fn set_preserve_header_order(&mut self) {
133        self.state.preserve_header_order = true;
134    }
135
136    #[cfg(feature = "client")]
137    pub(crate) fn set_h09_responses(&mut self) {
138        self.state.h09_responses = true;
139    }
140
141    pub(crate) fn set_http1_max_headers(&mut self, val: usize) {
142        self.state.h1_max_headers = Some(val);
143    }
144
145    #[cfg(feature = "server")]
146    pub(crate) fn set_http1_header_read_timeout(&mut self, val: Duration) {
147        self.state.h1_header_read_timeout = Some(val);
148    }
149
150    #[cfg(feature = "server")]
151    pub(crate) fn set_allow_half_close(&mut self) {
152        self.state.allow_half_close = true;
153    }
154
155    pub(crate) fn into_inner(self) -> (I, Bytes) {
156        self.io.into_inner()
157    }
158
159    pub(crate) fn pending_upgrade(&mut self) -> Option<crate::upgrade::Pending> {
160        self.state.upgrade.take()
161    }
162
163    pub(crate) fn is_read_closed(&self) -> bool {
164        self.state.is_read_closed()
165    }
166
167    pub(crate) fn is_write_closed(&self) -> bool {
168        self.state.is_write_closed()
169    }
170
171    pub(crate) fn can_read_head(&self) -> bool {
172        if !matches!(self.state.reading, Reading::Init) {
173            return false;
174        }
175
176        if T::should_read_first() {
177            return true;
178        }
179
180        !matches!(self.state.writing, Writing::Init)
181    }
182
183    pub(crate) fn can_read_body(&self) -> bool {
184        matches!(
185            self.state.reading,
186            Reading::Body(..) | Reading::Continue(..)
187        )
188    }
189
190    #[cfg(feature = "server")]
191    pub(crate) fn has_initial_read_write_state(&self) -> bool {
192        matches!(self.state.reading, Reading::Init)
193            && matches!(self.state.writing, Writing::Init)
194            && self.io.read_buf().is_empty()
195    }
196
197    fn should_error_on_eof(&self) -> bool {
198        // If we're idle, it's probably just the connection closing gracefully.
199        T::should_error_on_parse_eof() && !self.state.is_idle()
200    }
201
202    fn has_h2_prefix(&self) -> bool {
203        let read_buf = self.io.read_buf();
204        read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE
205    }
206
207    pub(super) fn poll_read_head(
208        &mut self,
209        cx: &mut Context<'_>,
210    ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>> {
211        debug_assert!(self.can_read_head());
212        trace!("Conn::read_head");
213
214        #[cfg(feature = "server")]
215        if !self.state.h1_header_read_timeout_running {
216            if let Some(h1_header_read_timeout) = self.state.h1_header_read_timeout {
217                let deadline = Instant::now() + h1_header_read_timeout;
218                self.state.h1_header_read_timeout_running = true;
219                match self.state.h1_header_read_timeout_fut {
220                    Some(ref mut h1_header_read_timeout_fut) => {
221                        trace!("resetting h1 header read timeout timer");
222                        self.state.timer.reset(h1_header_read_timeout_fut, deadline);
223                    }
224                    None => {
225                        trace!("setting h1 header read timeout timer");
226                        self.state.h1_header_read_timeout_fut =
227                            Some(self.state.timer.sleep_until(deadline));
228                    }
229                }
230            }
231        }
232
233        let msg = match self.io.parse::<T>(
234            cx,
235            ParseContext {
236                cached_headers: &mut self.state.cached_headers,
237                req_method: &mut self.state.method,
238                h1_parser_config: self.state.h1_parser_config.clone(),
239                h1_max_headers: self.state.h1_max_headers,
240                preserve_header_case: self.state.preserve_header_case,
241                #[cfg(feature = "ffi")]
242                preserve_header_order: self.state.preserve_header_order,
243                h09_responses: self.state.h09_responses,
244                #[cfg(feature = "ffi")]
245                on_informational: &mut self.state.on_informational,
246            },
247        ) {
248            Poll::Ready(Ok(msg)) => msg,
249            Poll::Ready(Err(e)) => return self.on_read_head_error(e),
250            Poll::Pending => {
251                #[cfg(feature = "server")]
252                if self.state.h1_header_read_timeout_running {
253                    if let Some(ref mut h1_header_read_timeout_fut) =
254                        self.state.h1_header_read_timeout_fut
255                    {
256                        if Pin::new(h1_header_read_timeout_fut).poll(cx).is_ready() {
257                            self.state.h1_header_read_timeout_running = false;
258
259                            warn!("read header from client timeout");
260                            return Poll::Ready(Some(Err(crate::Error::new_header_timeout())));
261                        }
262                    }
263                }
264
265                return Poll::Pending;
266            }
267        };
268
269        #[cfg(feature = "server")]
270        {
271            self.state.h1_header_read_timeout_running = false;
272            self.state.h1_header_read_timeout_fut = None;
273        }
274
275        // Note: don't deconstruct `msg` into local variables, it appears
276        // the optimizer doesn't remove the extra copies.
277
278        debug!("incoming body is {}", msg.decode);
279
280        // Prevent accepting HTTP/0.9 responses after the initial one, if any.
281        self.state.h09_responses = false;
282
283        // Drop any OnInformational callbacks, we're done there!
284        #[cfg(feature = "ffi")]
285        {
286            self.state.on_informational = None;
287        }
288
289        self.state.busy();
290        self.state.keep_alive &= msg.keep_alive;
291        self.state.version = msg.head.version;
292
293        let mut wants = if msg.wants_upgrade {
294            Wants::UPGRADE
295        } else {
296            Wants::EMPTY
297        };
298
299        if msg.decode == DecodedLength::ZERO {
300            if msg.expect_continue {
301                debug!("ignoring expect-continue since body is empty");
302            }
303            self.state.reading = Reading::KeepAlive;
304            if !T::should_read_first() {
305                self.try_keep_alive(cx);
306            }
307        } else if msg.expect_continue && msg.head.version.gt(&Version::HTTP_10) {
308            let h1_max_header_size = None; // TODO: remove this when we land h1_max_header_size support
309            self.state.reading = Reading::Continue(Decoder::new(
310                msg.decode,
311                self.state.h1_max_headers,
312                h1_max_header_size,
313            ));
314            wants = wants.add(Wants::EXPECT);
315        } else {
316            let h1_max_header_size = None; // TODO: remove this when we land h1_max_header_size support
317            self.state.reading = Reading::Body(Decoder::new(
318                msg.decode,
319                self.state.h1_max_headers,
320                h1_max_header_size,
321            ));
322        }
323
324        self.state.allow_trailer_fields = msg
325            .head
326            .headers
327            .get(TE)
328            .map_or(false, |te_header| te_header == "trailers");
329
330        Poll::Ready(Some(Ok((msg.head, msg.decode, wants))))
331    }
332
333    fn on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>> {
334        // If we are currently waiting on a message, then an empty
335        // message should be reported as an error. If not, it is just
336        // the connection closing gracefully.
337        let must_error = self.should_error_on_eof();
338        self.close_read();
339        self.io.consume_leading_lines();
340        let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty();
341        if was_mid_parse || must_error {
342            // We check if the buf contains the h2 Preface
343            debug!(
344                "parse error ({}) with {} bytes",
345                e,
346                self.io.read_buf().len()
347            );
348            match self.on_parse_error(e) {
349                Ok(()) => Poll::Pending, // XXX: wat?
350                Err(e) => Poll::Ready(Some(Err(e))),
351            }
352        } else {
353            debug!("read eof");
354            self.close_write();
355            Poll::Ready(None)
356        }
357    }
358
359    pub(crate) fn poll_read_body(
360        &mut self,
361        cx: &mut Context<'_>,
362    ) -> Poll<Option<io::Result<Frame<Bytes>>>> {
363        debug_assert!(self.can_read_body());
364
365        let (reading, ret) = match self.state.reading {
366            Reading::Body(ref mut decoder) => {
367                match ready!(decoder.decode(cx, &mut self.io)) {
368                    Ok(frame) => {
369                        if frame.is_data() {
370                            let slice = frame.data_ref().unwrap_or_else(|| unreachable!());
371                            let (reading, maybe_frame) = if decoder.is_eof() {
372                                debug!("incoming body completed");
373                                (
374                                    Reading::KeepAlive,
375                                    if !slice.is_empty() {
376                                        Some(Ok(frame))
377                                    } else {
378                                        None
379                                    },
380                                )
381                            } else if slice.is_empty() {
382                                error!("incoming body unexpectedly ended");
383                                // This should be unreachable, since all 3 decoders
384                                // either set eof=true or return an Err when reading
385                                // an empty slice...
386                                (Reading::Closed, None)
387                            } else {
388                                return Poll::Ready(Some(Ok(frame)));
389                            };
390                            (reading, Poll::Ready(maybe_frame))
391                        } else if frame.is_trailers() {
392                            (Reading::Closed, Poll::Ready(Some(Ok(frame))))
393                        } else {
394                            trace!("discarding unknown frame");
395                            (Reading::Closed, Poll::Ready(None))
396                        }
397                    }
398                    Err(e) => {
399                        debug!("incoming body decode error: {}", e);
400                        (Reading::Closed, Poll::Ready(Some(Err(e))))
401                    }
402                }
403            }
404            Reading::Continue(ref decoder) => {
405                // Write the 100 Continue if not already responded...
406                if let Writing::Init = self.state.writing {
407                    trace!("automatically sending 100 Continue");
408                    let cont = b"HTTP/1.1 100 Continue\r\n\r\n";
409                    self.io.headers_buf().extend_from_slice(cont);
410                }
411
412                // And now recurse once in the Reading::Body state...
413                self.state.reading = Reading::Body(decoder.clone());
414                return self.poll_read_body(cx);
415            }
416            _ => unreachable!("poll_read_body invalid state: {:?}", self.state.reading),
417        };
418
419        self.state.reading = reading;
420        self.try_keep_alive(cx);
421        ret
422    }
423
424    pub(crate) fn wants_read_again(&mut self) -> bool {
425        let ret = self.state.notify_read;
426        self.state.notify_read = false;
427        ret
428    }
429
430    pub(crate) fn poll_read_keep_alive(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
431        debug_assert!(!self.can_read_head() && !self.can_read_body());
432
433        if self.is_read_closed() {
434            Poll::Pending
435        } else if self.is_mid_message() {
436            self.mid_message_detect_eof(cx)
437        } else {
438            self.require_empty_read(cx)
439        }
440    }
441
442    fn is_mid_message(&self) -> bool {
443        !matches!(
444            (&self.state.reading, &self.state.writing),
445            (&Reading::Init, &Writing::Init)
446        )
447    }
448
449    // This will check to make sure the io object read is empty.
450    //
451    // This should only be called for Clients wanting to enter the idle
452    // state.
453    fn require_empty_read(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
454        debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
455        debug_assert!(!self.is_mid_message());
456        debug_assert!(T::is_client());
457
458        if !self.io.read_buf().is_empty() {
459            debug!("received an unexpected {} bytes", self.io.read_buf().len());
460            return Poll::Ready(Err(crate::Error::new_unexpected_message()));
461        }
462
463        let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
464
465        if num_read == 0 {
466            let ret = if self.should_error_on_eof() {
467                trace!("found unexpected EOF on busy connection: {:?}", self.state);
468                Poll::Ready(Err(crate::Error::new_incomplete()))
469            } else {
470                trace!("found EOF on idle connection, closing");
471                Poll::Ready(Ok(()))
472            };
473
474            // order is important: should_error needs state BEFORE close_read
475            self.state.close_read();
476            return ret;
477        }
478
479        debug!(
480            "received unexpected {} bytes on an idle connection",
481            num_read
482        );
483        Poll::Ready(Err(crate::Error::new_unexpected_message()))
484    }
485
486    fn mid_message_detect_eof(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
487        debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
488        debug_assert!(self.is_mid_message());
489
490        if self.state.allow_half_close || !self.io.read_buf().is_empty() {
491            return Poll::Pending;
492        }
493
494        let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
495
496        if num_read == 0 {
497            trace!("found unexpected EOF on busy connection: {:?}", self.state);
498            self.state.close_read();
499            Poll::Ready(Err(crate::Error::new_incomplete()))
500        } else {
501            Poll::Ready(Ok(()))
502        }
503    }
504
505    fn force_io_read(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
506        debug_assert!(!self.state.is_read_closed());
507
508        let result = ready!(self.io.poll_read_from_io(cx));
509        Poll::Ready(result.map_err(|e| {
510            trace!(error = %e, "force_io_read; io error");
511            self.state.close();
512            e
513        }))
514    }
515
516    fn maybe_notify(&mut self, cx: &mut Context<'_>) {
517        // its possible that we returned NotReady from poll() without having
518        // exhausted the underlying Io. We would have done this when we
519        // determined we couldn't keep reading until we knew how writing
520        // would finish.
521
522        match self.state.reading {
523            Reading::Continue(..) | Reading::Body(..) | Reading::KeepAlive | Reading::Closed => {
524                return
525            }
526            Reading::Init => (),
527        };
528
529        match self.state.writing {
530            Writing::Body(..) => return,
531            Writing::Init | Writing::KeepAlive | Writing::Closed => (),
532        }
533
534        if !self.io.is_read_blocked() {
535            if self.io.read_buf().is_empty() {
536                match self.io.poll_read_from_io(cx) {
537                    Poll::Ready(Ok(n)) => {
538                        if n == 0 {
539                            trace!("maybe_notify; read eof");
540                            if self.state.is_idle() {
541                                self.state.close();
542                            } else {
543                                self.close_read()
544                            }
545                            return;
546                        }
547                    }
548                    Poll::Pending => {
549                        trace!("maybe_notify; read_from_io blocked");
550                        return;
551                    }
552                    Poll::Ready(Err(e)) => {
553                        trace!("maybe_notify; read_from_io error: {}", e);
554                        self.state.close();
555                        self.state.error = Some(crate::Error::new_io(e));
556                    }
557                }
558            }
559            self.state.notify_read = true;
560        }
561    }
562
563    fn try_keep_alive(&mut self, cx: &mut Context<'_>) {
564        self.state.try_keep_alive::<T>();
565        self.maybe_notify(cx);
566    }
567
568    pub(crate) fn can_write_head(&self) -> bool {
569        if !T::should_read_first() && matches!(self.state.reading, Reading::Closed) {
570            return false;
571        }
572
573        match self.state.writing {
574            Writing::Init => self.io.can_headers_buf(),
575            _ => false,
576        }
577    }
578
579    pub(crate) fn can_write_body(&self) -> bool {
580        match self.state.writing {
581            Writing::Body(..) => true,
582            Writing::Init | Writing::KeepAlive | Writing::Closed => false,
583        }
584    }
585
586    pub(crate) fn can_buffer_body(&self) -> bool {
587        self.io.can_buffer()
588    }
589
590    pub(crate) fn write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>) {
591        if let Some(encoder) = self.encode_head(head, body) {
592            self.state.writing = if !encoder.is_eof() {
593                Writing::Body(encoder)
594            } else if encoder.is_last() {
595                Writing::Closed
596            } else {
597                Writing::KeepAlive
598            };
599        }
600    }
601
602    fn encode_head(
603        &mut self,
604        mut head: MessageHead<T::Outgoing>,
605        body: Option<BodyLength>,
606    ) -> Option<Encoder> {
607        debug_assert!(self.can_write_head());
608
609        if !T::should_read_first() {
610            self.state.busy();
611        }
612
613        self.enforce_version(&mut head);
614
615        let buf = self.io.headers_buf();
616        match super::role::encode_headers::<T>(
617            Encode {
618                head: &mut head,
619                body,
620                #[cfg(feature = "server")]
621                keep_alive: self.state.wants_keep_alive(),
622                req_method: &mut self.state.method,
623                title_case_headers: self.state.title_case_headers,
624                #[cfg(feature = "server")]
625                date_header: self.state.date_header,
626            },
627            buf,
628        ) {
629            Ok(encoder) => {
630                debug_assert!(self.state.cached_headers.is_none());
631                debug_assert!(head.headers.is_empty());
632                self.state.cached_headers = Some(head.headers);
633
634                #[cfg(feature = "ffi")]
635                {
636                    self.state.on_informational =
637                        head.extensions.remove::<crate::ffi::OnInformational>();
638                }
639
640                Some(encoder)
641            }
642            Err(err) => {
643                self.state.error = Some(err);
644                self.state.writing = Writing::Closed;
645                None
646            }
647        }
648    }
649
650    // Fix keep-alive when Connection: keep-alive header is not present
651    fn fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>) {
652        let outgoing_is_keep_alive = head
653            .headers
654            .get(CONNECTION)
655            .map_or(false, connection_keep_alive);
656
657        if !outgoing_is_keep_alive {
658            match head.version {
659                // If response is version 1.0 and keep-alive is not present in the response,
660                // disable keep-alive so the server closes the connection
661                Version::HTTP_10 => self.state.disable_keep_alive(),
662                // If response is version 1.1 and keep-alive is wanted, add
663                // Connection: keep-alive header when not present
664                Version::HTTP_11 => {
665                    if self.state.wants_keep_alive() {
666                        head.headers
667                            .insert(CONNECTION, HeaderValue::from_static("keep-alive"));
668                    }
669                }
670                _ => (),
671            }
672        }
673    }
674
675    // If we know the remote speaks an older version, we try to fix up any messages
676    // to work with our older peer.
677    fn enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>) {
678        if let Version::HTTP_10 = self.state.version {
679            // Fixes response or connection when keep-alive header is not present
680            self.fix_keep_alive(head);
681            // If the remote only knows HTTP/1.0, we should force ourselves
682            // to do only speak HTTP/1.0 as well.
683            head.version = Version::HTTP_10;
684        }
685        // If the remote speaks HTTP/1.1, then it *should* be fine with
686        // both HTTP/1.0 and HTTP/1.1 from us. So again, we just let
687        // the user's headers be.
688    }
689
690    pub(crate) fn write_body(&mut self, chunk: B) {
691        debug_assert!(self.can_write_body() && self.can_buffer_body());
692        // empty chunks should be discarded at Dispatcher level
693        debug_assert!(chunk.remaining() != 0);
694
695        let state = match self.state.writing {
696            Writing::Body(ref mut encoder) => {
697                self.io.buffer(encoder.encode(chunk));
698
699                if !encoder.is_eof() {
700                    return;
701                }
702
703                if encoder.is_last() {
704                    Writing::Closed
705                } else {
706                    Writing::KeepAlive
707                }
708            }
709            _ => unreachable!("write_body invalid state: {:?}", self.state.writing),
710        };
711
712        self.state.writing = state;
713    }
714
715    pub(crate) fn write_trailers(&mut self, trailers: HeaderMap) {
716        if T::is_server() && !self.state.allow_trailer_fields {
717            debug!("trailers not allowed to be sent");
718            return;
719        }
720        debug_assert!(self.can_write_body() && self.can_buffer_body());
721
722        match self.state.writing {
723            Writing::Body(ref encoder) => {
724                if let Some(enc_buf) =
725                    encoder.encode_trailers(trailers, self.state.title_case_headers)
726                {
727                    self.io.buffer(enc_buf);
728
729                    self.state.writing = if encoder.is_last() || encoder.is_close_delimited() {
730                        Writing::Closed
731                    } else {
732                        Writing::KeepAlive
733                    };
734                }
735            }
736            _ => unreachable!("write_trailers invalid state: {:?}", self.state.writing),
737        }
738    }
739
740    pub(crate) fn write_body_and_end(&mut self, chunk: B) {
741        debug_assert!(self.can_write_body() && self.can_buffer_body());
742        // empty chunks should be discarded at Dispatcher level
743        debug_assert!(chunk.remaining() != 0);
744
745        let state = match self.state.writing {
746            Writing::Body(ref encoder) => {
747                let can_keep_alive = encoder.encode_and_end(chunk, self.io.write_buf());
748                if can_keep_alive {
749                    Writing::KeepAlive
750                } else {
751                    Writing::Closed
752                }
753            }
754            _ => unreachable!("write_body invalid state: {:?}", self.state.writing),
755        };
756
757        self.state.writing = state;
758    }
759
760    pub(crate) fn end_body(&mut self) -> crate::Result<()> {
761        debug_assert!(self.can_write_body());
762
763        let encoder = match self.state.writing {
764            Writing::Body(ref mut enc) => enc,
765            _ => return Ok(()),
766        };
767
768        // end of stream, that means we should try to eof
769        match encoder.end() {
770            Ok(end) => {
771                if let Some(end) = end {
772                    self.io.buffer(end);
773                }
774
775                self.state.writing = if encoder.is_last() || encoder.is_close_delimited() {
776                    Writing::Closed
777                } else {
778                    Writing::KeepAlive
779                };
780
781                Ok(())
782            }
783            Err(not_eof) => {
784                self.state.writing = Writing::Closed;
785                Err(crate::Error::new_body_write_aborted().with(not_eof))
786            }
787        }
788    }
789
790    // When we get a parse error, depending on what side we are, we might be able
791    // to write a response before closing the connection.
792    //
793    // - Client: there is nothing we can do
794    // - Server: if Response hasn't been written yet, we can send a 4xx response
795    fn on_parse_error(&mut self, err: crate::Error) -> crate::Result<()> {
796        if let Writing::Init = self.state.writing {
797            if self.has_h2_prefix() {
798                return Err(crate::Error::new_version_h2());
799            }
800            if let Some(msg) = T::on_error(&err) {
801                // Drop the cached headers so as to not trigger a debug
802                // assert in `write_head`...
803                self.state.cached_headers.take();
804                self.write_head(msg, None);
805                self.state.error = Some(err);
806                return Ok(());
807            }
808        }
809
810        // fallback is pass the error back up
811        Err(err)
812    }
813
814    pub(crate) fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
815        ready!(Pin::new(&mut self.io).poll_flush(cx))?;
816        self.try_keep_alive(cx);
817        trace!("flushed({}): {:?}", T::LOG, self.state);
818        Poll::Ready(Ok(()))
819    }
820
821    pub(crate) fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
822        match ready!(Pin::new(self.io.io_mut()).poll_shutdown(cx)) {
823            Ok(()) => {
824                trace!("shut down IO complete");
825                Poll::Ready(Ok(()))
826            }
827            Err(e) => {
828                debug!("error shutting down IO: {}", e);
829                Poll::Ready(Err(e))
830            }
831        }
832    }
833
834    /// If the read side can be cheaply drained, do so. Otherwise, close.
835    pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut Context<'_>) {
836        if let Reading::Continue(ref decoder) = self.state.reading {
837            // skip sending the 100-continue
838            // just move forward to a read, in case a tiny body was included
839            self.state.reading = Reading::Body(decoder.clone());
840        }
841
842        let _ = self.poll_read_body(cx);
843
844        // If still in Reading::Body, just give up
845        match self.state.reading {
846            Reading::Init | Reading::KeepAlive => {
847                trace!("body drained")
848            }
849            _ => self.close_read(),
850        }
851    }
852
853    pub(crate) fn close_read(&mut self) {
854        self.state.close_read();
855    }
856
857    pub(crate) fn close_write(&mut self) {
858        self.state.close_write();
859    }
860
861    #[cfg(feature = "server")]
862    pub(crate) fn disable_keep_alive(&mut self) {
863        if self.state.is_idle() {
864            trace!("disable_keep_alive; closing idle connection");
865            self.state.close();
866        } else {
867            trace!("disable_keep_alive; in-progress connection");
868            self.state.disable_keep_alive();
869        }
870    }
871
872    pub(crate) fn take_error(&mut self) -> crate::Result<()> {
873        if let Some(err) = self.state.error.take() {
874            Err(err)
875        } else {
876            Ok(())
877        }
878    }
879
880    pub(super) fn on_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
881        trace!("{}: prepare possible HTTP upgrade", T::LOG);
882        self.state.prepare_upgrade()
883    }
884}
885
886impl<I, B: Buf, T> fmt::Debug for Conn<I, B, T> {
887    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
888        f.debug_struct("Conn")
889            .field("state", &self.state)
890            .field("io", &self.io)
891            .finish()
892    }
893}
894
895// B and T are never pinned
896impl<I: Unpin, B, T> Unpin for Conn<I, B, T> {}
897
898struct State {
899    allow_half_close: bool,
900    /// Re-usable HeaderMap to reduce allocating new ones.
901    cached_headers: Option<HeaderMap>,
902    /// If an error occurs when there wasn't a direct way to return it
903    /// back to the user, this is set.
904    error: Option<crate::Error>,
905    /// Current keep-alive status.
906    keep_alive: KA,
907    /// If mid-message, the HTTP Method that started it.
908    ///
909    /// This is used to know things such as if the message can include
910    /// a body or not.
911    method: Option<Method>,
912    h1_parser_config: ParserConfig,
913    h1_max_headers: Option<usize>,
914    #[cfg(feature = "server")]
915    h1_header_read_timeout: Option<Duration>,
916    #[cfg(feature = "server")]
917    h1_header_read_timeout_fut: Option<Pin<Box<dyn Sleep>>>,
918    #[cfg(feature = "server")]
919    h1_header_read_timeout_running: bool,
920    #[cfg(feature = "server")]
921    date_header: bool,
922    #[cfg(feature = "server")]
923    timer: Time,
924    preserve_header_case: bool,
925    #[cfg(feature = "ffi")]
926    preserve_header_order: bool,
927    title_case_headers: bool,
928    h09_responses: bool,
929    /// If set, called with each 1xx informational response received for
930    /// the current request. MUST be unset after a non-1xx response is
931    /// received.
932    #[cfg(feature = "ffi")]
933    on_informational: Option<crate::ffi::OnInformational>,
934    /// Set to true when the Dispatcher should poll read operations
935    /// again. See the `maybe_notify` method for more.
936    notify_read: bool,
937    /// State of allowed reads
938    reading: Reading,
939    /// State of allowed writes
940    writing: Writing,
941    /// An expected pending HTTP upgrade.
942    upgrade: Option<crate::upgrade::Pending>,
943    /// Either HTTP/1.0 or 1.1 connection
944    version: Version,
945    /// Flag to track if trailer fields are allowed to be sent
946    allow_trailer_fields: bool,
947}
948
949#[derive(Debug)]
950enum Reading {
951    Init,
952    Continue(Decoder),
953    Body(Decoder),
954    KeepAlive,
955    Closed,
956}
957
958enum Writing {
959    Init,
960    Body(Encoder),
961    KeepAlive,
962    Closed,
963}
964
965impl fmt::Debug for State {
966    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
967        let mut builder = f.debug_struct("State");
968        builder
969            .field("reading", &self.reading)
970            .field("writing", &self.writing)
971            .field("keep_alive", &self.keep_alive);
972
973        // Only show error field if it's interesting...
974        if let Some(ref error) = self.error {
975            builder.field("error", error);
976        }
977
978        if self.allow_half_close {
979            builder.field("allow_half_close", &true);
980        }
981
982        // Purposefully leaving off other fields..
983
984        builder.finish()
985    }
986}
987
988impl fmt::Debug for Writing {
989    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
990        match *self {
991            Writing::Init => f.write_str("Init"),
992            Writing::Body(ref enc) => f.debug_tuple("Body").field(enc).finish(),
993            Writing::KeepAlive => f.write_str("KeepAlive"),
994            Writing::Closed => f.write_str("Closed"),
995        }
996    }
997}
998
999impl std::ops::BitAndAssign<bool> for KA {
1000    fn bitand_assign(&mut self, enabled: bool) {
1001        if !enabled {
1002            trace!("remote disabling keep-alive");
1003            *self = KA::Disabled;
1004        }
1005    }
1006}
1007
1008#[derive(Clone, Copy, Debug, Default)]
1009enum KA {
1010    Idle,
1011    #[default]
1012    Busy,
1013    Disabled,
1014}
1015
1016impl KA {
1017    fn idle(&mut self) {
1018        *self = KA::Idle;
1019    }
1020
1021    fn busy(&mut self) {
1022        *self = KA::Busy;
1023    }
1024
1025    fn disable(&mut self) {
1026        *self = KA::Disabled;
1027    }
1028
1029    fn status(&self) -> KA {
1030        *self
1031    }
1032}
1033
1034impl State {
1035    fn close(&mut self) {
1036        trace!("State::close()");
1037        self.reading = Reading::Closed;
1038        self.writing = Writing::Closed;
1039        self.keep_alive.disable();
1040    }
1041
1042    fn close_read(&mut self) {
1043        trace!("State::close_read()");
1044        self.reading = Reading::Closed;
1045        self.keep_alive.disable();
1046    }
1047
1048    fn close_write(&mut self) {
1049        trace!("State::close_write()");
1050        self.writing = Writing::Closed;
1051        self.keep_alive.disable();
1052    }
1053
1054    fn wants_keep_alive(&self) -> bool {
1055        !matches!(self.keep_alive.status(), KA::Disabled)
1056    }
1057
1058    fn try_keep_alive<T: Http1Transaction>(&mut self) {
1059        match (&self.reading, &self.writing) {
1060            (&Reading::KeepAlive, &Writing::KeepAlive) => {
1061                if let KA::Busy = self.keep_alive.status() {
1062                    self.idle::<T>();
1063                } else {
1064                    trace!(
1065                        "try_keep_alive({}): could keep-alive, but status = {:?}",
1066                        T::LOG,
1067                        self.keep_alive
1068                    );
1069                    self.close();
1070                }
1071            }
1072            (&Reading::Closed, &Writing::KeepAlive) | (&Reading::KeepAlive, &Writing::Closed) => {
1073                self.close()
1074            }
1075            _ => (),
1076        }
1077    }
1078
1079    fn disable_keep_alive(&mut self) {
1080        self.keep_alive.disable()
1081    }
1082
1083    fn busy(&mut self) {
1084        if let KA::Disabled = self.keep_alive.status() {
1085            return;
1086        }
1087        self.keep_alive.busy();
1088    }
1089
1090    fn idle<T: Http1Transaction>(&mut self) {
1091        debug_assert!(!self.is_idle(), "State::idle() called while idle");
1092
1093        self.method = None;
1094        self.keep_alive.idle();
1095
1096        if !self.is_idle() {
1097            self.close();
1098            return;
1099        }
1100
1101        self.reading = Reading::Init;
1102        self.writing = Writing::Init;
1103
1104        // !T::should_read_first() means Client.
1105        //
1106        // If Client connection has just gone idle, the Dispatcher
1107        // should try the poll loop one more time, so as to poll the
1108        // pending requests stream.
1109        if !T::should_read_first() {
1110            self.notify_read = true;
1111        }
1112    }
1113
1114    fn is_idle(&self) -> bool {
1115        matches!(self.keep_alive.status(), KA::Idle)
1116    }
1117
1118    fn is_read_closed(&self) -> bool {
1119        matches!(self.reading, Reading::Closed)
1120    }
1121
1122    fn is_write_closed(&self) -> bool {
1123        matches!(self.writing, Writing::Closed)
1124    }
1125
1126    fn prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
1127        let (tx, rx) = crate::upgrade::pending();
1128        self.upgrade = Some(tx);
1129        rx
1130    }
1131}
1132
1133#[cfg(test)]
1134mod tests {
1135    #[cfg(all(feature = "nightly", not(miri)))]
1136    #[bench]
1137    fn bench_read_head_short(b: &mut ::test::Bencher) {
1138        use super::*;
1139        use crate::common::io::Compat;
1140        let s = b"GET / HTTP/1.1\r\nHost: localhost:8080\r\n\r\n";
1141        let len = s.len();
1142        b.bytes = len as u64;
1143
1144        // an empty IO, we'll be skipping and using the read buffer anyways
1145        let io = Compat(tokio_test::io::Builder::new().build());
1146        let mut conn = Conn::<_, bytes::Bytes, crate::proto::h1::ServerTransaction>::new(io);
1147        *conn.io.read_buf_mut() = ::bytes::BytesMut::from(&s[..]);
1148        conn.state.cached_headers = Some(HeaderMap::with_capacity(2));
1149
1150        let rt = tokio::runtime::Builder::new_current_thread()
1151            .enable_all()
1152            .build()
1153            .unwrap();
1154
1155        b.iter(|| {
1156            rt.block_on(futures_util::future::poll_fn(|cx| {
1157                match conn.poll_read_head(cx) {
1158                    Poll::Ready(Some(Ok(x))) => {
1159                        ::test::black_box(&x);
1160                        let mut headers = x.0.headers;
1161                        headers.clear();
1162                        conn.state.cached_headers = Some(headers);
1163                    }
1164                    f => panic!("expected Ready(Some(Ok(..))): {:?}", f),
1165                }
1166
1167                conn.io.read_buf_mut().reserve(1);
1168                unsafe {
1169                    conn.io.read_buf_mut().set_len(len);
1170                }
1171                conn.state.reading = Reading::Init;
1172                Poll::Ready(())
1173            }));
1174        });
1175    }
1176
1177    /*
1178    //TODO: rewrite these using dispatch... someday...
1179    use futures::{Async, Future, Stream, Sink};
1180    use futures::future;
1181
1182    use proto::{self, ClientTransaction, MessageHead, ServerTransaction};
1183    use super::super::Encoder;
1184    use mock::AsyncIo;
1185
1186    use super::{Conn, Decoder, Reading, Writing};
1187    use ::uri::Uri;
1188
1189    use std::str::FromStr;
1190
1191    #[test]
1192    fn test_conn_init_read() {
1193        let good_message = b"GET / HTTP/1.1\r\n\r\n".to_vec();
1194        let len = good_message.len();
1195        let io = AsyncIo::new_buf(good_message, len);
1196        let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1197
1198        match conn.poll().unwrap() {
1199            Async::Ready(Some(Frame::Message { message, body: false })) => {
1200                assert_eq!(message, MessageHead {
1201                    subject: ::proto::RequestLine(::Get, Uri::from_str("/").unwrap()),
1202                    .. MessageHead::default()
1203                })
1204            },
1205            f => panic!("frame is not Frame::Message: {:?}", f)
1206        }
1207    }
1208
1209    #[test]
1210    fn test_conn_parse_partial() {
1211        let _: Result<(), ()> = future::lazy(|| {
1212            let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec();
1213            let io = AsyncIo::new_buf(good_message, 10);
1214            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1215            assert!(conn.poll().unwrap().is_not_ready());
1216            conn.io.io_mut().block_in(50);
1217            let async = conn.poll().unwrap();
1218            assert!(async.is_ready());
1219            match async {
1220                Async::Ready(Some(Frame::Message { .. })) => (),
1221                f => panic!("frame is not Message: {:?}", f),
1222            }
1223            Ok(())
1224        }).wait();
1225    }
1226
1227    #[test]
1228    fn test_conn_init_read_eof_idle() {
1229        let io = AsyncIo::new_buf(vec![], 1);
1230        let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1231        conn.state.idle();
1232
1233        match conn.poll().unwrap() {
1234            Async::Ready(None) => {},
1235            other => panic!("frame is not None: {:?}", other)
1236        }
1237    }
1238
1239    #[test]
1240    fn test_conn_init_read_eof_idle_partial_parse() {
1241        let io = AsyncIo::new_buf(b"GET / HTTP/1.1".to_vec(), 100);
1242        let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1243        conn.state.idle();
1244
1245        match conn.poll() {
1246            Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {},
1247            other => panic!("unexpected frame: {:?}", other)
1248        }
1249    }
1250
1251    #[test]
1252    fn test_conn_init_read_eof_busy() {
1253        let _: Result<(), ()> = future::lazy(|| {
1254            // server ignores
1255            let io = AsyncIo::new_eof();
1256            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1257            conn.state.busy();
1258
1259            match conn.poll().unwrap() {
1260                Async::Ready(None) => {},
1261                other => panic!("unexpected frame: {:?}", other)
1262            }
1263
1264            // client
1265            let io = AsyncIo::new_eof();
1266            let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1267            conn.state.busy();
1268
1269            match conn.poll() {
1270                Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {},
1271                other => panic!("unexpected frame: {:?}", other)
1272            }
1273            Ok(())
1274        }).wait();
1275    }
1276
1277    #[test]
1278    fn test_conn_body_finish_read_eof() {
1279        let _: Result<(), ()> = future::lazy(|| {
1280            let io = AsyncIo::new_eof();
1281            let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1282            conn.state.busy();
1283            conn.state.writing = Writing::KeepAlive;
1284            conn.state.reading = Reading::Body(Decoder::length(0));
1285
1286            match conn.poll() {
1287                Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (),
1288                other => panic!("unexpected frame: {:?}", other)
1289            }
1290
1291            // conn eofs, but tokio-proto will call poll() again, before calling flush()
1292            // the conn eof in this case is perfectly fine
1293
1294            match conn.poll() {
1295                Ok(Async::Ready(None)) => (),
1296                other => panic!("unexpected frame: {:?}", other)
1297            }
1298            Ok(())
1299        }).wait();
1300    }
1301
1302    #[test]
1303    fn test_conn_message_empty_body_read_eof() {
1304        let _: Result<(), ()> = future::lazy(|| {
1305            let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec(), 1024);
1306            let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1307            conn.state.busy();
1308            conn.state.writing = Writing::KeepAlive;
1309
1310            match conn.poll() {
1311                Ok(Async::Ready(Some(Frame::Message { body: false, .. }))) => (),
1312                other => panic!("unexpected frame: {:?}", other)
1313            }
1314
1315            // conn eofs, but tokio-proto will call poll() again, before calling flush()
1316            // the conn eof in this case is perfectly fine
1317
1318            match conn.poll() {
1319                Ok(Async::Ready(None)) => (),
1320                other => panic!("unexpected frame: {:?}", other)
1321            }
1322            Ok(())
1323        }).wait();
1324    }
1325
1326    #[test]
1327    fn test_conn_read_body_end() {
1328        let _: Result<(), ()> = future::lazy(|| {
1329            let io = AsyncIo::new_buf(b"POST / HTTP/1.1\r\nContent-Length: 5\r\n\r\n12345".to_vec(), 1024);
1330            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1331            conn.state.busy();
1332
1333            match conn.poll() {
1334                Ok(Async::Ready(Some(Frame::Message { body: true, .. }))) => (),
1335                other => panic!("unexpected frame: {:?}", other)
1336            }
1337
1338            match conn.poll() {
1339                Ok(Async::Ready(Some(Frame::Body { chunk: Some(_) }))) => (),
1340                other => panic!("unexpected frame: {:?}", other)
1341            }
1342
1343            // When the body is done, `poll` MUST return a `Body` frame with chunk set to `None`
1344            match conn.poll() {
1345                Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (),
1346                other => panic!("unexpected frame: {:?}", other)
1347            }
1348
1349            match conn.poll() {
1350                Ok(Async::NotReady) => (),
1351                other => panic!("unexpected frame: {:?}", other)
1352            }
1353            Ok(())
1354        }).wait();
1355    }
1356
1357    #[test]
1358    fn test_conn_closed_read() {
1359        let io = AsyncIo::new_buf(vec![], 0);
1360        let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1361        conn.state.close();
1362
1363        match conn.poll().unwrap() {
1364            Async::Ready(None) => {},
1365            other => panic!("frame is not None: {:?}", other)
1366        }
1367    }
1368
1369    #[test]
1370    fn test_conn_body_write_length() {
1371        let _ = pretty_env_logger::try_init();
1372        let _: Result<(), ()> = future::lazy(|| {
1373            let io = AsyncIo::new_buf(vec![], 0);
1374            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1375            let max = super::super::io::DEFAULT_MAX_BUFFER_SIZE + 4096;
1376            conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64));
1377
1378            assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; max].into()) }).unwrap().is_ready());
1379            assert!(!conn.can_buffer_body());
1380
1381            assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; 1024 * 8].into()) }).unwrap().is_not_ready());
1382
1383            conn.io.io_mut().block_in(1024 * 3);
1384            assert!(conn.poll_complete().unwrap().is_not_ready());
1385            conn.io.io_mut().block_in(1024 * 3);
1386            assert!(conn.poll_complete().unwrap().is_not_ready());
1387            conn.io.io_mut().block_in(max * 2);
1388            assert!(conn.poll_complete().unwrap().is_ready());
1389
1390            assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'c'; 1024 * 8].into()) }).unwrap().is_ready());
1391            Ok(())
1392        }).wait();
1393    }
1394
1395    #[test]
1396    fn test_conn_body_write_chunked() {
1397        let _: Result<(), ()> = future::lazy(|| {
1398            let io = AsyncIo::new_buf(vec![], 4096);
1399            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1400            conn.state.writing = Writing::Body(Encoder::chunked());
1401
1402            assert!(conn.start_send(Frame::Body { chunk: Some("headers".into()) }).unwrap().is_ready());
1403            assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'x'; 8192].into()) }).unwrap().is_ready());
1404            Ok(())
1405        }).wait();
1406    }
1407
1408    #[test]
1409    fn test_conn_body_flush() {
1410        let _: Result<(), ()> = future::lazy(|| {
1411            let io = AsyncIo::new_buf(vec![], 1024 * 1024 * 5);
1412            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1413            conn.state.writing = Writing::Body(Encoder::length(1024 * 1024));
1414            assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 1024].into()) }).unwrap().is_ready());
1415            assert!(!conn.can_buffer_body());
1416            conn.io.io_mut().block_in(1024 * 1024 * 5);
1417            assert!(conn.poll_complete().unwrap().is_ready());
1418            assert!(conn.can_buffer_body());
1419            assert!(conn.io.io_mut().flushed());
1420
1421            Ok(())
1422        }).wait();
1423    }
1424
1425    #[test]
1426    fn test_conn_parking() {
1427        use std::sync::Arc;
1428        use futures::executor::Notify;
1429        use futures::executor::NotifyHandle;
1430
1431        struct Car {
1432            permit: bool,
1433        }
1434        impl Notify for Car {
1435            fn notify(&self, _id: usize) {
1436                assert!(self.permit, "unparked without permit");
1437            }
1438        }
1439
1440        fn car(permit: bool) -> NotifyHandle {
1441            Arc::new(Car {
1442                permit: permit,
1443            }).into()
1444        }
1445
1446        // test that once writing is done, unparks
1447        let f = future::lazy(|| {
1448            let io = AsyncIo::new_buf(vec![], 4096);
1449            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1450            conn.state.reading = Reading::KeepAlive;
1451            assert!(conn.poll().unwrap().is_not_ready());
1452
1453            conn.state.writing = Writing::KeepAlive;
1454            assert!(conn.poll_complete().unwrap().is_ready());
1455            Ok::<(), ()>(())
1456        });
1457        ::futures::executor::spawn(f).poll_future_notify(&car(true), 0).unwrap();
1458
1459
1460        // test that flushing when not waiting on read doesn't unpark
1461        let f = future::lazy(|| {
1462            let io = AsyncIo::new_buf(vec![], 4096);
1463            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1464            conn.state.writing = Writing::KeepAlive;
1465            assert!(conn.poll_complete().unwrap().is_ready());
1466            Ok::<(), ()>(())
1467        });
1468        ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap();
1469
1470
1471        // test that flushing and writing isn't done doesn't unpark
1472        let f = future::lazy(|| {
1473            let io = AsyncIo::new_buf(vec![], 4096);
1474            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1475            conn.state.reading = Reading::KeepAlive;
1476            assert!(conn.poll().unwrap().is_not_ready());
1477            conn.state.writing = Writing::Body(Encoder::length(5_000));
1478            assert!(conn.poll_complete().unwrap().is_ready());
1479            Ok::<(), ()>(())
1480        });
1481        ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap();
1482    }
1483
1484    #[test]
1485    fn test_conn_closed_write() {
1486        let io = AsyncIo::new_buf(vec![], 0);
1487        let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1488        conn.state.close();
1489
1490        match conn.start_send(Frame::Body { chunk: Some(b"foobar".to_vec().into()) }) {
1491            Err(_e) => {},
1492            other => panic!("did not return Err: {:?}", other)
1493        }
1494
1495        assert!(conn.state.is_write_closed());
1496    }
1497
1498    #[test]
1499    fn test_conn_write_empty_chunk() {
1500        let io = AsyncIo::new_buf(vec![], 0);
1501        let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1502        conn.state.writing = Writing::KeepAlive;
1503
1504        assert!(conn.start_send(Frame::Body { chunk: None }).unwrap().is_ready());
1505        assert!(conn.start_send(Frame::Body { chunk: Some(Vec::new().into()) }).unwrap().is_ready());
1506        conn.start_send(Frame::Body { chunk: Some(vec![b'a'].into()) }).unwrap_err();
1507    }
1508    */
1509}