libp2p_swarm/connection/
pool.rs

1// Copyright 2021 Protocol Labs.
2// Copyright 2018 Parity Technologies (UK) Ltd.
3//
4// Permission is hereby granted, free of charge, to any person obtaining a
5// copy of this software and associated documentation files (the "Software"),
6// to deal in the Software without restriction, including without limitation
7// the rights to use, copy, modify, merge, publish, distribute, sublicense,
8// and/or sell copies of the Software, and to permit persons to whom the
9// Software is furnished to do so, subject to the following conditions:
10//
11// The above copyright notice and this permission notice shall be included in
12// all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20// DEALINGS IN THE SOFTWARE.
21use crate::connection::{Connection, ConnectionId, PendingPoint};
22use crate::{
23    connection::{
24        Connected, ConnectionError, IncomingInfo, PendingConnectionError,
25        PendingInboundConnectionError, PendingOutboundConnectionError,
26    },
27    transport::TransportError,
28    ConnectedPoint, ConnectionHandler, Executor, Multiaddr, PeerId,
29};
30use concurrent_dial::ConcurrentDial;
31use fnv::FnvHashMap;
32use futures::prelude::*;
33use futures::stream::SelectAll;
34use futures::{
35    channel::{mpsc, oneshot},
36    future::{poll_fn, BoxFuture, Either},
37    ready,
38    stream::FuturesUnordered,
39};
40use libp2p_core::connection::Endpoint;
41use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt};
42use libp2p_core::transport::PortUse;
43use std::task::Waker;
44use std::{
45    collections::HashMap,
46    fmt,
47    num::{NonZeroU8, NonZeroUsize},
48    pin::Pin,
49    task::Context,
50    task::Poll,
51};
52use tracing::Instrument;
53use void::Void;
54use web_time::{Duration, Instant};
55
56mod concurrent_dial;
57mod task;
58
59enum ExecSwitch {
60    Executor(Box<dyn Executor + Send>),
61    LocalSpawn(FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>),
62}
63
64impl ExecSwitch {
65    fn advance_local(&mut self, cx: &mut Context) {
66        match self {
67            ExecSwitch::Executor(_) => {}
68            ExecSwitch::LocalSpawn(local) => {
69                while let Poll::Ready(Some(())) = local.poll_next_unpin(cx) {}
70            }
71        }
72    }
73
74    #[track_caller]
75    fn spawn(&mut self, task: impl Future<Output = ()> + Send + 'static) {
76        let task = task.boxed();
77
78        match self {
79            Self::Executor(executor) => executor.exec(task),
80            Self::LocalSpawn(local) => local.push(task),
81        }
82    }
83}
84
85/// A connection `Pool` manages a set of connections for each peer.
86pub(crate) struct Pool<THandler>
87where
88    THandler: ConnectionHandler,
89{
90    local_id: PeerId,
91
92    /// The connection counter(s).
93    counters: ConnectionCounters,
94
95    /// The managed connections of each peer that are currently considered established.
96    established: FnvHashMap<
97        PeerId,
98        FnvHashMap<ConnectionId, EstablishedConnection<THandler::FromBehaviour>>,
99    >,
100
101    /// The pending connections that are currently being negotiated.
102    pending: HashMap<ConnectionId, PendingConnection>,
103
104    /// Size of the task command buffer (per task).
105    task_command_buffer_size: usize,
106
107    /// Number of addresses concurrently dialed for a single outbound connection attempt.
108    dial_concurrency_factor: NonZeroU8,
109
110    /// The configured override for substream protocol upgrades, if any.
111    substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
112
113    /// The maximum number of inbound streams concurrently negotiating on a connection.
114    ///
115    /// See [`Connection::max_negotiating_inbound_streams`].
116    max_negotiating_inbound_streams: usize,
117
118    /// How many [`task::EstablishedConnectionEvent`]s can be buffered before the connection is back-pressured.
119    per_connection_event_buffer_size: usize,
120
121    /// The executor to use for running connection tasks. Can either be a global executor
122    /// or a local queue.
123    executor: ExecSwitch,
124
125    /// Sender distributed to pending tasks for reporting events back
126    /// to the pool.
127    pending_connection_events_tx: mpsc::Sender<task::PendingConnectionEvent>,
128
129    /// Receiver for events reported from pending tasks.
130    pending_connection_events_rx: mpsc::Receiver<task::PendingConnectionEvent>,
131
132    /// Waker in case we haven't established any connections yet.
133    no_established_connections_waker: Option<Waker>,
134
135    /// Receivers for events reported from established connections.
136    established_connection_events:
137        SelectAll<mpsc::Receiver<task::EstablishedConnectionEvent<THandler::ToBehaviour>>>,
138
139    /// Receivers for [`NewConnection`] objects that are dropped.
140    new_connection_dropped_listeners: FuturesUnordered<oneshot::Receiver<StreamMuxerBox>>,
141
142    /// How long a connection should be kept alive once it starts idling.
143    idle_connection_timeout: Duration,
144}
145
146#[derive(Debug)]
147pub(crate) struct EstablishedConnection<TInEvent> {
148    endpoint: ConnectedPoint,
149    /// Channel endpoint to send commands to the task.
150    sender: mpsc::Sender<task::Command<TInEvent>>,
151}
152
153impl<TInEvent> EstablishedConnection<TInEvent> {
154    /// (Asynchronously) sends an event to the connection handler.
155    ///
156    /// If the handler is not ready to receive the event, either because
157    /// it is busy or the connection is about to close, the given event
158    /// is returned with an `Err`.
159    ///
160    /// If execution of this method is preceded by successful execution of
161    /// `poll_ready_notify_handler` without another intervening execution
162    /// of `notify_handler`, it only fails if the connection is now about
163    /// to close.
164    pub(crate) fn notify_handler(&mut self, event: TInEvent) -> Result<(), TInEvent> {
165        let cmd = task::Command::NotifyHandler(event);
166        self.sender.try_send(cmd).map_err(|e| match e.into_inner() {
167            task::Command::NotifyHandler(event) => event,
168            _ => unreachable!("Expect failed send to return initial event."),
169        })
170    }
171
172    /// Checks if `notify_handler` is ready to accept an event.
173    ///
174    /// Returns `Ok(())` if the handler is ready to receive an event via `notify_handler`.
175    ///
176    /// Returns `Err(())` if the background task associated with the connection
177    /// is terminating and the connection is about to close.
178    pub(crate) fn poll_ready_notify_handler(
179        &mut self,
180        cx: &mut Context<'_>,
181    ) -> Poll<Result<(), ()>> {
182        self.sender.poll_ready(cx).map_err(|_| ())
183    }
184
185    /// Initiates a graceful close of the connection.
186    ///
187    /// Has no effect if the connection is already closing.
188    pub(crate) fn start_close(&mut self) {
189        // Clone the sender so that we are guaranteed to have
190        // capacity for the close command (every sender gets a slot).
191        match self.sender.clone().try_send(task::Command::Close) {
192            Ok(()) => {}
193            Err(e) => assert!(e.is_disconnected(), "No capacity for close command."),
194        };
195    }
196}
197
198struct PendingConnection {
199    /// [`PeerId`] of the remote peer.
200    peer_id: Option<PeerId>,
201    endpoint: PendingPoint,
202    /// When dropped, notifies the task which then knows to terminate.
203    abort_notifier: Option<oneshot::Sender<Void>>,
204    /// The moment we became aware of this possible connection, useful for timing metrics.
205    accepted_at: Instant,
206}
207
208impl PendingConnection {
209    fn is_for_same_remote_as(&self, other: PeerId) -> bool {
210        self.peer_id.map_or(false, |peer| peer == other)
211    }
212
213    /// Aborts the connection attempt, closing the connection.
214    fn abort(&mut self) {
215        if let Some(notifier) = self.abort_notifier.take() {
216            drop(notifier);
217        }
218    }
219}
220
221impl<THandler: ConnectionHandler> fmt::Debug for Pool<THandler> {
222    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
223        f.debug_struct("Pool")
224            .field("counters", &self.counters)
225            .finish()
226    }
227}
228
229/// Event that can happen on the `Pool`.
230#[derive(Debug)]
231pub(crate) enum PoolEvent<ToBehaviour> {
232    /// A new connection has been established.
233    ConnectionEstablished {
234        id: ConnectionId,
235        peer_id: PeerId,
236        endpoint: ConnectedPoint,
237        connection: NewConnection,
238        /// [`Some`] when the new connection is an outgoing connection.
239        /// Addresses are dialed in parallel. Contains the addresses and errors
240        /// of dial attempts that failed before the one successful dial.
241        concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<std::io::Error>)>>,
242        /// How long it took to establish this connection.
243        established_in: std::time::Duration,
244    },
245
246    /// An established connection was closed.
247    ///
248    /// A connection may close if
249    ///
250    ///   * it encounters an error, which includes the connection being
251    ///     closed by the remote. In this case `error` is `Some`.
252    ///   * it was actively closed by [`EstablishedConnection::start_close`],
253    ///     i.e. a successful, orderly close.
254    ///   * it was actively closed by [`Pool::disconnect`], i.e.
255    ///     dropped without an orderly close.
256    ///
257    ConnectionClosed {
258        id: ConnectionId,
259        /// Information about the connection that errored.
260        connected: Connected,
261        /// The error that occurred, if any. If `None`, the connection
262        /// was closed by the local peer.
263        error: Option<ConnectionError>,
264        /// The remaining established connections to the same peer.
265        remaining_established_connection_ids: Vec<ConnectionId>,
266    },
267
268    /// An outbound connection attempt failed.
269    PendingOutboundConnectionError {
270        /// The ID of the failed connection.
271        id: ConnectionId,
272        /// The error that occurred.
273        error: PendingOutboundConnectionError,
274        /// The (expected) peer of the failed connection.
275        peer: Option<PeerId>,
276    },
277
278    /// An inbound connection attempt failed.
279    PendingInboundConnectionError {
280        /// The ID of the failed connection.
281        id: ConnectionId,
282        /// Address used to send back data to the remote.
283        send_back_addr: Multiaddr,
284        /// Local connection address.
285        local_addr: Multiaddr,
286        /// The error that occurred.
287        error: PendingInboundConnectionError,
288    },
289
290    /// A node has produced an event.
291    ConnectionEvent {
292        id: ConnectionId,
293        peer_id: PeerId,
294        /// The produced event.
295        event: ToBehaviour,
296    },
297
298    /// The connection to a node has changed its address.
299    AddressChange {
300        id: ConnectionId,
301        peer_id: PeerId,
302        /// The new endpoint.
303        new_endpoint: ConnectedPoint,
304        /// The old endpoint.
305        old_endpoint: ConnectedPoint,
306    },
307}
308
309impl<THandler> Pool<THandler>
310where
311    THandler: ConnectionHandler,
312{
313    /// Creates a new empty `Pool`.
314    pub(crate) fn new(local_id: PeerId, config: PoolConfig) -> Self {
315        let (pending_connection_events_tx, pending_connection_events_rx) = mpsc::channel(0);
316        let executor = match config.executor {
317            Some(exec) => ExecSwitch::Executor(exec),
318            None => ExecSwitch::LocalSpawn(Default::default()),
319        };
320        Pool {
321            local_id,
322            counters: ConnectionCounters::new(),
323            established: Default::default(),
324            pending: Default::default(),
325            task_command_buffer_size: config.task_command_buffer_size,
326            dial_concurrency_factor: config.dial_concurrency_factor,
327            substream_upgrade_protocol_override: config.substream_upgrade_protocol_override,
328            max_negotiating_inbound_streams: config.max_negotiating_inbound_streams,
329            per_connection_event_buffer_size: config.per_connection_event_buffer_size,
330            idle_connection_timeout: config.idle_connection_timeout,
331            executor,
332            pending_connection_events_tx,
333            pending_connection_events_rx,
334            no_established_connections_waker: None,
335            established_connection_events: Default::default(),
336            new_connection_dropped_listeners: Default::default(),
337        }
338    }
339
340    /// Gets the dedicated connection counters.
341    pub(crate) fn counters(&self) -> &ConnectionCounters {
342        &self.counters
343    }
344
345    /// Gets an established connection from the pool by ID.
346    pub(crate) fn get_established(
347        &mut self,
348        id: ConnectionId,
349    ) -> Option<&mut EstablishedConnection<THandler::FromBehaviour>> {
350        self.established
351            .values_mut()
352            .find_map(|connections| connections.get_mut(&id))
353    }
354
355    /// Returns true if we are connected to the given peer.
356    ///
357    /// This will return true only after a `NodeReached` event has been produced by `poll()`.
358    pub(crate) fn is_connected(&self, id: PeerId) -> bool {
359        self.established.contains_key(&id)
360    }
361
362    /// Returns the number of connected peers, i.e. those with at least one
363    /// established connection in the pool.
364    pub(crate) fn num_peers(&self) -> usize {
365        self.established.len()
366    }
367
368    /// (Forcefully) close all connections to the given peer.
369    ///
370    /// All connections to the peer, whether pending or established are
371    /// closed asap and no more events from these connections are emitted
372    /// by the pool effective immediately.
373    pub(crate) fn disconnect(&mut self, peer: PeerId) {
374        if let Some(conns) = self.established.get_mut(&peer) {
375            for (_, conn) in conns.iter_mut() {
376                conn.start_close();
377            }
378        }
379
380        for connection in self
381            .pending
382            .iter_mut()
383            .filter_map(|(_, info)| info.is_for_same_remote_as(peer).then_some(info))
384        {
385            connection.abort()
386        }
387    }
388
389    /// Returns an iterator over all established connections of `peer`.
390    pub(crate) fn iter_established_connections_of_peer(
391        &mut self,
392        peer: &PeerId,
393    ) -> impl Iterator<Item = ConnectionId> + '_ {
394        match self.established.get(peer) {
395            Some(conns) => either::Either::Left(conns.iter().map(|(id, _)| *id)),
396            None => either::Either::Right(std::iter::empty()),
397        }
398    }
399
400    /// Checks whether we are currently dialing the given peer.
401    pub(crate) fn is_dialing(&self, peer: PeerId) -> bool {
402        self.pending.iter().any(|(_, info)| {
403            matches!(info.endpoint, PendingPoint::Dialer { .. }) && info.is_for_same_remote_as(peer)
404        })
405    }
406
407    /// Returns an iterator over all connected peers, i.e. those that have
408    /// at least one established connection in the pool.
409    pub(crate) fn iter_connected(&self) -> impl Iterator<Item = &PeerId> {
410        self.established.keys()
411    }
412
413    /// Adds a pending outgoing connection to the pool in the form of a `Future`
414    /// that establishes and negotiates the connection.
415    pub(crate) fn add_outgoing(
416        &mut self,
417        dials: Vec<
418            BoxFuture<
419                'static,
420                (
421                    Multiaddr,
422                    Result<(PeerId, StreamMuxerBox), TransportError<std::io::Error>>,
423                ),
424            >,
425        >,
426        peer: Option<PeerId>,
427        role_override: Endpoint,
428        port_use: PortUse,
429        dial_concurrency_factor_override: Option<NonZeroU8>,
430        connection_id: ConnectionId,
431    ) {
432        let concurrency_factor =
433            dial_concurrency_factor_override.unwrap_or(self.dial_concurrency_factor);
434        let span = tracing::debug_span!(parent: tracing::Span::none(), "new_outgoing_connection", %concurrency_factor, num_dials=%dials.len(), id = %connection_id);
435        span.follows_from(tracing::Span::current());
436
437        let (abort_notifier, abort_receiver) = oneshot::channel();
438
439        self.executor.spawn(
440            task::new_for_pending_outgoing_connection(
441                connection_id,
442                ConcurrentDial::new(dials, concurrency_factor),
443                abort_receiver,
444                self.pending_connection_events_tx.clone(),
445            )
446            .instrument(span),
447        );
448
449        let endpoint = PendingPoint::Dialer {
450            role_override,
451            port_use,
452        };
453
454        self.counters.inc_pending(&endpoint);
455        self.pending.insert(
456            connection_id,
457            PendingConnection {
458                peer_id: peer,
459                endpoint,
460                abort_notifier: Some(abort_notifier),
461                accepted_at: Instant::now(),
462            },
463        );
464    }
465
466    /// Adds a pending incoming connection to the pool in the form of a
467    /// `Future` that establishes and negotiates the connection.
468    pub(crate) fn add_incoming<TFut>(
469        &mut self,
470        future: TFut,
471        info: IncomingInfo<'_>,
472        connection_id: ConnectionId,
473    ) where
474        TFut: Future<Output = Result<(PeerId, StreamMuxerBox), std::io::Error>> + Send + 'static,
475    {
476        let endpoint = info.create_connected_point();
477
478        let (abort_notifier, abort_receiver) = oneshot::channel();
479
480        let span = tracing::debug_span!(parent: tracing::Span::none(), "new_incoming_connection", remote_addr = %info.send_back_addr, id = %connection_id);
481        span.follows_from(tracing::Span::current());
482
483        self.executor.spawn(
484            task::new_for_pending_incoming_connection(
485                connection_id,
486                future,
487                abort_receiver,
488                self.pending_connection_events_tx.clone(),
489            )
490            .instrument(span),
491        );
492
493        self.counters.inc_pending_incoming();
494        self.pending.insert(
495            connection_id,
496            PendingConnection {
497                peer_id: None,
498                endpoint: endpoint.into(),
499                abort_notifier: Some(abort_notifier),
500                accepted_at: Instant::now(),
501            },
502        );
503    }
504
505    pub(crate) fn spawn_connection(
506        &mut self,
507        id: ConnectionId,
508        obtained_peer_id: PeerId,
509        endpoint: &ConnectedPoint,
510        connection: NewConnection,
511        handler: THandler,
512    ) {
513        let connection = connection.extract();
514        let conns = self.established.entry(obtained_peer_id).or_default();
515        self.counters.inc_established(endpoint);
516
517        let (command_sender, command_receiver) = mpsc::channel(self.task_command_buffer_size);
518        let (event_sender, event_receiver) = mpsc::channel(self.per_connection_event_buffer_size);
519
520        conns.insert(
521            id,
522            EstablishedConnection {
523                endpoint: endpoint.clone(),
524                sender: command_sender,
525            },
526        );
527        self.established_connection_events.push(event_receiver);
528        if let Some(waker) = self.no_established_connections_waker.take() {
529            waker.wake();
530        }
531
532        let connection = Connection::new(
533            connection,
534            handler,
535            self.substream_upgrade_protocol_override,
536            self.max_negotiating_inbound_streams,
537            self.idle_connection_timeout,
538        );
539
540        let span = tracing::debug_span!(parent: tracing::Span::none(), "new_established_connection", remote_addr = %endpoint.get_remote_address(), %id, peer = %obtained_peer_id);
541        span.follows_from(tracing::Span::current());
542
543        self.executor.spawn(
544            task::new_for_established_connection(
545                id,
546                obtained_peer_id,
547                connection,
548                command_receiver,
549                event_sender,
550            )
551            .instrument(span),
552        )
553    }
554
555    /// Polls the connection pool for events.
556    #[tracing::instrument(level = "debug", name = "Pool::poll", skip(self, cx))]
557    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PoolEvent<THandler::ToBehaviour>>
558    where
559        THandler: ConnectionHandler + 'static,
560        <THandler as ConnectionHandler>::OutboundOpenInfo: Send,
561    {
562        // Poll for events of established connections.
563        //
564        // Note that established connections are polled before pending connections, thus
565        // prioritizing established connections over pending connections.
566        match self.established_connection_events.poll_next_unpin(cx) {
567            Poll::Pending => {}
568            Poll::Ready(None) => {
569                self.no_established_connections_waker = Some(cx.waker().clone());
570            }
571
572            Poll::Ready(Some(task::EstablishedConnectionEvent::Notify { id, peer_id, event })) => {
573                return Poll::Ready(PoolEvent::ConnectionEvent { peer_id, id, event });
574            }
575            Poll::Ready(Some(task::EstablishedConnectionEvent::AddressChange {
576                id,
577                peer_id,
578                new_address,
579            })) => {
580                let connection = self
581                    .established
582                    .get_mut(&peer_id)
583                    .expect("Receive `AddressChange` event for established peer.")
584                    .get_mut(&id)
585                    .expect("Receive `AddressChange` event from established connection");
586                let mut new_endpoint = connection.endpoint.clone();
587                new_endpoint.set_remote_address(new_address);
588                let old_endpoint =
589                    std::mem::replace(&mut connection.endpoint, new_endpoint.clone());
590
591                return Poll::Ready(PoolEvent::AddressChange {
592                    peer_id,
593                    id,
594                    new_endpoint,
595                    old_endpoint,
596                });
597            }
598            Poll::Ready(Some(task::EstablishedConnectionEvent::Closed { id, peer_id, error })) => {
599                let connections = self
600                    .established
601                    .get_mut(&peer_id)
602                    .expect("`Closed` event for established connection");
603                let EstablishedConnection { endpoint, .. } =
604                    connections.remove(&id).expect("Connection to be present");
605                self.counters.dec_established(&endpoint);
606                let remaining_established_connection_ids: Vec<ConnectionId> =
607                    connections.keys().cloned().collect();
608                if remaining_established_connection_ids.is_empty() {
609                    self.established.remove(&peer_id);
610                }
611                return Poll::Ready(PoolEvent::ConnectionClosed {
612                    id,
613                    connected: Connected { endpoint, peer_id },
614                    error,
615                    remaining_established_connection_ids,
616                });
617            }
618        }
619
620        // Poll for events of pending connections.
621        loop {
622            if let Poll::Ready(Some(result)) =
623                self.new_connection_dropped_listeners.poll_next_unpin(cx)
624            {
625                if let Ok(dropped_connection) = result {
626                    self.executor.spawn(async move {
627                        let _ = dropped_connection.close().await;
628                    });
629                }
630                continue;
631            }
632
633            let event = match self.pending_connection_events_rx.poll_next_unpin(cx) {
634                Poll::Ready(Some(event)) => event,
635                Poll::Pending => break,
636                Poll::Ready(None) => unreachable!("Pool holds both sender and receiver."),
637            };
638
639            match event {
640                task::PendingConnectionEvent::ConnectionEstablished {
641                    id,
642                    output: (obtained_peer_id, mut muxer),
643                    outgoing,
644                } => {
645                    let PendingConnection {
646                        peer_id: expected_peer_id,
647                        endpoint,
648                        abort_notifier: _,
649                        accepted_at,
650                    } = self
651                        .pending
652                        .remove(&id)
653                        .expect("Entry in `self.pending` for previously pending connection.");
654
655                    self.counters.dec_pending(&endpoint);
656
657                    let (endpoint, concurrent_dial_errors) = match (endpoint, outgoing) {
658                        (
659                            PendingPoint::Dialer {
660                                role_override,
661                                port_use,
662                            },
663                            Some((address, errors)),
664                        ) => (
665                            ConnectedPoint::Dialer {
666                                address,
667                                role_override,
668                                port_use,
669                            },
670                            Some(errors),
671                        ),
672                        (
673                            PendingPoint::Listener {
674                                local_addr,
675                                send_back_addr,
676                            },
677                            None,
678                        ) => (
679                            ConnectedPoint::Listener {
680                                local_addr,
681                                send_back_addr,
682                            },
683                            None,
684                        ),
685                        (PendingPoint::Dialer { .. }, None) => unreachable!(
686                            "Established incoming connection via pending outgoing connection."
687                        ),
688                        (PendingPoint::Listener { .. }, Some(_)) => unreachable!(
689                            "Established outgoing connection via pending incoming connection."
690                        ),
691                    };
692
693                    let check_peer_id = || {
694                        if let Some(peer) = expected_peer_id {
695                            if peer != obtained_peer_id {
696                                return Err(PendingConnectionError::WrongPeerId {
697                                    obtained: obtained_peer_id,
698                                    endpoint: endpoint.clone(),
699                                });
700                            }
701                        }
702
703                        if self.local_id == obtained_peer_id {
704                            return Err(PendingConnectionError::LocalPeerId {
705                                endpoint: endpoint.clone(),
706                            });
707                        }
708
709                        Ok(())
710                    };
711
712                    if let Err(error) = check_peer_id() {
713                        self.executor.spawn(poll_fn(move |cx| {
714                            if let Err(e) = ready!(muxer.poll_close_unpin(cx)) {
715                                tracing::debug!(
716                                    peer=%obtained_peer_id,
717                                    connection=%id,
718                                    "Failed to close connection to peer: {:?}",
719                                    e
720                                );
721                            }
722                            Poll::Ready(())
723                        }));
724
725                        match endpoint {
726                            ConnectedPoint::Dialer { .. } => {
727                                return Poll::Ready(PoolEvent::PendingOutboundConnectionError {
728                                    id,
729                                    error: error
730                                        .map(|t| vec![(endpoint.get_remote_address().clone(), t)]),
731                                    peer: expected_peer_id.or(Some(obtained_peer_id)),
732                                })
733                            }
734                            ConnectedPoint::Listener {
735                                send_back_addr,
736                                local_addr,
737                            } => {
738                                return Poll::Ready(PoolEvent::PendingInboundConnectionError {
739                                    id,
740                                    error,
741                                    send_back_addr,
742                                    local_addr,
743                                })
744                            }
745                        };
746                    }
747
748                    let established_in = accepted_at.elapsed();
749
750                    let (connection, drop_listener) = NewConnection::new(muxer);
751                    self.new_connection_dropped_listeners.push(drop_listener);
752
753                    return Poll::Ready(PoolEvent::ConnectionEstablished {
754                        peer_id: obtained_peer_id,
755                        endpoint,
756                        id,
757                        connection,
758                        concurrent_dial_errors,
759                        established_in,
760                    });
761                }
762                task::PendingConnectionEvent::PendingFailed { id, error } => {
763                    if let Some(PendingConnection {
764                        peer_id,
765                        endpoint,
766                        abort_notifier: _,
767                        accepted_at: _, // Ignoring the time it took for the connection to fail.
768                    }) = self.pending.remove(&id)
769                    {
770                        self.counters.dec_pending(&endpoint);
771
772                        match (endpoint, error) {
773                            (PendingPoint::Dialer { .. }, Either::Left(error)) => {
774                                return Poll::Ready(PoolEvent::PendingOutboundConnectionError {
775                                    id,
776                                    error,
777                                    peer: peer_id,
778                                });
779                            }
780                            (
781                                PendingPoint::Listener {
782                                    send_back_addr,
783                                    local_addr,
784                                },
785                                Either::Right(error),
786                            ) => {
787                                return Poll::Ready(PoolEvent::PendingInboundConnectionError {
788                                    id,
789                                    error,
790                                    send_back_addr,
791                                    local_addr,
792                                });
793                            }
794                            (PendingPoint::Dialer { .. }, Either::Right(_)) => {
795                                unreachable!("Inbound error for outbound connection.")
796                            }
797                            (PendingPoint::Listener { .. }, Either::Left(_)) => {
798                                unreachable!("Outbound error for inbound connection.")
799                            }
800                        }
801                    }
802                }
803            }
804        }
805
806        self.executor.advance_local(cx);
807
808        Poll::Pending
809    }
810}
811
812/// Opaque type for a new connection.
813///
814/// This connection has just been established but isn't part of the [`Pool`] yet.
815/// It either needs to be spawned via [`Pool::spawn_connection`] or dropped if undesired.
816///
817/// On drop, this type send the connection back to the [`Pool`] where it will be gracefully closed.
818#[derive(Debug)]
819pub(crate) struct NewConnection {
820    connection: Option<StreamMuxerBox>,
821    drop_sender: Option<oneshot::Sender<StreamMuxerBox>>,
822}
823
824impl NewConnection {
825    fn new(conn: StreamMuxerBox) -> (Self, oneshot::Receiver<StreamMuxerBox>) {
826        let (sender, receiver) = oneshot::channel();
827
828        (
829            Self {
830                connection: Some(conn),
831                drop_sender: Some(sender),
832            },
833            receiver,
834        )
835    }
836
837    fn extract(mut self) -> StreamMuxerBox {
838        self.connection.take().unwrap()
839    }
840}
841
842impl Drop for NewConnection {
843    fn drop(&mut self) {
844        if let Some(connection) = self.connection.take() {
845            let _ = self
846                .drop_sender
847                .take()
848                .expect("`drop_sender` to always be `Some`")
849                .send(connection);
850        }
851    }
852}
853
854/// Network connection information.
855#[derive(Debug, Clone)]
856pub struct ConnectionCounters {
857    /// The current number of incoming connections.
858    pending_incoming: u32,
859    /// The current number of outgoing connections.
860    pending_outgoing: u32,
861    /// The current number of established inbound connections.
862    established_incoming: u32,
863    /// The current number of established outbound connections.
864    established_outgoing: u32,
865}
866
867impl ConnectionCounters {
868    fn new() -> Self {
869        Self {
870            pending_incoming: 0,
871            pending_outgoing: 0,
872            established_incoming: 0,
873            established_outgoing: 0,
874        }
875    }
876
877    /// The total number of connections, both pending and established.
878    pub fn num_connections(&self) -> u32 {
879        self.num_pending() + self.num_established()
880    }
881
882    /// The total number of pending connections, both incoming and outgoing.
883    pub fn num_pending(&self) -> u32 {
884        self.pending_incoming + self.pending_outgoing
885    }
886
887    /// The number of incoming connections being established.
888    pub fn num_pending_incoming(&self) -> u32 {
889        self.pending_incoming
890    }
891
892    /// The number of outgoing connections being established.
893    pub fn num_pending_outgoing(&self) -> u32 {
894        self.pending_outgoing
895    }
896
897    /// The number of established incoming connections.
898    pub fn num_established_incoming(&self) -> u32 {
899        self.established_incoming
900    }
901
902    /// The number of established outgoing connections.
903    pub fn num_established_outgoing(&self) -> u32 {
904        self.established_outgoing
905    }
906
907    /// The total number of established connections.
908    pub fn num_established(&self) -> u32 {
909        self.established_outgoing + self.established_incoming
910    }
911
912    fn inc_pending(&mut self, endpoint: &PendingPoint) {
913        match endpoint {
914            PendingPoint::Dialer { .. } => {
915                self.pending_outgoing += 1;
916            }
917            PendingPoint::Listener { .. } => {
918                self.pending_incoming += 1;
919            }
920        }
921    }
922
923    fn inc_pending_incoming(&mut self) {
924        self.pending_incoming += 1;
925    }
926
927    fn dec_pending(&mut self, endpoint: &PendingPoint) {
928        match endpoint {
929            PendingPoint::Dialer { .. } => {
930                self.pending_outgoing -= 1;
931            }
932            PendingPoint::Listener { .. } => {
933                self.pending_incoming -= 1;
934            }
935        }
936    }
937
938    fn inc_established(&mut self, endpoint: &ConnectedPoint) {
939        match endpoint {
940            ConnectedPoint::Dialer { .. } => {
941                self.established_outgoing += 1;
942            }
943            ConnectedPoint::Listener { .. } => {
944                self.established_incoming += 1;
945            }
946        }
947    }
948
949    fn dec_established(&mut self, endpoint: &ConnectedPoint) {
950        match endpoint {
951            ConnectedPoint::Dialer { .. } => {
952                self.established_outgoing -= 1;
953            }
954            ConnectedPoint::Listener { .. } => {
955                self.established_incoming -= 1;
956            }
957        }
958    }
959}
960
961/// Configuration options when creating a [`Pool`].
962///
963/// The default configuration specifies no dedicated task executor, a
964/// task event buffer size of 32, and a task command buffer size of 7.
965pub(crate) struct PoolConfig {
966    /// Executor to use to spawn tasks.
967    pub(crate) executor: Option<Box<dyn Executor + Send>>,
968    /// Size of the task command buffer (per task).
969    pub(crate) task_command_buffer_size: usize,
970    /// Size of the pending connection task event buffer and the established connection task event
971    /// buffer.
972    pub(crate) per_connection_event_buffer_size: usize,
973    /// Number of addresses concurrently dialed for a single outbound connection attempt.
974    pub(crate) dial_concurrency_factor: NonZeroU8,
975    /// How long a connection should be kept alive once it is idling.
976    pub(crate) idle_connection_timeout: Duration,
977    /// The configured override for substream protocol upgrades, if any.
978    substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
979
980    /// The maximum number of inbound streams concurrently negotiating on a connection.
981    ///
982    /// See [`Connection::max_negotiating_inbound_streams`].
983    max_negotiating_inbound_streams: usize,
984}
985
986impl PoolConfig {
987    pub(crate) fn new(executor: Option<Box<dyn Executor + Send>>) -> Self {
988        Self {
989            executor,
990            task_command_buffer_size: 32,
991            per_connection_event_buffer_size: 7,
992            dial_concurrency_factor: NonZeroU8::new(8).expect("8 > 0"),
993            idle_connection_timeout: Duration::ZERO,
994            substream_upgrade_protocol_override: None,
995            max_negotiating_inbound_streams: 128,
996        }
997    }
998
999    /// Sets the maximum number of events sent to a connection's background task
1000    /// that may be buffered, if the task cannot keep up with their consumption and
1001    /// delivery to the connection handler.
1002    ///
1003    /// When the buffer for a particular connection is full, `notify_handler` will no
1004    /// longer be able to deliver events to the associated [`Connection`],
1005    /// thus exerting back-pressure on the connection and peer API.
1006    pub(crate) fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
1007        self.task_command_buffer_size = n.get() - 1;
1008        self
1009    }
1010
1011    /// Sets the maximum number of buffered connection events (beyond a guaranteed
1012    /// buffer of 1 event per connection).
1013    ///
1014    /// When the buffer is full, the background tasks of all connections will stall.
1015    /// In this way, the consumers of network events exert back-pressure on
1016    /// the network connection I/O.
1017    pub(crate) fn with_per_connection_event_buffer_size(mut self, n: usize) -> Self {
1018        self.per_connection_event_buffer_size = n;
1019        self
1020    }
1021
1022    /// Number of addresses concurrently dialed for a single outbound connection attempt.
1023    pub(crate) fn with_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
1024        self.dial_concurrency_factor = factor;
1025        self
1026    }
1027
1028    /// Configures an override for the substream upgrade protocol to use.
1029    pub(crate) fn with_substream_upgrade_protocol_override(
1030        mut self,
1031        v: libp2p_core::upgrade::Version,
1032    ) -> Self {
1033        self.substream_upgrade_protocol_override = Some(v);
1034        self
1035    }
1036
1037    /// The maximum number of inbound streams concurrently negotiating on a connection.
1038    ///
1039    /// See [`Connection::max_negotiating_inbound_streams`].
1040    pub(crate) fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self {
1041        self.max_negotiating_inbound_streams = v;
1042        self
1043    }
1044}