litep2p/yamux/connection/
cleanup.rs

1use crate::yamux::{
2    connection::StreamCommand, tagged_stream::TaggedStream, ConnectionError, StreamId,
3};
4use futures::{channel::mpsc, stream::SelectAll, StreamExt};
5use std::{
6    future::Future,
7    pin::Pin,
8    task::{Context, Poll},
9};
10
11/// A [`Future`] that cleans up resources in case of an error.
12#[must_use]
13pub struct Cleanup {
14    state: State,
15    stream_receivers: SelectAll<TaggedStream<StreamId, mpsc::Receiver<StreamCommand>>>,
16    error: Option<ConnectionError>,
17}
18
19impl Cleanup {
20    pub(crate) fn new(
21        stream_receivers: SelectAll<TaggedStream<StreamId, mpsc::Receiver<StreamCommand>>>,
22        error: ConnectionError,
23    ) -> Self {
24        Self {
25            state: State::ClosingStreamReceiver,
26            stream_receivers,
27            error: Some(error),
28        }
29    }
30}
31
32impl Future for Cleanup {
33    type Output = ConnectionError;
34
35    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
36        let this = self.get_mut();
37
38        loop {
39            match this.state {
40                State::ClosingStreamReceiver => {
41                    for stream in this.stream_receivers.iter_mut() {
42                        stream.inner_mut().close();
43                    }
44                    this.state = State::DrainingStreamReceiver;
45                }
46                State::DrainingStreamReceiver => match this.stream_receivers.poll_next_unpin(cx) {
47                    Poll::Ready(Some(cmd)) => {
48                        drop(cmd);
49                    }
50                    Poll::Ready(None) | Poll::Pending =>
51                        return Poll::Ready(
52                            this.error.take().expect("to not be called after completion"),
53                        ),
54                },
55            }
56        }
57    }
58}
59
60#[allow(clippy::enum_variant_names)]
61enum State {
62    ClosingStreamReceiver,
63    DrainingStreamReceiver,
64}