litep2p/yamux/
tagged_stream.rs

1use futures::Stream;
2use std::{
3    pin::Pin,
4    task::{Context, Poll},
5};
6
7/// A stream that yields its tag with every item.
8#[pin_project::pin_project]
9pub struct TaggedStream<K, S> {
10    key: K,
11    #[pin]
12    inner: S,
13
14    reported_none: bool,
15}
16
17impl<K, S> TaggedStream<K, S> {
18    pub fn new(key: K, inner: S) -> Self {
19        Self {
20            key,
21            inner,
22            reported_none: false,
23        }
24    }
25
26    pub fn inner_mut(&mut self) -> &mut S {
27        &mut self.inner
28    }
29}
30
31impl<K, S> Stream for TaggedStream<K, S>
32where
33    K: Copy,
34    S: Stream,
35{
36    type Item = (K, Option<S::Item>);
37
38    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
39        let this = self.project();
40
41        if *this.reported_none {
42            return Poll::Ready(None);
43        }
44
45        match futures::ready!(this.inner.poll_next(cx)) {
46            Some(item) => Poll::Ready(Some((*this.key, Some(item)))),
47            None => {
48                *this.reported_none = true;
49
50                Poll::Ready(Some((*this.key, None)))
51            }
52        }
53    }
54}