1use crate::yamux::{error::ConnectionError, Connection, Result, Stream, MAX_ACK_BACKLOG};
12use futures::{
13 channel::{mpsc, oneshot},
14 prelude::*,
15};
16use std::{
17 pin::Pin,
18 task::{Context, Poll},
19};
20
21#[derive(Clone, Debug)]
28pub struct Control {
29 sender: mpsc::Sender<ControlCommand>,
31}
32
33impl Control {
34 pub fn new<T>(connection: Connection<T>) -> (Self, ControlledConnection<T>) {
35 let (sender, receiver) = mpsc::channel(MAX_ACK_BACKLOG);
36
37 let control = Control { sender };
38 let connection = ControlledConnection {
39 state: State::Idle(connection),
40 commands: receiver,
41 };
42
43 (control, connection)
44 }
45
46 pub async fn open_stream(&mut self) -> Result<Stream> {
48 let (tx, rx) = oneshot::channel();
49 self.sender.send(ControlCommand::OpenStream(tx)).await?;
50 rx.await?
51 }
52
53 pub async fn close(&mut self) -> Result<()> {
55 let (tx, rx) = oneshot::channel();
56 if self.sender.send(ControlCommand::CloseConnection(tx)).await.is_err() {
57 return Ok(());
59 }
60 let _ = rx.await;
63 Ok(())
64 }
65}
66
67pub struct ControlledConnection<T> {
69 state: State<T>,
70 commands: mpsc::Receiver<ControlCommand>,
71}
72
73impl<T> ControlledConnection<T>
74where
75 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
76{
77 fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Stream>>> {
78 loop {
79 match std::mem::replace(&mut self.state, State::Poisoned) {
80 State::Idle(mut connection) => {
81 match connection.poll_next_inbound(cx) {
82 Poll::Ready(maybe_stream) => {
83 self.state = State::Idle(connection);
84 return Poll::Ready(maybe_stream);
85 }
86 Poll::Pending => {}
87 }
88
89 match self.commands.poll_next_unpin(cx) {
90 Poll::Ready(Some(ControlCommand::OpenStream(reply))) => {
91 self.state = State::OpeningNewStream { reply, connection };
92 continue;
93 }
94 Poll::Ready(Some(ControlCommand::CloseConnection(reply))) => {
95 self.commands.close();
96
97 self.state = State::Closing {
98 reply: Some(reply),
99 inner: Closing::DrainingControlCommands { connection },
100 };
101 continue;
102 }
103 Poll::Ready(None) => {
104 self.state = State::Closing {
106 reply: None,
107 inner: Closing::ClosingConnection { connection },
108 };
109 continue;
110 }
111 Poll::Pending => {}
112 }
113
114 self.state = State::Idle(connection);
115 return Poll::Pending;
116 }
117 State::OpeningNewStream {
118 reply,
119 mut connection,
120 } => match connection.poll_new_outbound(cx) {
121 Poll::Ready(stream) => {
122 let _ = reply.send(stream);
123
124 self.state = State::Idle(connection);
125 continue;
126 }
127 Poll::Pending => {
128 self.state = State::OpeningNewStream { reply, connection };
129 return Poll::Pending;
130 }
131 },
132 State::Closing {
133 reply,
134 inner: Closing::DrainingControlCommands { connection },
135 } => match self.commands.poll_next_unpin(cx) {
136 Poll::Ready(Some(ControlCommand::OpenStream(new_reply))) => {
137 let _ = new_reply.send(Err(ConnectionError::Closed));
138
139 self.state = State::Closing {
140 reply,
141 inner: Closing::DrainingControlCommands { connection },
142 };
143 continue;
144 }
145 Poll::Ready(Some(ControlCommand::CloseConnection(new_reply))) => {
146 let _ = new_reply.send(());
147
148 self.state = State::Closing {
149 reply,
150 inner: Closing::DrainingControlCommands { connection },
151 };
152 continue;
153 }
154 Poll::Ready(None) => {
155 self.state = State::Closing {
156 reply,
157 inner: Closing::ClosingConnection { connection },
158 };
159 continue;
160 }
161 Poll::Pending => {
162 self.state = State::Closing {
163 reply,
164 inner: Closing::DrainingControlCommands { connection },
165 };
166 return Poll::Pending;
167 }
168 },
169 State::Closing {
170 reply,
171 inner: Closing::ClosingConnection { mut connection },
172 } => match connection.poll_close(cx) {
173 Poll::Ready(Ok(())) | Poll::Ready(Err(ConnectionError::Closed)) => {
174 if let Some(reply) = reply {
175 let _ = reply.send(());
176 }
177 return Poll::Ready(None);
178 }
179 Poll::Ready(Err(other)) => {
180 if let Some(reply) = reply {
181 let _ = reply.send(());
182 }
183 return Poll::Ready(Some(Err(other)));
184 }
185 Poll::Pending => {
186 self.state = State::Closing {
187 reply,
188 inner: Closing::ClosingConnection { connection },
189 };
190 return Poll::Pending;
191 }
192 },
193 State::Poisoned => unreachable!(),
194 }
195 }
196 }
197}
198
199#[derive(Debug)]
200enum ControlCommand {
201 OpenStream(oneshot::Sender<Result<Stream>>),
203 CloseConnection(oneshot::Sender<()>),
205}
206
207enum State<T> {
209 Idle(Connection<T>),
210 OpeningNewStream {
211 reply: oneshot::Sender<Result<Stream>>,
212 connection: Connection<T>,
213 },
214 Closing {
215 reply: Option<oneshot::Sender<()>>,
218 inner: Closing<T>,
219 },
220 Poisoned,
221}
222
223enum Closing<T> {
230 DrainingControlCommands { connection: Connection<T> },
231 ClosingConnection { connection: Connection<T> },
232}
233
234impl<T> futures::Stream for ControlledConnection<T>
235where
236 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
237{
238 type Item = Result<Stream>;
239
240 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
241 self.get_mut().poll_next(cx)
242 }
243}