litep2p/yamux/
control.rs

1// Copyright (c) 2018-2019 Parity Technologies (UK) Ltd.
2//
3// Licensed under the Apache License, Version 2.0 or MIT license, at your option.
4//
5// A copy of the Apache License, Version 2.0 is included in the software as
6// LICENSE-APACHE and a copy of the MIT license is included in the software
7// as LICENSE-MIT. You may also obtain a copy of the Apache License, Version 2.0
8// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license
9// at https://opensource.org/licenses/MIT.
10
11use crate::yamux::{Connection, ConnectionError, Result, Stream, MAX_ACK_BACKLOG};
12
13use futures::{
14    channel::{mpsc, oneshot},
15    prelude::*,
16};
17use std::{
18    pin::Pin,
19    task::{Context, Poll},
20};
21
22const LOG_TARGET: &str = "litep2p::yamux::control";
23
24/// A Yamux [`Connection`] controller.
25///
26/// This presents an alternative API for using a yamux [`Connection`].
27///
28/// A [`Control`] communicates with a [`ControlledConnection`] via a channel. This allows
29/// a [`Control`] to be cloned and shared between tasks and threads.
30#[derive(Clone, Debug)]
31pub struct Control {
32    /// Command channel to [`ControlledConnection`].
33    sender: mpsc::Sender<ControlCommand>,
34}
35
36impl Control {
37    pub fn new<T>(connection: Connection<T>) -> (Self, ControlledConnection<T>) {
38        let (sender, receiver) = mpsc::channel(MAX_ACK_BACKLOG);
39
40        let control = Control { sender };
41        let connection = ControlledConnection {
42            state: State::Idle(connection),
43            commands: receiver,
44        };
45
46        (control, connection)
47    }
48
49    /// Open a new stream to the remote.
50    pub async fn open_stream(&mut self) -> Result<Stream> {
51        let (tx, rx) = oneshot::channel();
52        self.sender.send(ControlCommand::OpenStream(tx)).await?;
53        rx.await?
54    }
55
56    /// Close the connection.
57    pub async fn close(&mut self) -> Result<()> {
58        let (tx, rx) = oneshot::channel();
59        if self.sender.send(ControlCommand::CloseConnection(tx)).await.is_err() {
60            // The receiver is closed which means the connection is already closed.
61            return Ok(());
62        }
63        // A dropped `oneshot::Sender` means the `Connection` is gone,
64        // so we do not treat receive errors differently here.
65        let _ = rx.await;
66        Ok(())
67    }
68}
69
70/// Wraps a [`Connection`] which can be controlled with a [`Control`].
71pub struct ControlledConnection<T> {
72    state: State<T>,
73    commands: mpsc::Receiver<ControlCommand>,
74}
75
76impl<T> ControlledConnection<T>
77where
78    T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
79{
80    fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Stream>>> {
81        loop {
82            match std::mem::replace(&mut self.state, State::Poisoned) {
83                State::Idle(mut connection) => {
84                    match connection.poll_next_inbound(cx) {
85                        Poll::Ready(maybe_stream) => {
86                            // Transport layers will close the connection on the first
87                            // substream error. The `connection.poll_next_inbound` should
88                            // not be called again after returning an error. Instead, we
89                            // must close the connection gracefully.
90                            match maybe_stream.as_ref() {
91                                Some(Err(error)) => {
92                                    tracing::debug!(target: LOG_TARGET, ?error, "Inbound stream error, closing connection");
93
94                                    self.state = State::Closing {
95                                        reply: None,
96                                        inner: Closing::DrainingControlCommands { connection },
97                                    };
98                                }
99                                other => {
100                                    tracing::debug!(target: LOG_TARGET, ?other, "Inbound stream reset state to idle");
101                                    self.state = State::Idle(connection)
102                                }
103                            }
104
105                            return Poll::Ready(maybe_stream);
106                        }
107                        Poll::Pending => {}
108                    }
109
110                    match self.commands.poll_next_unpin(cx) {
111                        Poll::Ready(Some(ControlCommand::OpenStream(reply))) => {
112                            self.state = State::OpeningNewStream { reply, connection };
113                            continue;
114                        }
115                        Poll::Ready(Some(ControlCommand::CloseConnection(reply))) => {
116                            self.commands.close();
117
118                            self.state = State::Closing {
119                                reply: Some(reply),
120                                inner: Closing::DrainingControlCommands { connection },
121                            };
122                            continue;
123                        }
124                        Poll::Ready(None) => {
125                            // Last `Control` sender was dropped, close te connection.
126                            self.state = State::Closing {
127                                reply: None,
128                                inner: Closing::ClosingConnection { connection },
129                            };
130                            continue;
131                        }
132                        Poll::Pending => {}
133                    }
134
135                    self.state = State::Idle(connection);
136                    return Poll::Pending;
137                }
138                State::OpeningNewStream {
139                    reply,
140                    mut connection,
141                } => match connection.poll_new_outbound(cx) {
142                    Poll::Ready(stream) => {
143                        let _ = reply.send(stream);
144
145                        self.state = State::Idle(connection);
146                        continue;
147                    }
148                    Poll::Pending => {
149                        self.state = State::OpeningNewStream { reply, connection };
150                        return Poll::Pending;
151                    }
152                },
153                State::Closing {
154                    reply,
155                    inner: Closing::DrainingControlCommands { connection },
156                } => match self.commands.poll_next_unpin(cx) {
157                    Poll::Ready(Some(ControlCommand::OpenStream(new_reply))) => {
158                        let _ = new_reply.send(Err(ConnectionError::Closed));
159
160                        self.state = State::Closing {
161                            reply,
162                            inner: Closing::DrainingControlCommands { connection },
163                        };
164                        continue;
165                    }
166                    Poll::Ready(Some(ControlCommand::CloseConnection(new_reply))) => {
167                        let _ = new_reply.send(());
168
169                        self.state = State::Closing {
170                            reply,
171                            inner: Closing::DrainingControlCommands { connection },
172                        };
173                        continue;
174                    }
175                    Poll::Ready(None) => {
176                        self.state = State::Closing {
177                            reply,
178                            inner: Closing::ClosingConnection { connection },
179                        };
180                        continue;
181                    }
182                    Poll::Pending => {
183                        self.state = State::Closing {
184                            reply,
185                            inner: Closing::DrainingControlCommands { connection },
186                        };
187                        return Poll::Pending;
188                    }
189                },
190                State::Closing {
191                    reply,
192                    inner: Closing::ClosingConnection { mut connection },
193                } => match connection.poll_close(cx) {
194                    Poll::Ready(Ok(())) | Poll::Ready(Err(ConnectionError::Closed)) => {
195                        if let Some(reply) = reply {
196                            let _ = reply.send(());
197                        }
198                        return Poll::Ready(None);
199                    }
200                    Poll::Ready(Err(other)) => {
201                        if let Some(reply) = reply {
202                            let _ = reply.send(());
203                        }
204                        return Poll::Ready(Some(Err(other)));
205                    }
206                    Poll::Pending => {
207                        self.state = State::Closing {
208                            reply,
209                            inner: Closing::ClosingConnection { connection },
210                        };
211                        return Poll::Pending;
212                    }
213                },
214                State::Poisoned => return Poll::Pending,
215            }
216        }
217    }
218}
219
220#[derive(Debug)]
221enum ControlCommand {
222    /// Open a new stream to the remote end.
223    OpenStream(oneshot::Sender<Result<Stream>>),
224    /// Close the whole connection.
225    CloseConnection(oneshot::Sender<()>),
226}
227
228/// The state of a [`ControlledConnection`].
229enum State<T> {
230    Idle(Connection<T>),
231    OpeningNewStream {
232        reply: oneshot::Sender<Result<Stream>>,
233        connection: Connection<T>,
234    },
235    Closing {
236        /// A channel to the [`Control`] in case the close was requested. `None` if we are closing
237        /// because the last [`Control`] was dropped.
238        reply: Option<oneshot::Sender<()>>,
239        inner: Closing<T>,
240    },
241    Poisoned,
242}
243
244/// A sub-state of our larger state machine for a [`ControlledConnection`].
245///
246/// Closing connection involves two steps:
247///
248/// 1. Draining and answered all remaining [`Closing::DrainingControlCommands`].
249/// 1. Closing the underlying [`Connection`].
250enum Closing<T> {
251    DrainingControlCommands { connection: Connection<T> },
252    ClosingConnection { connection: Connection<T> },
253}
254
255impl<T> futures::Stream for ControlledConnection<T>
256where
257    T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
258{
259    type Item = Result<Stream>;
260
261    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
262        self.get_mut().poll_next(cx)
263    }
264}