libp2p_swarm/connection/pool/
task.rs

1// Copyright 2021 Protocol Labs.
2// Copyright 2018 Parity Technologies (UK) Ltd.
3//
4// Permission is hereby granted, free of charge, to any person obtaining a
5// copy of this software and associated documentation files (the "Software"),
6// to deal in the Software without restriction, including without limitation
7// the rights to use, copy, modify, merge, publish, distribute, sublicense,
8// and/or sell copies of the Software, and to permit persons to whom the
9// Software is furnished to do so, subject to the following conditions:
10//
11// The above copyright notice and this permission notice shall be included in
12// all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20// DEALINGS IN THE SOFTWARE.
21
22//! Async functions driving pending and established connections in the form of a task.
23
24use super::concurrent_dial::ConcurrentDial;
25use crate::{
26    connection::{
27        self, ConnectionError, ConnectionId, PendingInboundConnectionError,
28        PendingOutboundConnectionError,
29    },
30    transport::TransportError,
31    ConnectionHandler, Multiaddr, PeerId,
32};
33use futures::{
34    channel::{mpsc, oneshot},
35    future::{poll_fn, Either, Future},
36    SinkExt, StreamExt,
37};
38use libp2p_core::muxing::StreamMuxerBox;
39use std::pin::Pin;
40use void::Void;
41
42/// Commands that can be sent to a task driving an established connection.
43#[derive(Debug)]
44pub(crate) enum Command<T> {
45    /// Notify the connection handler of an event.
46    NotifyHandler(T),
47    /// Gracefully close the connection (active close) before
48    /// terminating the task.
49    Close,
50}
51
52pub(crate) enum PendingConnectionEvent {
53    ConnectionEstablished {
54        id: ConnectionId,
55        output: (PeerId, StreamMuxerBox),
56        /// [`Some`] when the new connection is an outgoing connection.
57        /// Addresses are dialed in parallel. Contains the addresses and errors
58        /// of dial attempts that failed before the one successful dial.
59        outgoing: Option<(Multiaddr, Vec<(Multiaddr, TransportError<std::io::Error>)>)>,
60    },
61    /// A pending connection failed.
62    PendingFailed {
63        id: ConnectionId,
64        error: Either<PendingOutboundConnectionError, PendingInboundConnectionError>,
65    },
66}
67
68#[derive(Debug)]
69#[allow(deprecated)]
70pub(crate) enum EstablishedConnectionEvent<THandler: ConnectionHandler> {
71    /// A node we are connected to has changed its address.
72    AddressChange {
73        id: ConnectionId,
74        peer_id: PeerId,
75        new_address: Multiaddr,
76    },
77    /// Notify the manager of an event from the connection.
78    Notify {
79        id: ConnectionId,
80        peer_id: PeerId,
81        event: THandler::ToBehaviour,
82    },
83    /// A connection closed, possibly due to an error.
84    ///
85    /// If `error` is `None`, the connection has completed
86    /// an active orderly close.
87    Closed {
88        id: ConnectionId,
89        peer_id: PeerId,
90        error: Option<ConnectionError<THandler::Error>>,
91        handler: THandler,
92    },
93}
94
95pub(crate) async fn new_for_pending_outgoing_connection(
96    connection_id: ConnectionId,
97    dial: ConcurrentDial,
98    abort_receiver: oneshot::Receiver<Void>,
99    mut events: mpsc::Sender<PendingConnectionEvent>,
100) {
101    match futures::future::select(abort_receiver, Box::pin(dial)).await {
102        Either::Left((Err(oneshot::Canceled), _)) => {
103            let _ = events
104                .send(PendingConnectionEvent::PendingFailed {
105                    id: connection_id,
106                    error: Either::Left(PendingOutboundConnectionError::Aborted),
107                })
108                .await;
109        }
110        Either::Left((Ok(v), _)) => void::unreachable(v),
111        Either::Right((Ok((address, output, errors)), _)) => {
112            let _ = events
113                .send(PendingConnectionEvent::ConnectionEstablished {
114                    id: connection_id,
115                    output,
116                    outgoing: Some((address, errors)),
117                })
118                .await;
119        }
120        Either::Right((Err(e), _)) => {
121            let _ = events
122                .send(PendingConnectionEvent::PendingFailed {
123                    id: connection_id,
124                    error: Either::Left(PendingOutboundConnectionError::Transport(e)),
125                })
126                .await;
127        }
128    }
129}
130
131pub(crate) async fn new_for_pending_incoming_connection<TFut>(
132    connection_id: ConnectionId,
133    future: TFut,
134    abort_receiver: oneshot::Receiver<Void>,
135    mut events: mpsc::Sender<PendingConnectionEvent>,
136) where
137    TFut: Future<Output = Result<(PeerId, StreamMuxerBox), std::io::Error>> + Send + 'static,
138{
139    match futures::future::select(abort_receiver, Box::pin(future)).await {
140        Either::Left((Err(oneshot::Canceled), _)) => {
141            let _ = events
142                .send(PendingConnectionEvent::PendingFailed {
143                    id: connection_id,
144                    error: Either::Right(PendingInboundConnectionError::Aborted),
145                })
146                .await;
147        }
148        Either::Left((Ok(v), _)) => void::unreachable(v),
149        Either::Right((Ok(output), _)) => {
150            let _ = events
151                .send(PendingConnectionEvent::ConnectionEstablished {
152                    id: connection_id,
153                    output,
154                    outgoing: None,
155                })
156                .await;
157        }
158        Either::Right((Err(e), _)) => {
159            let _ = events
160                .send(PendingConnectionEvent::PendingFailed {
161                    id: connection_id,
162                    error: Either::Right(PendingInboundConnectionError::Transport(
163                        TransportError::Other(e),
164                    )),
165                })
166                .await;
167        }
168    }
169}
170
171pub(crate) async fn new_for_established_connection<THandler>(
172    connection_id: ConnectionId,
173    peer_id: PeerId,
174    mut connection: crate::connection::Connection<THandler>,
175    mut command_receiver: mpsc::Receiver<Command<THandler::FromBehaviour>>,
176    mut events: mpsc::Sender<EstablishedConnectionEvent<THandler>>,
177) where
178    THandler: ConnectionHandler,
179{
180    loop {
181        match futures::future::select(
182            command_receiver.next(),
183            poll_fn(|cx| Pin::new(&mut connection).poll(cx)),
184        )
185        .await
186        {
187            Either::Left((Some(command), _)) => match command {
188                Command::NotifyHandler(event) => connection.on_behaviour_event(event),
189                Command::Close => {
190                    command_receiver.close();
191                    let (handler, closing_muxer) = connection.close();
192
193                    let error = closing_muxer.await.err().map(ConnectionError::IO);
194                    let _ = events
195                        .send(EstablishedConnectionEvent::Closed {
196                            id: connection_id,
197                            peer_id,
198                            error,
199                            handler,
200                        })
201                        .await;
202                    return;
203                }
204            },
205
206            // The manager has disappeared; abort.
207            Either::Left((None, _)) => return,
208
209            Either::Right((event, _)) => {
210                match event {
211                    Ok(connection::Event::Handler(event)) => {
212                        let _ = events
213                            .send(EstablishedConnectionEvent::Notify {
214                                id: connection_id,
215                                peer_id,
216                                event,
217                            })
218                            .await;
219                    }
220                    Ok(connection::Event::AddressChange(new_address)) => {
221                        let _ = events
222                            .send(EstablishedConnectionEvent::AddressChange {
223                                id: connection_id,
224                                peer_id,
225                                new_address,
226                            })
227                            .await;
228                    }
229                    Err(error) => {
230                        command_receiver.close();
231                        let (handler, _closing_muxer) = connection.close();
232                        // Terminate the task with the error, dropping the connection.
233                        let _ = events
234                            .send(EstablishedConnectionEvent::Closed {
235                                id: connection_id,
236                                peer_id,
237                                error: Some(error),
238                                handler,
239                            })
240                            .await;
241                        return;
242                    }
243                }
244            }
245        }
246    }
247}