yamux/
tagged_stream.rs

1use futures::Stream;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5/// A stream that yields its tag with every item.
6#[pin_project::pin_project]
7pub struct TaggedStream<K, S> {
8    key: K,
9    #[pin]
10    inner: S,
11
12    reported_none: bool,
13}
14
15impl<K, S> TaggedStream<K, S> {
16    pub fn new(key: K, inner: S) -> Self {
17        Self {
18            key,
19            inner,
20            reported_none: false,
21        }
22    }
23
24    pub fn inner_mut(&mut self) -> &mut S {
25        &mut self.inner
26    }
27}
28
29impl<K, S> Stream for TaggedStream<K, S>
30where
31    K: Copy,
32    S: Stream,
33{
34    type Item = (K, Option<S::Item>);
35
36    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
37        let this = self.project();
38
39        if *this.reported_none {
40            return Poll::Ready(None);
41        }
42
43        match futures::ready!(this.inner.poll_next(cx)) {
44            Some(item) => Poll::Ready(Some((*this.key, Some(item)))),
45            None => {
46                *this.reported_none = true;
47
48                Poll::Ready(Some((*this.key, None)))
49            }
50        }
51    }
52}