1use std::cmp;
2use std::fmt;
3use std::io::{self, IoSlice};
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use crate::rt::{Read, ReadBuf, Write};
8use bytes::{Buf, BufMut, Bytes, BytesMut};
9use futures_util::ready;
10
11use super::{Http1Transaction, ParseContext, ParsedMessage};
12use crate::common::buf::BufList;
13
14pub(crate) const INIT_BUFFER_SIZE: usize = 8192;
16
17pub(crate) const MINIMUM_MAX_BUFFER_SIZE: usize = INIT_BUFFER_SIZE;
19
20pub(crate) const DEFAULT_MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100;
24
25const MAX_BUF_LIST_BUFFERS: usize = 16;
31
32pub(crate) struct Buffered<T, B> {
33 flush_pipeline: bool,
34 io: T,
35 read_blocked: bool,
36 read_buf: BytesMut,
37 read_buf_strategy: ReadStrategy,
38 write_buf: WriteBuf<B>,
39}
40
41impl<T, B> fmt::Debug for Buffered<T, B>
42where
43 B: Buf,
44{
45 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46 f.debug_struct("Buffered")
47 .field("read_buf", &self.read_buf)
48 .field("write_buf", &self.write_buf)
49 .finish()
50 }
51}
52
53impl<T, B> Buffered<T, B>
54where
55 T: Read + Write + Unpin,
56 B: Buf,
57{
58 pub(crate) fn new(io: T) -> Buffered<T, B> {
59 let strategy = if io.is_write_vectored() {
60 WriteStrategy::Queue
61 } else {
62 WriteStrategy::Flatten
63 };
64 let write_buf = WriteBuf::new(strategy);
65 Buffered {
66 flush_pipeline: false,
67 io,
68 read_blocked: false,
69 read_buf: BytesMut::with_capacity(0),
70 read_buf_strategy: ReadStrategy::default(),
71 write_buf,
72 }
73 }
74
75 #[cfg(feature = "server")]
76 pub(crate) fn set_flush_pipeline(&mut self, enabled: bool) {
77 debug_assert!(!self.write_buf.has_remaining());
78 self.flush_pipeline = enabled;
79 if enabled {
80 self.set_write_strategy_flatten();
81 }
82 }
83
84 pub(crate) fn set_max_buf_size(&mut self, max: usize) {
85 assert!(
86 max >= MINIMUM_MAX_BUFFER_SIZE,
87 "The max_buf_size cannot be smaller than {}.",
88 MINIMUM_MAX_BUFFER_SIZE,
89 );
90 self.read_buf_strategy = ReadStrategy::with_max(max);
91 self.write_buf.max_buf_size = max;
92 }
93
94 #[cfg(feature = "client")]
95 pub(crate) fn set_read_buf_exact_size(&mut self, sz: usize) {
96 self.read_buf_strategy = ReadStrategy::Exact(sz);
97 }
98
99 pub(crate) fn set_write_strategy_flatten(&mut self) {
100 debug_assert!(self.write_buf.queue.bufs_cnt() == 0);
103 self.write_buf.set_strategy(WriteStrategy::Flatten);
104 }
105
106 pub(crate) fn set_write_strategy_queue(&mut self) {
107 debug_assert!(self.write_buf.queue.bufs_cnt() == 0);
110 self.write_buf.set_strategy(WriteStrategy::Queue);
111 }
112
113 pub(crate) fn read_buf(&self) -> &[u8] {
114 self.read_buf.as_ref()
115 }
116
117 #[cfg(test)]
118 #[cfg(feature = "nightly")]
119 pub(super) fn read_buf_mut(&mut self) -> &mut BytesMut {
120 &mut self.read_buf
121 }
122
123 fn read_buf_remaining_mut(&self) -> usize {
126 self.read_buf.capacity() - self.read_buf.len()
127 }
128
129 pub(crate) fn can_headers_buf(&self) -> bool {
135 !self.write_buf.queue.has_remaining()
136 }
137
138 pub(crate) fn headers_buf(&mut self) -> &mut Vec<u8> {
139 let buf = self.write_buf.headers_mut();
140 &mut buf.bytes
141 }
142
143 pub(super) fn write_buf(&mut self) -> &mut WriteBuf<B> {
144 &mut self.write_buf
145 }
146
147 pub(crate) fn buffer<BB: Buf + Into<B>>(&mut self, buf: BB) {
148 self.write_buf.buffer(buf)
149 }
150
151 pub(crate) fn can_buffer(&self) -> bool {
152 self.flush_pipeline || self.write_buf.can_buffer()
153 }
154
155 pub(crate) fn consume_leading_lines(&mut self) {
156 if !self.read_buf.is_empty() {
157 let mut i = 0;
158 while i < self.read_buf.len() {
159 match self.read_buf[i] {
160 b'\r' | b'\n' => i += 1,
161 _ => break,
162 }
163 }
164 self.read_buf.advance(i);
165 }
166 }
167
168 pub(super) fn parse<S>(
169 &mut self,
170 cx: &mut Context<'_>,
171 parse_ctx: ParseContext<'_>,
172 ) -> Poll<crate::Result<ParsedMessage<S::Incoming>>>
173 where
174 S: Http1Transaction,
175 {
176 loop {
177 match super::role::parse_headers::<S>(
178 &mut self.read_buf,
179 ParseContext {
180 cached_headers: parse_ctx.cached_headers,
181 req_method: parse_ctx.req_method,
182 h1_parser_config: parse_ctx.h1_parser_config.clone(),
183 h1_max_headers: parse_ctx.h1_max_headers,
184 preserve_header_case: parse_ctx.preserve_header_case,
185 #[cfg(feature = "ffi")]
186 preserve_header_order: parse_ctx.preserve_header_order,
187 h09_responses: parse_ctx.h09_responses,
188 #[cfg(feature = "ffi")]
189 on_informational: parse_ctx.on_informational,
190 },
191 )? {
192 Some(msg) => {
193 debug!("parsed {} headers", msg.head.headers.len());
194 return Poll::Ready(Ok(msg));
195 }
196 None => {
197 let max = self.read_buf_strategy.max();
198 if self.read_buf.len() >= max {
199 debug!("max_buf_size ({}) reached, closing", max);
200 return Poll::Ready(Err(crate::Error::new_too_large()));
201 }
202 }
203 }
204 if ready!(self.poll_read_from_io(cx)).map_err(crate::Error::new_io)? == 0 {
205 trace!("parse eof");
206 return Poll::Ready(Err(crate::Error::new_incomplete()));
207 }
208 }
209 }
210
211 pub(crate) fn poll_read_from_io(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
212 self.read_blocked = false;
213 let next = self.read_buf_strategy.next();
214 if self.read_buf_remaining_mut() < next {
215 self.read_buf.reserve(next);
216 }
217
218 let dst = unsafe { self.read_buf.chunk_mut().as_uninit_slice_mut() };
221 let mut buf = ReadBuf::uninit(dst);
222 match Pin::new(&mut self.io).poll_read(cx, buf.unfilled()) {
223 Poll::Ready(Ok(_)) => {
224 let n = buf.filled().len();
225 trace!("received {} bytes", n);
226 unsafe {
227 self.read_buf.advance_mut(n);
231 }
232 self.read_buf_strategy.record(n);
233 Poll::Ready(Ok(n))
234 }
235 Poll::Pending => {
236 self.read_blocked = true;
237 Poll::Pending
238 }
239 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
240 }
241 }
242
243 pub(crate) fn into_inner(self) -> (T, Bytes) {
244 (self.io, self.read_buf.freeze())
245 }
246
247 pub(crate) fn io_mut(&mut self) -> &mut T {
248 &mut self.io
249 }
250
251 pub(crate) fn is_read_blocked(&self) -> bool {
252 self.read_blocked
253 }
254
255 pub(crate) fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
256 if self.flush_pipeline && !self.read_buf.is_empty() {
257 Poll::Ready(Ok(()))
258 } else if self.write_buf.remaining() == 0 {
259 Pin::new(&mut self.io).poll_flush(cx)
260 } else {
261 if let WriteStrategy::Flatten = self.write_buf.strategy {
262 return self.poll_flush_flattened(cx);
263 }
264
265 const MAX_WRITEV_BUFS: usize = 64;
266 loop {
267 let n = {
268 let mut iovs = [IoSlice::new(&[]); MAX_WRITEV_BUFS];
269 let len = self.write_buf.chunks_vectored(&mut iovs);
270 ready!(Pin::new(&mut self.io).poll_write_vectored(cx, &iovs[..len]))?
271 };
272 self.write_buf.advance(n);
276 debug!("flushed {} bytes", n);
277 if self.write_buf.remaining() == 0 {
278 break;
279 } else if n == 0 {
280 trace!(
281 "write returned zero, but {} bytes remaining",
282 self.write_buf.remaining()
283 );
284 return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
285 }
286 }
287 Pin::new(&mut self.io).poll_flush(cx)
288 }
289 }
290
291 fn poll_flush_flattened(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
296 loop {
297 let n = ready!(Pin::new(&mut self.io).poll_write(cx, self.write_buf.headers.chunk()))?;
298 debug!("flushed {} bytes", n);
299 self.write_buf.headers.advance(n);
300 if self.write_buf.headers.remaining() == 0 {
301 self.write_buf.headers.reset();
302 break;
303 } else if n == 0 {
304 trace!(
305 "write returned zero, but {} bytes remaining",
306 self.write_buf.remaining()
307 );
308 return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
309 }
310 }
311 Pin::new(&mut self.io).poll_flush(cx)
312 }
313
314 #[cfg(test)]
315 fn flush<'a>(&'a mut self) -> impl std::future::Future<Output = io::Result<()>> + 'a {
316 futures_util::future::poll_fn(move |cx| self.poll_flush(cx))
317 }
318}
319
320impl<T: Unpin, B> Unpin for Buffered<T, B> {}
322
323pub(crate) trait MemRead {
325 fn read_mem(&mut self, cx: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>>;
326}
327
328impl<T, B> MemRead for Buffered<T, B>
329where
330 T: Read + Write + Unpin,
331 B: Buf,
332{
333 fn read_mem(&mut self, cx: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>> {
334 if !self.read_buf.is_empty() {
335 let n = std::cmp::min(len, self.read_buf.len());
336 Poll::Ready(Ok(self.read_buf.split_to(n).freeze()))
337 } else {
338 let n = ready!(self.poll_read_from_io(cx))?;
339 Poll::Ready(Ok(self.read_buf.split_to(::std::cmp::min(len, n)).freeze()))
340 }
341 }
342}
343
344#[derive(Clone, Copy, Debug)]
345enum ReadStrategy {
346 Adaptive {
347 decrease_now: bool,
348 next: usize,
349 max: usize,
350 },
351 #[cfg(feature = "client")]
352 Exact(usize),
353}
354
355impl ReadStrategy {
356 fn with_max(max: usize) -> ReadStrategy {
357 ReadStrategy::Adaptive {
358 decrease_now: false,
359 next: INIT_BUFFER_SIZE,
360 max,
361 }
362 }
363
364 fn next(&self) -> usize {
365 match *self {
366 ReadStrategy::Adaptive { next, .. } => next,
367 #[cfg(feature = "client")]
368 ReadStrategy::Exact(exact) => exact,
369 }
370 }
371
372 fn max(&self) -> usize {
373 match *self {
374 ReadStrategy::Adaptive { max, .. } => max,
375 #[cfg(feature = "client")]
376 ReadStrategy::Exact(exact) => exact,
377 }
378 }
379
380 fn record(&mut self, bytes_read: usize) {
381 match *self {
382 ReadStrategy::Adaptive {
383 ref mut decrease_now,
384 ref mut next,
385 max,
386 ..
387 } => {
388 if bytes_read >= *next {
389 *next = cmp::min(incr_power_of_two(*next), max);
390 *decrease_now = false;
391 } else {
392 let decr_to = prev_power_of_two(*next);
393 if bytes_read < decr_to {
394 if *decrease_now {
395 *next = cmp::max(decr_to, INIT_BUFFER_SIZE);
396 *decrease_now = false;
397 } else {
398 *decrease_now = true;
400 }
401 } else {
402 *decrease_now = false;
406 }
407 }
408 }
409 #[cfg(feature = "client")]
410 ReadStrategy::Exact(_) => (),
411 }
412 }
413}
414
415fn incr_power_of_two(n: usize) -> usize {
416 n.saturating_mul(2)
417}
418
419fn prev_power_of_two(n: usize) -> usize {
420 debug_assert!(n >= 4);
423 (usize::MAX >> (n.leading_zeros() + 2)) + 1
424}
425
426impl Default for ReadStrategy {
427 fn default() -> ReadStrategy {
428 ReadStrategy::with_max(DEFAULT_MAX_BUFFER_SIZE)
429 }
430}
431
432#[derive(Clone)]
433pub(crate) struct Cursor<T> {
434 bytes: T,
435 pos: usize,
436}
437
438impl<T: AsRef<[u8]>> Cursor<T> {
439 #[inline]
440 pub(crate) fn new(bytes: T) -> Cursor<T> {
441 Cursor { bytes, pos: 0 }
442 }
443}
444
445impl Cursor<Vec<u8>> {
446 fn maybe_unshift(&mut self, additional: usize) {
450 if self.pos == 0 {
451 return;
453 }
454
455 if self.bytes.capacity() - self.bytes.len() >= additional {
456 return;
458 }
459
460 self.bytes.drain(0..self.pos);
461 self.pos = 0;
462 }
463
464 fn reset(&mut self) {
465 self.pos = 0;
466 self.bytes.clear();
467 }
468}
469
470impl<T: AsRef<[u8]>> fmt::Debug for Cursor<T> {
471 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
472 f.debug_struct("Cursor")
473 .field("pos", &self.pos)
474 .field("len", &self.bytes.as_ref().len())
475 .finish()
476 }
477}
478
479impl<T: AsRef<[u8]>> Buf for Cursor<T> {
480 #[inline]
481 fn remaining(&self) -> usize {
482 self.bytes.as_ref().len() - self.pos
483 }
484
485 #[inline]
486 fn chunk(&self) -> &[u8] {
487 &self.bytes.as_ref()[self.pos..]
488 }
489
490 #[inline]
491 fn advance(&mut self, cnt: usize) {
492 debug_assert!(self.pos + cnt <= self.bytes.as_ref().len());
493 self.pos += cnt;
494 }
495}
496
497pub(super) struct WriteBuf<B> {
499 headers: Cursor<Vec<u8>>,
501 max_buf_size: usize,
502 queue: BufList<B>,
504 strategy: WriteStrategy,
505}
506
507impl<B: Buf> WriteBuf<B> {
508 fn new(strategy: WriteStrategy) -> WriteBuf<B> {
509 WriteBuf {
510 headers: Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE)),
511 max_buf_size: DEFAULT_MAX_BUFFER_SIZE,
512 queue: BufList::new(),
513 strategy,
514 }
515 }
516}
517
518impl<B> WriteBuf<B>
519where
520 B: Buf,
521{
522 fn set_strategy(&mut self, strategy: WriteStrategy) {
523 self.strategy = strategy;
524 }
525
526 pub(super) fn buffer<BB: Buf + Into<B>>(&mut self, mut buf: BB) {
527 debug_assert!(buf.has_remaining());
528 match self.strategy {
529 WriteStrategy::Flatten => {
530 let head = self.headers_mut();
531
532 head.maybe_unshift(buf.remaining());
533 trace!(
534 self.len = head.remaining(),
535 buf.len = buf.remaining(),
536 "buffer.flatten"
537 );
538 loop {
541 let adv = {
542 let slice = buf.chunk();
543 if slice.is_empty() {
544 return;
545 }
546 head.bytes.extend_from_slice(slice);
547 slice.len()
548 };
549 buf.advance(adv);
550 }
551 }
552 WriteStrategy::Queue => {
553 trace!(
554 self.len = self.remaining(),
555 buf.len = buf.remaining(),
556 "buffer.queue"
557 );
558 self.queue.push(buf.into());
559 }
560 }
561 }
562
563 fn can_buffer(&self) -> bool {
564 match self.strategy {
565 WriteStrategy::Flatten => self.remaining() < self.max_buf_size,
566 WriteStrategy::Queue => {
567 self.queue.bufs_cnt() < MAX_BUF_LIST_BUFFERS && self.remaining() < self.max_buf_size
568 }
569 }
570 }
571
572 fn headers_mut(&mut self) -> &mut Cursor<Vec<u8>> {
573 debug_assert!(!self.queue.has_remaining());
574 &mut self.headers
575 }
576}
577
578impl<B: Buf> fmt::Debug for WriteBuf<B> {
579 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
580 f.debug_struct("WriteBuf")
581 .field("remaining", &self.remaining())
582 .field("strategy", &self.strategy)
583 .finish()
584 }
585}
586
587impl<B: Buf> Buf for WriteBuf<B> {
588 #[inline]
589 fn remaining(&self) -> usize {
590 self.headers.remaining() + self.queue.remaining()
591 }
592
593 #[inline]
594 fn chunk(&self) -> &[u8] {
595 let headers = self.headers.chunk();
596 if !headers.is_empty() {
597 headers
598 } else {
599 self.queue.chunk()
600 }
601 }
602
603 #[inline]
604 fn advance(&mut self, cnt: usize) {
605 let hrem = self.headers.remaining();
606
607 match hrem.cmp(&cnt) {
608 cmp::Ordering::Equal => self.headers.reset(),
609 cmp::Ordering::Greater => self.headers.advance(cnt),
610 cmp::Ordering::Less => {
611 let qcnt = cnt - hrem;
612 self.headers.reset();
613 self.queue.advance(qcnt);
614 }
615 }
616 }
617
618 #[inline]
619 fn chunks_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize {
620 let n = self.headers.chunks_vectored(dst);
621 self.queue.chunks_vectored(&mut dst[n..]) + n
622 }
623}
624
625#[derive(Debug)]
626enum WriteStrategy {
627 Flatten,
628 Queue,
629}
630
631#[cfg(test)]
632mod tests {
633 use super::*;
634 use crate::common::io::Compat;
635 use std::time::Duration;
636
637 use tokio_test::io::Builder as Mock;
638
639 #[tokio::test]
653 #[ignore]
654 async fn iobuf_write_empty_slice() {
655 }
672
673 #[cfg(not(miri))]
674 #[tokio::test]
675 async fn parse_reads_until_blocked() {
676 use crate::proto::h1::ClientTransaction;
677
678 let _ = pretty_env_logger::try_init();
679 let mock = Mock::new()
680 .read(b"HTTP/1.1 200 OK\r\n")
682 .read(b"Server: hyper\r\n")
683 .wait(Duration::from_secs(1))
685 .build();
686
687 let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(Compat::new(mock));
688
689 futures_util::future::poll_fn(|cx| {
692 let parse_ctx = ParseContext {
693 cached_headers: &mut None,
694 req_method: &mut None,
695 h1_parser_config: Default::default(),
696 h1_max_headers: None,
697 preserve_header_case: false,
698 #[cfg(feature = "ffi")]
699 preserve_header_order: false,
700 h09_responses: false,
701 #[cfg(feature = "ffi")]
702 on_informational: &mut None,
703 };
704 assert!(buffered
705 .parse::<ClientTransaction>(cx, parse_ctx)
706 .is_pending());
707 Poll::Ready(())
708 })
709 .await;
710
711 assert_eq!(
712 buffered.read_buf,
713 b"HTTP/1.1 200 OK\r\nServer: hyper\r\n"[..]
714 );
715 }
716
717 #[test]
718 fn read_strategy_adaptive_increments() {
719 let mut strategy = ReadStrategy::default();
720 assert_eq!(strategy.next(), 8192);
721
722 strategy.record(8192);
724 assert_eq!(strategy.next(), 16384);
725
726 strategy.record(16384);
727 assert_eq!(strategy.next(), 32768);
728
729 strategy.record(usize::MAX);
731 assert_eq!(strategy.next(), 65536);
732
733 let max = strategy.max();
734 while strategy.next() < max {
735 strategy.record(max);
736 }
737
738 assert_eq!(strategy.next(), max, "never goes over max");
739 strategy.record(max + 1);
740 assert_eq!(strategy.next(), max, "never goes over max");
741 }
742
743 #[test]
744 fn read_strategy_adaptive_decrements() {
745 let mut strategy = ReadStrategy::default();
746 strategy.record(8192);
747 assert_eq!(strategy.next(), 16384);
748
749 strategy.record(1);
750 assert_eq!(
751 strategy.next(),
752 16384,
753 "first smaller record doesn't decrement yet"
754 );
755 strategy.record(8192);
756 assert_eq!(strategy.next(), 16384, "record was with range");
757
758 strategy.record(1);
759 assert_eq!(
760 strategy.next(),
761 16384,
762 "in-range record should make this the 'first' again"
763 );
764
765 strategy.record(1);
766 assert_eq!(strategy.next(), 8192, "second smaller record decrements");
767
768 strategy.record(1);
769 assert_eq!(strategy.next(), 8192, "first doesn't decrement");
770 strategy.record(1);
771 assert_eq!(strategy.next(), 8192, "doesn't decrement under minimum");
772 }
773
774 #[test]
775 fn read_strategy_adaptive_stays_the_same() {
776 let mut strategy = ReadStrategy::default();
777 strategy.record(8192);
778 assert_eq!(strategy.next(), 16384);
779
780 strategy.record(8193);
781 assert_eq!(
782 strategy.next(),
783 16384,
784 "first smaller record doesn't decrement yet"
785 );
786
787 strategy.record(8193);
788 assert_eq!(
789 strategy.next(),
790 16384,
791 "with current step does not decrement"
792 );
793 }
794
795 #[test]
796 fn read_strategy_adaptive_max_fuzz() {
797 fn fuzz(max: usize) {
798 let mut strategy = ReadStrategy::with_max(max);
799 while strategy.next() < max {
800 strategy.record(usize::MAX);
801 }
802 let mut next = strategy.next();
803 while next > 8192 {
804 strategy.record(1);
805 strategy.record(1);
806 next = strategy.next();
807 assert!(
808 next.is_power_of_two(),
809 "decrement should be powers of two: {} (max = {})",
810 next,
811 max,
812 );
813 }
814 }
815
816 let mut max = 8192;
817 while max < std::usize::MAX {
818 fuzz(max);
819 max = (max / 2).saturating_mul(3);
820 }
821 fuzz(usize::MAX);
822 }
823
824 #[test]
825 #[should_panic]
826 #[cfg(debug_assertions)] fn write_buf_requires_non_empty_bufs() {
828 let mock = Mock::new().build();
829 let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(Compat::new(mock));
830
831 buffered.buffer(Cursor::new(Vec::new()));
832 }
833
834 #[cfg(not(miri))]
858 #[tokio::test]
859 async fn write_buf_flatten() {
860 let _ = pretty_env_logger::try_init();
861
862 let mock = Mock::new().write(b"hello world, it's hyper!").build();
863
864 let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(Compat::new(mock));
865 buffered.write_buf.set_strategy(WriteStrategy::Flatten);
866
867 buffered.headers_buf().extend(b"hello ");
868 buffered.buffer(Cursor::new(b"world, ".to_vec()));
869 buffered.buffer(Cursor::new(b"it's ".to_vec()));
870 buffered.buffer(Cursor::new(b"hyper!".to_vec()));
871 assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0);
872
873 buffered.flush().await.expect("flush");
874 }
875
876 #[test]
877 fn write_buf_flatten_partially_flushed() {
878 let _ = pretty_env_logger::try_init();
879
880 let b = |s: &str| Cursor::new(s.as_bytes().to_vec());
881
882 let mut write_buf = WriteBuf::<Cursor<Vec<u8>>>::new(WriteStrategy::Flatten);
883
884 write_buf.buffer(b("hello "));
885 write_buf.buffer(b("world, "));
886
887 assert_eq!(write_buf.chunk(), b"hello world, ");
888
889 write_buf.advance(11);
891
892 assert_eq!(write_buf.chunk(), b", ");
893 assert_eq!(write_buf.headers.pos, 11);
894 assert_eq!(write_buf.headers.bytes.capacity(), INIT_BUFFER_SIZE);
895
896 write_buf.buffer(b("it's hyper!"));
898
899 assert_eq!(write_buf.chunk(), b", it's hyper!");
900 assert_eq!(write_buf.headers.pos, 11);
901
902 let rem1 = write_buf.remaining();
903 let cap = write_buf.headers.bytes.capacity();
904
905 write_buf.buffer(Cursor::new(vec![b'X'; cap]));
907 assert_eq!(write_buf.remaining(), cap + rem1);
908 assert_eq!(write_buf.headers.pos, 0);
909 }
910
911 #[cfg(not(miri))]
912 #[tokio::test]
913 async fn write_buf_queue_disable_auto() {
914 let _ = pretty_env_logger::try_init();
915
916 let mock = Mock::new()
917 .write(b"hello ")
918 .write(b"world, ")
919 .write(b"it's ")
920 .write(b"hyper!")
921 .build();
922
923 let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(Compat::new(mock));
924 buffered.write_buf.set_strategy(WriteStrategy::Queue);
925
926 buffered.headers_buf().extend(b"hello ");
930 buffered.buffer(Cursor::new(b"world, ".to_vec()));
931 buffered.buffer(Cursor::new(b"it's ".to_vec()));
932 buffered.buffer(Cursor::new(b"hyper!".to_vec()));
933 assert_eq!(buffered.write_buf.queue.bufs_cnt(), 3);
934
935 buffered.flush().await.expect("flush");
936
937 assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0);
938 }
939
940 }