yamux/connection/
closing.rs

1use crate::connection::StreamCommand;
2use crate::frame::Frame;
3use crate::tagged_stream::TaggedStream;
4use crate::Result;
5use crate::{frame, StreamId};
6use futures::channel::mpsc;
7use futures::stream::{Fuse, SelectAll};
8use futures::{ready, AsyncRead, AsyncWrite, SinkExt, StreamExt};
9use std::collections::VecDeque;
10use std::future::Future;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14/// A [`Future`] that gracefully closes the yamux connection.
15#[must_use]
16pub struct Closing<T> {
17    state: State,
18    stream_receivers: SelectAll<TaggedStream<StreamId, mpsc::Receiver<StreamCommand>>>,
19    pending_frames: VecDeque<Frame<()>>,
20    socket: Fuse<frame::Io<T>>,
21}
22
23impl<T> Closing<T>
24where
25    T: AsyncRead + AsyncWrite + Unpin,
26{
27    pub(crate) fn new(
28        stream_receivers: SelectAll<TaggedStream<StreamId, mpsc::Receiver<StreamCommand>>>,
29        pending_frames: VecDeque<Frame<()>>,
30        socket: Fuse<frame::Io<T>>,
31    ) -> Self {
32        Self {
33            state: State::ClosingStreamReceiver,
34            stream_receivers,
35            pending_frames,
36            socket,
37        }
38    }
39}
40
41impl<T> Future for Closing<T>
42where
43    T: AsyncRead + AsyncWrite + Unpin,
44{
45    type Output = Result<()>;
46
47    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
48        let mut this = self.get_mut();
49
50        loop {
51            match this.state {
52                State::ClosingStreamReceiver => {
53                    for stream in this.stream_receivers.iter_mut() {
54                        stream.inner_mut().close();
55                    }
56                    this.state = State::DrainingStreamReceiver;
57                }
58
59                State::DrainingStreamReceiver => {
60                    match this.stream_receivers.poll_next_unpin(cx) {
61                        Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) => {
62                            this.pending_frames.push_back(frame.into())
63                        }
64                        Poll::Ready(Some((id, Some(StreamCommand::CloseStream { ack })))) => {
65                            this.pending_frames
66                                .push_back(Frame::close_stream(id, ack).into());
67                        }
68                        Poll::Ready(Some((_, None))) => {}
69                        Poll::Pending | Poll::Ready(None) => {
70                            // No more frames from streams, append `Term` frame and flush them all.
71                            this.pending_frames.push_back(Frame::term().into());
72                            this.state = State::FlushingPendingFrames;
73                            continue;
74                        }
75                    }
76                }
77                State::FlushingPendingFrames => {
78                    ready!(this.socket.poll_ready_unpin(cx))?;
79
80                    match this.pending_frames.pop_front() {
81                        Some(frame) => this.socket.start_send_unpin(frame)?,
82                        None => this.state = State::ClosingSocket,
83                    }
84                }
85                State::ClosingSocket => {
86                    ready!(this.socket.poll_close_unpin(cx))?;
87
88                    return Poll::Ready(Ok(()));
89                }
90            }
91        }
92    }
93}
94
95enum State {
96    ClosingStreamReceiver,
97    DrainingStreamReceiver,
98    FlushingPendingFrames,
99    ClosingSocket,
100}