litep2p/yamux/
control.rs

1// Copyright (c) 2018-2019 Parity Technologies (UK) Ltd.
2//
3// Licensed under the Apache License, Version 2.0 or MIT license, at your option.
4//
5// A copy of the Apache License, Version 2.0 is included in the software as
6// LICENSE-APACHE and a copy of the MIT license is included in the software
7// as LICENSE-MIT. You may also obtain a copy of the Apache License, Version 2.0
8// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license
9// at https://opensource.org/licenses/MIT.
10
11use crate::yamux::{error::ConnectionError, Connection, Result, Stream, MAX_ACK_BACKLOG};
12use futures::{
13    channel::{mpsc, oneshot},
14    prelude::*,
15};
16use std::{
17    pin::Pin,
18    task::{Context, Poll},
19};
20
21/// A Yamux [`Connection`] controller.
22///
23/// This presents an alternative API for using a yamux [`Connection`].
24///
25/// A [`Control`] communicates with a [`ControlledConnection`] via a channel. This allows
26/// a [`Control`] to be cloned and shared between tasks and threads.
27#[derive(Clone, Debug)]
28pub struct Control {
29    /// Command channel to [`ControlledConnection`].
30    sender: mpsc::Sender<ControlCommand>,
31}
32
33impl Control {
34    pub fn new<T>(connection: Connection<T>) -> (Self, ControlledConnection<T>) {
35        let (sender, receiver) = mpsc::channel(MAX_ACK_BACKLOG);
36
37        let control = Control { sender };
38        let connection = ControlledConnection {
39            state: State::Idle(connection),
40            commands: receiver,
41        };
42
43        (control, connection)
44    }
45
46    /// Open a new stream to the remote.
47    pub async fn open_stream(&mut self) -> Result<Stream> {
48        let (tx, rx) = oneshot::channel();
49        self.sender.send(ControlCommand::OpenStream(tx)).await?;
50        rx.await?
51    }
52
53    /// Close the connection.
54    pub async fn close(&mut self) -> Result<()> {
55        let (tx, rx) = oneshot::channel();
56        if self.sender.send(ControlCommand::CloseConnection(tx)).await.is_err() {
57            // The receiver is closed which means the connection is already closed.
58            return Ok(());
59        }
60        // A dropped `oneshot::Sender` means the `Connection` is gone,
61        // so we do not treat receive errors differently here.
62        let _ = rx.await;
63        Ok(())
64    }
65}
66
67/// Wraps a [`Connection`] which can be controlled with a [`Control`].
68pub struct ControlledConnection<T> {
69    state: State<T>,
70    commands: mpsc::Receiver<ControlCommand>,
71}
72
73impl<T> ControlledConnection<T>
74where
75    T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
76{
77    fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Stream>>> {
78        loop {
79            match std::mem::replace(&mut self.state, State::Poisoned) {
80                State::Idle(mut connection) => {
81                    match connection.poll_next_inbound(cx) {
82                        Poll::Ready(maybe_stream) => {
83                            self.state = State::Idle(connection);
84                            return Poll::Ready(maybe_stream);
85                        }
86                        Poll::Pending => {}
87                    }
88
89                    match self.commands.poll_next_unpin(cx) {
90                        Poll::Ready(Some(ControlCommand::OpenStream(reply))) => {
91                            self.state = State::OpeningNewStream { reply, connection };
92                            continue;
93                        }
94                        Poll::Ready(Some(ControlCommand::CloseConnection(reply))) => {
95                            self.commands.close();
96
97                            self.state = State::Closing {
98                                reply: Some(reply),
99                                inner: Closing::DrainingControlCommands { connection },
100                            };
101                            continue;
102                        }
103                        Poll::Ready(None) => {
104                            // Last `Control` sender was dropped, close te connection.
105                            self.state = State::Closing {
106                                reply: None,
107                                inner: Closing::ClosingConnection { connection },
108                            };
109                            continue;
110                        }
111                        Poll::Pending => {}
112                    }
113
114                    self.state = State::Idle(connection);
115                    return Poll::Pending;
116                }
117                State::OpeningNewStream {
118                    reply,
119                    mut connection,
120                } => match connection.poll_new_outbound(cx) {
121                    Poll::Ready(stream) => {
122                        let _ = reply.send(stream);
123
124                        self.state = State::Idle(connection);
125                        continue;
126                    }
127                    Poll::Pending => {
128                        self.state = State::OpeningNewStream { reply, connection };
129                        return Poll::Pending;
130                    }
131                },
132                State::Closing {
133                    reply,
134                    inner: Closing::DrainingControlCommands { connection },
135                } => match self.commands.poll_next_unpin(cx) {
136                    Poll::Ready(Some(ControlCommand::OpenStream(new_reply))) => {
137                        let _ = new_reply.send(Err(ConnectionError::Closed));
138
139                        self.state = State::Closing {
140                            reply,
141                            inner: Closing::DrainingControlCommands { connection },
142                        };
143                        continue;
144                    }
145                    Poll::Ready(Some(ControlCommand::CloseConnection(new_reply))) => {
146                        let _ = new_reply.send(());
147
148                        self.state = State::Closing {
149                            reply,
150                            inner: Closing::DrainingControlCommands { connection },
151                        };
152                        continue;
153                    }
154                    Poll::Ready(None) => {
155                        self.state = State::Closing {
156                            reply,
157                            inner: Closing::ClosingConnection { connection },
158                        };
159                        continue;
160                    }
161                    Poll::Pending => {
162                        self.state = State::Closing {
163                            reply,
164                            inner: Closing::DrainingControlCommands { connection },
165                        };
166                        return Poll::Pending;
167                    }
168                },
169                State::Closing {
170                    reply,
171                    inner: Closing::ClosingConnection { mut connection },
172                } => match connection.poll_close(cx) {
173                    Poll::Ready(Ok(())) | Poll::Ready(Err(ConnectionError::Closed)) => {
174                        if let Some(reply) = reply {
175                            let _ = reply.send(());
176                        }
177                        return Poll::Ready(None);
178                    }
179                    Poll::Ready(Err(other)) => {
180                        if let Some(reply) = reply {
181                            let _ = reply.send(());
182                        }
183                        return Poll::Ready(Some(Err(other)));
184                    }
185                    Poll::Pending => {
186                        self.state = State::Closing {
187                            reply,
188                            inner: Closing::ClosingConnection { connection },
189                        };
190                        return Poll::Pending;
191                    }
192                },
193                State::Poisoned => unreachable!(),
194            }
195        }
196    }
197}
198
199#[derive(Debug)]
200enum ControlCommand {
201    /// Open a new stream to the remote end.
202    OpenStream(oneshot::Sender<Result<Stream>>),
203    /// Close the whole connection.
204    CloseConnection(oneshot::Sender<()>),
205}
206
207/// The state of a [`ControlledConnection`].
208enum State<T> {
209    Idle(Connection<T>),
210    OpeningNewStream {
211        reply: oneshot::Sender<Result<Stream>>,
212        connection: Connection<T>,
213    },
214    Closing {
215        /// A channel to the [`Control`] in case the close was requested. `None` if we are closing
216        /// because the last [`Control`] was dropped.
217        reply: Option<oneshot::Sender<()>>,
218        inner: Closing<T>,
219    },
220    Poisoned,
221}
222
223/// A sub-state of our larger state machine for a [`ControlledConnection`].
224///
225/// Closing connection involves two steps:
226///
227/// 1. Draining and answered all remaining [`Closing::DrainingControlCommands`].
228/// 1. Closing the underlying [`Connection`].
229enum Closing<T> {
230    DrainingControlCommands { connection: Connection<T> },
231    ClosingConnection { connection: Connection<T> },
232}
233
234impl<T> futures::Stream for ControlledConnection<T>
235where
236    T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
237{
238    type Item = Result<Stream>;
239
240    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
241        self.get_mut().poll_next(cx)
242    }
243}