litep2p/yamux/connection/
closing.rs

1use crate::yamux::{
2    connection::StreamCommand, frame, frame::Frame, tagged_stream::TaggedStream, Result, StreamId,
3};
4use futures::{
5    channel::mpsc,
6    ready,
7    stream::{Fuse, SelectAll},
8    AsyncRead, AsyncWrite, SinkExt, StreamExt,
9};
10use std::{
11    collections::VecDeque,
12    future::Future,
13    pin::Pin,
14    task::{Context, Poll},
15};
16
17/// A [`Future`] that gracefully closes the yamux connection.
18#[must_use]
19pub struct Closing<T> {
20    state: State,
21    stream_receivers: SelectAll<TaggedStream<StreamId, mpsc::Receiver<StreamCommand>>>,
22    pending_frames: VecDeque<Frame<()>>,
23    socket: Fuse<frame::Io<T>>,
24}
25
26impl<T> Closing<T>
27where
28    T: AsyncRead + AsyncWrite + Unpin,
29{
30    pub(crate) fn new(
31        stream_receivers: SelectAll<TaggedStream<StreamId, mpsc::Receiver<StreamCommand>>>,
32        pending_frames: VecDeque<Frame<()>>,
33        socket: Fuse<frame::Io<T>>,
34    ) -> Self {
35        Self {
36            state: State::ClosingStreamReceiver,
37            stream_receivers,
38            pending_frames,
39            socket,
40        }
41    }
42}
43
44impl<T> Future for Closing<T>
45where
46    T: AsyncRead + AsyncWrite + Unpin,
47{
48    type Output = Result<()>;
49
50    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
51        let this = self.get_mut();
52
53        loop {
54            match this.state {
55                State::ClosingStreamReceiver => {
56                    for stream in this.stream_receivers.iter_mut() {
57                        stream.inner_mut().close();
58                    }
59                    this.state = State::DrainingStreamReceiver;
60                }
61
62                State::DrainingStreamReceiver => {
63                    match this.stream_receivers.poll_next_unpin(cx) {
64                        Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) =>
65                            this.pending_frames.push_back(frame.into()),
66                        Poll::Ready(Some((id, Some(StreamCommand::CloseStream { ack })))) => {
67                            this.pending_frames.push_back(Frame::close_stream(id, ack).into());
68                        }
69                        Poll::Ready(Some((_, None))) => {}
70                        Poll::Pending | Poll::Ready(None) => {
71                            // No more frames from streams, append `Term` frame and flush them all.
72                            this.pending_frames.push_back(Frame::term().into());
73                            this.state = State::FlushingPendingFrames;
74                            continue;
75                        }
76                    }
77                }
78                State::FlushingPendingFrames => {
79                    ready!(this.socket.poll_ready_unpin(cx))?;
80
81                    match this.pending_frames.pop_front() {
82                        Some(frame) => this.socket.start_send_unpin(frame)?,
83                        None => this.state = State::ClosingSocket,
84                    }
85                }
86                State::ClosingSocket => {
87                    ready!(this.socket.poll_close_unpin(cx))?;
88
89                    return Poll::Ready(Ok(()));
90                }
91            }
92        }
93    }
94}
95
96enum State {
97    ClosingStreamReceiver,
98    DrainingStreamReceiver,
99    FlushingPendingFrames,
100    ClosingSocket,
101}