libp2p_swarm/connection/
pool.rs

1// Copyright 2021 Protocol Labs.
2// Copyright 2018 Parity Technologies (UK) Ltd.
3//
4// Permission is hereby granted, free of charge, to any person obtaining a
5// copy of this software and associated documentation files (the "Software"),
6// to deal in the Software without restriction, including without limitation
7// the rights to use, copy, modify, merge, publish, distribute, sublicense,
8// and/or sell copies of the Software, and to permit persons to whom the
9// Software is furnished to do so, subject to the following conditions:
10//
11// The above copyright notice and this permission notice shall be included in
12// all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20// DEALINGS IN THE SOFTWARE.
21use crate::connection::{Connection, ConnectionId, PendingPoint};
22use crate::{
23    connection::{
24        Connected, ConnectionError, IncomingInfo, PendingConnectionError,
25        PendingInboundConnectionError, PendingOutboundConnectionError,
26    },
27    transport::TransportError,
28    ConnectedPoint, ConnectionHandler, Executor, Multiaddr, PeerId,
29};
30use concurrent_dial::ConcurrentDial;
31use fnv::FnvHashMap;
32use futures::prelude::*;
33use futures::stream::SelectAll;
34use futures::{
35    channel::{mpsc, oneshot},
36    future::{poll_fn, BoxFuture, Either},
37    ready,
38    stream::FuturesUnordered,
39};
40use instant::{Duration, Instant};
41use libp2p_core::connection::Endpoint;
42use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt};
43use std::task::Waker;
44use std::{
45    collections::{hash_map, HashMap},
46    fmt,
47    num::{NonZeroU8, NonZeroUsize},
48    pin::Pin,
49    task::Context,
50    task::Poll,
51};
52use void::Void;
53
54mod concurrent_dial;
55mod task;
56
57enum ExecSwitch {
58    Executor(Box<dyn Executor + Send>),
59    LocalSpawn(FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>),
60}
61
62impl ExecSwitch {
63    fn advance_local(&mut self, cx: &mut Context) {
64        match self {
65            ExecSwitch::Executor(_) => {}
66            ExecSwitch::LocalSpawn(local) => {
67                while let Poll::Ready(Some(())) = local.poll_next_unpin(cx) {}
68            }
69        }
70    }
71
72    fn spawn(&mut self, task: impl Future<Output = ()> + Send + 'static) {
73        let task = task.boxed();
74
75        match self {
76            Self::Executor(executor) => executor.exec(task),
77            Self::LocalSpawn(local) => local.push(task),
78        }
79    }
80}
81
82/// A connection `Pool` manages a set of connections for each peer.
83pub(crate) struct Pool<THandler>
84where
85    THandler: ConnectionHandler,
86{
87    local_id: PeerId,
88
89    /// The connection counter(s).
90    counters: ConnectionCounters,
91
92    /// The managed connections of each peer that are currently considered established.
93    established: FnvHashMap<
94        PeerId,
95        FnvHashMap<ConnectionId, EstablishedConnection<THandler::FromBehaviour>>,
96    >,
97
98    /// The pending connections that are currently being negotiated.
99    pending: HashMap<ConnectionId, PendingConnection>,
100
101    /// Size of the task command buffer (per task).
102    task_command_buffer_size: usize,
103
104    /// Number of addresses concurrently dialed for a single outbound connection attempt.
105    dial_concurrency_factor: NonZeroU8,
106
107    /// The configured override for substream protocol upgrades, if any.
108    substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
109
110    /// The maximum number of inbound streams concurrently negotiating on a connection.
111    ///
112    /// See [`Connection::max_negotiating_inbound_streams`].
113    max_negotiating_inbound_streams: usize,
114
115    /// How many [`task::EstablishedConnectionEvent`]s can be buffered before the connection is back-pressured.
116    per_connection_event_buffer_size: usize,
117
118    /// The executor to use for running connection tasks. Can either be a global executor
119    /// or a local queue.
120    executor: ExecSwitch,
121
122    /// Sender distributed to pending tasks for reporting events back
123    /// to the pool.
124    pending_connection_events_tx: mpsc::Sender<task::PendingConnectionEvent>,
125
126    /// Receiver for events reported from pending tasks.
127    pending_connection_events_rx: mpsc::Receiver<task::PendingConnectionEvent>,
128
129    /// Waker in case we haven't established any connections yet.
130    no_established_connections_waker: Option<Waker>,
131
132    /// Receivers for events reported from established connections.
133    established_connection_events:
134        SelectAll<mpsc::Receiver<task::EstablishedConnectionEvent<THandler>>>,
135
136    /// Receivers for [`NewConnection`] objects that are dropped.
137    new_connection_dropped_listeners: FuturesUnordered<oneshot::Receiver<StreamMuxerBox>>,
138
139    /// How long a connection should be kept alive once it starts idling.
140    idle_connection_timeout: Duration,
141}
142
143#[derive(Debug)]
144pub(crate) struct EstablishedConnection<TInEvent> {
145    endpoint: ConnectedPoint,
146    /// Channel endpoint to send commands to the task.
147    sender: mpsc::Sender<task::Command<TInEvent>>,
148}
149
150impl<TInEvent> EstablishedConnection<TInEvent> {
151    /// (Asynchronously) sends an event to the connection handler.
152    ///
153    /// If the handler is not ready to receive the event, either because
154    /// it is busy or the connection is about to close, the given event
155    /// is returned with an `Err`.
156    ///
157    /// If execution of this method is preceded by successful execution of
158    /// `poll_ready_notify_handler` without another intervening execution
159    /// of `notify_handler`, it only fails if the connection is now about
160    /// to close.
161    pub(crate) fn notify_handler(&mut self, event: TInEvent) -> Result<(), TInEvent> {
162        let cmd = task::Command::NotifyHandler(event);
163        self.sender.try_send(cmd).map_err(|e| match e.into_inner() {
164            task::Command::NotifyHandler(event) => event,
165            _ => unreachable!("Expect failed send to return initial event."),
166        })
167    }
168
169    /// Checks if `notify_handler` is ready to accept an event.
170    ///
171    /// Returns `Ok(())` if the handler is ready to receive an event via `notify_handler`.
172    ///
173    /// Returns `Err(())` if the background task associated with the connection
174    /// is terminating and the connection is about to close.
175    pub(crate) fn poll_ready_notify_handler(
176        &mut self,
177        cx: &mut Context<'_>,
178    ) -> Poll<Result<(), ()>> {
179        self.sender.poll_ready(cx).map_err(|_| ())
180    }
181
182    /// Initiates a graceful close of the connection.
183    ///
184    /// Has no effect if the connection is already closing.
185    pub(crate) fn start_close(&mut self) {
186        // Clone the sender so that we are guaranteed to have
187        // capacity for the close command (every sender gets a slot).
188        match self.sender.clone().try_send(task::Command::Close) {
189            Ok(()) => {}
190            Err(e) => assert!(e.is_disconnected(), "No capacity for close command."),
191        };
192    }
193}
194
195struct PendingConnection {
196    /// [`PeerId`] of the remote peer.
197    peer_id: Option<PeerId>,
198    endpoint: PendingPoint,
199    /// When dropped, notifies the task which then knows to terminate.
200    abort_notifier: Option<oneshot::Sender<Void>>,
201    /// The moment we became aware of this possible connection, useful for timing metrics.
202    accepted_at: Instant,
203}
204
205impl PendingConnection {
206    fn is_for_same_remote_as(&self, other: PeerId) -> bool {
207        self.peer_id.map_or(false, |peer| peer == other)
208    }
209
210    /// Aborts the connection attempt, closing the connection.
211    fn abort(&mut self) {
212        if let Some(notifier) = self.abort_notifier.take() {
213            drop(notifier);
214        }
215    }
216}
217
218impl<THandler: ConnectionHandler> fmt::Debug for Pool<THandler> {
219    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
220        f.debug_struct("Pool")
221            .field("counters", &self.counters)
222            .finish()
223    }
224}
225
226/// Event that can happen on the `Pool`.
227#[derive(Debug)]
228#[allow(deprecated)]
229pub(crate) enum PoolEvent<THandler: ConnectionHandler> {
230    /// A new connection has been established.
231    ConnectionEstablished {
232        id: ConnectionId,
233        peer_id: PeerId,
234        endpoint: ConnectedPoint,
235        connection: NewConnection,
236        /// [`Some`] when the new connection is an outgoing connection.
237        /// Addresses are dialed in parallel. Contains the addresses and errors
238        /// of dial attempts that failed before the one successful dial.
239        concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<std::io::Error>)>>,
240        /// How long it took to establish this connection.
241        established_in: std::time::Duration,
242    },
243
244    /// An established connection was closed.
245    ///
246    /// A connection may close if
247    ///
248    ///   * it encounters an error, which includes the connection being
249    ///     closed by the remote. In this case `error` is `Some`.
250    ///   * it was actively closed by [`EstablishedConnection::start_close`],
251    ///     i.e. a successful, orderly close.
252    ///   * it was actively closed by [`Pool::disconnect`], i.e.
253    ///     dropped without an orderly close.
254    ///
255    ConnectionClosed {
256        id: ConnectionId,
257        /// Information about the connection that errored.
258        connected: Connected,
259        /// The error that occurred, if any. If `None`, the connection
260        /// was closed by the local peer.
261        error: Option<ConnectionError<THandler::Error>>,
262        /// The remaining established connections to the same peer.
263        remaining_established_connection_ids: Vec<ConnectionId>,
264        handler: THandler,
265    },
266
267    /// An outbound connection attempt failed.
268    PendingOutboundConnectionError {
269        /// The ID of the failed connection.
270        id: ConnectionId,
271        /// The error that occurred.
272        error: PendingOutboundConnectionError,
273        /// The (expected) peer of the failed connection.
274        peer: Option<PeerId>,
275    },
276
277    /// An inbound connection attempt failed.
278    PendingInboundConnectionError {
279        /// The ID of the failed connection.
280        id: ConnectionId,
281        /// Address used to send back data to the remote.
282        send_back_addr: Multiaddr,
283        /// Local connection address.
284        local_addr: Multiaddr,
285        /// The error that occurred.
286        error: PendingInboundConnectionError,
287    },
288
289    /// A node has produced an event.
290    ConnectionEvent {
291        id: ConnectionId,
292        peer_id: PeerId,
293        /// The produced event.
294        event: THandler::ToBehaviour,
295    },
296
297    /// The connection to a node has changed its address.
298    AddressChange {
299        id: ConnectionId,
300        peer_id: PeerId,
301        /// The new endpoint.
302        new_endpoint: ConnectedPoint,
303        /// The old endpoint.
304        old_endpoint: ConnectedPoint,
305    },
306}
307
308impl<THandler> Pool<THandler>
309where
310    THandler: ConnectionHandler,
311{
312    /// Creates a new empty `Pool`.
313    pub(crate) fn new(local_id: PeerId, config: PoolConfig) -> Self {
314        let (pending_connection_events_tx, pending_connection_events_rx) = mpsc::channel(0);
315        let executor = match config.executor {
316            Some(exec) => ExecSwitch::Executor(exec),
317            None => ExecSwitch::LocalSpawn(Default::default()),
318        };
319        Pool {
320            local_id,
321            counters: ConnectionCounters::new(),
322            established: Default::default(),
323            pending: Default::default(),
324            task_command_buffer_size: config.task_command_buffer_size,
325            dial_concurrency_factor: config.dial_concurrency_factor,
326            substream_upgrade_protocol_override: config.substream_upgrade_protocol_override,
327            max_negotiating_inbound_streams: config.max_negotiating_inbound_streams,
328            per_connection_event_buffer_size: config.per_connection_event_buffer_size,
329            idle_connection_timeout: config.idle_connection_timeout,
330            executor,
331            pending_connection_events_tx,
332            pending_connection_events_rx,
333            no_established_connections_waker: None,
334            established_connection_events: Default::default(),
335            new_connection_dropped_listeners: Default::default(),
336        }
337    }
338
339    /// Gets the dedicated connection counters.
340    pub(crate) fn counters(&self) -> &ConnectionCounters {
341        &self.counters
342    }
343
344    /// Gets an established connection from the pool by ID.
345    pub(crate) fn get_established(
346        &mut self,
347        id: ConnectionId,
348    ) -> Option<&mut EstablishedConnection<THandler::FromBehaviour>> {
349        self.established
350            .values_mut()
351            .find_map(|connections| connections.get_mut(&id))
352    }
353
354    /// Returns true if we are connected to the given peer.
355    ///
356    /// This will return true only after a `NodeReached` event has been produced by `poll()`.
357    pub(crate) fn is_connected(&self, id: PeerId) -> bool {
358        self.established.contains_key(&id)
359    }
360
361    /// Returns the number of connected peers, i.e. those with at least one
362    /// established connection in the pool.
363    pub(crate) fn num_peers(&self) -> usize {
364        self.established.len()
365    }
366
367    /// (Forcefully) close all connections to the given peer.
368    ///
369    /// All connections to the peer, whether pending or established are
370    /// closed asap and no more events from these connections are emitted
371    /// by the pool effective immediately.
372    pub(crate) fn disconnect(&mut self, peer: PeerId) {
373        if let Some(conns) = self.established.get_mut(&peer) {
374            for (_, conn) in conns.iter_mut() {
375                conn.start_close();
376            }
377        }
378
379        for connection in self
380            .pending
381            .iter_mut()
382            .filter_map(|(_, info)| info.is_for_same_remote_as(peer).then_some(info))
383        {
384            connection.abort()
385        }
386    }
387
388    /// Returns an iterator over all established connections of `peer`.
389    pub(crate) fn iter_established_connections_of_peer(
390        &mut self,
391        peer: &PeerId,
392    ) -> impl Iterator<Item = ConnectionId> + '_ {
393        match self.established.get(peer) {
394            Some(conns) => either::Either::Left(conns.iter().map(|(id, _)| *id)),
395            None => either::Either::Right(std::iter::empty()),
396        }
397    }
398
399    /// Checks whether we are currently dialing the given peer.
400    pub(crate) fn is_dialing(&self, peer: PeerId) -> bool {
401        self.pending.iter().any(|(_, info)| {
402            matches!(info.endpoint, PendingPoint::Dialer { .. }) && info.is_for_same_remote_as(peer)
403        })
404    }
405
406    /// Returns an iterator over all connected peers, i.e. those that have
407    /// at least one established connection in the pool.
408    pub(crate) fn iter_connected(&self) -> impl Iterator<Item = &PeerId> {
409        self.established.keys()
410    }
411
412    /// Adds a pending outgoing connection to the pool in the form of a `Future`
413    /// that establishes and negotiates the connection.
414    pub(crate) fn add_outgoing(
415        &mut self,
416        dials: Vec<
417            BoxFuture<
418                'static,
419                (
420                    Multiaddr,
421                    Result<(PeerId, StreamMuxerBox), TransportError<std::io::Error>>,
422                ),
423            >,
424        >,
425        peer: Option<PeerId>,
426        role_override: Endpoint,
427        dial_concurrency_factor_override: Option<NonZeroU8>,
428        connection_id: ConnectionId,
429    ) {
430        let dial = ConcurrentDial::new(
431            dials,
432            dial_concurrency_factor_override.unwrap_or(self.dial_concurrency_factor),
433        );
434
435        let (abort_notifier, abort_receiver) = oneshot::channel();
436
437        self.executor
438            .spawn(task::new_for_pending_outgoing_connection(
439                connection_id,
440                dial,
441                abort_receiver,
442                self.pending_connection_events_tx.clone(),
443            ));
444
445        let endpoint = PendingPoint::Dialer { role_override };
446
447        self.counters.inc_pending(&endpoint);
448        self.pending.insert(
449            connection_id,
450            PendingConnection {
451                peer_id: peer,
452                endpoint,
453                abort_notifier: Some(abort_notifier),
454                accepted_at: Instant::now(),
455            },
456        );
457    }
458
459    /// Adds a pending incoming connection to the pool in the form of a
460    /// `Future` that establishes and negotiates the connection.
461    pub(crate) fn add_incoming<TFut>(
462        &mut self,
463        future: TFut,
464        info: IncomingInfo<'_>,
465        connection_id: ConnectionId,
466    ) where
467        TFut: Future<Output = Result<(PeerId, StreamMuxerBox), std::io::Error>> + Send + 'static,
468    {
469        let endpoint = info.create_connected_point();
470
471        let (abort_notifier, abort_receiver) = oneshot::channel();
472
473        self.executor
474            .spawn(task::new_for_pending_incoming_connection(
475                connection_id,
476                future,
477                abort_receiver,
478                self.pending_connection_events_tx.clone(),
479            ));
480
481        self.counters.inc_pending_incoming();
482        self.pending.insert(
483            connection_id,
484            PendingConnection {
485                peer_id: None,
486                endpoint: endpoint.into(),
487                abort_notifier: Some(abort_notifier),
488                accepted_at: Instant::now(),
489            },
490        );
491    }
492
493    pub(crate) fn spawn_connection(
494        &mut self,
495        id: ConnectionId,
496        obtained_peer_id: PeerId,
497        endpoint: &ConnectedPoint,
498        connection: NewConnection,
499        handler: THandler,
500    ) {
501        let connection = connection.extract();
502
503        let conns = self.established.entry(obtained_peer_id).or_default();
504        self.counters.inc_established(endpoint);
505
506        let (command_sender, command_receiver) = mpsc::channel(self.task_command_buffer_size);
507        let (event_sender, event_receiver) = mpsc::channel(self.per_connection_event_buffer_size);
508
509        conns.insert(
510            id,
511            EstablishedConnection {
512                endpoint: endpoint.clone(),
513                sender: command_sender,
514            },
515        );
516        self.established_connection_events.push(event_receiver);
517        if let Some(waker) = self.no_established_connections_waker.take() {
518            waker.wake();
519        }
520
521        let connection = Connection::new(
522            connection,
523            handler,
524            self.substream_upgrade_protocol_override,
525            self.max_negotiating_inbound_streams,
526            self.idle_connection_timeout,
527        );
528
529        self.executor.spawn(task::new_for_established_connection(
530            id,
531            obtained_peer_id,
532            connection,
533            command_receiver,
534            event_sender,
535        ))
536    }
537
538    /// Polls the connection pool for events.
539    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PoolEvent<THandler>>
540    where
541        THandler: ConnectionHandler + 'static,
542        <THandler as ConnectionHandler>::OutboundOpenInfo: Send,
543    {
544        // Poll for events of established connections.
545        //
546        // Note that established connections are polled before pending connections, thus
547        // prioritizing established connections over pending connections.
548        match self.established_connection_events.poll_next_unpin(cx) {
549            Poll::Pending => {}
550            Poll::Ready(None) => {
551                self.no_established_connections_waker = Some(cx.waker().clone());
552            }
553
554            Poll::Ready(Some(task::EstablishedConnectionEvent::Notify { id, peer_id, event })) => {
555                return Poll::Ready(PoolEvent::ConnectionEvent { peer_id, id, event });
556            }
557            Poll::Ready(Some(task::EstablishedConnectionEvent::AddressChange {
558                id,
559                peer_id,
560                new_address,
561            })) => {
562                let connection = self
563                    .established
564                    .get_mut(&peer_id)
565                    .expect("Receive `AddressChange` event for established peer.")
566                    .get_mut(&id)
567                    .expect("Receive `AddressChange` event from established connection");
568                let mut new_endpoint = connection.endpoint.clone();
569                new_endpoint.set_remote_address(new_address);
570                let old_endpoint =
571                    std::mem::replace(&mut connection.endpoint, new_endpoint.clone());
572
573                return Poll::Ready(PoolEvent::AddressChange {
574                    peer_id,
575                    id,
576                    new_endpoint,
577                    old_endpoint,
578                });
579            }
580            Poll::Ready(Some(task::EstablishedConnectionEvent::Closed {
581                id,
582                peer_id,
583                error,
584                handler,
585            })) => {
586                let connections = self
587                    .established
588                    .get_mut(&peer_id)
589                    .expect("`Closed` event for established connection");
590                let EstablishedConnection { endpoint, .. } =
591                    connections.remove(&id).expect("Connection to be present");
592                self.counters.dec_established(&endpoint);
593                let remaining_established_connection_ids: Vec<ConnectionId> =
594                    connections.keys().cloned().collect();
595                if remaining_established_connection_ids.is_empty() {
596                    self.established.remove(&peer_id);
597                }
598                return Poll::Ready(PoolEvent::ConnectionClosed {
599                    id,
600                    connected: Connected { endpoint, peer_id },
601                    error,
602                    remaining_established_connection_ids,
603                    handler,
604                });
605            }
606        }
607
608        // Poll for events of pending connections.
609        loop {
610            if let Poll::Ready(Some(result)) =
611                self.new_connection_dropped_listeners.poll_next_unpin(cx)
612            {
613                if let Ok(dropped_connection) = result {
614                    self.executor.spawn(async move {
615                        let _ = dropped_connection.close().await;
616                    });
617                }
618                continue;
619            }
620
621            let event = match self.pending_connection_events_rx.poll_next_unpin(cx) {
622                Poll::Ready(Some(event)) => event,
623                Poll::Pending => break,
624                Poll::Ready(None) => unreachable!("Pool holds both sender and receiver."),
625            };
626
627            match event {
628                task::PendingConnectionEvent::ConnectionEstablished {
629                    id,
630                    output: (obtained_peer_id, mut muxer),
631                    outgoing,
632                } => {
633                    let PendingConnection {
634                        peer_id: expected_peer_id,
635                        endpoint,
636                        abort_notifier: _,
637                        accepted_at,
638                    } = self
639                        .pending
640                        .remove(&id)
641                        .expect("Entry in `self.pending` for previously pending connection.");
642
643                    self.counters.dec_pending(&endpoint);
644
645                    let (endpoint, concurrent_dial_errors) = match (endpoint, outgoing) {
646                        (PendingPoint::Dialer { role_override }, Some((address, errors))) => (
647                            ConnectedPoint::Dialer {
648                                address,
649                                role_override,
650                            },
651                            Some(errors),
652                        ),
653                        (
654                            PendingPoint::Listener {
655                                local_addr,
656                                send_back_addr,
657                            },
658                            None,
659                        ) => (
660                            ConnectedPoint::Listener {
661                                local_addr,
662                                send_back_addr,
663                            },
664                            None,
665                        ),
666                        (PendingPoint::Dialer { .. }, None) => unreachable!(
667                            "Established incoming connection via pending outgoing connection."
668                        ),
669                        (PendingPoint::Listener { .. }, Some(_)) => unreachable!(
670                            "Established outgoing connection via pending incoming connection."
671                        ),
672                    };
673
674                    let check_peer_id = || {
675                        if let Some(peer) = expected_peer_id {
676                            if peer != obtained_peer_id {
677                                return Err(PendingConnectionError::WrongPeerId {
678                                    obtained: obtained_peer_id,
679                                    endpoint: endpoint.clone(),
680                                });
681                            }
682                        }
683
684                        if self.local_id == obtained_peer_id {
685                            return Err(PendingConnectionError::LocalPeerId {
686                                endpoint: endpoint.clone(),
687                            });
688                        }
689
690                        Ok(())
691                    };
692
693                    if let Err(error) = check_peer_id() {
694                        self.executor.spawn(poll_fn(move |cx| {
695                            if let Err(e) = ready!(muxer.poll_close_unpin(cx)) {
696                                log::debug!(
697                                    "Failed to close connection {:?} to peer {}: {:?}",
698                                    id,
699                                    obtained_peer_id,
700                                    e
701                                );
702                            }
703                            Poll::Ready(())
704                        }));
705
706                        match endpoint {
707                            ConnectedPoint::Dialer { .. } => {
708                                return Poll::Ready(PoolEvent::PendingOutboundConnectionError {
709                                    id,
710                                    error: error
711                                        .map(|t| vec![(endpoint.get_remote_address().clone(), t)]),
712                                    peer: expected_peer_id.or(Some(obtained_peer_id)),
713                                })
714                            }
715                            ConnectedPoint::Listener {
716                                send_back_addr,
717                                local_addr,
718                            } => {
719                                return Poll::Ready(PoolEvent::PendingInboundConnectionError {
720                                    id,
721                                    error,
722                                    send_back_addr,
723                                    local_addr,
724                                })
725                            }
726                        };
727                    }
728
729                    let established_in = accepted_at.elapsed();
730
731                    let (connection, drop_listener) = NewConnection::new(muxer);
732                    self.new_connection_dropped_listeners.push(drop_listener);
733
734                    return Poll::Ready(PoolEvent::ConnectionEstablished {
735                        peer_id: obtained_peer_id,
736                        endpoint,
737                        id,
738                        connection,
739                        concurrent_dial_errors,
740                        established_in,
741                    });
742                }
743                task::PendingConnectionEvent::PendingFailed { id, error } => {
744                    if let Some(PendingConnection {
745                        peer_id,
746                        endpoint,
747                        abort_notifier: _,
748                        accepted_at: _, // Ignoring the time it took for the connection to fail.
749                    }) = self.pending.remove(&id)
750                    {
751                        self.counters.dec_pending(&endpoint);
752
753                        match (endpoint, error) {
754                            (PendingPoint::Dialer { .. }, Either::Left(error)) => {
755                                return Poll::Ready(PoolEvent::PendingOutboundConnectionError {
756                                    id,
757                                    error,
758                                    peer: peer_id,
759                                });
760                            }
761                            (
762                                PendingPoint::Listener {
763                                    send_back_addr,
764                                    local_addr,
765                                },
766                                Either::Right(error),
767                            ) => {
768                                return Poll::Ready(PoolEvent::PendingInboundConnectionError {
769                                    id,
770                                    error,
771                                    send_back_addr,
772                                    local_addr,
773                                });
774                            }
775                            (PendingPoint::Dialer { .. }, Either::Right(_)) => {
776                                unreachable!("Inbound error for outbound connection.")
777                            }
778                            (PendingPoint::Listener { .. }, Either::Left(_)) => {
779                                unreachable!("Outbound error for inbound connection.")
780                            }
781                        }
782                    }
783                }
784            }
785        }
786
787        self.executor.advance_local(cx);
788
789        Poll::Pending
790    }
791}
792
793/// Opaque type for a new connection.
794///
795/// This connection has just been established but isn't part of the [`Pool`] yet.
796/// It either needs to be spawned via [`Pool::spawn_connection`] or dropped if undesired.
797///
798/// On drop, this type send the connection back to the [`Pool`] where it will be gracefully closed.
799#[derive(Debug)]
800pub(crate) struct NewConnection {
801    connection: Option<StreamMuxerBox>,
802    drop_sender: Option<oneshot::Sender<StreamMuxerBox>>,
803}
804
805impl NewConnection {
806    fn new(conn: StreamMuxerBox) -> (Self, oneshot::Receiver<StreamMuxerBox>) {
807        let (sender, receiver) = oneshot::channel();
808
809        (
810            Self {
811                connection: Some(conn),
812                drop_sender: Some(sender),
813            },
814            receiver,
815        )
816    }
817
818    fn extract(mut self) -> StreamMuxerBox {
819        self.connection.take().unwrap()
820    }
821}
822
823impl Drop for NewConnection {
824    fn drop(&mut self) {
825        if let Some(connection) = self.connection.take() {
826            let _ = self
827                .drop_sender
828                .take()
829                .expect("`drop_sender` to always be `Some`")
830                .send(connection);
831        }
832    }
833}
834
835/// Network connection information.
836#[derive(Debug, Clone)]
837pub struct ConnectionCounters {
838    /// The current number of incoming connections.
839    pending_incoming: u32,
840    /// The current number of outgoing connections.
841    pending_outgoing: u32,
842    /// The current number of established inbound connections.
843    established_incoming: u32,
844    /// The current number of established outbound connections.
845    established_outgoing: u32,
846}
847
848impl ConnectionCounters {
849    fn new() -> Self {
850        Self {
851            pending_incoming: 0,
852            pending_outgoing: 0,
853            established_incoming: 0,
854            established_outgoing: 0,
855        }
856    }
857
858    /// The total number of connections, both pending and established.
859    pub fn num_connections(&self) -> u32 {
860        self.num_pending() + self.num_established()
861    }
862
863    /// The total number of pending connections, both incoming and outgoing.
864    pub fn num_pending(&self) -> u32 {
865        self.pending_incoming + self.pending_outgoing
866    }
867
868    /// The number of incoming connections being established.
869    pub fn num_pending_incoming(&self) -> u32 {
870        self.pending_incoming
871    }
872
873    /// The number of outgoing connections being established.
874    pub fn num_pending_outgoing(&self) -> u32 {
875        self.pending_outgoing
876    }
877
878    /// The number of established incoming connections.
879    pub fn num_established_incoming(&self) -> u32 {
880        self.established_incoming
881    }
882
883    /// The number of established outgoing connections.
884    pub fn num_established_outgoing(&self) -> u32 {
885        self.established_outgoing
886    }
887
888    /// The total number of established connections.
889    pub fn num_established(&self) -> u32 {
890        self.established_outgoing + self.established_incoming
891    }
892
893    fn inc_pending(&mut self, endpoint: &PendingPoint) {
894        match endpoint {
895            PendingPoint::Dialer { .. } => {
896                self.pending_outgoing += 1;
897            }
898            PendingPoint::Listener { .. } => {
899                self.pending_incoming += 1;
900            }
901        }
902    }
903
904    fn inc_pending_incoming(&mut self) {
905        self.pending_incoming += 1;
906    }
907
908    fn dec_pending(&mut self, endpoint: &PendingPoint) {
909        match endpoint {
910            PendingPoint::Dialer { .. } => {
911                self.pending_outgoing -= 1;
912            }
913            PendingPoint::Listener { .. } => {
914                self.pending_incoming -= 1;
915            }
916        }
917    }
918
919    fn inc_established(&mut self, endpoint: &ConnectedPoint) {
920        match endpoint {
921            ConnectedPoint::Dialer { .. } => {
922                self.established_outgoing += 1;
923            }
924            ConnectedPoint::Listener { .. } => {
925                self.established_incoming += 1;
926            }
927        }
928    }
929
930    fn dec_established(&mut self, endpoint: &ConnectedPoint) {
931        match endpoint {
932            ConnectedPoint::Dialer { .. } => {
933                self.established_outgoing -= 1;
934            }
935            ConnectedPoint::Listener { .. } => {
936                self.established_incoming -= 1;
937            }
938        }
939    }
940}
941
942/// Configuration options when creating a [`Pool`].
943///
944/// The default configuration specifies no dedicated task executor, a
945/// task event buffer size of 32, and a task command buffer size of 7.
946pub(crate) struct PoolConfig {
947    /// Executor to use to spawn tasks.
948    pub(crate) executor: Option<Box<dyn Executor + Send>>,
949    /// Size of the task command buffer (per task).
950    pub(crate) task_command_buffer_size: usize,
951    /// Size of the pending connection task event buffer and the established connection task event
952    /// buffer.
953    pub(crate) per_connection_event_buffer_size: usize,
954    /// Number of addresses concurrently dialed for a single outbound connection attempt.
955    pub(crate) dial_concurrency_factor: NonZeroU8,
956    /// How long a connection should be kept alive once it is idling.
957    pub(crate) idle_connection_timeout: Duration,
958    /// The configured override for substream protocol upgrades, if any.
959    substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
960
961    /// The maximum number of inbound streams concurrently negotiating on a connection.
962    ///
963    /// See [`Connection::max_negotiating_inbound_streams`].
964    max_negotiating_inbound_streams: usize,
965}
966
967impl PoolConfig {
968    pub(crate) fn new(executor: Option<Box<dyn Executor + Send>>) -> Self {
969        Self {
970            executor,
971            task_command_buffer_size: 32,
972            per_connection_event_buffer_size: 7,
973            dial_concurrency_factor: NonZeroU8::new(8).expect("8 > 0"),
974            idle_connection_timeout: Duration::ZERO,
975            substream_upgrade_protocol_override: None,
976            max_negotiating_inbound_streams: 128,
977        }
978    }
979
980    /// Sets the maximum number of events sent to a connection's background task
981    /// that may be buffered, if the task cannot keep up with their consumption and
982    /// delivery to the connection handler.
983    ///
984    /// When the buffer for a particular connection is full, `notify_handler` will no
985    /// longer be able to deliver events to the associated [`Connection`],
986    /// thus exerting back-pressure on the connection and peer API.
987    pub(crate) fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
988        self.task_command_buffer_size = n.get() - 1;
989        self
990    }
991
992    /// Sets the maximum number of buffered connection events (beyond a guaranteed
993    /// buffer of 1 event per connection).
994    ///
995    /// When the buffer is full, the background tasks of all connections will stall.
996    /// In this way, the consumers of network events exert back-pressure on
997    /// the network connection I/O.
998    pub(crate) fn with_per_connection_event_buffer_size(mut self, n: usize) -> Self {
999        self.per_connection_event_buffer_size = n;
1000        self
1001    }
1002
1003    /// Number of addresses concurrently dialed for a single outbound connection attempt.
1004    pub(crate) fn with_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
1005        self.dial_concurrency_factor = factor;
1006        self
1007    }
1008
1009    /// Configures an override for the substream upgrade protocol to use.
1010    pub(crate) fn with_substream_upgrade_protocol_override(
1011        mut self,
1012        v: libp2p_core::upgrade::Version,
1013    ) -> Self {
1014        self.substream_upgrade_protocol_override = Some(v);
1015        self
1016    }
1017
1018    /// The maximum number of inbound streams concurrently negotiating on a connection.
1019    ///
1020    /// See [`Connection::max_negotiating_inbound_streams`].
1021    pub(crate) fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self {
1022        self.max_negotiating_inbound_streams = v;
1023        self
1024    }
1025}
1026
1027trait EntryExt<'a, K, V> {
1028    fn expect_occupied(self, msg: &'static str) -> hash_map::OccupiedEntry<'a, K, V>;
1029}
1030
1031impl<'a, K: 'a, V: 'a> EntryExt<'a, K, V> for hash_map::Entry<'a, K, V> {
1032    fn expect_occupied(self, msg: &'static str) -> hash_map::OccupiedEntry<'a, K, V> {
1033        match self {
1034            hash_map::Entry::Occupied(entry) => entry,
1035            hash_map::Entry::Vacant(_) => panic!("{}", msg),
1036        }
1037    }
1038}
1039
1040#[cfg(test)]
1041mod tests {
1042    use super::*;
1043    use futures::future::Future;
1044
1045    struct Dummy;
1046
1047    impl Executor for Dummy {
1048        fn exec(&self, _: Pin<Box<dyn Future<Output = ()> + Send>>) {}
1049    }
1050}