litep2p/yamux/
tagged_stream.rs1use futures::Stream;
2use std::{
3 pin::Pin,
4 task::{Context, Poll},
5};
6
7#[pin_project::pin_project]
9pub struct TaggedStream<K, S> {
10 key: K,
11 #[pin]
12 inner: S,
13
14 reported_none: bool,
15}
16
17impl<K, S> TaggedStream<K, S> {
18 pub fn new(key: K, inner: S) -> Self {
19 Self {
20 key,
21 inner,
22 reported_none: false,
23 }
24 }
25
26 pub fn inner_mut(&mut self) -> &mut S {
27 &mut self.inner
28 }
29}
30
31impl<K, S> Stream for TaggedStream<K, S>
32where
33 K: Copy,
34 S: Stream,
35{
36 type Item = (K, Option<S::Item>);
37
38 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
39 let this = self.project();
40
41 if *this.reported_none {
42 return Poll::Ready(None);
43 }
44
45 match futures::ready!(this.inner.poll_next(cx)) {
46 Some(item) => Poll::Ready(Some((*this.key, Some(item)))),
47 None => {
48 *this.reported_none = true;
49
50 Poll::Ready(Some((*this.key, None)))
51 }
52 }
53 }
54}