hyper/proto/h1/
io.rs

1use std::cmp;
2use std::fmt;
3use std::io::{self, IoSlice};
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use crate::rt::{Read, ReadBuf, Write};
8use bytes::{Buf, BufMut, Bytes, BytesMut};
9use futures_util::ready;
10
11use super::{Http1Transaction, ParseContext, ParsedMessage};
12use crate::common::buf::BufList;
13
14/// The initial buffer size allocated before trying to read from IO.
15pub(crate) const INIT_BUFFER_SIZE: usize = 8192;
16
17/// The minimum value that can be set to max buffer size.
18pub(crate) const MINIMUM_MAX_BUFFER_SIZE: usize = INIT_BUFFER_SIZE;
19
20/// The default maximum read buffer size. If the buffer gets this big and
21/// a message is still not complete, a `TooLarge` error is triggered.
22// Note: if this changes, update server::conn::Http::max_buf_size docs.
23pub(crate) const DEFAULT_MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100;
24
25/// The maximum number of distinct `Buf`s to hold in a list before requiring
26/// a flush. Only affects when the buffer strategy is to queue buffers.
27///
28/// Note that a flush can happen before reaching the maximum. This simply
29/// forces a flush if the queue gets this big.
30const MAX_BUF_LIST_BUFFERS: usize = 16;
31
32pub(crate) struct Buffered<T, B> {
33    flush_pipeline: bool,
34    io: T,
35    read_blocked: bool,
36    read_buf: BytesMut,
37    read_buf_strategy: ReadStrategy,
38    write_buf: WriteBuf<B>,
39}
40
41impl<T, B> fmt::Debug for Buffered<T, B>
42where
43    B: Buf,
44{
45    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46        f.debug_struct("Buffered")
47            .field("read_buf", &self.read_buf)
48            .field("write_buf", &self.write_buf)
49            .finish()
50    }
51}
52
53impl<T, B> Buffered<T, B>
54where
55    T: Read + Write + Unpin,
56    B: Buf,
57{
58    pub(crate) fn new(io: T) -> Buffered<T, B> {
59        let strategy = if io.is_write_vectored() {
60            WriteStrategy::Queue
61        } else {
62            WriteStrategy::Flatten
63        };
64        let write_buf = WriteBuf::new(strategy);
65        Buffered {
66            flush_pipeline: false,
67            io,
68            read_blocked: false,
69            read_buf: BytesMut::with_capacity(0),
70            read_buf_strategy: ReadStrategy::default(),
71            write_buf,
72        }
73    }
74
75    #[cfg(feature = "server")]
76    pub(crate) fn set_flush_pipeline(&mut self, enabled: bool) {
77        debug_assert!(!self.write_buf.has_remaining());
78        self.flush_pipeline = enabled;
79        if enabled {
80            self.set_write_strategy_flatten();
81        }
82    }
83
84    pub(crate) fn set_max_buf_size(&mut self, max: usize) {
85        assert!(
86            max >= MINIMUM_MAX_BUFFER_SIZE,
87            "The max_buf_size cannot be smaller than {}.",
88            MINIMUM_MAX_BUFFER_SIZE,
89        );
90        self.read_buf_strategy = ReadStrategy::with_max(max);
91        self.write_buf.max_buf_size = max;
92    }
93
94    #[cfg(feature = "client")]
95    pub(crate) fn set_read_buf_exact_size(&mut self, sz: usize) {
96        self.read_buf_strategy = ReadStrategy::Exact(sz);
97    }
98
99    pub(crate) fn set_write_strategy_flatten(&mut self) {
100        // this should always be called only at construction time,
101        // so this assert is here to catch myself
102        debug_assert!(self.write_buf.queue.bufs_cnt() == 0);
103        self.write_buf.set_strategy(WriteStrategy::Flatten);
104    }
105
106    pub(crate) fn set_write_strategy_queue(&mut self) {
107        // this should always be called only at construction time,
108        // so this assert is here to catch myself
109        debug_assert!(self.write_buf.queue.bufs_cnt() == 0);
110        self.write_buf.set_strategy(WriteStrategy::Queue);
111    }
112
113    pub(crate) fn read_buf(&self) -> &[u8] {
114        self.read_buf.as_ref()
115    }
116
117    #[cfg(test)]
118    #[cfg(feature = "nightly")]
119    pub(super) fn read_buf_mut(&mut self) -> &mut BytesMut {
120        &mut self.read_buf
121    }
122
123    /// Return the "allocated" available space, not the potential space
124    /// that could be allocated in the future.
125    fn read_buf_remaining_mut(&self) -> usize {
126        self.read_buf.capacity() - self.read_buf.len()
127    }
128
129    /// Return whether we can append to the headers buffer.
130    ///
131    /// Reasons we can't:
132    /// - The write buf is in queue mode, and some of the past body is still
133    ///   needing to be flushed.
134    pub(crate) fn can_headers_buf(&self) -> bool {
135        !self.write_buf.queue.has_remaining()
136    }
137
138    pub(crate) fn headers_buf(&mut self) -> &mut Vec<u8> {
139        let buf = self.write_buf.headers_mut();
140        &mut buf.bytes
141    }
142
143    pub(super) fn write_buf(&mut self) -> &mut WriteBuf<B> {
144        &mut self.write_buf
145    }
146
147    pub(crate) fn buffer<BB: Buf + Into<B>>(&mut self, buf: BB) {
148        self.write_buf.buffer(buf)
149    }
150
151    pub(crate) fn can_buffer(&self) -> bool {
152        self.flush_pipeline || self.write_buf.can_buffer()
153    }
154
155    pub(crate) fn consume_leading_lines(&mut self) {
156        if !self.read_buf.is_empty() {
157            let mut i = 0;
158            while i < self.read_buf.len() {
159                match self.read_buf[i] {
160                    b'\r' | b'\n' => i += 1,
161                    _ => break,
162                }
163            }
164            self.read_buf.advance(i);
165        }
166    }
167
168    pub(super) fn parse<S>(
169        &mut self,
170        cx: &mut Context<'_>,
171        parse_ctx: ParseContext<'_>,
172    ) -> Poll<crate::Result<ParsedMessage<S::Incoming>>>
173    where
174        S: Http1Transaction,
175    {
176        loop {
177            match super::role::parse_headers::<S>(
178                &mut self.read_buf,
179                ParseContext {
180                    cached_headers: parse_ctx.cached_headers,
181                    req_method: parse_ctx.req_method,
182                    h1_parser_config: parse_ctx.h1_parser_config.clone(),
183                    h1_max_headers: parse_ctx.h1_max_headers,
184                    preserve_header_case: parse_ctx.preserve_header_case,
185                    #[cfg(feature = "ffi")]
186                    preserve_header_order: parse_ctx.preserve_header_order,
187                    h09_responses: parse_ctx.h09_responses,
188                    #[cfg(feature = "ffi")]
189                    on_informational: parse_ctx.on_informational,
190                },
191            )? {
192                Some(msg) => {
193                    debug!("parsed {} headers", msg.head.headers.len());
194                    return Poll::Ready(Ok(msg));
195                }
196                None => {
197                    let max = self.read_buf_strategy.max();
198                    if self.read_buf.len() >= max {
199                        debug!("max_buf_size ({}) reached, closing", max);
200                        return Poll::Ready(Err(crate::Error::new_too_large()));
201                    }
202                }
203            }
204            if ready!(self.poll_read_from_io(cx)).map_err(crate::Error::new_io)? == 0 {
205                trace!("parse eof");
206                return Poll::Ready(Err(crate::Error::new_incomplete()));
207            }
208        }
209    }
210
211    pub(crate) fn poll_read_from_io(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
212        self.read_blocked = false;
213        let next = self.read_buf_strategy.next();
214        if self.read_buf_remaining_mut() < next {
215            self.read_buf.reserve(next);
216        }
217
218        // SAFETY: ReadBuf and poll_read promise not to set any uninitialized
219        // bytes onto `dst`.
220        let dst = unsafe { self.read_buf.chunk_mut().as_uninit_slice_mut() };
221        let mut buf = ReadBuf::uninit(dst);
222        match Pin::new(&mut self.io).poll_read(cx, buf.unfilled()) {
223            Poll::Ready(Ok(_)) => {
224                let n = buf.filled().len();
225                trace!("received {} bytes", n);
226                unsafe {
227                    // Safety: we just read that many bytes into the
228                    // uninitialized part of the buffer, so this is okay.
229                    // @tokio pls give me back `poll_read_buf` thanks
230                    self.read_buf.advance_mut(n);
231                }
232                self.read_buf_strategy.record(n);
233                Poll::Ready(Ok(n))
234            }
235            Poll::Pending => {
236                self.read_blocked = true;
237                Poll::Pending
238            }
239            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
240        }
241    }
242
243    pub(crate) fn into_inner(self) -> (T, Bytes) {
244        (self.io, self.read_buf.freeze())
245    }
246
247    pub(crate) fn io_mut(&mut self) -> &mut T {
248        &mut self.io
249    }
250
251    pub(crate) fn is_read_blocked(&self) -> bool {
252        self.read_blocked
253    }
254
255    pub(crate) fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
256        if self.flush_pipeline && !self.read_buf.is_empty() {
257            Poll::Ready(Ok(()))
258        } else if self.write_buf.remaining() == 0 {
259            Pin::new(&mut self.io).poll_flush(cx)
260        } else {
261            if let WriteStrategy::Flatten = self.write_buf.strategy {
262                return self.poll_flush_flattened(cx);
263            }
264
265            const MAX_WRITEV_BUFS: usize = 64;
266            loop {
267                let n = {
268                    let mut iovs = [IoSlice::new(&[]); MAX_WRITEV_BUFS];
269                    let len = self.write_buf.chunks_vectored(&mut iovs);
270                    ready!(Pin::new(&mut self.io).poll_write_vectored(cx, &iovs[..len]))?
271                };
272                // TODO(eliza): we have to do this manually because
273                // `poll_write_buf` doesn't exist in Tokio 0.3 yet...when
274                // `poll_write_buf` comes back, the manual advance will need to leave!
275                self.write_buf.advance(n);
276                debug!("flushed {} bytes", n);
277                if self.write_buf.remaining() == 0 {
278                    break;
279                } else if n == 0 {
280                    trace!(
281                        "write returned zero, but {} bytes remaining",
282                        self.write_buf.remaining()
283                    );
284                    return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
285                }
286            }
287            Pin::new(&mut self.io).poll_flush(cx)
288        }
289    }
290
291    /// Specialized version of `flush` when strategy is Flatten.
292    ///
293    /// Since all buffered bytes are flattened into the single headers buffer,
294    /// that skips some bookkeeping around using multiple buffers.
295    fn poll_flush_flattened(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
296        loop {
297            let n = ready!(Pin::new(&mut self.io).poll_write(cx, self.write_buf.headers.chunk()))?;
298            debug!("flushed {} bytes", n);
299            self.write_buf.headers.advance(n);
300            if self.write_buf.headers.remaining() == 0 {
301                self.write_buf.headers.reset();
302                break;
303            } else if n == 0 {
304                trace!(
305                    "write returned zero, but {} bytes remaining",
306                    self.write_buf.remaining()
307                );
308                return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
309            }
310        }
311        Pin::new(&mut self.io).poll_flush(cx)
312    }
313
314    #[cfg(test)]
315    fn flush<'a>(&'a mut self) -> impl std::future::Future<Output = io::Result<()>> + 'a {
316        futures_util::future::poll_fn(move |cx| self.poll_flush(cx))
317    }
318}
319
320// The `B` is a `Buf`, we never project a pin to it
321impl<T: Unpin, B> Unpin for Buffered<T, B> {}
322
323// TODO: This trait is old... at least rename to PollBytes or something...
324pub(crate) trait MemRead {
325    fn read_mem(&mut self, cx: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>>;
326}
327
328impl<T, B> MemRead for Buffered<T, B>
329where
330    T: Read + Write + Unpin,
331    B: Buf,
332{
333    fn read_mem(&mut self, cx: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>> {
334        if !self.read_buf.is_empty() {
335            let n = std::cmp::min(len, self.read_buf.len());
336            Poll::Ready(Ok(self.read_buf.split_to(n).freeze()))
337        } else {
338            let n = ready!(self.poll_read_from_io(cx))?;
339            Poll::Ready(Ok(self.read_buf.split_to(::std::cmp::min(len, n)).freeze()))
340        }
341    }
342}
343
344#[derive(Clone, Copy, Debug)]
345enum ReadStrategy {
346    Adaptive {
347        decrease_now: bool,
348        next: usize,
349        max: usize,
350    },
351    #[cfg(feature = "client")]
352    Exact(usize),
353}
354
355impl ReadStrategy {
356    fn with_max(max: usize) -> ReadStrategy {
357        ReadStrategy::Adaptive {
358            decrease_now: false,
359            next: INIT_BUFFER_SIZE,
360            max,
361        }
362    }
363
364    fn next(&self) -> usize {
365        match *self {
366            ReadStrategy::Adaptive { next, .. } => next,
367            #[cfg(feature = "client")]
368            ReadStrategy::Exact(exact) => exact,
369        }
370    }
371
372    fn max(&self) -> usize {
373        match *self {
374            ReadStrategy::Adaptive { max, .. } => max,
375            #[cfg(feature = "client")]
376            ReadStrategy::Exact(exact) => exact,
377        }
378    }
379
380    fn record(&mut self, bytes_read: usize) {
381        match *self {
382            ReadStrategy::Adaptive {
383                ref mut decrease_now,
384                ref mut next,
385                max,
386                ..
387            } => {
388                if bytes_read >= *next {
389                    *next = cmp::min(incr_power_of_two(*next), max);
390                    *decrease_now = false;
391                } else {
392                    let decr_to = prev_power_of_two(*next);
393                    if bytes_read < decr_to {
394                        if *decrease_now {
395                            *next = cmp::max(decr_to, INIT_BUFFER_SIZE);
396                            *decrease_now = false;
397                        } else {
398                            // Decreasing is a two "record" process.
399                            *decrease_now = true;
400                        }
401                    } else {
402                        // A read within the current range should cancel
403                        // a potential decrease, since we just saw proof
404                        // that we still need this size.
405                        *decrease_now = false;
406                    }
407                }
408            }
409            #[cfg(feature = "client")]
410            ReadStrategy::Exact(_) => (),
411        }
412    }
413}
414
415fn incr_power_of_two(n: usize) -> usize {
416    n.saturating_mul(2)
417}
418
419fn prev_power_of_two(n: usize) -> usize {
420    // Only way this shift can underflow is if n is less than 4.
421    // (Which would means `usize::MAX >> 64` and underflowed!)
422    debug_assert!(n >= 4);
423    (usize::MAX >> (n.leading_zeros() + 2)) + 1
424}
425
426impl Default for ReadStrategy {
427    fn default() -> ReadStrategy {
428        ReadStrategy::with_max(DEFAULT_MAX_BUFFER_SIZE)
429    }
430}
431
432#[derive(Clone)]
433pub(crate) struct Cursor<T> {
434    bytes: T,
435    pos: usize,
436}
437
438impl<T: AsRef<[u8]>> Cursor<T> {
439    #[inline]
440    pub(crate) fn new(bytes: T) -> Cursor<T> {
441        Cursor { bytes, pos: 0 }
442    }
443}
444
445impl Cursor<Vec<u8>> {
446    /// If we've advanced the position a bit in this cursor, and wish to
447    /// extend the underlying vector, we may wish to unshift the "read" bytes
448    /// off, and move everything else over.
449    fn maybe_unshift(&mut self, additional: usize) {
450        if self.pos == 0 {
451            // nothing to do
452            return;
453        }
454
455        if self.bytes.capacity() - self.bytes.len() >= additional {
456            // there's room!
457            return;
458        }
459
460        self.bytes.drain(0..self.pos);
461        self.pos = 0;
462    }
463
464    fn reset(&mut self) {
465        self.pos = 0;
466        self.bytes.clear();
467    }
468}
469
470impl<T: AsRef<[u8]>> fmt::Debug for Cursor<T> {
471    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
472        f.debug_struct("Cursor")
473            .field("pos", &self.pos)
474            .field("len", &self.bytes.as_ref().len())
475            .finish()
476    }
477}
478
479impl<T: AsRef<[u8]>> Buf for Cursor<T> {
480    #[inline]
481    fn remaining(&self) -> usize {
482        self.bytes.as_ref().len() - self.pos
483    }
484
485    #[inline]
486    fn chunk(&self) -> &[u8] {
487        &self.bytes.as_ref()[self.pos..]
488    }
489
490    #[inline]
491    fn advance(&mut self, cnt: usize) {
492        debug_assert!(self.pos + cnt <= self.bytes.as_ref().len());
493        self.pos += cnt;
494    }
495}
496
497// an internal buffer to collect writes before flushes
498pub(super) struct WriteBuf<B> {
499    /// Re-usable buffer that holds message headers
500    headers: Cursor<Vec<u8>>,
501    max_buf_size: usize,
502    /// Deque of user buffers if strategy is Queue
503    queue: BufList<B>,
504    strategy: WriteStrategy,
505}
506
507impl<B: Buf> WriteBuf<B> {
508    fn new(strategy: WriteStrategy) -> WriteBuf<B> {
509        WriteBuf {
510            headers: Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE)),
511            max_buf_size: DEFAULT_MAX_BUFFER_SIZE,
512            queue: BufList::new(),
513            strategy,
514        }
515    }
516}
517
518impl<B> WriteBuf<B>
519where
520    B: Buf,
521{
522    fn set_strategy(&mut self, strategy: WriteStrategy) {
523        self.strategy = strategy;
524    }
525
526    pub(super) fn buffer<BB: Buf + Into<B>>(&mut self, mut buf: BB) {
527        debug_assert!(buf.has_remaining());
528        match self.strategy {
529            WriteStrategy::Flatten => {
530                let head = self.headers_mut();
531
532                head.maybe_unshift(buf.remaining());
533                trace!(
534                    self.len = head.remaining(),
535                    buf.len = buf.remaining(),
536                    "buffer.flatten"
537                );
538                //perf: This is a little faster than <Vec as BufMut>>::put,
539                //but accomplishes the same result.
540                loop {
541                    let adv = {
542                        let slice = buf.chunk();
543                        if slice.is_empty() {
544                            return;
545                        }
546                        head.bytes.extend_from_slice(slice);
547                        slice.len()
548                    };
549                    buf.advance(adv);
550                }
551            }
552            WriteStrategy::Queue => {
553                trace!(
554                    self.len = self.remaining(),
555                    buf.len = buf.remaining(),
556                    "buffer.queue"
557                );
558                self.queue.push(buf.into());
559            }
560        }
561    }
562
563    fn can_buffer(&self) -> bool {
564        match self.strategy {
565            WriteStrategy::Flatten => self.remaining() < self.max_buf_size,
566            WriteStrategy::Queue => {
567                self.queue.bufs_cnt() < MAX_BUF_LIST_BUFFERS && self.remaining() < self.max_buf_size
568            }
569        }
570    }
571
572    fn headers_mut(&mut self) -> &mut Cursor<Vec<u8>> {
573        debug_assert!(!self.queue.has_remaining());
574        &mut self.headers
575    }
576}
577
578impl<B: Buf> fmt::Debug for WriteBuf<B> {
579    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
580        f.debug_struct("WriteBuf")
581            .field("remaining", &self.remaining())
582            .field("strategy", &self.strategy)
583            .finish()
584    }
585}
586
587impl<B: Buf> Buf for WriteBuf<B> {
588    #[inline]
589    fn remaining(&self) -> usize {
590        self.headers.remaining() + self.queue.remaining()
591    }
592
593    #[inline]
594    fn chunk(&self) -> &[u8] {
595        let headers = self.headers.chunk();
596        if !headers.is_empty() {
597            headers
598        } else {
599            self.queue.chunk()
600        }
601    }
602
603    #[inline]
604    fn advance(&mut self, cnt: usize) {
605        let hrem = self.headers.remaining();
606
607        match hrem.cmp(&cnt) {
608            cmp::Ordering::Equal => self.headers.reset(),
609            cmp::Ordering::Greater => self.headers.advance(cnt),
610            cmp::Ordering::Less => {
611                let qcnt = cnt - hrem;
612                self.headers.reset();
613                self.queue.advance(qcnt);
614            }
615        }
616    }
617
618    #[inline]
619    fn chunks_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize {
620        let n = self.headers.chunks_vectored(dst);
621        self.queue.chunks_vectored(&mut dst[n..]) + n
622    }
623}
624
625#[derive(Debug)]
626enum WriteStrategy {
627    Flatten,
628    Queue,
629}
630
631#[cfg(test)]
632mod tests {
633    use super::*;
634    use crate::common::io::Compat;
635    use std::time::Duration;
636
637    use tokio_test::io::Builder as Mock;
638
639    // #[cfg(feature = "nightly")]
640    // use test::Bencher;
641
642    /*
643    impl<T: Read> MemRead for AsyncIo<T> {
644        fn read_mem(&mut self, len: usize) -> Poll<Bytes, io::Error> {
645            let mut v = vec![0; len];
646            let n = try_nb!(self.read(v.as_mut_slice()));
647            Ok(Async::Ready(BytesMut::from(&v[..n]).freeze()))
648        }
649    }
650    */
651
652    #[tokio::test]
653    #[ignore]
654    async fn iobuf_write_empty_slice() {
655        // TODO(eliza): can i have writev back pls T_T
656        // // First, let's just check that the Mock would normally return an
657        // // error on an unexpected write, even if the buffer is empty...
658        // let mut mock = Mock::new().build();
659        // futures_util::future::poll_fn(|cx| {
660        //     Pin::new(&mut mock).poll_write_buf(cx, &mut Cursor::new(&[]))
661        // })
662        // .await
663        // .expect_err("should be a broken pipe");
664
665        // // underlying io will return the logic error upon write,
666        // // so we are testing that the io_buf does not trigger a write
667        // // when there is nothing to flush
668        // let mock = Mock::new().build();
669        // let mut io_buf = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
670        // io_buf.flush().await.expect("should short-circuit flush");
671    }
672
673    #[cfg(not(miri))]
674    #[tokio::test]
675    async fn parse_reads_until_blocked() {
676        use crate::proto::h1::ClientTransaction;
677
678        let _ = pretty_env_logger::try_init();
679        let mock = Mock::new()
680            // Split over multiple reads will read all of it
681            .read(b"HTTP/1.1 200 OK\r\n")
682            .read(b"Server: hyper\r\n")
683            // missing last line ending
684            .wait(Duration::from_secs(1))
685            .build();
686
687        let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(Compat::new(mock));
688
689        // We expect a `parse` to be not ready, and so can't await it directly.
690        // Rather, this `poll_fn` will wrap the `Poll` result.
691        futures_util::future::poll_fn(|cx| {
692            let parse_ctx = ParseContext {
693                cached_headers: &mut None,
694                req_method: &mut None,
695                h1_parser_config: Default::default(),
696                h1_max_headers: None,
697                preserve_header_case: false,
698                #[cfg(feature = "ffi")]
699                preserve_header_order: false,
700                h09_responses: false,
701                #[cfg(feature = "ffi")]
702                on_informational: &mut None,
703            };
704            assert!(buffered
705                .parse::<ClientTransaction>(cx, parse_ctx)
706                .is_pending());
707            Poll::Ready(())
708        })
709        .await;
710
711        assert_eq!(
712            buffered.read_buf,
713            b"HTTP/1.1 200 OK\r\nServer: hyper\r\n"[..]
714        );
715    }
716
717    #[test]
718    fn read_strategy_adaptive_increments() {
719        let mut strategy = ReadStrategy::default();
720        assert_eq!(strategy.next(), 8192);
721
722        // Grows if record == next
723        strategy.record(8192);
724        assert_eq!(strategy.next(), 16384);
725
726        strategy.record(16384);
727        assert_eq!(strategy.next(), 32768);
728
729        // Enormous records still increment at same rate
730        strategy.record(usize::MAX);
731        assert_eq!(strategy.next(), 65536);
732
733        let max = strategy.max();
734        while strategy.next() < max {
735            strategy.record(max);
736        }
737
738        assert_eq!(strategy.next(), max, "never goes over max");
739        strategy.record(max + 1);
740        assert_eq!(strategy.next(), max, "never goes over max");
741    }
742
743    #[test]
744    fn read_strategy_adaptive_decrements() {
745        let mut strategy = ReadStrategy::default();
746        strategy.record(8192);
747        assert_eq!(strategy.next(), 16384);
748
749        strategy.record(1);
750        assert_eq!(
751            strategy.next(),
752            16384,
753            "first smaller record doesn't decrement yet"
754        );
755        strategy.record(8192);
756        assert_eq!(strategy.next(), 16384, "record was with range");
757
758        strategy.record(1);
759        assert_eq!(
760            strategy.next(),
761            16384,
762            "in-range record should make this the 'first' again"
763        );
764
765        strategy.record(1);
766        assert_eq!(strategy.next(), 8192, "second smaller record decrements");
767
768        strategy.record(1);
769        assert_eq!(strategy.next(), 8192, "first doesn't decrement");
770        strategy.record(1);
771        assert_eq!(strategy.next(), 8192, "doesn't decrement under minimum");
772    }
773
774    #[test]
775    fn read_strategy_adaptive_stays_the_same() {
776        let mut strategy = ReadStrategy::default();
777        strategy.record(8192);
778        assert_eq!(strategy.next(), 16384);
779
780        strategy.record(8193);
781        assert_eq!(
782            strategy.next(),
783            16384,
784            "first smaller record doesn't decrement yet"
785        );
786
787        strategy.record(8193);
788        assert_eq!(
789            strategy.next(),
790            16384,
791            "with current step does not decrement"
792        );
793    }
794
795    #[test]
796    fn read_strategy_adaptive_max_fuzz() {
797        fn fuzz(max: usize) {
798            let mut strategy = ReadStrategy::with_max(max);
799            while strategy.next() < max {
800                strategy.record(usize::MAX);
801            }
802            let mut next = strategy.next();
803            while next > 8192 {
804                strategy.record(1);
805                strategy.record(1);
806                next = strategy.next();
807                assert!(
808                    next.is_power_of_two(),
809                    "decrement should be powers of two: {} (max = {})",
810                    next,
811                    max,
812                );
813            }
814        }
815
816        let mut max = 8192;
817        while max < std::usize::MAX {
818            fuzz(max);
819            max = (max / 2).saturating_mul(3);
820        }
821        fuzz(usize::MAX);
822    }
823
824    #[test]
825    #[should_panic]
826    #[cfg(debug_assertions)] // needs to trigger a debug_assert
827    fn write_buf_requires_non_empty_bufs() {
828        let mock = Mock::new().build();
829        let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(Compat::new(mock));
830
831        buffered.buffer(Cursor::new(Vec::new()));
832    }
833
834    /*
835    TODO: needs tokio_test::io to allow configure write_buf calls
836    #[test]
837    fn write_buf_queue() {
838        let _ = pretty_env_logger::try_init();
839
840        let mock = AsyncIo::new_buf(vec![], 1024);
841        let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
842
843
844        buffered.headers_buf().extend(b"hello ");
845        buffered.buffer(Cursor::new(b"world, ".to_vec()));
846        buffered.buffer(Cursor::new(b"it's ".to_vec()));
847        buffered.buffer(Cursor::new(b"hyper!".to_vec()));
848        assert_eq!(buffered.write_buf.queue.bufs_cnt(), 3);
849        buffered.flush().unwrap();
850
851        assert_eq!(buffered.io, b"hello world, it's hyper!");
852        assert_eq!(buffered.io.num_writes(), 1);
853        assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0);
854    }
855    */
856
857    #[cfg(not(miri))]
858    #[tokio::test]
859    async fn write_buf_flatten() {
860        let _ = pretty_env_logger::try_init();
861
862        let mock = Mock::new().write(b"hello world, it's hyper!").build();
863
864        let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(Compat::new(mock));
865        buffered.write_buf.set_strategy(WriteStrategy::Flatten);
866
867        buffered.headers_buf().extend(b"hello ");
868        buffered.buffer(Cursor::new(b"world, ".to_vec()));
869        buffered.buffer(Cursor::new(b"it's ".to_vec()));
870        buffered.buffer(Cursor::new(b"hyper!".to_vec()));
871        assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0);
872
873        buffered.flush().await.expect("flush");
874    }
875
876    #[test]
877    fn write_buf_flatten_partially_flushed() {
878        let _ = pretty_env_logger::try_init();
879
880        let b = |s: &str| Cursor::new(s.as_bytes().to_vec());
881
882        let mut write_buf = WriteBuf::<Cursor<Vec<u8>>>::new(WriteStrategy::Flatten);
883
884        write_buf.buffer(b("hello "));
885        write_buf.buffer(b("world, "));
886
887        assert_eq!(write_buf.chunk(), b"hello world, ");
888
889        // advance most of the way, but not all
890        write_buf.advance(11);
891
892        assert_eq!(write_buf.chunk(), b", ");
893        assert_eq!(write_buf.headers.pos, 11);
894        assert_eq!(write_buf.headers.bytes.capacity(), INIT_BUFFER_SIZE);
895
896        // there's still room in the headers buffer, so just push on the end
897        write_buf.buffer(b("it's hyper!"));
898
899        assert_eq!(write_buf.chunk(), b", it's hyper!");
900        assert_eq!(write_buf.headers.pos, 11);
901
902        let rem1 = write_buf.remaining();
903        let cap = write_buf.headers.bytes.capacity();
904
905        // but when this would go over capacity, don't copy the old bytes
906        write_buf.buffer(Cursor::new(vec![b'X'; cap]));
907        assert_eq!(write_buf.remaining(), cap + rem1);
908        assert_eq!(write_buf.headers.pos, 0);
909    }
910
911    #[cfg(not(miri))]
912    #[tokio::test]
913    async fn write_buf_queue_disable_auto() {
914        let _ = pretty_env_logger::try_init();
915
916        let mock = Mock::new()
917            .write(b"hello ")
918            .write(b"world, ")
919            .write(b"it's ")
920            .write(b"hyper!")
921            .build();
922
923        let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(Compat::new(mock));
924        buffered.write_buf.set_strategy(WriteStrategy::Queue);
925
926        // we have 4 buffers, and vec IO disabled, but explicitly said
927        // don't try to auto detect (via setting strategy above)
928
929        buffered.headers_buf().extend(b"hello ");
930        buffered.buffer(Cursor::new(b"world, ".to_vec()));
931        buffered.buffer(Cursor::new(b"it's ".to_vec()));
932        buffered.buffer(Cursor::new(b"hyper!".to_vec()));
933        assert_eq!(buffered.write_buf.queue.bufs_cnt(), 3);
934
935        buffered.flush().await.expect("flush");
936
937        assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0);
938    }
939
940    // #[cfg(feature = "nightly")]
941    // #[bench]
942    // fn bench_write_buf_flatten_buffer_chunk(b: &mut Bencher) {
943    //     let s = "Hello, World!";
944    //     b.bytes = s.len() as u64;
945
946    //     let mut write_buf = WriteBuf::<bytes::Bytes>::new();
947    //     write_buf.set_strategy(WriteStrategy::Flatten);
948    //     b.iter(|| {
949    //         let chunk = bytes::Bytes::from(s);
950    //         write_buf.buffer(chunk);
951    //         ::test::black_box(&write_buf);
952    //         write_buf.headers.bytes.clear();
953    //     })
954    // }
955}