libp2p_swarm/
lib.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! High-level network manager.
22//!
23//! A [`Swarm`] contains the state of the network as a whole. The entire
24//! behaviour of a libp2p network can be controlled through the `Swarm`.
25//! The `Swarm` struct contains all active and pending connections to
26//! remotes and manages the state of all the substreams that have been
27//! opened, and all the upgrades that were built upon these substreams.
28//!
29//! # Initializing a Swarm
30//!
31//! Creating a `Swarm` requires three things:
32//!
33//!  1. A network identity of the local node in form of a [`PeerId`].
34//!  2. An implementation of the [`Transport`] trait. This is the type that
35//!     will be used in order to reach nodes on the network based on their
36//!     address. See the `transport` module for more information.
37//!  3. An implementation of the [`NetworkBehaviour`] trait. This is a state
38//!     machine that defines how the swarm should behave once it is connected
39//!     to a node.
40//!
41//! # Network Behaviour
42//!
43//! The [`NetworkBehaviour`] trait is implemented on types that indicate to
44//! the swarm how it should behave. This includes which protocols are supported
45//! and which nodes to try to connect to. It is the `NetworkBehaviour` that
46//! controls what happens on the network. Multiple types that implement
47//! `NetworkBehaviour` can be composed into a single behaviour.
48//!
49//! # Protocols Handler
50//!
51//! The [`ConnectionHandler`] trait defines how each active connection to a
52//! remote should behave: how to handle incoming substreams, which protocols
53//! are supported, when to open a new outbound substream, etc.
54//!
55
56#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
57
58mod connection;
59mod executor;
60mod stream;
61mod stream_protocol;
62#[cfg(test)]
63mod test;
64mod upgrade;
65
66pub mod behaviour;
67pub mod dial_opts;
68pub mod dummy;
69pub mod handler;
70mod listen_opts;
71mod translation;
72
73/// Bundles all symbols required for the [`libp2p_swarm_derive::NetworkBehaviour`] macro.
74#[doc(hidden)]
75pub mod derive_prelude {
76    pub use crate::behaviour::AddressChange;
77    pub use crate::behaviour::ConnectionClosed;
78    pub use crate::behaviour::ConnectionEstablished;
79    pub use crate::behaviour::DialFailure;
80    pub use crate::behaviour::ExpiredListenAddr;
81    pub use crate::behaviour::ExternalAddrConfirmed;
82    pub use crate::behaviour::ExternalAddrExpired;
83    pub use crate::behaviour::FromSwarm;
84    pub use crate::behaviour::ListenFailure;
85    pub use crate::behaviour::ListenerClosed;
86    pub use crate::behaviour::ListenerError;
87    pub use crate::behaviour::NewExternalAddrCandidate;
88    pub use crate::behaviour::NewExternalAddrOfPeer;
89    pub use crate::behaviour::NewListenAddr;
90    pub use crate::behaviour::NewListener;
91    pub use crate::connection::ConnectionId;
92    pub use crate::ConnectionDenied;
93    pub use crate::ConnectionHandler;
94    pub use crate::ConnectionHandlerSelect;
95    pub use crate::DialError;
96    pub use crate::NetworkBehaviour;
97    pub use crate::THandler;
98    pub use crate::THandlerInEvent;
99    pub use crate::THandlerOutEvent;
100    pub use crate::ToSwarm;
101    pub use either::Either;
102    pub use futures::prelude as futures;
103    pub use libp2p_core::transport::{ListenerId, PortUse};
104    pub use libp2p_core::ConnectedPoint;
105    pub use libp2p_core::Endpoint;
106    pub use libp2p_core::Multiaddr;
107    pub use libp2p_identity::PeerId;
108}
109
110pub use behaviour::{
111    AddressChange, CloseConnection, ConnectionClosed, DialFailure, ExpiredListenAddr,
112    ExternalAddrExpired, ExternalAddresses, FromSwarm, ListenAddresses, ListenFailure,
113    ListenerClosed, ListenerError, NetworkBehaviour, NewExternalAddrCandidate,
114    NewExternalAddrOfPeer, NewListenAddr, NotifyHandler, PeerAddresses, ToSwarm,
115};
116pub use connection::pool::ConnectionCounters;
117pub use connection::{ConnectionError, ConnectionId, SupportedProtocols};
118pub use executor::Executor;
119pub use handler::{
120    ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, OneShotHandler,
121    OneShotHandlerConfig, StreamUpgradeError, SubstreamProtocol,
122};
123#[cfg(feature = "macros")]
124pub use libp2p_swarm_derive::NetworkBehaviour;
125pub use listen_opts::ListenOpts;
126pub use stream::Stream;
127pub use stream_protocol::{InvalidProtocol, StreamProtocol};
128
129use crate::behaviour::ExternalAddrConfirmed;
130use crate::handler::UpgradeInfoSend;
131use connection::pool::{EstablishedConnection, Pool, PoolConfig, PoolEvent};
132use connection::IncomingInfo;
133use connection::{
134    PendingConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError,
135};
136use dial_opts::{DialOpts, PeerCondition};
137use futures::{prelude::*, stream::FusedStream};
138
139use libp2p_core::{
140    connection::ConnectedPoint,
141    muxing::StreamMuxerBox,
142    transport::{self, ListenerId, TransportError, TransportEvent},
143    Multiaddr, Transport,
144};
145use libp2p_identity::PeerId;
146
147use smallvec::SmallVec;
148use std::collections::{HashMap, HashSet, VecDeque};
149use std::num::{NonZeroU32, NonZeroU8, NonZeroUsize};
150use std::time::Duration;
151use std::{
152    error, fmt, io,
153    pin::Pin,
154    task::{Context, Poll},
155};
156use tracing::Instrument;
157#[doc(hidden)]
158pub use translation::_address_translation;
159
160/// Event generated by the [`NetworkBehaviour`] that the swarm will report back.
161type TBehaviourOutEvent<TBehaviour> = <TBehaviour as NetworkBehaviour>::ToSwarm;
162
163/// [`ConnectionHandler`] of the [`NetworkBehaviour`] for all the protocols the [`NetworkBehaviour`]
164/// supports.
165pub type THandler<TBehaviour> = <TBehaviour as NetworkBehaviour>::ConnectionHandler;
166
167/// Custom event that can be received by the [`ConnectionHandler`] of the
168/// [`NetworkBehaviour`].
169pub type THandlerInEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::FromBehaviour;
170
171/// Custom event that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`].
172pub type THandlerOutEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::ToBehaviour;
173
174/// Event generated by the `Swarm`.
175#[derive(Debug)]
176#[non_exhaustive]
177pub enum SwarmEvent<TBehaviourOutEvent> {
178    /// Event generated by the `NetworkBehaviour`.
179    Behaviour(TBehaviourOutEvent),
180    /// A connection to the given peer has been opened.
181    ConnectionEstablished {
182        /// Identity of the peer that we have connected to.
183        peer_id: PeerId,
184        /// Identifier of the connection.
185        connection_id: ConnectionId,
186        /// Endpoint of the connection that has been opened.
187        endpoint: ConnectedPoint,
188        /// Number of established connections to this peer, including the one that has just been
189        /// opened.
190        num_established: NonZeroU32,
191        /// [`Some`] when the new connection is an outgoing connection.
192        /// Addresses are dialed concurrently. Contains the addresses and errors
193        /// of dial attempts that failed before the one successful dial.
194        concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<io::Error>)>>,
195        /// How long it took to establish this connection
196        established_in: std::time::Duration,
197    },
198    /// A connection with the given peer has been closed,
199    /// possibly as a result of an error.
200    ConnectionClosed {
201        /// Identity of the peer that we have connected to.
202        peer_id: PeerId,
203        /// Identifier of the connection.
204        connection_id: ConnectionId,
205        /// Endpoint of the connection that has been closed.
206        endpoint: ConnectedPoint,
207        /// Number of other remaining connections to this same peer.
208        num_established: u32,
209        /// Reason for the disconnection, if it was not a successful
210        /// active close.
211        cause: Option<ConnectionError>,
212    },
213    /// A new connection arrived on a listener and is in the process of protocol negotiation.
214    ///
215    /// A corresponding [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished) or
216    /// [`IncomingConnectionError`](SwarmEvent::IncomingConnectionError) event will later be
217    /// generated for this connection.
218    IncomingConnection {
219        /// Identifier of the connection.
220        connection_id: ConnectionId,
221        /// Local connection address.
222        /// This address has been earlier reported with a [`NewListenAddr`](SwarmEvent::NewListenAddr)
223        /// event.
224        local_addr: Multiaddr,
225        /// Address used to send back data to the remote.
226        send_back_addr: Multiaddr,
227    },
228    /// An error happened on an inbound connection during its initial handshake.
229    ///
230    /// This can include, for example, an error during the handshake of the encryption layer, or
231    /// the connection unexpectedly closed.
232    IncomingConnectionError {
233        /// Identifier of the connection.
234        connection_id: ConnectionId,
235        /// Local connection address.
236        /// This address has been earlier reported with a [`NewListenAddr`](SwarmEvent::NewListenAddr)
237        /// event.
238        local_addr: Multiaddr,
239        /// Address used to send back data to the remote.
240        send_back_addr: Multiaddr,
241        /// The error that happened.
242        error: ListenError,
243    },
244    /// An error happened on an outbound connection.
245    OutgoingConnectionError {
246        /// Identifier of the connection.
247        connection_id: ConnectionId,
248        /// If known, [`PeerId`] of the peer we tried to reach.
249        peer_id: Option<PeerId>,
250        /// Error that has been encountered.
251        error: DialError,
252    },
253    /// One of our listeners has reported a new local listening address.
254    NewListenAddr {
255        /// The listener that is listening on the new address.
256        listener_id: ListenerId,
257        /// The new address that is being listened on.
258        address: Multiaddr,
259    },
260    /// One of our listeners has reported the expiration of a listening address.
261    ExpiredListenAddr {
262        /// The listener that is no longer listening on the address.
263        listener_id: ListenerId,
264        /// The expired address.
265        address: Multiaddr,
266    },
267    /// One of the listeners gracefully closed.
268    ListenerClosed {
269        /// The listener that closed.
270        listener_id: ListenerId,
271        /// The addresses that the listener was listening on. These addresses are now considered
272        /// expired, similar to if a [`ExpiredListenAddr`](SwarmEvent::ExpiredListenAddr) event
273        /// has been generated for each of them.
274        addresses: Vec<Multiaddr>,
275        /// Reason for the closure. Contains `Ok(())` if the stream produced `None`, or `Err`
276        /// if the stream produced an error.
277        reason: Result<(), io::Error>,
278    },
279    /// One of the listeners reported a non-fatal error.
280    ListenerError {
281        /// The listener that errored.
282        listener_id: ListenerId,
283        /// The listener error.
284        error: io::Error,
285    },
286    /// A new dialing attempt has been initiated by the [`NetworkBehaviour`]
287    /// implementation.
288    ///
289    /// A [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished) event is
290    /// reported if the dialing attempt succeeds, otherwise a
291    /// [`OutgoingConnectionError`](SwarmEvent::OutgoingConnectionError) event
292    /// is reported.
293    Dialing {
294        /// Identity of the peer that we are connecting to.
295        peer_id: Option<PeerId>,
296
297        /// Identifier of the connection.
298        connection_id: ConnectionId,
299    },
300    /// We have discovered a new candidate for an external address for us.
301    NewExternalAddrCandidate { address: Multiaddr },
302    /// An external address of the local node was confirmed.
303    ExternalAddrConfirmed { address: Multiaddr },
304    /// An external address of the local node expired, i.e. is no-longer confirmed.
305    ExternalAddrExpired { address: Multiaddr },
306    /// We have discovered a new address of a peer.
307    NewExternalAddrOfPeer { peer_id: PeerId, address: Multiaddr },
308}
309
310impl<TBehaviourOutEvent> SwarmEvent<TBehaviourOutEvent> {
311    /// Extract the `TBehaviourOutEvent` from this [`SwarmEvent`] in case it is the `Behaviour` variant, otherwise fail.
312    #[allow(clippy::result_large_err)]
313    pub fn try_into_behaviour_event(self) -> Result<TBehaviourOutEvent, Self> {
314        match self {
315            SwarmEvent::Behaviour(inner) => Ok(inner),
316            other => Err(other),
317        }
318    }
319}
320
321/// Contains the state of the network, plus the way it should behave.
322///
323/// Note: Needs to be polled via `<Swarm as Stream>` in order to make
324/// progress.
325pub struct Swarm<TBehaviour>
326where
327    TBehaviour: NetworkBehaviour,
328{
329    /// [`Transport`] for dialing remote peers and listening for incoming connection.
330    transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
331
332    /// The nodes currently active.
333    pool: Pool<THandler<TBehaviour>>,
334
335    /// The local peer ID.
336    local_peer_id: PeerId,
337
338    /// Handles which nodes to connect to and how to handle the events sent back by the protocol
339    /// handlers.
340    behaviour: TBehaviour,
341
342    /// List of protocols that the behaviour says it supports.
343    supported_protocols: SmallVec<[Vec<u8>; 16]>,
344
345    confirmed_external_addr: HashSet<Multiaddr>,
346
347    /// Multiaddresses that our listeners are listening on,
348    listened_addrs: HashMap<ListenerId, SmallVec<[Multiaddr; 1]>>,
349
350    /// Pending event to be delivered to connection handlers
351    /// (or dropped if the peer disconnected) before the `behaviour`
352    /// can be polled again.
353    pending_handler_event: Option<(PeerId, PendingNotifyHandler, THandlerInEvent<TBehaviour>)>,
354
355    pending_swarm_events: VecDeque<SwarmEvent<TBehaviour::ToSwarm>>,
356}
357
358impl<TBehaviour> Unpin for Swarm<TBehaviour> where TBehaviour: NetworkBehaviour {}
359
360impl<TBehaviour> Swarm<TBehaviour>
361where
362    TBehaviour: NetworkBehaviour,
363{
364    /// Creates a new [`Swarm`] from the given [`Transport`], [`NetworkBehaviour`], [`PeerId`] and
365    /// [`Config`].
366    pub fn new(
367        transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
368        behaviour: TBehaviour,
369        local_peer_id: PeerId,
370        config: Config,
371    ) -> Self {
372        tracing::info!(%local_peer_id);
373
374        Swarm {
375            local_peer_id,
376            transport,
377            pool: Pool::new(local_peer_id, config.pool_config),
378            behaviour,
379            supported_protocols: Default::default(),
380            confirmed_external_addr: Default::default(),
381            listened_addrs: HashMap::new(),
382            pending_handler_event: None,
383            pending_swarm_events: VecDeque::default(),
384        }
385    }
386
387    /// Returns information about the connections underlying the [`Swarm`].
388    pub fn network_info(&self) -> NetworkInfo {
389        let num_peers = self.pool.num_peers();
390        let connection_counters = self.pool.counters().clone();
391        NetworkInfo {
392            num_peers,
393            connection_counters,
394        }
395    }
396
397    /// Starts listening on the given address.
398    /// Returns an error if the address is not supported.
399    ///
400    /// Listeners report their new listening addresses as [`SwarmEvent::NewListenAddr`].
401    /// Depending on the underlying transport, one listener may have multiple listening addresses.
402    pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
403        let opts = ListenOpts::new(addr);
404        let id = opts.listener_id();
405        self.add_listener(opts)?;
406        Ok(id)
407    }
408
409    /// Remove some listener.
410    ///
411    /// Returns `true` if there was a listener with this ID, `false`
412    /// otherwise.
413    pub fn remove_listener(&mut self, listener_id: ListenerId) -> bool {
414        self.transport.remove_listener(listener_id)
415    }
416
417    /// Dial a known or unknown peer.
418    ///
419    /// See also [`DialOpts`].
420    ///
421    /// ```
422    /// # use libp2p_swarm::Swarm;
423    /// # use libp2p_swarm::dial_opts::{DialOpts, PeerCondition};
424    /// # use libp2p_core::{Multiaddr, Transport};
425    /// # use libp2p_core::transport::dummy::DummyTransport;
426    /// # use libp2p_swarm::dummy;
427    /// # use libp2p_identity::PeerId;
428    /// #
429    /// # #[tokio::main]
430    /// # async fn main() {
431    /// let mut swarm = build_swarm();
432    ///
433    /// // Dial a known peer.
434    /// swarm.dial(PeerId::random());
435    ///
436    /// // Dial an unknown peer.
437    /// swarm.dial("/ip6/::1/tcp/12345".parse::<Multiaddr>().unwrap());
438    /// # }
439    ///
440    /// # fn build_swarm() -> Swarm<dummy::Behaviour> {
441    /// #     Swarm::new(DummyTransport::new().boxed(), dummy::Behaviour, PeerId::random(), libp2p_swarm::Config::with_tokio_executor())
442    /// # }
443    /// ```
444    pub fn dial(&mut self, opts: impl Into<DialOpts>) -> Result<(), DialError> {
445        let dial_opts = opts.into();
446
447        let peer_id = dial_opts.get_peer_id();
448        let condition = dial_opts.peer_condition();
449        let connection_id = dial_opts.connection_id();
450
451        let should_dial = match (condition, peer_id) {
452            (_, None) => true,
453            (PeerCondition::Always, _) => true,
454            (PeerCondition::Disconnected, Some(peer_id)) => !self.pool.is_connected(peer_id),
455            (PeerCondition::NotDialing, Some(peer_id)) => !self.pool.is_dialing(peer_id),
456            (PeerCondition::DisconnectedAndNotDialing, Some(peer_id)) => {
457                !self.pool.is_dialing(peer_id) && !self.pool.is_connected(peer_id)
458            }
459        };
460
461        if !should_dial {
462            let e = DialError::DialPeerConditionFalse(condition);
463
464            self.behaviour
465                .on_swarm_event(FromSwarm::DialFailure(DialFailure {
466                    peer_id,
467                    error: &e,
468                    connection_id,
469                }));
470
471            return Err(e);
472        }
473
474        let addresses = {
475            let mut addresses_from_opts = dial_opts.get_addresses();
476
477            match self.behaviour.handle_pending_outbound_connection(
478                connection_id,
479                peer_id,
480                addresses_from_opts.as_slice(),
481                dial_opts.role_override(),
482            ) {
483                Ok(addresses) => {
484                    if dial_opts.extend_addresses_through_behaviour() {
485                        addresses_from_opts.extend(addresses)
486                    } else {
487                        let num_addresses = addresses.len();
488
489                        if num_addresses > 0 {
490                            tracing::debug!(
491                                connection=%connection_id,
492                                discarded_addresses_count=%num_addresses,
493                                "discarding addresses from `NetworkBehaviour` because `DialOpts::extend_addresses_through_behaviour is `false` for connection"
494                            )
495                        }
496                    }
497                }
498                Err(cause) => {
499                    let error = DialError::Denied { cause };
500
501                    self.behaviour
502                        .on_swarm_event(FromSwarm::DialFailure(DialFailure {
503                            peer_id,
504                            error: &error,
505                            connection_id,
506                        }));
507
508                    return Err(error);
509                }
510            }
511
512            let mut unique_addresses = HashSet::new();
513            addresses_from_opts.retain(|addr| {
514                !self.listened_addrs.values().flatten().any(|a| a == addr)
515                    && unique_addresses.insert(addr.clone())
516            });
517
518            if addresses_from_opts.is_empty() {
519                let error = DialError::NoAddresses;
520                self.behaviour
521                    .on_swarm_event(FromSwarm::DialFailure(DialFailure {
522                        peer_id,
523                        error: &error,
524                        connection_id,
525                    }));
526                return Err(error);
527            };
528
529            addresses_from_opts
530        };
531
532        let dials = addresses
533            .into_iter()
534            .map(|a| match peer_id.map_or(Ok(a.clone()), |p| a.with_p2p(p)) {
535                Ok(address) => {
536                    let dial = self.transport.dial(
537                        address.clone(),
538                        transport::DialOpts {
539                            role: dial_opts.role_override(),
540                            port_use: dial_opts.port_use(),
541                        },
542                    );
543                    let span = tracing::debug_span!(parent: tracing::Span::none(), "Transport::dial", %address);
544                    span.follows_from(tracing::Span::current());
545                    match dial {
546                        Ok(fut) => fut
547                            .map(|r| (address, r.map_err(TransportError::Other)))
548                            .instrument(span)
549                            .boxed(),
550                        Err(err) => futures::future::ready((address, Err(err))).boxed(),
551                    }
552                }
553                Err(address) => futures::future::ready((
554                    address.clone(),
555                    Err(TransportError::MultiaddrNotSupported(address)),
556                ))
557                .boxed(),
558            })
559            .collect();
560
561        self.pool.add_outgoing(
562            dials,
563            peer_id,
564            dial_opts.role_override(),
565            dial_opts.port_use(),
566            dial_opts.dial_concurrency_override(),
567            connection_id,
568        );
569
570        Ok(())
571    }
572
573    /// Returns an iterator that produces the list of addresses we're listening on.
574    pub fn listeners(&self) -> impl Iterator<Item = &Multiaddr> {
575        self.listened_addrs.values().flatten()
576    }
577
578    /// Returns the peer ID of the swarm passed as parameter.
579    pub fn local_peer_id(&self) -> &PeerId {
580        &self.local_peer_id
581    }
582
583    /// List all **confirmed** external address for the local node.
584    pub fn external_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
585        self.confirmed_external_addr.iter()
586    }
587
588    fn add_listener(&mut self, opts: ListenOpts) -> Result<(), TransportError<io::Error>> {
589        let addr = opts.address();
590        let listener_id = opts.listener_id();
591
592        if let Err(e) = self.transport.listen_on(listener_id, addr.clone()) {
593            self.behaviour
594                .on_swarm_event(FromSwarm::ListenerError(behaviour::ListenerError {
595                    listener_id,
596                    err: &e,
597                }));
598
599            return Err(e);
600        }
601
602        self.behaviour
603            .on_swarm_event(FromSwarm::NewListener(behaviour::NewListener {
604                listener_id,
605            }));
606
607        Ok(())
608    }
609
610    /// Add a **confirmed** external address for the local node.
611    ///
612    /// This function should only be called with addresses that are guaranteed to be reachable.
613    /// The address is broadcast to all [`NetworkBehaviour`]s via [`FromSwarm::ExternalAddrConfirmed`].
614    pub fn add_external_address(&mut self, a: Multiaddr) {
615        self.behaviour
616            .on_swarm_event(FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed {
617                addr: &a,
618            }));
619        self.confirmed_external_addr.insert(a);
620    }
621
622    /// Remove an external address for the local node.
623    ///
624    /// The address is broadcast to all [`NetworkBehaviour`]s via [`FromSwarm::ExternalAddrExpired`].
625    pub fn remove_external_address(&mut self, addr: &Multiaddr) {
626        self.behaviour
627            .on_swarm_event(FromSwarm::ExternalAddrExpired(ExternalAddrExpired { addr }));
628        self.confirmed_external_addr.remove(addr);
629    }
630
631    /// Add a new external address of a remote peer.
632    ///
633    /// The address is broadcast to all [`NetworkBehaviour`]s via [`FromSwarm::NewExternalAddrOfPeer`].
634    pub fn add_peer_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
635        self.behaviour
636            .on_swarm_event(FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
637                peer_id,
638                addr: &addr,
639            }))
640    }
641
642    /// Disconnects a peer by its peer ID, closing all connections to said peer.
643    ///
644    /// Returns `Ok(())` if there was one or more established connections to the peer.
645    ///
646    /// Closing a connection via [`Swarm::disconnect_peer_id`] will poll [`ConnectionHandler::poll_close`] to completion.
647    /// Use this function if you want to close a connection _despite_ it still being in use by one or more handlers.
648    #[allow(clippy::result_unit_err)]
649    pub fn disconnect_peer_id(&mut self, peer_id: PeerId) -> Result<(), ()> {
650        let was_connected = self.pool.is_connected(peer_id);
651        self.pool.disconnect(peer_id);
652
653        if was_connected {
654            Ok(())
655        } else {
656            Err(())
657        }
658    }
659
660    /// Attempt to gracefully close a connection.
661    ///
662    /// Closing a connection is asynchronous but this function will return immediately.
663    /// A [`SwarmEvent::ConnectionClosed`] event will be emitted once the connection is actually closed.
664    ///
665    /// # Returns
666    ///
667    /// - `true` if the connection was established and is now being closed.
668    /// - `false` if the connection was not found or is no longer established.
669    pub fn close_connection(&mut self, connection_id: ConnectionId) -> bool {
670        if let Some(established) = self.pool.get_established(connection_id) {
671            established.start_close();
672            return true;
673        }
674
675        false
676    }
677
678    /// Checks whether there is an established connection to a peer.
679    pub fn is_connected(&self, peer_id: &PeerId) -> bool {
680        self.pool.is_connected(*peer_id)
681    }
682
683    /// Returns the currently connected peers.
684    pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
685        self.pool.iter_connected()
686    }
687
688    /// Returns a reference to the provided [`NetworkBehaviour`].
689    pub fn behaviour(&self) -> &TBehaviour {
690        &self.behaviour
691    }
692
693    /// Returns a mutable reference to the provided [`NetworkBehaviour`].
694    pub fn behaviour_mut(&mut self) -> &mut TBehaviour {
695        &mut self.behaviour
696    }
697
698    fn handle_pool_event(&mut self, event: PoolEvent<THandlerOutEvent<TBehaviour>>) {
699        match event {
700            PoolEvent::ConnectionEstablished {
701                peer_id,
702                id,
703                endpoint,
704                connection,
705                concurrent_dial_errors,
706                established_in,
707            } => {
708                let handler = match endpoint.clone() {
709                    ConnectedPoint::Dialer {
710                        address,
711                        role_override,
712                        port_use,
713                    } => {
714                        match self.behaviour.handle_established_outbound_connection(
715                            id,
716                            peer_id,
717                            &address,
718                            role_override,
719                            port_use,
720                        ) {
721                            Ok(handler) => handler,
722                            Err(cause) => {
723                                let dial_error = DialError::Denied { cause };
724                                self.behaviour.on_swarm_event(FromSwarm::DialFailure(
725                                    DialFailure {
726                                        connection_id: id,
727                                        error: &dial_error,
728                                        peer_id: Some(peer_id),
729                                    },
730                                ));
731
732                                self.pending_swarm_events.push_back(
733                                    SwarmEvent::OutgoingConnectionError {
734                                        peer_id: Some(peer_id),
735                                        connection_id: id,
736                                        error: dial_error,
737                                    },
738                                );
739                                return;
740                            }
741                        }
742                    }
743                    ConnectedPoint::Listener {
744                        local_addr,
745                        send_back_addr,
746                    } => {
747                        match self.behaviour.handle_established_inbound_connection(
748                            id,
749                            peer_id,
750                            &local_addr,
751                            &send_back_addr,
752                        ) {
753                            Ok(handler) => handler,
754                            Err(cause) => {
755                                let listen_error = ListenError::Denied { cause };
756                                self.behaviour.on_swarm_event(FromSwarm::ListenFailure(
757                                    ListenFailure {
758                                        local_addr: &local_addr,
759                                        send_back_addr: &send_back_addr,
760                                        error: &listen_error,
761                                        connection_id: id,
762                                        peer_id: Some(peer_id),
763                                    },
764                                ));
765
766                                self.pending_swarm_events.push_back(
767                                    SwarmEvent::IncomingConnectionError {
768                                        connection_id: id,
769                                        send_back_addr,
770                                        local_addr,
771                                        error: listen_error,
772                                    },
773                                );
774                                return;
775                            }
776                        }
777                    }
778                };
779
780                let supported_protocols = handler
781                    .listen_protocol()
782                    .upgrade()
783                    .protocol_info()
784                    .map(|p| p.as_ref().as_bytes().to_vec())
785                    .collect();
786                let other_established_connection_ids = self
787                    .pool
788                    .iter_established_connections_of_peer(&peer_id)
789                    .collect::<Vec<_>>();
790                let num_established = NonZeroU32::new(
791                    u32::try_from(other_established_connection_ids.len() + 1).unwrap(),
792                )
793                .expect("n + 1 is always non-zero; qed");
794
795                self.pool
796                    .spawn_connection(id, peer_id, &endpoint, connection, handler);
797
798                tracing::debug!(
799                    peer=%peer_id,
800                    ?endpoint,
801                    total_peers=%num_established,
802                    "Connection established"
803                );
804                let failed_addresses = concurrent_dial_errors
805                    .as_ref()
806                    .map(|es| {
807                        es.iter()
808                            .map(|(a, _)| a)
809                            .cloned()
810                            .collect::<Vec<Multiaddr>>()
811                    })
812                    .unwrap_or_default();
813                self.behaviour
814                    .on_swarm_event(FromSwarm::ConnectionEstablished(
815                        behaviour::ConnectionEstablished {
816                            peer_id,
817                            connection_id: id,
818                            endpoint: &endpoint,
819                            failed_addresses: &failed_addresses,
820                            other_established: other_established_connection_ids.len(),
821                        },
822                    ));
823                self.supported_protocols = supported_protocols;
824                self.pending_swarm_events
825                    .push_back(SwarmEvent::ConnectionEstablished {
826                        peer_id,
827                        connection_id: id,
828                        num_established,
829                        endpoint,
830                        concurrent_dial_errors,
831                        established_in,
832                    });
833            }
834            PoolEvent::PendingOutboundConnectionError {
835                id: connection_id,
836                error,
837                peer,
838            } => {
839                let error = error.into();
840
841                self.behaviour
842                    .on_swarm_event(FromSwarm::DialFailure(DialFailure {
843                        peer_id: peer,
844                        error: &error,
845                        connection_id,
846                    }));
847
848                if let Some(peer) = peer {
849                    tracing::debug!(%peer, "Connection attempt to peer failed with {:?}.", error,);
850                } else {
851                    tracing::debug!("Connection attempt to unknown peer failed with {:?}", error);
852                }
853
854                self.pending_swarm_events
855                    .push_back(SwarmEvent::OutgoingConnectionError {
856                        peer_id: peer,
857                        connection_id,
858                        error,
859                    });
860            }
861            PoolEvent::PendingInboundConnectionError {
862                id,
863                send_back_addr,
864                local_addr,
865                error,
866            } => {
867                let error = error.into();
868
869                tracing::debug!("Incoming connection failed: {:?}", error);
870                self.behaviour
871                    .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
872                        local_addr: &local_addr,
873                        send_back_addr: &send_back_addr,
874                        error: &error,
875                        connection_id: id,
876                        peer_id: None,
877                    }));
878                self.pending_swarm_events
879                    .push_back(SwarmEvent::IncomingConnectionError {
880                        connection_id: id,
881                        local_addr,
882                        send_back_addr,
883                        error,
884                    });
885            }
886            PoolEvent::ConnectionClosed {
887                id,
888                connected,
889                error,
890                remaining_established_connection_ids,
891                ..
892            } => {
893                if let Some(error) = error.as_ref() {
894                    tracing::debug!(
895                        total_peers=%remaining_established_connection_ids.len(),
896                        "Connection closed with error {:?}: {:?}",
897                        error,
898                        connected,
899                    );
900                } else {
901                    tracing::debug!(
902                        total_peers=%remaining_established_connection_ids.len(),
903                        "Connection closed: {:?}",
904                        connected
905                    );
906                }
907                let peer_id = connected.peer_id;
908                let endpoint = connected.endpoint;
909                let num_established =
910                    u32::try_from(remaining_established_connection_ids.len()).unwrap();
911
912                self.behaviour
913                    .on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
914                        peer_id,
915                        connection_id: id,
916                        endpoint: &endpoint,
917                        cause: error.as_ref(),
918                        remaining_established: num_established as usize,
919                    }));
920                self.pending_swarm_events
921                    .push_back(SwarmEvent::ConnectionClosed {
922                        peer_id,
923                        connection_id: id,
924                        endpoint,
925                        cause: error,
926                        num_established,
927                    });
928            }
929            PoolEvent::ConnectionEvent { peer_id, id, event } => {
930                self.behaviour
931                    .on_connection_handler_event(peer_id, id, event);
932            }
933            PoolEvent::AddressChange {
934                peer_id,
935                id,
936                new_endpoint,
937                old_endpoint,
938            } => {
939                self.behaviour
940                    .on_swarm_event(FromSwarm::AddressChange(AddressChange {
941                        peer_id,
942                        connection_id: id,
943                        old: &old_endpoint,
944                        new: &new_endpoint,
945                    }));
946            }
947        }
948    }
949
950    fn handle_transport_event(
951        &mut self,
952        event: TransportEvent<
953            <transport::Boxed<(PeerId, StreamMuxerBox)> as Transport>::ListenerUpgrade,
954            io::Error,
955        >,
956    ) {
957        match event {
958            TransportEvent::Incoming {
959                listener_id: _,
960                upgrade,
961                local_addr,
962                send_back_addr,
963            } => {
964                let connection_id = ConnectionId::next();
965
966                match self.behaviour.handle_pending_inbound_connection(
967                    connection_id,
968                    &local_addr,
969                    &send_back_addr,
970                ) {
971                    Ok(()) => {}
972                    Err(cause) => {
973                        let listen_error = ListenError::Denied { cause };
974
975                        self.behaviour
976                            .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
977                                local_addr: &local_addr,
978                                send_back_addr: &send_back_addr,
979                                error: &listen_error,
980                                connection_id,
981                                peer_id: None,
982                            }));
983
984                        self.pending_swarm_events
985                            .push_back(SwarmEvent::IncomingConnectionError {
986                                connection_id,
987                                local_addr,
988                                send_back_addr,
989                                error: listen_error,
990                            });
991                        return;
992                    }
993                }
994
995                self.pool.add_incoming(
996                    upgrade,
997                    IncomingInfo {
998                        local_addr: &local_addr,
999                        send_back_addr: &send_back_addr,
1000                    },
1001                    connection_id,
1002                );
1003
1004                self.pending_swarm_events
1005                    .push_back(SwarmEvent::IncomingConnection {
1006                        connection_id,
1007                        local_addr,
1008                        send_back_addr,
1009                    })
1010            }
1011            TransportEvent::NewAddress {
1012                listener_id,
1013                listen_addr,
1014            } => {
1015                tracing::debug!(
1016                    listener=?listener_id,
1017                    address=%listen_addr,
1018                    "New listener address"
1019                );
1020                let addrs = self.listened_addrs.entry(listener_id).or_default();
1021                if !addrs.contains(&listen_addr) {
1022                    addrs.push(listen_addr.clone())
1023                }
1024                self.behaviour
1025                    .on_swarm_event(FromSwarm::NewListenAddr(NewListenAddr {
1026                        listener_id,
1027                        addr: &listen_addr,
1028                    }));
1029                self.pending_swarm_events
1030                    .push_back(SwarmEvent::NewListenAddr {
1031                        listener_id,
1032                        address: listen_addr,
1033                    })
1034            }
1035            TransportEvent::AddressExpired {
1036                listener_id,
1037                listen_addr,
1038            } => {
1039                tracing::debug!(
1040                    listener=?listener_id,
1041                    address=%listen_addr,
1042                    "Expired listener address"
1043                );
1044                if let Some(addrs) = self.listened_addrs.get_mut(&listener_id) {
1045                    addrs.retain(|a| a != &listen_addr);
1046                }
1047                self.behaviour
1048                    .on_swarm_event(FromSwarm::ExpiredListenAddr(ExpiredListenAddr {
1049                        listener_id,
1050                        addr: &listen_addr,
1051                    }));
1052                self.pending_swarm_events
1053                    .push_back(SwarmEvent::ExpiredListenAddr {
1054                        listener_id,
1055                        address: listen_addr,
1056                    })
1057            }
1058            TransportEvent::ListenerClosed {
1059                listener_id,
1060                reason,
1061            } => {
1062                tracing::debug!(
1063                    listener=?listener_id,
1064                    ?reason,
1065                    "Listener closed"
1066                );
1067                let addrs = self.listened_addrs.remove(&listener_id).unwrap_or_default();
1068                for addr in addrs.iter() {
1069                    self.behaviour.on_swarm_event(FromSwarm::ExpiredListenAddr(
1070                        ExpiredListenAddr { listener_id, addr },
1071                    ));
1072                }
1073                self.behaviour
1074                    .on_swarm_event(FromSwarm::ListenerClosed(ListenerClosed {
1075                        listener_id,
1076                        reason: reason.as_ref().copied(),
1077                    }));
1078                self.pending_swarm_events
1079                    .push_back(SwarmEvent::ListenerClosed {
1080                        listener_id,
1081                        addresses: addrs.to_vec(),
1082                        reason,
1083                    })
1084            }
1085            TransportEvent::ListenerError { listener_id, error } => {
1086                self.behaviour
1087                    .on_swarm_event(FromSwarm::ListenerError(ListenerError {
1088                        listener_id,
1089                        err: &error,
1090                    }));
1091                self.pending_swarm_events
1092                    .push_back(SwarmEvent::ListenerError { listener_id, error })
1093            }
1094        }
1095    }
1096
1097    fn handle_behaviour_event(
1098        &mut self,
1099        event: ToSwarm<TBehaviour::ToSwarm, THandlerInEvent<TBehaviour>>,
1100    ) {
1101        match event {
1102            ToSwarm::GenerateEvent(event) => {
1103                self.pending_swarm_events
1104                    .push_back(SwarmEvent::Behaviour(event));
1105            }
1106            ToSwarm::Dial { opts } => {
1107                let peer_id = opts.get_peer_id();
1108                let connection_id = opts.connection_id();
1109                if let Ok(()) = self.dial(opts) {
1110                    self.pending_swarm_events.push_back(SwarmEvent::Dialing {
1111                        peer_id,
1112                        connection_id,
1113                    });
1114                }
1115            }
1116            ToSwarm::ListenOn { opts } => {
1117                // Error is dispatched internally, safe to ignore.
1118                let _ = self.add_listener(opts);
1119            }
1120            ToSwarm::RemoveListener { id } => {
1121                self.remove_listener(id);
1122            }
1123            ToSwarm::NotifyHandler {
1124                peer_id,
1125                handler,
1126                event,
1127            } => {
1128                assert!(self.pending_handler_event.is_none());
1129                let handler = match handler {
1130                    NotifyHandler::One(connection) => PendingNotifyHandler::One(connection),
1131                    NotifyHandler::Any => {
1132                        let ids = self
1133                            .pool
1134                            .iter_established_connections_of_peer(&peer_id)
1135                            .collect();
1136                        PendingNotifyHandler::Any(ids)
1137                    }
1138                };
1139
1140                self.pending_handler_event = Some((peer_id, handler, event));
1141            }
1142            ToSwarm::NewExternalAddrCandidate(addr) => {
1143                self.behaviour
1144                    .on_swarm_event(FromSwarm::NewExternalAddrCandidate(
1145                        NewExternalAddrCandidate { addr: &addr },
1146                    ));
1147                self.pending_swarm_events
1148                    .push_back(SwarmEvent::NewExternalAddrCandidate { address: addr });
1149            }
1150            ToSwarm::ExternalAddrConfirmed(addr) => {
1151                self.add_external_address(addr.clone());
1152                self.pending_swarm_events
1153                    .push_back(SwarmEvent::ExternalAddrConfirmed { address: addr });
1154            }
1155            ToSwarm::ExternalAddrExpired(addr) => {
1156                self.remove_external_address(&addr);
1157                self.pending_swarm_events
1158                    .push_back(SwarmEvent::ExternalAddrExpired { address: addr });
1159            }
1160            ToSwarm::CloseConnection {
1161                peer_id,
1162                connection,
1163            } => match connection {
1164                CloseConnection::One(connection_id) => {
1165                    if let Some(conn) = self.pool.get_established(connection_id) {
1166                        conn.start_close();
1167                    }
1168                }
1169                CloseConnection::All => {
1170                    self.pool.disconnect(peer_id);
1171                }
1172            },
1173            ToSwarm::NewExternalAddrOfPeer { peer_id, address } => {
1174                self.behaviour
1175                    .on_swarm_event(FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
1176                        peer_id,
1177                        addr: &address,
1178                    }));
1179                self.pending_swarm_events
1180                    .push_back(SwarmEvent::NewExternalAddrOfPeer { peer_id, address });
1181            }
1182        }
1183    }
1184
1185    /// Internal function used by everything event-related.
1186    ///
1187    /// Polls the `Swarm` for the next event.
1188    #[tracing::instrument(level = "debug", name = "Swarm::poll", skip(self, cx))]
1189    fn poll_next_event(
1190        mut self: Pin<&mut Self>,
1191        cx: &mut Context<'_>,
1192    ) -> Poll<SwarmEvent<TBehaviour::ToSwarm>> {
1193        // We use a `this` variable because the compiler can't mutably borrow multiple times
1194        // across a `Deref`.
1195        let this = &mut *self;
1196
1197        // This loop polls the components below in a prioritized order.
1198        //
1199        // 1. [`NetworkBehaviour`]
1200        // 2. Connection [`Pool`]
1201        // 3. [`ListenersStream`]
1202        //
1203        // (1) is polled before (2) to prioritize local work over work coming from a remote.
1204        //
1205        // (2) is polled before (3) to prioritize existing connections over upgrading new incoming connections.
1206        loop {
1207            if let Some(swarm_event) = this.pending_swarm_events.pop_front() {
1208                return Poll::Ready(swarm_event);
1209            }
1210
1211            match this.pending_handler_event.take() {
1212                // Try to deliver the pending event emitted by the [`NetworkBehaviour`] in the previous
1213                // iteration to the connection handler(s).
1214                Some((peer_id, handler, event)) => match handler {
1215                    PendingNotifyHandler::One(conn_id) => {
1216                        match this.pool.get_established(conn_id) {
1217                            Some(conn) => match notify_one(conn, event, cx) {
1218                                None => continue,
1219                                Some(event) => {
1220                                    this.pending_handler_event = Some((peer_id, handler, event));
1221                                }
1222                            },
1223                            None => continue,
1224                        }
1225                    }
1226                    PendingNotifyHandler::Any(ids) => {
1227                        match notify_any::<_, TBehaviour>(ids, &mut this.pool, event, cx) {
1228                            None => continue,
1229                            Some((event, ids)) => {
1230                                let handler = PendingNotifyHandler::Any(ids);
1231                                this.pending_handler_event = Some((peer_id, handler, event));
1232                            }
1233                        }
1234                    }
1235                },
1236                // No pending event. Allow the [`NetworkBehaviour`] to make progress.
1237                None => match this.behaviour.poll(cx) {
1238                    Poll::Pending => {}
1239                    Poll::Ready(behaviour_event) => {
1240                        this.handle_behaviour_event(behaviour_event);
1241
1242                        continue;
1243                    }
1244                },
1245            }
1246
1247            // Poll the known peers.
1248            match this.pool.poll(cx) {
1249                Poll::Pending => {}
1250                Poll::Ready(pool_event) => {
1251                    this.handle_pool_event(pool_event);
1252                    continue;
1253                }
1254            }
1255
1256            // Poll the listener(s) for new connections.
1257            match Pin::new(&mut this.transport).poll(cx) {
1258                Poll::Pending => {}
1259                Poll::Ready(transport_event) => {
1260                    this.handle_transport_event(transport_event);
1261                    continue;
1262                }
1263            }
1264
1265            return Poll::Pending;
1266        }
1267    }
1268}
1269
1270/// Connection to notify of a pending event.
1271///
1272/// The connection IDs out of which to notify one of an event are captured at
1273/// the time the behaviour emits the event, in order not to forward the event to
1274/// a new connection which the behaviour may not have been aware of at the time
1275/// it issued the request for sending it.
1276enum PendingNotifyHandler {
1277    One(ConnectionId),
1278    Any(SmallVec<[ConnectionId; 10]>),
1279}
1280
1281/// Notify a single connection of an event.
1282///
1283/// Returns `Some` with the given event if the connection is not currently
1284/// ready to receive another event, in which case the current task is
1285/// scheduled to be woken up.
1286///
1287/// Returns `None` if the connection is closing or the event has been
1288/// successfully sent, in either case the event is consumed.
1289fn notify_one<THandlerInEvent>(
1290    conn: &mut EstablishedConnection<THandlerInEvent>,
1291    event: THandlerInEvent,
1292    cx: &mut Context<'_>,
1293) -> Option<THandlerInEvent> {
1294    match conn.poll_ready_notify_handler(cx) {
1295        Poll::Pending => Some(event),
1296        Poll::Ready(Err(())) => None, // connection is closing
1297        Poll::Ready(Ok(())) => {
1298            // Can now only fail if connection is closing.
1299            let _ = conn.notify_handler(event);
1300            None
1301        }
1302    }
1303}
1304
1305/// Notify any one of a given list of connections of a peer of an event.
1306///
1307/// Returns `Some` with the given event and a new list of connections if
1308/// none of the given connections was able to receive the event but at
1309/// least one of them is not closing, in which case the current task
1310/// is scheduled to be woken up. The returned connections are those which
1311/// may still become ready to receive another event.
1312///
1313/// Returns `None` if either all connections are closing or the event
1314/// was successfully sent to a handler, in either case the event is consumed.
1315fn notify_any<THandler, TBehaviour>(
1316    ids: SmallVec<[ConnectionId; 10]>,
1317    pool: &mut Pool<THandler>,
1318    event: THandlerInEvent<TBehaviour>,
1319    cx: &mut Context<'_>,
1320) -> Option<(THandlerInEvent<TBehaviour>, SmallVec<[ConnectionId; 10]>)>
1321where
1322    TBehaviour: NetworkBehaviour,
1323    THandler: ConnectionHandler<
1324        FromBehaviour = THandlerInEvent<TBehaviour>,
1325        ToBehaviour = THandlerOutEvent<TBehaviour>,
1326    >,
1327{
1328    let mut pending = SmallVec::new();
1329    let mut event = Some(event); // (1)
1330    for id in ids.into_iter() {
1331        if let Some(conn) = pool.get_established(id) {
1332            match conn.poll_ready_notify_handler(cx) {
1333                Poll::Pending => pending.push(id),
1334                Poll::Ready(Err(())) => {} // connection is closing
1335                Poll::Ready(Ok(())) => {
1336                    let e = event.take().expect("by (1),(2)");
1337                    if let Err(e) = conn.notify_handler(e) {
1338                        event = Some(e) // (2)
1339                    } else {
1340                        break;
1341                    }
1342                }
1343            }
1344        }
1345    }
1346
1347    event.and_then(|e| {
1348        if !pending.is_empty() {
1349            Some((e, pending))
1350        } else {
1351            None
1352        }
1353    })
1354}
1355
1356/// Stream of events returned by [`Swarm`].
1357///
1358/// Includes events from the [`NetworkBehaviour`] as well as events about
1359/// connection and listener status. See [`SwarmEvent`] for details.
1360///
1361/// Note: This stream is infinite and it is guaranteed that
1362/// [`futures::Stream::poll_next`] will never return `Poll::Ready(None)`.
1363impl<TBehaviour> futures::Stream for Swarm<TBehaviour>
1364where
1365    TBehaviour: NetworkBehaviour,
1366{
1367    type Item = SwarmEvent<TBehaviourOutEvent<TBehaviour>>;
1368
1369    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1370        self.as_mut().poll_next_event(cx).map(Some)
1371    }
1372}
1373
1374/// The stream of swarm events never terminates, so we can implement fused for it.
1375impl<TBehaviour> FusedStream for Swarm<TBehaviour>
1376where
1377    TBehaviour: NetworkBehaviour,
1378{
1379    fn is_terminated(&self) -> bool {
1380        false
1381    }
1382}
1383
1384pub struct Config {
1385    pool_config: PoolConfig,
1386}
1387
1388impl Config {
1389    /// Creates a new [`Config`] from the given executor. The [`Swarm`] is obtained via
1390    /// [`Swarm::new`].
1391    pub fn with_executor(executor: impl Executor + Send + 'static) -> Self {
1392        Self {
1393            pool_config: PoolConfig::new(Some(Box::new(executor))),
1394        }
1395    }
1396
1397    #[doc(hidden)]
1398    /// Used on connection benchmarks.
1399    pub fn without_executor() -> Self {
1400        Self {
1401            pool_config: PoolConfig::new(None),
1402        }
1403    }
1404
1405    /// Sets executor to the `wasm` executor.
1406    /// Background tasks will be executed by the browser on the next micro-tick.
1407    ///
1408    /// Spawning a task is similar too:
1409    /// ```typescript
1410    /// function spawn(task: () => Promise<void>) {
1411    ///     task()
1412    /// }
1413    /// ```
1414    #[cfg(feature = "wasm-bindgen")]
1415    pub fn with_wasm_executor() -> Self {
1416        Self::with_executor(crate::executor::WasmBindgenExecutor)
1417    }
1418
1419    /// Builds a new [`Config`] from the given `tokio` executor.
1420    #[cfg(all(
1421        feature = "tokio",
1422        not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1423    ))]
1424    pub fn with_tokio_executor() -> Self {
1425        Self::with_executor(crate::executor::TokioExecutor)
1426    }
1427
1428    /// Builds a new [`Config`] from the given `async-std` executor.
1429    #[cfg(all(
1430        feature = "async-std",
1431        not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1432    ))]
1433    pub fn with_async_std_executor() -> Self {
1434        Self::with_executor(crate::executor::AsyncStdExecutor)
1435    }
1436
1437    /// Configures the number of events from the [`NetworkBehaviour`] in
1438    /// destination to the [`ConnectionHandler`] that can be buffered before
1439    /// the [`Swarm`] has to wait. An individual buffer with this number of
1440    /// events exists for each individual connection.
1441    ///
1442    /// The ideal value depends on the executor used, the CPU speed, and the
1443    /// volume of events. If this value is too low, then the [`Swarm`] will
1444    /// be sleeping more often than necessary. Increasing this value increases
1445    /// the overall memory usage.
1446    pub fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
1447        self.pool_config = self.pool_config.with_notify_handler_buffer_size(n);
1448        self
1449    }
1450
1451    /// Configures the size of the buffer for events sent by a [`ConnectionHandler`] to the
1452    /// [`NetworkBehaviour`].
1453    ///
1454    /// Each connection has its own buffer.
1455    ///
1456    /// The ideal value depends on the executor used, the CPU speed and the volume of events.
1457    /// If this value is too low, then the [`ConnectionHandler`]s will be sleeping more often
1458    /// than necessary. Increasing this value increases the overall memory
1459    /// usage, and more importantly the latency between the moment when an
1460    /// event is emitted and the moment when it is received by the
1461    /// [`NetworkBehaviour`].
1462    pub fn with_per_connection_event_buffer_size(mut self, n: usize) -> Self {
1463        self.pool_config = self.pool_config.with_per_connection_event_buffer_size(n);
1464        self
1465    }
1466
1467    /// Number of addresses concurrently dialed for a single outbound connection attempt.
1468    pub fn with_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
1469        self.pool_config = self.pool_config.with_dial_concurrency_factor(factor);
1470        self
1471    }
1472
1473    /// Configures an override for the substream upgrade protocol to use.
1474    ///
1475    /// The subtream upgrade protocol is the multistream-select protocol
1476    /// used for protocol negotiation on substreams. Since a listener
1477    /// supports all existing versions, the choice of upgrade protocol
1478    /// only effects the "dialer", i.e. the peer opening a substream.
1479    ///
1480    /// > **Note**: If configured, specific upgrade protocols for
1481    /// > individual [`SubstreamProtocol`]s emitted by the `NetworkBehaviour`
1482    /// > are ignored.
1483    pub fn with_substream_upgrade_protocol_override(
1484        mut self,
1485        v: libp2p_core::upgrade::Version,
1486    ) -> Self {
1487        self.pool_config = self.pool_config.with_substream_upgrade_protocol_override(v);
1488        self
1489    }
1490
1491    /// The maximum number of inbound streams concurrently negotiating on a
1492    /// connection. New inbound streams exceeding the limit are dropped and thus
1493    /// reset.
1494    ///
1495    /// Note: This only enforces a limit on the number of concurrently
1496    /// negotiating inbound streams. The total number of inbound streams on a
1497    /// connection is the sum of negotiating and negotiated streams. A limit on
1498    /// the total number of streams can be enforced at the
1499    /// [`StreamMuxerBox`] level.
1500    pub fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self {
1501        self.pool_config = self.pool_config.with_max_negotiating_inbound_streams(v);
1502        self
1503    }
1504
1505    /// How long to keep a connection alive once it is idling.
1506    ///
1507    /// Defaults to 0.
1508    pub fn with_idle_connection_timeout(mut self, timeout: Duration) -> Self {
1509        self.pool_config.idle_connection_timeout = timeout;
1510        self
1511    }
1512}
1513
1514/// Possible errors when trying to establish or upgrade an outbound connection.
1515#[derive(Debug)]
1516pub enum DialError {
1517    /// The peer identity obtained on the connection matches the local peer.
1518    LocalPeerId { endpoint: ConnectedPoint },
1519    /// No addresses have been provided by [`NetworkBehaviour::handle_pending_outbound_connection`] and [`DialOpts`].
1520    NoAddresses,
1521    /// The provided [`dial_opts::PeerCondition`] evaluated to false and thus
1522    /// the dial was aborted.
1523    DialPeerConditionFalse(dial_opts::PeerCondition),
1524    /// Pending connection attempt has been aborted.
1525    Aborted,
1526    /// The peer identity obtained on the connection did not match the one that was expected.
1527    WrongPeerId {
1528        obtained: PeerId,
1529        endpoint: ConnectedPoint,
1530    },
1531    /// One of the [`NetworkBehaviour`]s rejected the outbound connection
1532    /// via [`NetworkBehaviour::handle_pending_outbound_connection`] or
1533    /// [`NetworkBehaviour::handle_established_outbound_connection`].
1534    Denied { cause: ConnectionDenied },
1535    /// An error occurred while negotiating the transport protocol(s) on a connection.
1536    Transport(Vec<(Multiaddr, TransportError<io::Error>)>),
1537}
1538
1539impl From<PendingOutboundConnectionError> for DialError {
1540    fn from(error: PendingOutboundConnectionError) -> Self {
1541        match error {
1542            PendingConnectionError::Aborted => DialError::Aborted,
1543            PendingConnectionError::WrongPeerId { obtained, endpoint } => {
1544                DialError::WrongPeerId { obtained, endpoint }
1545            }
1546            PendingConnectionError::LocalPeerId { endpoint } => DialError::LocalPeerId { endpoint },
1547            PendingConnectionError::Transport(e) => DialError::Transport(e),
1548        }
1549    }
1550}
1551
1552impl fmt::Display for DialError {
1553    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1554        match self {
1555            DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
1556            DialError::LocalPeerId { endpoint } => write!(
1557                f,
1558                "Dial error: tried to dial local peer id at {endpoint:?}."
1559            ),
1560            DialError::DialPeerConditionFalse(PeerCondition::Disconnected) => write!(f, "Dial error: dial condition was configured to only happen when disconnected (`PeerCondition::Disconnected`), but node is already connected, thus cancelling new dial."),
1561            DialError::DialPeerConditionFalse(PeerCondition::NotDialing) => write!(f, "Dial error: dial condition was configured to only happen if there is currently no ongoing dialing attempt (`PeerCondition::NotDialing`), but a dial is in progress, thus cancelling new dial."),
1562            DialError::DialPeerConditionFalse(PeerCondition::DisconnectedAndNotDialing) => write!(f, "Dial error: dial condition was configured to only happen when both disconnected (`PeerCondition::Disconnected`) and there is currently no ongoing dialing attempt (`PeerCondition::NotDialing`), but node is already connected or dial is in progress, thus cancelling new dial."),
1563            DialError::DialPeerConditionFalse(PeerCondition::Always) => unreachable!("Dial peer condition is by definition true."),
1564            DialError::Aborted => write!(
1565                f,
1566                "Dial error: Pending connection attempt has been aborted."
1567            ),
1568            DialError::WrongPeerId { obtained, endpoint } => write!(
1569                f,
1570                "Dial error: Unexpected peer ID {obtained} at {endpoint:?}."
1571            ),
1572            DialError::Transport(errors) => {
1573                write!(f, "Failed to negotiate transport protocol(s): [")?;
1574
1575                for (addr, error) in errors {
1576                    write!(f, "({addr}")?;
1577                    print_error_chain(f, error)?;
1578                    write!(f, ")")?;
1579                }
1580                write!(f, "]")?;
1581
1582                Ok(())
1583            }
1584            DialError::Denied { .. } => {
1585                write!(f, "Dial error")
1586            }
1587        }
1588    }
1589}
1590
1591fn print_error_chain(f: &mut fmt::Formatter<'_>, e: &dyn error::Error) -> fmt::Result {
1592    write!(f, ": {e}")?;
1593
1594    if let Some(source) = e.source() {
1595        print_error_chain(f, source)?;
1596    }
1597
1598    Ok(())
1599}
1600
1601impl error::Error for DialError {
1602    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1603        match self {
1604            DialError::LocalPeerId { .. } => None,
1605            DialError::NoAddresses => None,
1606            DialError::DialPeerConditionFalse(_) => None,
1607            DialError::Aborted => None,
1608            DialError::WrongPeerId { .. } => None,
1609            DialError::Transport(_) => None,
1610            DialError::Denied { cause } => Some(cause),
1611        }
1612    }
1613}
1614
1615/// Possible errors when upgrading an inbound connection.
1616#[derive(Debug)]
1617pub enum ListenError {
1618    /// Pending connection attempt has been aborted.
1619    Aborted,
1620    /// The peer identity obtained on the connection did not match the one that was expected.
1621    WrongPeerId {
1622        obtained: PeerId,
1623        endpoint: ConnectedPoint,
1624    },
1625    /// The connection was dropped because it resolved to our own [`PeerId`].
1626    LocalPeerId {
1627        endpoint: ConnectedPoint,
1628    },
1629    Denied {
1630        cause: ConnectionDenied,
1631    },
1632    /// An error occurred while negotiating the transport protocol(s) on a connection.
1633    Transport(TransportError<io::Error>),
1634}
1635
1636impl From<PendingInboundConnectionError> for ListenError {
1637    fn from(error: PendingInboundConnectionError) -> Self {
1638        match error {
1639            PendingInboundConnectionError::Transport(inner) => ListenError::Transport(inner),
1640            PendingInboundConnectionError::Aborted => ListenError::Aborted,
1641            PendingInboundConnectionError::WrongPeerId { obtained, endpoint } => {
1642                ListenError::WrongPeerId { obtained, endpoint }
1643            }
1644            PendingInboundConnectionError::LocalPeerId { endpoint } => {
1645                ListenError::LocalPeerId { endpoint }
1646            }
1647        }
1648    }
1649}
1650
1651impl fmt::Display for ListenError {
1652    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1653        match self {
1654            ListenError::Aborted => write!(
1655                f,
1656                "Listen error: Pending connection attempt has been aborted."
1657            ),
1658            ListenError::WrongPeerId { obtained, endpoint } => write!(
1659                f,
1660                "Listen error: Unexpected peer ID {obtained} at {endpoint:?}."
1661            ),
1662            ListenError::Transport(_) => {
1663                write!(f, "Listen error: Failed to negotiate transport protocol(s)")
1664            }
1665            ListenError::Denied { cause } => {
1666                write!(f, "Listen error: Denied: {cause}")
1667            }
1668            ListenError::LocalPeerId { endpoint } => {
1669                write!(f, "Listen error: Local peer ID at {endpoint:?}.")
1670            }
1671        }
1672    }
1673}
1674
1675impl error::Error for ListenError {
1676    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1677        match self {
1678            ListenError::WrongPeerId { .. } => None,
1679            ListenError::Transport(err) => Some(err),
1680            ListenError::Aborted => None,
1681            ListenError::Denied { cause } => Some(cause),
1682            ListenError::LocalPeerId { .. } => None,
1683        }
1684    }
1685}
1686
1687/// A connection was denied.
1688///
1689/// To figure out which [`NetworkBehaviour`] denied the connection, use [`ConnectionDenied::downcast`].
1690#[derive(Debug)]
1691pub struct ConnectionDenied {
1692    inner: Box<dyn error::Error + Send + Sync + 'static>,
1693}
1694
1695impl ConnectionDenied {
1696    pub fn new(cause: impl Into<Box<dyn error::Error + Send + Sync + 'static>>) -> Self {
1697        Self {
1698            inner: cause.into(),
1699        }
1700    }
1701
1702    /// Attempt to downcast to a particular reason for why the connection was denied.
1703    pub fn downcast<E>(self) -> Result<E, Self>
1704    where
1705        E: error::Error + Send + Sync + 'static,
1706    {
1707        let inner = self
1708            .inner
1709            .downcast::<E>()
1710            .map_err(|inner| ConnectionDenied { inner })?;
1711
1712        Ok(*inner)
1713    }
1714
1715    /// Attempt to downcast to a particular reason for why the connection was denied.
1716    pub fn downcast_ref<E>(&self) -> Option<&E>
1717    where
1718        E: error::Error + Send + Sync + 'static,
1719    {
1720        self.inner.downcast_ref::<E>()
1721    }
1722}
1723
1724impl fmt::Display for ConnectionDenied {
1725    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1726        write!(f, "connection denied")
1727    }
1728}
1729
1730impl error::Error for ConnectionDenied {
1731    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1732        Some(self.inner.as_ref())
1733    }
1734}
1735
1736/// Information about the connections obtained by [`Swarm::network_info()`].
1737#[derive(Clone, Debug)]
1738pub struct NetworkInfo {
1739    /// The total number of connected peers.
1740    num_peers: usize,
1741    /// Counters of ongoing network connections.
1742    connection_counters: ConnectionCounters,
1743}
1744
1745impl NetworkInfo {
1746    /// The number of connected peers, i.e. peers with whom at least
1747    /// one established connection exists.
1748    pub fn num_peers(&self) -> usize {
1749        self.num_peers
1750    }
1751
1752    /// Gets counters for ongoing network connections.
1753    pub fn connection_counters(&self) -> &ConnectionCounters {
1754        &self.connection_counters
1755    }
1756}
1757
1758#[cfg(test)]
1759mod tests {
1760    use super::*;
1761    use crate::test::{CallTraceBehaviour, MockBehaviour};
1762    use libp2p_core::multiaddr::multiaddr;
1763    use libp2p_core::transport::memory::MemoryTransportError;
1764    use libp2p_core::transport::{PortUse, TransportEvent};
1765    use libp2p_core::Endpoint;
1766    use libp2p_core::{multiaddr, transport, upgrade};
1767    use libp2p_identity as identity;
1768    use libp2p_plaintext as plaintext;
1769    use libp2p_yamux as yamux;
1770    use quickcheck::*;
1771
1772    // Test execution state.
1773    // Connection => Disconnecting => Connecting.
1774    enum State {
1775        Connecting,
1776        Disconnecting,
1777    }
1778
1779    fn new_test_swarm(
1780        config: Config,
1781    ) -> Swarm<CallTraceBehaviour<MockBehaviour<dummy::ConnectionHandler, ()>>> {
1782        let id_keys = identity::Keypair::generate_ed25519();
1783        let local_public_key = id_keys.public();
1784        let transport = transport::MemoryTransport::default()
1785            .upgrade(upgrade::Version::V1)
1786            .authenticate(plaintext::Config::new(&id_keys))
1787            .multiplex(yamux::Config::default())
1788            .boxed();
1789        let behaviour = CallTraceBehaviour::new(MockBehaviour::new(dummy::ConnectionHandler));
1790
1791        Swarm::new(
1792            transport,
1793            behaviour,
1794            local_public_key.into(),
1795            config.with_idle_connection_timeout(Duration::from_secs(5)),
1796        )
1797    }
1798
1799    fn swarms_connected<TBehaviour>(
1800        swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
1801        swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
1802        num_connections: usize,
1803    ) -> bool
1804    where
1805        TBehaviour: NetworkBehaviour,
1806        THandlerOutEvent<TBehaviour>: Clone,
1807    {
1808        swarm1
1809            .behaviour()
1810            .num_connections_to_peer(*swarm2.local_peer_id())
1811            == num_connections
1812            && swarm2
1813                .behaviour()
1814                .num_connections_to_peer(*swarm1.local_peer_id())
1815                == num_connections
1816            && swarm1.is_connected(swarm2.local_peer_id())
1817            && swarm2.is_connected(swarm1.local_peer_id())
1818    }
1819
1820    fn swarms_disconnected<TBehaviour>(
1821        swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
1822        swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
1823    ) -> bool
1824    where
1825        TBehaviour: NetworkBehaviour,
1826        THandlerOutEvent<TBehaviour>: Clone,
1827    {
1828        swarm1
1829            .behaviour()
1830            .num_connections_to_peer(*swarm2.local_peer_id())
1831            == 0
1832            && swarm2
1833                .behaviour()
1834                .num_connections_to_peer(*swarm1.local_peer_id())
1835                == 0
1836            && !swarm1.is_connected(swarm2.local_peer_id())
1837            && !swarm2.is_connected(swarm1.local_peer_id())
1838    }
1839
1840    /// Establishes multiple connections between two peers,
1841    /// after which one peer disconnects the other using [`Swarm::disconnect_peer_id`].
1842    ///
1843    /// The test expects both behaviours to be notified via calls to [`NetworkBehaviour::on_swarm_event`]
1844    /// with pairs of [`FromSwarm::ConnectionEstablished`] / [`FromSwarm::ConnectionClosed`]
1845    #[tokio::test]
1846    async fn test_swarm_disconnect() {
1847        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1848        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1849
1850        let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1851        let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1852
1853        swarm1.listen_on(addr1.clone()).unwrap();
1854        swarm2.listen_on(addr2.clone()).unwrap();
1855
1856        let swarm1_id = *swarm1.local_peer_id();
1857
1858        let mut reconnected = false;
1859        let num_connections = 10;
1860
1861        for _ in 0..num_connections {
1862            swarm1.dial(addr2.clone()).unwrap();
1863        }
1864        let mut state = State::Connecting;
1865
1866        future::poll_fn(move |cx| loop {
1867            let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1868            let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1869            match state {
1870                State::Connecting => {
1871                    if swarms_connected(&swarm1, &swarm2, num_connections) {
1872                        if reconnected {
1873                            return Poll::Ready(());
1874                        }
1875                        swarm2
1876                            .disconnect_peer_id(swarm1_id)
1877                            .expect("Error disconnecting");
1878                        state = State::Disconnecting;
1879                    }
1880                }
1881                State::Disconnecting => {
1882                    if swarms_disconnected(&swarm1, &swarm2) {
1883                        if reconnected {
1884                            return Poll::Ready(());
1885                        }
1886                        reconnected = true;
1887                        for _ in 0..num_connections {
1888                            swarm2.dial(addr1.clone()).unwrap();
1889                        }
1890                        state = State::Connecting;
1891                    }
1892                }
1893            }
1894
1895            if poll1.is_pending() && poll2.is_pending() {
1896                return Poll::Pending;
1897            }
1898        })
1899        .await
1900    }
1901
1902    /// Establishes multiple connections between two peers,
1903    /// after which one peer disconnects the other
1904    /// using [`ToSwarm::CloseConnection`] returned by a [`NetworkBehaviour`].
1905    ///
1906    /// The test expects both behaviours to be notified via calls to [`NetworkBehaviour::on_swarm_event`]
1907    /// with pairs of [`FromSwarm::ConnectionEstablished`] / [`FromSwarm::ConnectionClosed`]
1908    #[tokio::test]
1909    async fn test_behaviour_disconnect_all() {
1910        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1911        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1912
1913        let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1914        let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1915
1916        swarm1.listen_on(addr1.clone()).unwrap();
1917        swarm2.listen_on(addr2.clone()).unwrap();
1918
1919        let swarm1_id = *swarm1.local_peer_id();
1920
1921        let mut reconnected = false;
1922        let num_connections = 10;
1923
1924        for _ in 0..num_connections {
1925            swarm1.dial(addr2.clone()).unwrap();
1926        }
1927        let mut state = State::Connecting;
1928
1929        future::poll_fn(move |cx| loop {
1930            let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1931            let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1932            match state {
1933                State::Connecting => {
1934                    if swarms_connected(&swarm1, &swarm2, num_connections) {
1935                        if reconnected {
1936                            return Poll::Ready(());
1937                        }
1938                        swarm2
1939                            .behaviour
1940                            .inner()
1941                            .next_action
1942                            .replace(ToSwarm::CloseConnection {
1943                                peer_id: swarm1_id,
1944                                connection: CloseConnection::All,
1945                            });
1946                        state = State::Disconnecting;
1947                        continue;
1948                    }
1949                }
1950                State::Disconnecting => {
1951                    if swarms_disconnected(&swarm1, &swarm2) {
1952                        reconnected = true;
1953                        for _ in 0..num_connections {
1954                            swarm2.dial(addr1.clone()).unwrap();
1955                        }
1956                        state = State::Connecting;
1957                        continue;
1958                    }
1959                }
1960            }
1961
1962            if poll1.is_pending() && poll2.is_pending() {
1963                return Poll::Pending;
1964            }
1965        })
1966        .await
1967    }
1968
1969    /// Establishes multiple connections between two peers,
1970    /// after which one peer closes a single connection
1971    /// using [`ToSwarm::CloseConnection`] returned by a [`NetworkBehaviour`].
1972    ///
1973    /// The test expects both behaviours to be notified via calls to [`NetworkBehaviour::on_swarm_event`]
1974    /// with pairs of [`FromSwarm::ConnectionEstablished`] / [`FromSwarm::ConnectionClosed`]
1975    #[tokio::test]
1976    async fn test_behaviour_disconnect_one() {
1977        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1978        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1979
1980        let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1981        let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1982
1983        swarm1.listen_on(addr1).unwrap();
1984        swarm2.listen_on(addr2.clone()).unwrap();
1985
1986        let swarm1_id = *swarm1.local_peer_id();
1987
1988        let num_connections = 10;
1989
1990        for _ in 0..num_connections {
1991            swarm1.dial(addr2.clone()).unwrap();
1992        }
1993        let mut state = State::Connecting;
1994        let mut disconnected_conn_id = None;
1995
1996        future::poll_fn(move |cx| loop {
1997            let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1998            let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1999            match state {
2000                State::Connecting => {
2001                    if swarms_connected(&swarm1, &swarm2, num_connections) {
2002                        disconnected_conn_id = {
2003                            let conn_id =
2004                                swarm2.behaviour.on_connection_established[num_connections / 2].1;
2005                            swarm2.behaviour.inner().next_action.replace(
2006                                ToSwarm::CloseConnection {
2007                                    peer_id: swarm1_id,
2008                                    connection: CloseConnection::One(conn_id),
2009                                },
2010                            );
2011                            Some(conn_id)
2012                        };
2013                        state = State::Disconnecting;
2014                    }
2015                }
2016                State::Disconnecting => {
2017                    for s in &[&swarm1, &swarm2] {
2018                        assert!(s
2019                            .behaviour
2020                            .on_connection_closed
2021                            .iter()
2022                            .all(|(.., remaining_conns)| *remaining_conns > 0));
2023                        assert_eq!(s.behaviour.on_connection_established.len(), num_connections);
2024                        s.behaviour.assert_connected(num_connections, 1);
2025                    }
2026                    if [&swarm1, &swarm2]
2027                        .iter()
2028                        .all(|s| s.behaviour.on_connection_closed.len() == 1)
2029                    {
2030                        let conn_id = swarm2.behaviour.on_connection_closed[0].1;
2031                        assert_eq!(Some(conn_id), disconnected_conn_id);
2032                        return Poll::Ready(());
2033                    }
2034                }
2035            }
2036
2037            if poll1.is_pending() && poll2.is_pending() {
2038                return Poll::Pending;
2039            }
2040        })
2041        .await
2042    }
2043
2044    #[test]
2045    fn concurrent_dialing() {
2046        #[derive(Clone, Debug)]
2047        struct DialConcurrencyFactor(NonZeroU8);
2048
2049        impl Arbitrary for DialConcurrencyFactor {
2050            fn arbitrary(g: &mut Gen) -> Self {
2051                Self(NonZeroU8::new(g.gen_range(1..11)).unwrap())
2052            }
2053        }
2054
2055        fn prop(concurrency_factor: DialConcurrencyFactor) {
2056            tokio::runtime::Runtime::new().unwrap().block_on(async {
2057                let mut swarm = new_test_swarm(
2058                    Config::with_tokio_executor()
2059                        .with_dial_concurrency_factor(concurrency_factor.0),
2060                );
2061
2062                // Listen on `concurrency_factor + 1` addresses.
2063                //
2064                // `+ 2` to ensure a subset of addresses is dialed by network_2.
2065                let num_listen_addrs = concurrency_factor.0.get() + 2;
2066                let mut listen_addresses = Vec::new();
2067                let mut transports = Vec::new();
2068                for _ in 0..num_listen_addrs {
2069                    let mut transport = transport::MemoryTransport::default().boxed();
2070                    transport
2071                        .listen_on(ListenerId::next(), "/memory/0".parse().unwrap())
2072                        .unwrap();
2073
2074                    match transport.select_next_some().await {
2075                        TransportEvent::NewAddress { listen_addr, .. } => {
2076                            listen_addresses.push(listen_addr);
2077                        }
2078                        _ => panic!("Expected `NewListenAddr` event."),
2079                    }
2080
2081                    transports.push(transport);
2082                }
2083
2084                // Have swarm dial each listener and wait for each listener to receive the incoming
2085                // connections.
2086                swarm
2087                    .dial(
2088                        DialOpts::peer_id(PeerId::random())
2089                            .addresses(listen_addresses)
2090                            .build(),
2091                    )
2092                    .unwrap();
2093                for mut transport in transports.into_iter() {
2094                    match futures::future::select(transport.select_next_some(), swarm.next()).await
2095                    {
2096                        future::Either::Left((TransportEvent::Incoming { .. }, _)) => {}
2097                        future::Either::Left(_) => {
2098                            panic!("Unexpected transport event.")
2099                        }
2100                        future::Either::Right((e, _)) => {
2101                            panic!("Expect swarm to not emit any event {e:?}")
2102                        }
2103                    }
2104                }
2105
2106                match swarm.next().await.unwrap() {
2107                    SwarmEvent::OutgoingConnectionError { .. } => {}
2108                    e => panic!("Unexpected swarm event {e:?}"),
2109                }
2110            })
2111        }
2112
2113        QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _);
2114    }
2115
2116    #[tokio::test]
2117    async fn invalid_peer_id() {
2118        // Checks whether dialing an address containing the wrong peer id raises an error
2119        // for the expected peer id instead of the obtained peer id.
2120
2121        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
2122        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
2123
2124        swarm1.listen_on("/memory/0".parse().unwrap()).unwrap();
2125
2126        let address = future::poll_fn(|cx| match swarm1.poll_next_unpin(cx) {
2127            Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => Poll::Ready(address),
2128            Poll::Pending => Poll::Pending,
2129            _ => panic!("Was expecting the listen address to be reported"),
2130        })
2131        .await;
2132
2133        let other_id = PeerId::random();
2134        let other_addr = address.with(multiaddr::Protocol::P2p(other_id));
2135
2136        swarm2.dial(other_addr.clone()).unwrap();
2137
2138        let (peer_id, error) = future::poll_fn(|cx| {
2139            if let Poll::Ready(Some(SwarmEvent::IncomingConnection { .. })) =
2140                swarm1.poll_next_unpin(cx)
2141            {}
2142
2143            match swarm2.poll_next_unpin(cx) {
2144                Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
2145                    peer_id, error, ..
2146                })) => Poll::Ready((peer_id, error)),
2147                Poll::Ready(x) => panic!("unexpected {x:?}"),
2148                Poll::Pending => Poll::Pending,
2149            }
2150        })
2151        .await;
2152        assert_eq!(peer_id.unwrap(), other_id);
2153        match error {
2154            DialError::WrongPeerId { obtained, endpoint } => {
2155                assert_eq!(obtained, *swarm1.local_peer_id());
2156                assert_eq!(
2157                    endpoint,
2158                    ConnectedPoint::Dialer {
2159                        address: other_addr,
2160                        role_override: Endpoint::Dialer,
2161                        port_use: PortUse::Reuse,
2162                    }
2163                );
2164            }
2165            x => panic!("wrong error {x:?}"),
2166        }
2167    }
2168
2169    #[tokio::test]
2170    async fn dial_self() {
2171        // Check whether dialing ourselves correctly fails.
2172        //
2173        // Dialing the same address we're listening should result in three events:
2174        //
2175        // - The incoming connection notification (before we know the incoming peer ID).
2176        // - The connection error for the dialing endpoint (once we've determined that it's our own ID).
2177        // - The connection error for the listening endpoint (once we've determined that it's our own ID).
2178        //
2179        // The last two can happen in any order.
2180
2181        let mut swarm = new_test_swarm(Config::with_tokio_executor());
2182        swarm.listen_on("/memory/0".parse().unwrap()).unwrap();
2183
2184        let local_address = future::poll_fn(|cx| match swarm.poll_next_unpin(cx) {
2185            Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => Poll::Ready(address),
2186            Poll::Pending => Poll::Pending,
2187            _ => panic!("Was expecting the listen address to be reported"),
2188        })
2189        .await;
2190
2191        swarm.listened_addrs.clear(); // This is a hack to actually execute the dial to ourselves which would otherwise be filtered.
2192
2193        swarm.dial(local_address.clone()).unwrap();
2194
2195        let mut got_dial_err = false;
2196        let mut got_inc_err = false;
2197        future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
2198            loop {
2199                match swarm.poll_next_unpin(cx) {
2200                    Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
2201                        peer_id,
2202                        error: DialError::LocalPeerId { .. },
2203                        ..
2204                    })) => {
2205                        assert_eq!(&peer_id.unwrap(), swarm.local_peer_id());
2206                        assert!(!got_dial_err);
2207                        got_dial_err = true;
2208                        if got_inc_err {
2209                            return Poll::Ready(Ok(()));
2210                        }
2211                    }
2212                    Poll::Ready(Some(SwarmEvent::IncomingConnectionError {
2213                        local_addr, ..
2214                    })) => {
2215                        assert!(!got_inc_err);
2216                        assert_eq!(local_addr, local_address);
2217                        got_inc_err = true;
2218                        if got_dial_err {
2219                            return Poll::Ready(Ok(()));
2220                        }
2221                    }
2222                    Poll::Ready(Some(SwarmEvent::IncomingConnection { local_addr, .. })) => {
2223                        assert_eq!(local_addr, local_address);
2224                    }
2225                    Poll::Ready(ev) => {
2226                        panic!("Unexpected event: {ev:?}")
2227                    }
2228                    Poll::Pending => break Poll::Pending,
2229                }
2230            }
2231        })
2232        .await
2233        .unwrap();
2234    }
2235
2236    #[tokio::test]
2237    async fn dial_self_by_id() {
2238        // Trying to dial self by passing the same `PeerId` shouldn't even be possible in the first
2239        // place.
2240        let swarm = new_test_swarm(Config::with_tokio_executor());
2241        let peer_id = *swarm.local_peer_id();
2242        assert!(!swarm.is_connected(&peer_id));
2243    }
2244
2245    #[tokio::test]
2246    async fn multiple_addresses_err() {
2247        // Tries dialing multiple addresses, and makes sure there's one dialing error per address.
2248
2249        let target = PeerId::random();
2250
2251        let mut swarm = new_test_swarm(Config::with_tokio_executor());
2252
2253        let addresses = HashSet::from([
2254            multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2255            multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2256            multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2257            multiaddr![Udp(rand::random::<u16>())],
2258            multiaddr![Udp(rand::random::<u16>())],
2259            multiaddr![Udp(rand::random::<u16>())],
2260            multiaddr![Udp(rand::random::<u16>())],
2261            multiaddr![Udp(rand::random::<u16>())],
2262        ]);
2263
2264        swarm
2265            .dial(
2266                DialOpts::peer_id(target)
2267                    .addresses(addresses.iter().cloned().collect())
2268                    .build(),
2269            )
2270            .unwrap();
2271
2272        match swarm.next().await.unwrap() {
2273            SwarmEvent::OutgoingConnectionError {
2274                peer_id,
2275                // multiaddr,
2276                error: DialError::Transport(errors),
2277                ..
2278            } => {
2279                assert_eq!(target, peer_id.unwrap());
2280
2281                let failed_addresses = errors.into_iter().map(|(addr, _)| addr).collect::<Vec<_>>();
2282                let expected_addresses = addresses
2283                    .into_iter()
2284                    .map(|addr| addr.with(multiaddr::Protocol::P2p(target)))
2285                    .collect::<Vec<_>>();
2286
2287                assert_eq!(expected_addresses, failed_addresses);
2288            }
2289            e => panic!("Unexpected event: {e:?}"),
2290        }
2291    }
2292
2293    #[tokio::test]
2294    async fn aborting_pending_connection_surfaces_error() {
2295        let _ = tracing_subscriber::fmt()
2296            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2297            .try_init();
2298
2299        let mut dialer = new_test_swarm(Config::with_tokio_executor());
2300        let mut listener = new_test_swarm(Config::with_tokio_executor());
2301
2302        let listener_peer_id = *listener.local_peer_id();
2303        listener.listen_on(multiaddr![Memory(0u64)]).unwrap();
2304        let listener_address = match listener.next().await.unwrap() {
2305            SwarmEvent::NewListenAddr { address, .. } => address,
2306            e => panic!("Unexpected network event: {e:?}"),
2307        };
2308
2309        dialer
2310            .dial(
2311                DialOpts::peer_id(listener_peer_id)
2312                    .addresses(vec![listener_address])
2313                    .build(),
2314            )
2315            .unwrap();
2316
2317        dialer
2318            .disconnect_peer_id(listener_peer_id)
2319            .expect_err("Expect peer to not yet be connected.");
2320
2321        match dialer.next().await.unwrap() {
2322            SwarmEvent::OutgoingConnectionError {
2323                error: DialError::Aborted,
2324                ..
2325            } => {}
2326            e => panic!("Unexpected swarm event {e:?}."),
2327        }
2328    }
2329
2330    #[test]
2331    fn dial_error_prints_sources() {
2332        // This constitutes a fairly typical error for chained transports.
2333        let error = DialError::Transport(vec![(
2334            "/ip4/127.0.0.1/tcp/80".parse().unwrap(),
2335            TransportError::Other(io::Error::new(
2336                io::ErrorKind::Other,
2337                MemoryTransportError::Unreachable,
2338            )),
2339        )]);
2340
2341        let string = format!("{error}");
2342
2343        // Unfortunately, we have some "empty" errors that lead to multiple colons without text but that is the best we can do.
2344        assert_eq!("Failed to negotiate transport protocol(s): [(/ip4/127.0.0.1/tcp/80: : No listener on the given port.)]", string)
2345    }
2346}