1use futures::Stream;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5#[pin_project::pin_project]
7pub struct TaggedStream<K, S> {
8 key: K,
9 #[pin]
10 inner: S,
11
12 reported_none: bool,
13}
14
15impl<K, S> TaggedStream<K, S> {
16 pub fn new(key: K, inner: S) -> Self {
17 Self {
18 key,
19 inner,
20 reported_none: false,
21 }
22 }
23
24 pub fn inner_mut(&mut self) -> &mut S {
25 &mut self.inner
26 }
27}
28
29impl<K, S> Stream for TaggedStream<K, S>
30where
31 K: Copy,
32 S: Stream,
33{
34 type Item = (K, Option<S::Item>);
35
36 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
37 let this = self.project();
38
39 if *this.reported_none {
40 return Poll::Ready(None);
41 }
42
43 match futures::ready!(this.inner.poll_next(cx)) {
44 Some(item) => Poll::Ready(Some((*this.key, Some(item)))),
45 None => {
46 *this.reported_none = true;
47
48 Poll::Ready(Some((*this.key, None)))
49 }
50 }
51 }
52}