yamux/connection/
cleanup.rs1use crate::connection::StreamCommand;
2use crate::tagged_stream::TaggedStream;
3use crate::{ConnectionError, StreamId};
4use futures::channel::mpsc;
5use futures::stream::SelectAll;
6use futures::StreamExt;
7use std::future::Future;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10
11#[must_use]
13pub struct Cleanup {
14 state: State,
15 stream_receivers: SelectAll<TaggedStream<StreamId, mpsc::Receiver<StreamCommand>>>,
16 error: Option<ConnectionError>,
17}
18
19impl Cleanup {
20 pub(crate) fn new(
21 stream_receivers: SelectAll<TaggedStream<StreamId, mpsc::Receiver<StreamCommand>>>,
22 error: ConnectionError,
23 ) -> Self {
24 Self {
25 state: State::ClosingStreamReceiver,
26 stream_receivers,
27 error: Some(error),
28 }
29 }
30}
31
32impl Future for Cleanup {
33 type Output = ConnectionError;
34
35 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
36 let mut this = self.get_mut();
37
38 loop {
39 match this.state {
40 State::ClosingStreamReceiver => {
41 for stream in this.stream_receivers.iter_mut() {
42 stream.inner_mut().close();
43 }
44 this.state = State::DrainingStreamReceiver;
45 }
46 State::DrainingStreamReceiver => match this.stream_receivers.poll_next_unpin(cx) {
47 Poll::Ready(Some(cmd)) => {
48 drop(cmd);
49 }
50 Poll::Ready(None) | Poll::Pending => {
51 return Poll::Ready(
52 this.error
53 .take()
54 .expect("to not be called after completion"),
55 )
56 }
57 },
58 }
59 }
60 }
61}
62
63#[allow(clippy::enum_variant_names)]
64enum State {
65 ClosingStreamReceiver,
66 DrainingStreamReceiver,
67}