yamux/connection/
cleanup.rs

1use crate::connection::StreamCommand;
2use crate::tagged_stream::TaggedStream;
3use crate::{ConnectionError, StreamId};
4use futures::channel::mpsc;
5use futures::stream::SelectAll;
6use futures::StreamExt;
7use std::future::Future;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10
11/// A [`Future`] that cleans up resources in case of an error.
12#[must_use]
13pub struct Cleanup {
14    state: State,
15    stream_receivers: SelectAll<TaggedStream<StreamId, mpsc::Receiver<StreamCommand>>>,
16    error: Option<ConnectionError>,
17}
18
19impl Cleanup {
20    pub(crate) fn new(
21        stream_receivers: SelectAll<TaggedStream<StreamId, mpsc::Receiver<StreamCommand>>>,
22        error: ConnectionError,
23    ) -> Self {
24        Self {
25            state: State::ClosingStreamReceiver,
26            stream_receivers,
27            error: Some(error),
28        }
29    }
30}
31
32impl Future for Cleanup {
33    type Output = ConnectionError;
34
35    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
36        let mut this = self.get_mut();
37
38        loop {
39            match this.state {
40                State::ClosingStreamReceiver => {
41                    for stream in this.stream_receivers.iter_mut() {
42                        stream.inner_mut().close();
43                    }
44                    this.state = State::DrainingStreamReceiver;
45                }
46                State::DrainingStreamReceiver => match this.stream_receivers.poll_next_unpin(cx) {
47                    Poll::Ready(Some(cmd)) => {
48                        drop(cmd);
49                    }
50                    Poll::Ready(None) | Poll::Pending => {
51                        return Poll::Ready(
52                            this.error
53                                .take()
54                                .expect("to not be called after completion"),
55                        )
56                    }
57                },
58            }
59        }
60    }
61}
62
63#[allow(clippy::enum_variant_names)]
64enum State {
65    ClosingStreamReceiver,
66    DrainingStreamReceiver,
67}