litep2p/yamux/connection/
cleanup.rs1use crate::yamux::{
2 connection::StreamCommand, tagged_stream::TaggedStream, ConnectionError, StreamId,
3};
4use futures::{channel::mpsc, stream::SelectAll, StreamExt};
5use std::{
6 future::Future,
7 pin::Pin,
8 task::{Context, Poll},
9};
10
11#[must_use]
13pub struct Cleanup {
14 state: State,
15 stream_receivers: SelectAll<TaggedStream<StreamId, mpsc::Receiver<StreamCommand>>>,
16 error: Option<ConnectionError>,
17}
18
19impl Cleanup {
20 pub(crate) fn new(
21 stream_receivers: SelectAll<TaggedStream<StreamId, mpsc::Receiver<StreamCommand>>>,
22 error: ConnectionError,
23 ) -> Self {
24 Self {
25 state: State::ClosingStreamReceiver,
26 stream_receivers,
27 error: Some(error),
28 }
29 }
30}
31
32impl Future for Cleanup {
33 type Output = ConnectionError;
34
35 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
36 let this = self.get_mut();
37
38 loop {
39 match this.state {
40 State::ClosingStreamReceiver => {
41 for stream in this.stream_receivers.iter_mut() {
42 stream.inner_mut().close();
43 }
44 this.state = State::DrainingStreamReceiver;
45 }
46 State::DrainingStreamReceiver => match this.stream_receivers.poll_next_unpin(cx) {
47 Poll::Ready(Some(cmd)) => {
48 drop(cmd);
49 }
50 Poll::Ready(None) | Poll::Pending =>
51 return Poll::Ready(
52 this.error.take().expect("to not be called after completion"),
53 ),
54 },
55 }
56 }
57 }
58}
59
60#[allow(clippy::enum_variant_names)]
61enum State {
62 ClosingStreamReceiver,
63 DrainingStreamReceiver,
64}