1use crate::yamux::{Connection, ConnectionError, Result, Stream, MAX_ACK_BACKLOG};
12
13use futures::{
14 channel::{mpsc, oneshot},
15 prelude::*,
16};
17use std::{
18 pin::Pin,
19 task::{Context, Poll},
20};
21
22const LOG_TARGET: &str = "litep2p::yamux::control";
23
24#[derive(Clone, Debug)]
31pub struct Control {
32 sender: mpsc::Sender<ControlCommand>,
34}
35
36impl Control {
37 pub fn new<T>(connection: Connection<T>) -> (Self, ControlledConnection<T>) {
38 let (sender, receiver) = mpsc::channel(MAX_ACK_BACKLOG);
39
40 let control = Control { sender };
41 let connection = ControlledConnection {
42 state: State::Idle(connection),
43 commands: receiver,
44 };
45
46 (control, connection)
47 }
48
49 pub async fn open_stream(&mut self) -> Result<Stream> {
51 let (tx, rx) = oneshot::channel();
52 self.sender.send(ControlCommand::OpenStream(tx)).await?;
53 rx.await?
54 }
55
56 pub async fn close(&mut self) -> Result<()> {
58 let (tx, rx) = oneshot::channel();
59 if self.sender.send(ControlCommand::CloseConnection(tx)).await.is_err() {
60 return Ok(());
62 }
63 let _ = rx.await;
66 Ok(())
67 }
68}
69
70pub struct ControlledConnection<T> {
72 state: State<T>,
73 commands: mpsc::Receiver<ControlCommand>,
74}
75
76impl<T> ControlledConnection<T>
77where
78 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
79{
80 fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Stream>>> {
81 loop {
82 match std::mem::replace(&mut self.state, State::Poisoned) {
83 State::Idle(mut connection) => {
84 match connection.poll_next_inbound(cx) {
85 Poll::Ready(maybe_stream) => {
86 match maybe_stream.as_ref() {
91 Some(Err(error)) => {
92 tracing::debug!(target: LOG_TARGET, ?error, "Inbound stream error, closing connection");
93
94 self.state = State::Closing {
95 reply: None,
96 inner: Closing::DrainingControlCommands { connection },
97 };
98 }
99 other => {
100 tracing::debug!(target: LOG_TARGET, ?other, "Inbound stream reset state to idle");
101 self.state = State::Idle(connection)
102 }
103 }
104
105 return Poll::Ready(maybe_stream);
106 }
107 Poll::Pending => {}
108 }
109
110 match self.commands.poll_next_unpin(cx) {
111 Poll::Ready(Some(ControlCommand::OpenStream(reply))) => {
112 self.state = State::OpeningNewStream { reply, connection };
113 continue;
114 }
115 Poll::Ready(Some(ControlCommand::CloseConnection(reply))) => {
116 self.commands.close();
117
118 self.state = State::Closing {
119 reply: Some(reply),
120 inner: Closing::DrainingControlCommands { connection },
121 };
122 continue;
123 }
124 Poll::Ready(None) => {
125 self.state = State::Closing {
127 reply: None,
128 inner: Closing::ClosingConnection { connection },
129 };
130 continue;
131 }
132 Poll::Pending => {}
133 }
134
135 self.state = State::Idle(connection);
136 return Poll::Pending;
137 }
138 State::OpeningNewStream {
139 reply,
140 mut connection,
141 } => match connection.poll_new_outbound(cx) {
142 Poll::Ready(stream) => {
143 let _ = reply.send(stream);
144
145 self.state = State::Idle(connection);
146 continue;
147 }
148 Poll::Pending => {
149 self.state = State::OpeningNewStream { reply, connection };
150 return Poll::Pending;
151 }
152 },
153 State::Closing {
154 reply,
155 inner: Closing::DrainingControlCommands { connection },
156 } => match self.commands.poll_next_unpin(cx) {
157 Poll::Ready(Some(ControlCommand::OpenStream(new_reply))) => {
158 let _ = new_reply.send(Err(ConnectionError::Closed));
159
160 self.state = State::Closing {
161 reply,
162 inner: Closing::DrainingControlCommands { connection },
163 };
164 continue;
165 }
166 Poll::Ready(Some(ControlCommand::CloseConnection(new_reply))) => {
167 let _ = new_reply.send(());
168
169 self.state = State::Closing {
170 reply,
171 inner: Closing::DrainingControlCommands { connection },
172 };
173 continue;
174 }
175 Poll::Ready(None) => {
176 self.state = State::Closing {
177 reply,
178 inner: Closing::ClosingConnection { connection },
179 };
180 continue;
181 }
182 Poll::Pending => {
183 self.state = State::Closing {
184 reply,
185 inner: Closing::DrainingControlCommands { connection },
186 };
187 return Poll::Pending;
188 }
189 },
190 State::Closing {
191 reply,
192 inner: Closing::ClosingConnection { mut connection },
193 } => match connection.poll_close(cx) {
194 Poll::Ready(Ok(())) | Poll::Ready(Err(ConnectionError::Closed)) => {
195 if let Some(reply) = reply {
196 let _ = reply.send(());
197 }
198 return Poll::Ready(None);
199 }
200 Poll::Ready(Err(other)) => {
201 if let Some(reply) = reply {
202 let _ = reply.send(());
203 }
204 return Poll::Ready(Some(Err(other)));
205 }
206 Poll::Pending => {
207 self.state = State::Closing {
208 reply,
209 inner: Closing::ClosingConnection { connection },
210 };
211 return Poll::Pending;
212 }
213 },
214 State::Poisoned => return Poll::Pending,
215 }
216 }
217 }
218}
219
220#[derive(Debug)]
221enum ControlCommand {
222 OpenStream(oneshot::Sender<Result<Stream>>),
224 CloseConnection(oneshot::Sender<()>),
226}
227
228enum State<T> {
230 Idle(Connection<T>),
231 OpeningNewStream {
232 reply: oneshot::Sender<Result<Stream>>,
233 connection: Connection<T>,
234 },
235 Closing {
236 reply: Option<oneshot::Sender<()>>,
239 inner: Closing<T>,
240 },
241 Poisoned,
242}
243
244enum Closing<T> {
251 DrainingControlCommands { connection: Connection<T> },
252 ClosingConnection { connection: Connection<T> },
253}
254
255impl<T> futures::Stream for ControlledConnection<T>
256where
257 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
258{
259 type Item = Result<Stream>;
260
261 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
262 self.get_mut().poll_next(cx)
263 }
264}