hyper/rt/
io.rs

1use std::fmt;
2use std::mem::MaybeUninit;
3use std::ops::DerefMut;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7// New IO traits? What?! Why, are you bonkers?
8//
9// I mean, yes, probably. But, here's the goals:
10//
11// 1. Supports poll-based IO operations.
12// 2. Opt-in vectored IO.
13// 3. Can use an optional buffer pool.
14// 4. Able to add completion-based (uring) IO eventually.
15//
16// Frankly, the last point is the entire reason we're doing this. We want to
17// have forwards-compatibility with an eventually stable io-uring runtime. We
18// don't need that to work right away. But it must be possible to add in here
19// without breaking hyper 1.0.
20//
21// While in here, if there's small tweaks to poll_read or poll_write that would
22// allow even the "slow" path to be faster, such as if someone didn't remember
23// to forward along an `is_completion` call.
24
25/// Reads bytes from a source.
26///
27/// This trait is similar to `std::io::Read`, but supports asynchronous reads.
28pub trait Read {
29    /// Attempts to read bytes into the `buf`.
30    ///
31    /// On success, returns `Poll::Ready(Ok(()))` and places data in the
32    /// unfilled portion of `buf`. If no data was read (`buf.remaining()` is
33    /// unchanged), it implies that EOF has been reached.
34    ///
35    /// If no data is available for reading, the method returns `Poll::Pending`
36    /// and arranges for the current task (via `cx.waker()`) to receive a
37    /// notification when the object becomes readable or is closed.
38    fn poll_read(
39        self: Pin<&mut Self>,
40        cx: &mut Context<'_>,
41        buf: ReadBufCursor<'_>,
42    ) -> Poll<Result<(), std::io::Error>>;
43}
44
45/// Write bytes asynchronously.
46///
47/// This trait is similar to `std::io::Write`, but for asynchronous writes.
48pub trait Write {
49    /// Attempt to write bytes from `buf` into the destination.
50    ///
51    /// On success, returns `Poll::Ready(Ok(num_bytes_written)))`. If
52    /// successful, it must be guaranteed that `n <= buf.len()`. A return value
53    /// of `0` means that the underlying object is no longer able to accept
54    /// bytes, or that the provided buffer is empty.
55    ///
56    /// If the object is not ready for writing, the method returns
57    /// `Poll::Pending` and arranges for the current task (via `cx.waker()`) to
58    /// receive a notification when the object becomes writable or is closed.
59    fn poll_write(
60        self: Pin<&mut Self>,
61        cx: &mut Context<'_>,
62        buf: &[u8],
63    ) -> Poll<Result<usize, std::io::Error>>;
64
65    /// Attempts to flush the object.
66    ///
67    /// On success, returns `Poll::Ready(Ok(()))`.
68    ///
69    /// If flushing cannot immediately complete, this method returns
70    /// `Poll::Pending` and arranges for the current task (via `cx.waker()`) to
71    /// receive a notification when the object can make progress.
72    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>>;
73
74    /// Attempts to shut down this writer.
75    fn poll_shutdown(
76        self: Pin<&mut Self>,
77        cx: &mut Context<'_>,
78    ) -> Poll<Result<(), std::io::Error>>;
79
80    /// Returns whether this writer has an efficient `poll_write_vectored`
81    /// implementation.
82    ///
83    /// The default implementation returns `false`.
84    fn is_write_vectored(&self) -> bool {
85        false
86    }
87
88    /// Like `poll_write`, except that it writes from a slice of buffers.
89    fn poll_write_vectored(
90        self: Pin<&mut Self>,
91        cx: &mut Context<'_>,
92        bufs: &[std::io::IoSlice<'_>],
93    ) -> Poll<Result<usize, std::io::Error>> {
94        let buf = bufs
95            .iter()
96            .find(|b| !b.is_empty())
97            .map_or(&[][..], |b| &**b);
98        self.poll_write(cx, buf)
99    }
100}
101
102/// A wrapper around a byte buffer that is incrementally filled and initialized.
103///
104/// This type is a sort of "double cursor". It tracks three regions in the
105/// buffer: a region at the beginning of the buffer that has been logically
106/// filled with data, a region that has been initialized at some point but not
107/// yet logically filled, and a region at the end that may be uninitialized.
108/// The filled region is guaranteed to be a subset of the initialized region.
109///
110/// In summary, the contents of the buffer can be visualized as:
111///
112/// ```not_rust
113/// [             capacity              ]
114/// [ filled |         unfilled         ]
115/// [    initialized    | uninitialized ]
116/// ```
117///
118/// It is undefined behavior to de-initialize any bytes from the uninitialized
119/// region, since it is merely unknown whether this region is uninitialized or
120/// not, and if part of it turns out to be initialized, it must stay initialized.
121pub struct ReadBuf<'a> {
122    raw: &'a mut [MaybeUninit<u8>],
123    filled: usize,
124    init: usize,
125}
126
127/// The cursor part of a [`ReadBuf`].
128///
129/// This is created by calling `ReadBuf::unfilled()`.
130#[derive(Debug)]
131pub struct ReadBufCursor<'a> {
132    buf: &'a mut ReadBuf<'a>,
133}
134
135impl<'data> ReadBuf<'data> {
136    /// Create a new `ReadBuf` with a slice of initialized bytes.
137    #[inline]
138    pub fn new(raw: &'data mut [u8]) -> Self {
139        let len = raw.len();
140        Self {
141            // SAFETY: We never de-init the bytes ourselves.
142            raw: unsafe { &mut *(raw as *mut [u8] as *mut [MaybeUninit<u8>]) },
143            filled: 0,
144            init: len,
145        }
146    }
147
148    /// Create a new `ReadBuf` with a slice of uninitialized bytes.
149    #[inline]
150    pub fn uninit(raw: &'data mut [MaybeUninit<u8>]) -> Self {
151        Self {
152            raw,
153            filled: 0,
154            init: 0,
155        }
156    }
157
158    /// Get a slice of the buffer that has been filled in with bytes.
159    #[inline]
160    pub fn filled(&self) -> &[u8] {
161        // SAFETY: We only slice the filled part of the buffer, which is always valid
162        unsafe { &*(&self.raw[0..self.filled] as *const [MaybeUninit<u8>] as *const [u8]) }
163    }
164
165    /// Get a cursor to the unfilled portion of the buffer.
166    #[inline]
167    pub fn unfilled<'cursor>(&'cursor mut self) -> ReadBufCursor<'cursor> {
168        ReadBufCursor {
169            // SAFETY: self.buf is never re-assigned, so its safe to narrow
170            // the lifetime.
171            buf: unsafe {
172                std::mem::transmute::<&'cursor mut ReadBuf<'data>, &'cursor mut ReadBuf<'cursor>>(
173                    self,
174                )
175            },
176        }
177    }
178
179    #[inline]
180    #[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))]
181    pub(crate) unsafe fn set_init(&mut self, n: usize) {
182        self.init = self.init.max(n);
183    }
184
185    #[inline]
186    #[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))]
187    pub(crate) unsafe fn set_filled(&mut self, n: usize) {
188        self.filled = self.filled.max(n);
189    }
190
191    #[inline]
192    #[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))]
193    pub(crate) fn len(&self) -> usize {
194        self.filled
195    }
196
197    #[inline]
198    #[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))]
199    pub(crate) fn init_len(&self) -> usize {
200        self.init
201    }
202
203    #[inline]
204    fn remaining(&self) -> usize {
205        self.capacity() - self.filled
206    }
207
208    #[inline]
209    fn capacity(&self) -> usize {
210        self.raw.len()
211    }
212}
213
214impl<'data> fmt::Debug for ReadBuf<'data> {
215    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
216        f.debug_struct("ReadBuf")
217            .field("filled", &self.filled)
218            .field("init", &self.init)
219            .field("capacity", &self.capacity())
220            .finish()
221    }
222}
223
224impl<'data> ReadBufCursor<'data> {
225    /// Access the unfilled part of the buffer.
226    ///
227    /// # Safety
228    ///
229    /// The caller must not uninitialize any bytes that may have been
230    /// initialized before.
231    #[inline]
232    pub unsafe fn as_mut(&mut self) -> &mut [MaybeUninit<u8>] {
233        &mut self.buf.raw[self.buf.filled..]
234    }
235
236    /// Advance the `filled` cursor by `n` bytes.
237    ///
238    /// # Safety
239    ///
240    /// The caller must take care that `n` more bytes have been initialized.
241    #[inline]
242    pub unsafe fn advance(&mut self, n: usize) {
243        self.buf.filled = self.buf.filled.checked_add(n).expect("overflow");
244        self.buf.init = self.buf.filled.max(self.buf.init);
245    }
246
247    #[inline]
248    pub(crate) fn remaining(&self) -> usize {
249        self.buf.remaining()
250    }
251
252    #[inline]
253    pub(crate) fn put_slice(&mut self, buf: &[u8]) {
254        assert!(
255            self.buf.remaining() >= buf.len(),
256            "buf.len() must fit in remaining()"
257        );
258
259        let amt = buf.len();
260        // Cannot overflow, asserted above
261        let end = self.buf.filled + amt;
262
263        // Safety: the length is asserted above
264        unsafe {
265            self.buf.raw[self.buf.filled..end]
266                .as_mut_ptr()
267                .cast::<u8>()
268                .copy_from_nonoverlapping(buf.as_ptr(), amt);
269        }
270
271        if self.buf.init < end {
272            self.buf.init = end;
273        }
274        self.buf.filled = end;
275    }
276}
277
278macro_rules! deref_async_read {
279    () => {
280        fn poll_read(
281            mut self: Pin<&mut Self>,
282            cx: &mut Context<'_>,
283            buf: ReadBufCursor<'_>,
284        ) -> Poll<std::io::Result<()>> {
285            Pin::new(&mut **self).poll_read(cx, buf)
286        }
287    };
288}
289
290impl<T: ?Sized + Read + Unpin> Read for Box<T> {
291    deref_async_read!();
292}
293
294impl<T: ?Sized + Read + Unpin> Read for &mut T {
295    deref_async_read!();
296}
297
298impl<P> Read for Pin<P>
299where
300    P: DerefMut,
301    P::Target: Read,
302{
303    fn poll_read(
304        self: Pin<&mut Self>,
305        cx: &mut Context<'_>,
306        buf: ReadBufCursor<'_>,
307    ) -> Poll<std::io::Result<()>> {
308        pin_as_deref_mut(self).poll_read(cx, buf)
309    }
310}
311
312macro_rules! deref_async_write {
313    () => {
314        fn poll_write(
315            mut self: Pin<&mut Self>,
316            cx: &mut Context<'_>,
317            buf: &[u8],
318        ) -> Poll<std::io::Result<usize>> {
319            Pin::new(&mut **self).poll_write(cx, buf)
320        }
321
322        fn poll_write_vectored(
323            mut self: Pin<&mut Self>,
324            cx: &mut Context<'_>,
325            bufs: &[std::io::IoSlice<'_>],
326        ) -> Poll<std::io::Result<usize>> {
327            Pin::new(&mut **self).poll_write_vectored(cx, bufs)
328        }
329
330        fn is_write_vectored(&self) -> bool {
331            (**self).is_write_vectored()
332        }
333
334        fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
335            Pin::new(&mut **self).poll_flush(cx)
336        }
337
338        fn poll_shutdown(
339            mut self: Pin<&mut Self>,
340            cx: &mut Context<'_>,
341        ) -> Poll<std::io::Result<()>> {
342            Pin::new(&mut **self).poll_shutdown(cx)
343        }
344    };
345}
346
347impl<T: ?Sized + Write + Unpin> Write for Box<T> {
348    deref_async_write!();
349}
350
351impl<T: ?Sized + Write + Unpin> Write for &mut T {
352    deref_async_write!();
353}
354
355impl<P> Write for Pin<P>
356where
357    P: DerefMut,
358    P::Target: Write,
359{
360    fn poll_write(
361        self: Pin<&mut Self>,
362        cx: &mut Context<'_>,
363        buf: &[u8],
364    ) -> Poll<std::io::Result<usize>> {
365        pin_as_deref_mut(self).poll_write(cx, buf)
366    }
367
368    fn poll_write_vectored(
369        self: Pin<&mut Self>,
370        cx: &mut Context<'_>,
371        bufs: &[std::io::IoSlice<'_>],
372    ) -> Poll<std::io::Result<usize>> {
373        pin_as_deref_mut(self).poll_write_vectored(cx, bufs)
374    }
375
376    fn is_write_vectored(&self) -> bool {
377        (**self).is_write_vectored()
378    }
379
380    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
381        pin_as_deref_mut(self).poll_flush(cx)
382    }
383
384    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
385        pin_as_deref_mut(self).poll_shutdown(cx)
386    }
387}
388
389/// Polyfill for Pin::as_deref_mut()
390/// TODO: use Pin::as_deref_mut() instead once stabilized
391fn pin_as_deref_mut<P: DerefMut>(pin: Pin<&mut Pin<P>>) -> Pin<&mut P::Target> {
392    // SAFETY: we go directly from Pin<&mut Pin<P>> to Pin<&mut P::Target>, without moving or
393    // giving out the &mut Pin<P> in the process. See Pin::as_deref_mut() for more detail.
394    unsafe { pin.get_unchecked_mut() }.as_mut()
395}