litep2p/yamux/connection/
closing.rs1use crate::yamux::{
2 connection::StreamCommand, frame, frame::Frame, tagged_stream::TaggedStream, Result, StreamId,
3};
4use futures::{
5 channel::mpsc,
6 ready,
7 stream::{Fuse, SelectAll},
8 AsyncRead, AsyncWrite, SinkExt, StreamExt,
9};
10use std::{
11 collections::VecDeque,
12 future::Future,
13 pin::Pin,
14 task::{Context, Poll},
15};
16
17#[must_use]
19pub struct Closing<T> {
20 state: State,
21 stream_receivers: SelectAll<TaggedStream<StreamId, mpsc::Receiver<StreamCommand>>>,
22 pending_frames: VecDeque<Frame<()>>,
23 socket: Fuse<frame::Io<T>>,
24}
25
26impl<T> Closing<T>
27where
28 T: AsyncRead + AsyncWrite + Unpin,
29{
30 pub(crate) fn new(
31 stream_receivers: SelectAll<TaggedStream<StreamId, mpsc::Receiver<StreamCommand>>>,
32 pending_frames: VecDeque<Frame<()>>,
33 socket: Fuse<frame::Io<T>>,
34 ) -> Self {
35 Self {
36 state: State::ClosingStreamReceiver,
37 stream_receivers,
38 pending_frames,
39 socket,
40 }
41 }
42}
43
44impl<T> Future for Closing<T>
45where
46 T: AsyncRead + AsyncWrite + Unpin,
47{
48 type Output = Result<()>;
49
50 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
51 let this = self.get_mut();
52
53 loop {
54 match this.state {
55 State::ClosingStreamReceiver => {
56 for stream in this.stream_receivers.iter_mut() {
57 stream.inner_mut().close();
58 }
59 this.state = State::DrainingStreamReceiver;
60 }
61
62 State::DrainingStreamReceiver => {
63 match this.stream_receivers.poll_next_unpin(cx) {
64 Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) =>
65 this.pending_frames.push_back(frame.into()),
66 Poll::Ready(Some((id, Some(StreamCommand::CloseStream { ack })))) => {
67 this.pending_frames.push_back(Frame::close_stream(id, ack).into());
68 }
69 Poll::Ready(Some((_, None))) => {}
70 Poll::Pending | Poll::Ready(None) => {
71 this.pending_frames.push_back(Frame::term().into());
73 this.state = State::FlushingPendingFrames;
74 continue;
75 }
76 }
77 }
78 State::FlushingPendingFrames => {
79 ready!(this.socket.poll_ready_unpin(cx))?;
80
81 match this.pending_frames.pop_front() {
82 Some(frame) => this.socket.start_send_unpin(frame)?,
83 None => this.state = State::ClosingSocket,
84 }
85 }
86 State::ClosingSocket => {
87 ready!(this.socket.poll_close_unpin(cx))?;
88
89 return Poll::Ready(Ok(()));
90 }
91 }
92 }
93 }
94}
95
96enum State {
97 ClosingStreamReceiver,
98 DrainingStreamReceiver,
99 FlushingPendingFrames,
100 ClosingSocket,
101}