yamux/connection/
closing.rs1use crate::connection::StreamCommand;
2use crate::frame::Frame;
3use crate::tagged_stream::TaggedStream;
4use crate::Result;
5use crate::{frame, StreamId};
6use futures::channel::mpsc;
7use futures::stream::{Fuse, SelectAll};
8use futures::{ready, AsyncRead, AsyncWrite, SinkExt, StreamExt};
9use std::collections::VecDeque;
10use std::future::Future;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14#[must_use]
16pub struct Closing<T> {
17 state: State,
18 stream_receivers: SelectAll<TaggedStream<StreamId, mpsc::Receiver<StreamCommand>>>,
19 pending_frames: VecDeque<Frame<()>>,
20 socket: Fuse<frame::Io<T>>,
21}
22
23impl<T> Closing<T>
24where
25 T: AsyncRead + AsyncWrite + Unpin,
26{
27 pub(crate) fn new(
28 stream_receivers: SelectAll<TaggedStream<StreamId, mpsc::Receiver<StreamCommand>>>,
29 pending_frames: VecDeque<Frame<()>>,
30 socket: Fuse<frame::Io<T>>,
31 ) -> Self {
32 Self {
33 state: State::ClosingStreamReceiver,
34 stream_receivers,
35 pending_frames,
36 socket,
37 }
38 }
39}
40
41impl<T> Future for Closing<T>
42where
43 T: AsyncRead + AsyncWrite + Unpin,
44{
45 type Output = Result<()>;
46
47 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
48 let mut this = self.get_mut();
49
50 loop {
51 match this.state {
52 State::ClosingStreamReceiver => {
53 for stream in this.stream_receivers.iter_mut() {
54 stream.inner_mut().close();
55 }
56 this.state = State::DrainingStreamReceiver;
57 }
58
59 State::DrainingStreamReceiver => {
60 match this.stream_receivers.poll_next_unpin(cx) {
61 Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) => {
62 this.pending_frames.push_back(frame.into())
63 }
64 Poll::Ready(Some((id, Some(StreamCommand::CloseStream { ack })))) => {
65 this.pending_frames
66 .push_back(Frame::close_stream(id, ack).into());
67 }
68 Poll::Ready(Some((_, None))) => {}
69 Poll::Pending | Poll::Ready(None) => {
70 this.pending_frames.push_back(Frame::term().into());
72 this.state = State::FlushingPendingFrames;
73 continue;
74 }
75 }
76 }
77 State::FlushingPendingFrames => {
78 ready!(this.socket.poll_ready_unpin(cx))?;
79
80 match this.pending_frames.pop_front() {
81 Some(frame) => this.socket.start_send_unpin(frame)?,
82 None => this.state = State::ClosingSocket,
83 }
84 }
85 State::ClosingSocket => {
86 ready!(this.socket.poll_close_unpin(cx))?;
87
88 return Poll::Ready(Ok(()));
89 }
90 }
91 }
92 }
93}
94
95enum State {
96 ClosingStreamReceiver,
97 DrainingStreamReceiver,
98 FlushingPendingFrames,
99 ClosingSocket,
100}