libp2p_swarm/
lib.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! High-level network manager.
22//!
23//! A [`Swarm`] contains the state of the network as a whole. The entire
24//! behaviour of a libp2p network can be controlled through the `Swarm`.
25//! The `Swarm` struct contains all active and pending connections to
26//! remotes and manages the state of all the substreams that have been
27//! opened, and all the upgrades that were built upon these substreams.
28//!
29//! # Initializing a Swarm
30//!
31//! Creating a `Swarm` requires three things:
32//!
33//!  1. A network identity of the local node in form of a [`PeerId`].
34//!  2. An implementation of the [`Transport`] trait. This is the type that
35//!     will be used in order to reach nodes on the network based on their
36//!     address. See the `transport` module for more information.
37//!  3. An implementation of the [`NetworkBehaviour`] trait. This is a state
38//!     machine that defines how the swarm should behave once it is connected
39//!     to a node.
40//!
41//! # Network Behaviour
42//!
43//! The [`NetworkBehaviour`] trait is implemented on types that indicate to
44//! the swarm how it should behave. This includes which protocols are supported
45//! and which nodes to try to connect to. It is the `NetworkBehaviour` that
46//! controls what happens on the network. Multiple types that implement
47//! `NetworkBehaviour` can be composed into a single behaviour.
48//!
49//! # Protocols Handler
50//!
51//! The [`ConnectionHandler`] trait defines how each active connection to a
52//! remote should behave: how to handle incoming substreams, which protocols
53//! are supported, when to open a new outbound substream, etc.
54//!
55
56#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
57
58mod connection;
59mod executor;
60mod stream;
61mod stream_protocol;
62#[cfg(test)]
63mod test;
64mod upgrade;
65
66pub mod behaviour;
67pub mod dial_opts;
68pub mod dummy;
69pub mod handler;
70#[deprecated(
71    note = "Configure an appropriate idle connection timeout via `SwarmBuilder::idle_connection_timeout` instead. To keep connections alive 'forever', use `Duration::from_secs(u64::MAX)`."
72)]
73pub mod keep_alive;
74mod listen_opts;
75
76/// Bundles all symbols required for the [`libp2p_swarm_derive::NetworkBehaviour`] macro.
77#[doc(hidden)]
78pub mod derive_prelude {
79    pub use crate::behaviour::AddressChange;
80    pub use crate::behaviour::ConnectionClosed;
81    pub use crate::behaviour::ConnectionEstablished;
82    pub use crate::behaviour::DialFailure;
83    pub use crate::behaviour::ExpiredListenAddr;
84    pub use crate::behaviour::ExternalAddrConfirmed;
85    pub use crate::behaviour::ExternalAddrExpired;
86    pub use crate::behaviour::FromSwarm;
87    pub use crate::behaviour::ListenFailure;
88    pub use crate::behaviour::ListenerClosed;
89    pub use crate::behaviour::ListenerError;
90    pub use crate::behaviour::NewExternalAddrCandidate;
91    pub use crate::behaviour::NewListenAddr;
92    pub use crate::behaviour::NewListener;
93    pub use crate::connection::ConnectionId;
94    pub use crate::ConnectionDenied;
95    pub use crate::ConnectionHandler;
96    pub use crate::ConnectionHandlerSelect;
97    pub use crate::DialError;
98    pub use crate::NetworkBehaviour;
99    pub use crate::PollParameters;
100    pub use crate::THandler;
101    pub use crate::THandlerInEvent;
102    pub use crate::THandlerOutEvent;
103    pub use crate::ToSwarm;
104    pub use either::Either;
105    pub use futures::prelude as futures;
106    pub use libp2p_core::transport::ListenerId;
107    pub use libp2p_core::ConnectedPoint;
108    pub use libp2p_core::Endpoint;
109    pub use libp2p_core::Multiaddr;
110    pub use libp2p_identity::PeerId;
111}
112
113pub use behaviour::{
114    AddressChange, CloseConnection, ConnectionClosed, DialFailure, ExpiredListenAddr,
115    ExternalAddrExpired, ExternalAddresses, FromSwarm, ListenAddresses, ListenFailure,
116    ListenerClosed, ListenerError, NetworkBehaviour, NewExternalAddrCandidate, NewListenAddr,
117    NotifyHandler, PollParameters, ToSwarm,
118};
119pub use connection::pool::ConnectionCounters;
120pub use connection::{ConnectionError, ConnectionId, SupportedProtocols};
121pub use executor::Executor;
122pub use handler::{
123    ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, KeepAlive, OneShotHandler,
124    OneShotHandlerConfig, StreamUpgradeError, SubstreamProtocol,
125};
126#[cfg(feature = "macros")]
127pub use libp2p_swarm_derive::NetworkBehaviour;
128pub use listen_opts::ListenOpts;
129pub use stream::Stream;
130pub use stream_protocol::{InvalidProtocol, StreamProtocol};
131
132use crate::behaviour::ExternalAddrConfirmed;
133use crate::handler::UpgradeInfoSend;
134use connection::pool::{EstablishedConnection, Pool, PoolConfig, PoolEvent};
135use connection::IncomingInfo;
136use connection::{
137    PendingConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError,
138};
139use dial_opts::{DialOpts, PeerCondition};
140use futures::{prelude::*, stream::FusedStream};
141use libp2p_core::{
142    connection::ConnectedPoint,
143    multiaddr,
144    muxing::StreamMuxerBox,
145    transport::{self, ListenerId, TransportError, TransportEvent},
146    Endpoint, Multiaddr, Transport,
147};
148use libp2p_identity::PeerId;
149use smallvec::SmallVec;
150use std::collections::{HashMap, HashSet};
151use std::num::{NonZeroU32, NonZeroU8, NonZeroUsize};
152use std::time::Duration;
153use std::{
154    convert::TryFrom,
155    error, fmt, io,
156    pin::Pin,
157    task::{Context, Poll},
158};
159
160/// Substream for which a protocol has been chosen.
161///
162/// Implements the [`AsyncRead`] and [`AsyncWrite`] traits.
163#[deprecated(note = "The 'substream' terminology is deprecated. Use 'Stream' instead")]
164pub type NegotiatedSubstream = Stream;
165
166/// Event generated by the [`NetworkBehaviour`] that the swarm will report back.
167type TBehaviourOutEvent<TBehaviour> = <TBehaviour as NetworkBehaviour>::ToSwarm;
168
169/// [`ConnectionHandler`] of the [`NetworkBehaviour`] for all the protocols the [`NetworkBehaviour`]
170/// supports.
171pub type THandler<TBehaviour> = <TBehaviour as NetworkBehaviour>::ConnectionHandler;
172
173/// Custom event that can be received by the [`ConnectionHandler`] of the
174/// [`NetworkBehaviour`].
175pub type THandlerInEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::FromBehaviour;
176
177/// Custom event that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`].
178pub type THandlerOutEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::ToBehaviour;
179
180/// Custom error that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`].
181#[deprecated(
182    note = "Will be removed together with `ConnectionHandlerEvent::Close`. See <https://github.com/libp2p/rust-libp2p/issues/3591> for details."
183)]
184#[allow(deprecated)]
185pub type THandlerErr<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::Error;
186
187/// Event generated by the `Swarm`.
188#[derive(Debug)]
189pub enum SwarmEvent<TBehaviourOutEvent, THandlerErr> {
190    /// Event generated by the `NetworkBehaviour`.
191    Behaviour(TBehaviourOutEvent),
192    /// A connection to the given peer has been opened.
193    ConnectionEstablished {
194        /// Identity of the peer that we have connected to.
195        peer_id: PeerId,
196        /// Identifier of the connection.
197        connection_id: ConnectionId,
198        /// Endpoint of the connection that has been opened.
199        endpoint: ConnectedPoint,
200        /// Number of established connections to this peer, including the one that has just been
201        /// opened.
202        num_established: NonZeroU32,
203        /// [`Some`] when the new connection is an outgoing connection.
204        /// Addresses are dialed concurrently. Contains the addresses and errors
205        /// of dial attempts that failed before the one successful dial.
206        concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<io::Error>)>>,
207        /// How long it took to establish this connection
208        established_in: std::time::Duration,
209    },
210    /// A connection with the given peer has been closed,
211    /// possibly as a result of an error.
212    ConnectionClosed {
213        /// Identity of the peer that we have connected to.
214        peer_id: PeerId,
215        /// Identifier of the connection.
216        connection_id: ConnectionId,
217        /// Endpoint of the connection that has been closed.
218        endpoint: ConnectedPoint,
219        /// Number of other remaining connections to this same peer.
220        num_established: u32,
221        /// Reason for the disconnection, if it was not a successful
222        /// active close.
223        cause: Option<ConnectionError<THandlerErr>>,
224    },
225    /// A new connection arrived on a listener and is in the process of protocol negotiation.
226    ///
227    /// A corresponding [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished) or
228    /// [`IncomingConnectionError`](SwarmEvent::IncomingConnectionError) event will later be
229    /// generated for this connection.
230    IncomingConnection {
231        /// Identifier of the connection.
232        connection_id: ConnectionId,
233        /// Local connection address.
234        /// This address has been earlier reported with a [`NewListenAddr`](SwarmEvent::NewListenAddr)
235        /// event.
236        local_addr: Multiaddr,
237        /// Address used to send back data to the remote.
238        send_back_addr: Multiaddr,
239    },
240    /// An error happened on an inbound connection during its initial handshake.
241    ///
242    /// This can include, for example, an error during the handshake of the encryption layer, or
243    /// the connection unexpectedly closed.
244    IncomingConnectionError {
245        /// Identifier of the connection.
246        connection_id: ConnectionId,
247        /// Local connection address.
248        /// This address has been earlier reported with a [`NewListenAddr`](SwarmEvent::NewListenAddr)
249        /// event.
250        local_addr: Multiaddr,
251        /// Address used to send back data to the remote.
252        send_back_addr: Multiaddr,
253        /// The error that happened.
254        error: ListenError,
255    },
256    /// An error happened on an outbound connection.
257    OutgoingConnectionError {
258        /// Identifier of the connection.
259        connection_id: ConnectionId,
260        /// If known, [`PeerId`] of the peer we tried to reach.
261        peer_id: Option<PeerId>,
262        /// Error that has been encountered.
263        error: DialError,
264    },
265    /// One of our listeners has reported a new local listening address.
266    NewListenAddr {
267        /// The listener that is listening on the new address.
268        listener_id: ListenerId,
269        /// The new address that is being listened on.
270        address: Multiaddr,
271    },
272    /// One of our listeners has reported the expiration of a listening address.
273    ExpiredListenAddr {
274        /// The listener that is no longer listening on the address.
275        listener_id: ListenerId,
276        /// The expired address.
277        address: Multiaddr,
278    },
279    /// One of the listeners gracefully closed.
280    ListenerClosed {
281        /// The listener that closed.
282        listener_id: ListenerId,
283        /// The addresses that the listener was listening on. These addresses are now considered
284        /// expired, similar to if a [`ExpiredListenAddr`](SwarmEvent::ExpiredListenAddr) event
285        /// has been generated for each of them.
286        addresses: Vec<Multiaddr>,
287        /// Reason for the closure. Contains `Ok(())` if the stream produced `None`, or `Err`
288        /// if the stream produced an error.
289        reason: Result<(), io::Error>,
290    },
291    /// One of the listeners reported a non-fatal error.
292    ListenerError {
293        /// The listener that errored.
294        listener_id: ListenerId,
295        /// The listener error.
296        error: io::Error,
297    },
298    /// A new dialing attempt has been initiated by the [`NetworkBehaviour`]
299    /// implementation.
300    ///
301    /// A [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished) event is
302    /// reported if the dialing attempt succeeds, otherwise a
303    /// [`OutgoingConnectionError`](SwarmEvent::OutgoingConnectionError) event
304    /// is reported.
305    Dialing {
306        /// Identity of the peer that we are connecting to.
307        peer_id: Option<PeerId>,
308
309        /// Identifier of the connection.
310        connection_id: ConnectionId,
311    },
312}
313
314impl<TBehaviourOutEvent, THandlerErr> SwarmEvent<TBehaviourOutEvent, THandlerErr> {
315    /// Extract the `TBehaviourOutEvent` from this [`SwarmEvent`] in case it is the `Behaviour` variant, otherwise fail.
316    #[allow(clippy::result_large_err)]
317    pub fn try_into_behaviour_event(self) -> Result<TBehaviourOutEvent, Self> {
318        match self {
319            SwarmEvent::Behaviour(inner) => Ok(inner),
320            other => Err(other),
321        }
322    }
323}
324
325/// Contains the state of the network, plus the way it should behave.
326///
327/// Note: Needs to be polled via `<Swarm as Stream>` in order to make
328/// progress.
329pub struct Swarm<TBehaviour>
330where
331    TBehaviour: NetworkBehaviour,
332{
333    /// [`Transport`] for dialing remote peers and listening for incoming connection.
334    transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
335
336    /// The nodes currently active.
337    pool: Pool<THandler<TBehaviour>>,
338
339    /// The local peer ID.
340    local_peer_id: PeerId,
341
342    /// Handles which nodes to connect to and how to handle the events sent back by the protocol
343    /// handlers.
344    behaviour: TBehaviour,
345
346    /// List of protocols that the behaviour says it supports.
347    supported_protocols: SmallVec<[Vec<u8>; 16]>,
348
349    confirmed_external_addr: HashSet<Multiaddr>,
350
351    /// Multiaddresses that our listeners are listening on,
352    listened_addrs: HashMap<ListenerId, SmallVec<[Multiaddr; 1]>>,
353
354    /// Pending event to be delivered to connection handlers
355    /// (or dropped if the peer disconnected) before the `behaviour`
356    /// can be polled again.
357    pending_event: Option<(PeerId, PendingNotifyHandler, THandlerInEvent<TBehaviour>)>,
358}
359
360impl<TBehaviour> Unpin for Swarm<TBehaviour> where TBehaviour: NetworkBehaviour {}
361
362impl<TBehaviour> Swarm<TBehaviour>
363where
364    TBehaviour: NetworkBehaviour,
365{
366    /// Creates a new [`Swarm`] from the given [`Transport`], [`NetworkBehaviour`], [`PeerId`] and
367    /// [`Config`].
368    pub fn new(
369        transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
370        behaviour: TBehaviour,
371        local_peer_id: PeerId,
372        config: Config,
373    ) -> Self {
374        Swarm {
375            local_peer_id,
376            transport,
377            pool: Pool::new(local_peer_id, config.pool_config),
378            behaviour,
379            supported_protocols: Default::default(),
380            confirmed_external_addr: Default::default(),
381            listened_addrs: HashMap::new(),
382            pending_event: None,
383        }
384    }
385
386    /// Returns information about the connections underlying the [`Swarm`].
387    pub fn network_info(&self) -> NetworkInfo {
388        let num_peers = self.pool.num_peers();
389        let connection_counters = self.pool.counters().clone();
390        NetworkInfo {
391            num_peers,
392            connection_counters,
393        }
394    }
395
396    /// Starts listening on the given address.
397    /// Returns an error if the address is not supported.
398    ///
399    /// Listeners report their new listening addresses as [`SwarmEvent::NewListenAddr`].
400    /// Depending on the underlying transport, one listener may have multiple listening addresses.
401    pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
402        let opts = ListenOpts::new(addr);
403        let id = opts.listener_id();
404        self.add_listener(opts)?;
405        Ok(id)
406    }
407
408    /// Remove some listener.
409    ///
410    /// Returns `true` if there was a listener with this ID, `false`
411    /// otherwise.
412    pub fn remove_listener(&mut self, listener_id: ListenerId) -> bool {
413        self.transport.remove_listener(listener_id)
414    }
415
416    /// Dial a known or unknown peer.
417    ///
418    /// See also [`DialOpts`].
419    ///
420    /// ```
421    /// # use libp2p_swarm::SwarmBuilder;
422    /// # use libp2p_swarm::dial_opts::{DialOpts, PeerCondition};
423    /// # use libp2p_core::{Multiaddr, Transport};
424    /// # use libp2p_core::transport::dummy::DummyTransport;
425    /// # use libp2p_swarm::dummy;
426    /// # use libp2p_identity::PeerId;
427    /// #
428    /// # #[tokio::main]
429    /// # async fn main() {
430    /// let mut swarm = SwarmBuilder::with_tokio_executor(
431    ///     DummyTransport::new().boxed(),
432    ///     dummy::Behaviour,
433    ///     PeerId::random(),
434    /// ).build();
435    ///
436    /// // Dial a known peer.
437    /// swarm.dial(PeerId::random());
438    ///
439    /// // Dial an unknown peer.
440    /// swarm.dial("/ip6/::1/tcp/12345".parse::<Multiaddr>().unwrap());
441    /// # }
442    /// ```
443    pub fn dial(&mut self, opts: impl Into<DialOpts>) -> Result<(), DialError> {
444        let dial_opts = opts.into();
445
446        let peer_id = dial_opts.get_peer_id();
447        let condition = dial_opts.peer_condition();
448        let connection_id = dial_opts.connection_id();
449
450        let should_dial = match (condition, peer_id) {
451            (PeerCondition::Always, _) => true,
452            (PeerCondition::Disconnected, None) => true,
453            (PeerCondition::NotDialing, None) => true,
454            (PeerCondition::Disconnected, Some(peer_id)) => !self.pool.is_connected(peer_id),
455            (PeerCondition::NotDialing, Some(peer_id)) => !self.pool.is_dialing(peer_id),
456        };
457
458        if !should_dial {
459            let e = DialError::DialPeerConditionFalse(condition);
460
461            self.behaviour
462                .on_swarm_event(FromSwarm::DialFailure(DialFailure {
463                    peer_id,
464                    error: &e,
465                    connection_id,
466                }));
467
468            return Err(e);
469        }
470
471        let addresses = {
472            let mut addresses_from_opts = dial_opts.get_addresses();
473
474            match self.behaviour.handle_pending_outbound_connection(
475                connection_id,
476                peer_id,
477                addresses_from_opts.as_slice(),
478                dial_opts.role_override(),
479            ) {
480                Ok(addresses) => {
481                    if dial_opts.extend_addresses_through_behaviour() {
482                        addresses_from_opts.extend(addresses)
483                    } else {
484                        let num_addresses = addresses.len();
485
486                        if num_addresses > 0 {
487                            log::debug!("discarding {num_addresses} addresses from `NetworkBehaviour` because `DialOpts::extend_addresses_through_behaviour is `false` for connection {connection_id:?}")
488                        }
489                    }
490                }
491                Err(cause) => {
492                    let error = DialError::Denied { cause };
493
494                    self.behaviour
495                        .on_swarm_event(FromSwarm::DialFailure(DialFailure {
496                            peer_id,
497                            error: &error,
498                            connection_id,
499                        }));
500
501                    return Err(error);
502                }
503            }
504
505            let mut unique_addresses = HashSet::new();
506            addresses_from_opts.retain(|addr| {
507                !self.listened_addrs.values().flatten().any(|a| a == addr)
508                    && unique_addresses.insert(addr.clone())
509            });
510
511            if addresses_from_opts.is_empty() {
512                let error = DialError::NoAddresses;
513                self.behaviour
514                    .on_swarm_event(FromSwarm::DialFailure(DialFailure {
515                        peer_id,
516                        error: &error,
517                        connection_id,
518                    }));
519                return Err(error);
520            };
521
522            addresses_from_opts
523        };
524
525        let dials = addresses
526            .into_iter()
527            .map(|a| match p2p_addr(peer_id, a) {
528                Ok(address) => {
529                    let dial = match dial_opts.role_override() {
530                        Endpoint::Dialer => self.transport.dial(address.clone()),
531                        Endpoint::Listener => self.transport.dial_as_listener(address.clone()),
532                    };
533                    match dial {
534                        Ok(fut) => fut
535                            .map(|r| (address, r.map_err(TransportError::Other)))
536                            .boxed(),
537                        Err(err) => futures::future::ready((address, Err(err))).boxed(),
538                    }
539                }
540                Err(address) => futures::future::ready((
541                    address.clone(),
542                    Err(TransportError::MultiaddrNotSupported(address)),
543                ))
544                .boxed(),
545            })
546            .collect();
547
548        self.pool.add_outgoing(
549            dials,
550            peer_id,
551            dial_opts.role_override(),
552            dial_opts.dial_concurrency_override(),
553            connection_id,
554        );
555
556        Ok(())
557    }
558
559    /// Returns an iterator that produces the list of addresses we're listening on.
560    pub fn listeners(&self) -> impl Iterator<Item = &Multiaddr> {
561        self.listened_addrs.values().flatten()
562    }
563
564    /// Returns the peer ID of the swarm passed as parameter.
565    pub fn local_peer_id(&self) -> &PeerId {
566        &self.local_peer_id
567    }
568
569    /// List all **confirmed** external address for the local node.
570    pub fn external_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
571        self.confirmed_external_addr.iter()
572    }
573
574    fn add_listener(&mut self, opts: ListenOpts) -> Result<(), TransportError<io::Error>> {
575        let addr = opts.address();
576        let listener_id = opts.listener_id();
577
578        if let Err(e) = self.transport.listen_on(listener_id, addr.clone()) {
579            self.behaviour
580                .on_swarm_event(FromSwarm::ListenerError(behaviour::ListenerError {
581                    listener_id,
582                    err: &e,
583                }));
584
585            return Err(e);
586        }
587
588        self.behaviour
589            .on_swarm_event(FromSwarm::NewListener(behaviour::NewListener {
590                listener_id,
591            }));
592
593        Ok(())
594    }
595
596    /// Add a **confirmed** external address for the local node.
597    ///
598    /// This function should only be called with addresses that are guaranteed to be reachable.
599    /// The address is broadcast to all [`NetworkBehaviour`]s via [`FromSwarm::ExternalAddrConfirmed`].
600    pub fn add_external_address(&mut self, a: Multiaddr) {
601        self.behaviour
602            .on_swarm_event(FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed {
603                addr: &a,
604            }));
605        self.confirmed_external_addr.insert(a);
606    }
607
608    /// Remove an external address for the local node.
609    ///
610    /// The address is broadcast to all [`NetworkBehaviour`]s via [`FromSwarm::ExternalAddrExpired`].
611    pub fn remove_external_address(&mut self, addr: &Multiaddr) {
612        self.behaviour
613            .on_swarm_event(FromSwarm::ExternalAddrExpired(ExternalAddrExpired { addr }));
614        self.confirmed_external_addr.remove(addr);
615    }
616
617    /// Disconnects a peer by its peer ID, closing all connections to said peer.
618    ///
619    /// Returns `Ok(())` if there was one or more established connections to the peer.
620    ///
621    /// Note: Closing a connection via [`Swarm::disconnect_peer_id`] does
622    /// not inform the corresponding [`ConnectionHandler`].
623    /// Closing a connection via a [`ConnectionHandler`] can be done either in a
624    /// collaborative manner across [`ConnectionHandler`]s
625    /// with [`ConnectionHandler::connection_keep_alive`] or directly with
626    /// [`ConnectionHandlerEvent::Close`].
627    #[allow(clippy::result_unit_err)]
628    pub fn disconnect_peer_id(&mut self, peer_id: PeerId) -> Result<(), ()> {
629        let was_connected = self.pool.is_connected(peer_id);
630        self.pool.disconnect(peer_id);
631
632        if was_connected {
633            Ok(())
634        } else {
635            Err(())
636        }
637    }
638
639    /// Attempt to gracefully close a connection.
640    ///
641    /// Closing a connection is asynchronous but this function will return immediately.
642    /// A [`SwarmEvent::ConnectionClosed`] event will be emitted once the connection is actually closed.
643    ///
644    /// # Returns
645    ///
646    /// - `true` if the connection was established and is now being closed.
647    /// - `false` if the connection was not found or is no longer established.
648    pub fn close_connection(&mut self, connection_id: ConnectionId) -> bool {
649        if let Some(established) = self.pool.get_established(connection_id) {
650            established.start_close();
651            return true;
652        }
653
654        false
655    }
656
657    /// Checks whether there is an established connection to a peer.
658    pub fn is_connected(&self, peer_id: &PeerId) -> bool {
659        self.pool.is_connected(*peer_id)
660    }
661
662    /// Returns the currently connected peers.
663    pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
664        self.pool.iter_connected()
665    }
666
667    /// Returns a reference to the provided [`NetworkBehaviour`].
668    pub fn behaviour(&self) -> &TBehaviour {
669        &self.behaviour
670    }
671
672    /// Returns a mutable reference to the provided [`NetworkBehaviour`].
673    pub fn behaviour_mut(&mut self) -> &mut TBehaviour {
674        &mut self.behaviour
675    }
676
677    #[allow(deprecated)]
678    fn handle_pool_event(
679        &mut self,
680        event: PoolEvent<THandler<TBehaviour>>,
681    ) -> Option<SwarmEvent<TBehaviour::ToSwarm, THandlerErr<TBehaviour>>> {
682        match event {
683            PoolEvent::ConnectionEstablished {
684                peer_id,
685                id,
686                endpoint,
687                connection,
688                concurrent_dial_errors,
689                established_in,
690            } => {
691                let handler = match endpoint.clone() {
692                    ConnectedPoint::Dialer {
693                        address,
694                        role_override,
695                    } => {
696                        match self.behaviour.handle_established_outbound_connection(
697                            id,
698                            peer_id,
699                            &address,
700                            role_override,
701                        ) {
702                            Ok(handler) => handler,
703                            Err(cause) => {
704                                let dial_error = DialError::Denied { cause };
705                                self.behaviour.on_swarm_event(FromSwarm::DialFailure(
706                                    DialFailure {
707                                        connection_id: id,
708                                        error: &dial_error,
709                                        peer_id: Some(peer_id),
710                                    },
711                                ));
712
713                                return Some(SwarmEvent::OutgoingConnectionError {
714                                    peer_id: Some(peer_id),
715                                    connection_id: id,
716                                    error: dial_error,
717                                });
718                            }
719                        }
720                    }
721                    ConnectedPoint::Listener {
722                        local_addr,
723                        send_back_addr,
724                    } => {
725                        match self.behaviour.handle_established_inbound_connection(
726                            id,
727                            peer_id,
728                            &local_addr,
729                            &send_back_addr,
730                        ) {
731                            Ok(handler) => handler,
732                            Err(cause) => {
733                                let listen_error = ListenError::Denied { cause };
734                                self.behaviour.on_swarm_event(FromSwarm::ListenFailure(
735                                    ListenFailure {
736                                        local_addr: &local_addr,
737                                        send_back_addr: &send_back_addr,
738                                        error: &listen_error,
739                                        connection_id: id,
740                                    },
741                                ));
742
743                                return Some(SwarmEvent::IncomingConnectionError {
744                                    connection_id: id,
745                                    send_back_addr,
746                                    local_addr,
747                                    error: listen_error,
748                                });
749                            }
750                        }
751                    }
752                };
753
754                let supported_protocols = handler
755                    .listen_protocol()
756                    .upgrade()
757                    .protocol_info()
758                    .map(|p| p.as_ref().as_bytes().to_vec())
759                    .collect();
760                let other_established_connection_ids = self
761                    .pool
762                    .iter_established_connections_of_peer(&peer_id)
763                    .collect::<Vec<_>>();
764                let num_established = NonZeroU32::new(
765                    u32::try_from(other_established_connection_ids.len() + 1).unwrap(),
766                )
767                .expect("n + 1 is always non-zero; qed");
768
769                self.pool
770                    .spawn_connection(id, peer_id, &endpoint, connection, handler);
771
772                log::debug!(
773                    "Connection established: {:?} {:?}; Total (peer): {}.",
774                    peer_id,
775                    endpoint,
776                    num_established,
777                );
778                let failed_addresses = concurrent_dial_errors
779                    .as_ref()
780                    .map(|es| {
781                        es.iter()
782                            .map(|(a, _)| a)
783                            .cloned()
784                            .collect::<Vec<Multiaddr>>()
785                    })
786                    .unwrap_or_default();
787                self.behaviour
788                    .on_swarm_event(FromSwarm::ConnectionEstablished(
789                        behaviour::ConnectionEstablished {
790                            peer_id,
791                            connection_id: id,
792                            endpoint: &endpoint,
793                            failed_addresses: &failed_addresses,
794                            other_established: other_established_connection_ids.len(),
795                        },
796                    ));
797                self.supported_protocols = supported_protocols;
798                return Some(SwarmEvent::ConnectionEstablished {
799                    peer_id,
800                    connection_id: id,
801                    num_established,
802                    endpoint,
803                    concurrent_dial_errors,
804                    established_in,
805                });
806            }
807            PoolEvent::PendingOutboundConnectionError {
808                id: connection_id,
809                error,
810                peer,
811            } => {
812                let error = error.into();
813
814                self.behaviour
815                    .on_swarm_event(FromSwarm::DialFailure(DialFailure {
816                        peer_id: peer,
817                        error: &error,
818                        connection_id,
819                    }));
820
821                if let Some(peer) = peer {
822                    log::debug!("Connection attempt to {:?} failed with {:?}.", peer, error,);
823                } else {
824                    log::debug!("Connection attempt to unknown peer failed with {:?}", error);
825                }
826
827                return Some(SwarmEvent::OutgoingConnectionError {
828                    peer_id: peer,
829                    connection_id,
830                    error,
831                });
832            }
833            PoolEvent::PendingInboundConnectionError {
834                id,
835                send_back_addr,
836                local_addr,
837                error,
838            } => {
839                let error = error.into();
840
841                log::debug!("Incoming connection failed: {:?}", error);
842                self.behaviour
843                    .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
844                        local_addr: &local_addr,
845                        send_back_addr: &send_back_addr,
846                        error: &error,
847                        connection_id: id,
848                    }));
849                return Some(SwarmEvent::IncomingConnectionError {
850                    connection_id: id,
851                    local_addr,
852                    send_back_addr,
853                    error,
854                });
855            }
856            PoolEvent::ConnectionClosed {
857                id,
858                connected,
859                error,
860                remaining_established_connection_ids,
861                handler,
862                ..
863            } => {
864                if let Some(error) = error.as_ref() {
865                    log::debug!(
866                        "Connection closed with error {:?}: {:?}; Total (peer): {}.",
867                        error,
868                        connected,
869                        remaining_established_connection_ids.len()
870                    );
871                } else {
872                    log::debug!(
873                        "Connection closed: {:?}; Total (peer): {}.",
874                        connected,
875                        remaining_established_connection_ids.len()
876                    );
877                }
878                let peer_id = connected.peer_id;
879                let endpoint = connected.endpoint;
880                let num_established =
881                    u32::try_from(remaining_established_connection_ids.len()).unwrap();
882
883                self.behaviour
884                    .on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
885                        peer_id,
886                        connection_id: id,
887                        endpoint: &endpoint,
888                        handler,
889                        remaining_established: num_established as usize,
890                    }));
891                return Some(SwarmEvent::ConnectionClosed {
892                    peer_id,
893                    connection_id: id,
894                    endpoint,
895                    cause: error,
896                    num_established,
897                });
898            }
899            PoolEvent::ConnectionEvent { peer_id, id, event } => {
900                self.behaviour
901                    .on_connection_handler_event(peer_id, id, event);
902            }
903            PoolEvent::AddressChange {
904                peer_id,
905                id,
906                new_endpoint,
907                old_endpoint,
908            } => {
909                self.behaviour
910                    .on_swarm_event(FromSwarm::AddressChange(AddressChange {
911                        peer_id,
912                        connection_id: id,
913                        old: &old_endpoint,
914                        new: &new_endpoint,
915                    }));
916            }
917        }
918
919        None
920    }
921
922    #[allow(deprecated)]
923    fn handle_transport_event(
924        &mut self,
925        event: TransportEvent<
926            <transport::Boxed<(PeerId, StreamMuxerBox)> as Transport>::ListenerUpgrade,
927            io::Error,
928        >,
929    ) -> Option<SwarmEvent<TBehaviour::ToSwarm, THandlerErr<TBehaviour>>> {
930        match event {
931            TransportEvent::Incoming {
932                listener_id: _,
933                upgrade,
934                local_addr,
935                send_back_addr,
936            } => {
937                let connection_id = ConnectionId::next();
938
939                match self.behaviour.handle_pending_inbound_connection(
940                    connection_id,
941                    &local_addr,
942                    &send_back_addr,
943                ) {
944                    Ok(()) => {}
945                    Err(cause) => {
946                        let listen_error = ListenError::Denied { cause };
947
948                        self.behaviour
949                            .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
950                                local_addr: &local_addr,
951                                send_back_addr: &send_back_addr,
952                                error: &listen_error,
953                                connection_id,
954                            }));
955
956                        return Some(SwarmEvent::IncomingConnectionError {
957                            connection_id,
958                            local_addr,
959                            send_back_addr,
960                            error: listen_error,
961                        });
962                    }
963                }
964
965                self.pool.add_incoming(
966                    upgrade,
967                    IncomingInfo {
968                        local_addr: &local_addr,
969                        send_back_addr: &send_back_addr,
970                    },
971                    connection_id,
972                );
973
974                Some(SwarmEvent::IncomingConnection {
975                    connection_id,
976                    local_addr,
977                    send_back_addr,
978                })
979            }
980            TransportEvent::NewAddress {
981                listener_id,
982                listen_addr,
983            } => {
984                log::debug!("Listener {:?}; New address: {:?}", listener_id, listen_addr);
985                let addrs = self.listened_addrs.entry(listener_id).or_default();
986                if !addrs.contains(&listen_addr) {
987                    addrs.push(listen_addr.clone())
988                }
989                self.behaviour
990                    .on_swarm_event(FromSwarm::NewListenAddr(NewListenAddr {
991                        listener_id,
992                        addr: &listen_addr,
993                    }));
994                Some(SwarmEvent::NewListenAddr {
995                    listener_id,
996                    address: listen_addr,
997                })
998            }
999            TransportEvent::AddressExpired {
1000                listener_id,
1001                listen_addr,
1002            } => {
1003                log::debug!(
1004                    "Listener {:?}; Expired address {:?}.",
1005                    listener_id,
1006                    listen_addr
1007                );
1008                if let Some(addrs) = self.listened_addrs.get_mut(&listener_id) {
1009                    addrs.retain(|a| a != &listen_addr);
1010                }
1011                self.behaviour
1012                    .on_swarm_event(FromSwarm::ExpiredListenAddr(ExpiredListenAddr {
1013                        listener_id,
1014                        addr: &listen_addr,
1015                    }));
1016                Some(SwarmEvent::ExpiredListenAddr {
1017                    listener_id,
1018                    address: listen_addr,
1019                })
1020            }
1021            TransportEvent::ListenerClosed {
1022                listener_id,
1023                reason,
1024            } => {
1025                log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason);
1026                let addrs = self.listened_addrs.remove(&listener_id).unwrap_or_default();
1027                for addr in addrs.iter() {
1028                    self.behaviour.on_swarm_event(FromSwarm::ExpiredListenAddr(
1029                        ExpiredListenAddr { listener_id, addr },
1030                    ));
1031                }
1032                self.behaviour
1033                    .on_swarm_event(FromSwarm::ListenerClosed(ListenerClosed {
1034                        listener_id,
1035                        reason: reason.as_ref().copied(),
1036                    }));
1037                Some(SwarmEvent::ListenerClosed {
1038                    listener_id,
1039                    addresses: addrs.to_vec(),
1040                    reason,
1041                })
1042            }
1043            TransportEvent::ListenerError { listener_id, error } => {
1044                self.behaviour
1045                    .on_swarm_event(FromSwarm::ListenerError(ListenerError {
1046                        listener_id,
1047                        err: &error,
1048                    }));
1049                Some(SwarmEvent::ListenerError { listener_id, error })
1050            }
1051        }
1052    }
1053
1054    #[allow(deprecated)]
1055    fn handle_behaviour_event(
1056        &mut self,
1057        event: ToSwarm<TBehaviour::ToSwarm, THandlerInEvent<TBehaviour>>,
1058    ) -> Option<SwarmEvent<TBehaviour::ToSwarm, THandlerErr<TBehaviour>>> {
1059        match event {
1060            ToSwarm::GenerateEvent(event) => return Some(SwarmEvent::Behaviour(event)),
1061            ToSwarm::Dial { opts } => {
1062                let peer_id = opts.get_peer_id();
1063                let connection_id = opts.connection_id();
1064                if let Ok(()) = self.dial(opts) {
1065                    return Some(SwarmEvent::Dialing {
1066                        peer_id,
1067                        connection_id,
1068                    });
1069                }
1070            }
1071            ToSwarm::ListenOn { opts } => {
1072                // Error is dispatched internally, safe to ignore.
1073                let _ = self.add_listener(opts);
1074            }
1075            ToSwarm::RemoveListener { id } => {
1076                self.remove_listener(id);
1077            }
1078            ToSwarm::NotifyHandler {
1079                peer_id,
1080                handler,
1081                event,
1082            } => {
1083                assert!(self.pending_event.is_none());
1084                let handler = match handler {
1085                    NotifyHandler::One(connection) => PendingNotifyHandler::One(connection),
1086                    NotifyHandler::Any => {
1087                        let ids = self
1088                            .pool
1089                            .iter_established_connections_of_peer(&peer_id)
1090                            .collect();
1091                        PendingNotifyHandler::Any(ids)
1092                    }
1093                };
1094
1095                self.pending_event = Some((peer_id, handler, event));
1096            }
1097            ToSwarm::NewExternalAddrCandidate(addr) => {
1098                // Apply address translation to the candidate address.
1099                // For TCP without port-reuse, the observed address contains an ephemeral port which needs to be replaced by the port of a listen address.
1100                let translated_addresses = {
1101                    let mut addrs: Vec<_> = self
1102                        .listened_addrs
1103                        .values()
1104                        .flatten()
1105                        .filter_map(|server| self.transport.address_translation(server, &addr))
1106                        .collect();
1107
1108                    // remove duplicates
1109                    addrs.sort_unstable();
1110                    addrs.dedup();
1111                    addrs
1112                };
1113
1114                // If address translation yielded nothing, broacast the original candidate address.
1115                if translated_addresses.is_empty() {
1116                    self.behaviour
1117                        .on_swarm_event(FromSwarm::NewExternalAddrCandidate(
1118                            NewExternalAddrCandidate { addr: &addr },
1119                        ));
1120                } else {
1121                    for addr in translated_addresses {
1122                        self.behaviour
1123                            .on_swarm_event(FromSwarm::NewExternalAddrCandidate(
1124                                NewExternalAddrCandidate { addr: &addr },
1125                            ));
1126                    }
1127                }
1128            }
1129            ToSwarm::ExternalAddrConfirmed(addr) => {
1130                self.add_external_address(addr);
1131            }
1132            ToSwarm::ExternalAddrExpired(addr) => {
1133                self.remove_external_address(&addr);
1134            }
1135            ToSwarm::CloseConnection {
1136                peer_id,
1137                connection,
1138            } => match connection {
1139                CloseConnection::One(connection_id) => {
1140                    if let Some(conn) = self.pool.get_established(connection_id) {
1141                        conn.start_close();
1142                    }
1143                }
1144                CloseConnection::All => {
1145                    self.pool.disconnect(peer_id);
1146                }
1147            },
1148        }
1149
1150        None
1151    }
1152
1153    /// Internal function used by everything event-related.
1154    ///
1155    /// Polls the `Swarm` for the next event.
1156    #[allow(deprecated)]
1157    fn poll_next_event(
1158        mut self: Pin<&mut Self>,
1159        cx: &mut Context<'_>,
1160    ) -> Poll<SwarmEvent<TBehaviour::ToSwarm, THandlerErr<TBehaviour>>> {
1161        // We use a `this` variable because the compiler can't mutably borrow multiple times
1162        // across a `Deref`.
1163        let this = &mut *self;
1164
1165        // This loop polls the components below in a prioritized order.
1166        //
1167        // 1. [`NetworkBehaviour`]
1168        // 2. Connection [`Pool`]
1169        // 3. [`ListenersStream`]
1170        //
1171        // (1) is polled before (2) to prioritize local work over work coming from a remote.
1172        //
1173        // (2) is polled before (3) to prioritize existing connections over upgrading new incoming connections.
1174        loop {
1175            match this.pending_event.take() {
1176                // Try to deliver the pending event emitted by the [`NetworkBehaviour`] in the previous
1177                // iteration to the connection handler(s).
1178                Some((peer_id, handler, event)) => match handler {
1179                    PendingNotifyHandler::One(conn_id) => {
1180                        match this.pool.get_established(conn_id) {
1181                            Some(conn) => match notify_one(conn, event, cx) {
1182                                None => continue,
1183                                Some(event) => {
1184                                    this.pending_event = Some((peer_id, handler, event));
1185                                }
1186                            },
1187                            None => continue,
1188                        }
1189                    }
1190                    PendingNotifyHandler::Any(ids) => {
1191                        match notify_any::<_, TBehaviour>(ids, &mut this.pool, event, cx) {
1192                            None => continue,
1193                            Some((event, ids)) => {
1194                                let handler = PendingNotifyHandler::Any(ids);
1195                                this.pending_event = Some((peer_id, handler, event));
1196                            }
1197                        }
1198                    }
1199                },
1200                // No pending event. Allow the [`NetworkBehaviour`] to make progress.
1201                None => {
1202                    let behaviour_poll = {
1203                        let mut parameters = SwarmPollParameters {
1204                            supported_protocols: &this.supported_protocols,
1205                        };
1206                        this.behaviour.poll(cx, &mut parameters)
1207                    };
1208
1209                    match behaviour_poll {
1210                        Poll::Pending => {}
1211                        Poll::Ready(behaviour_event) => {
1212                            if let Some(swarm_event) = this.handle_behaviour_event(behaviour_event)
1213                            {
1214                                return Poll::Ready(swarm_event);
1215                            }
1216
1217                            continue;
1218                        }
1219                    }
1220                }
1221            }
1222
1223            // Poll the known peers.
1224            match this.pool.poll(cx) {
1225                Poll::Pending => {}
1226                Poll::Ready(pool_event) => {
1227                    if let Some(swarm_event) = this.handle_pool_event(pool_event) {
1228                        return Poll::Ready(swarm_event);
1229                    }
1230
1231                    continue;
1232                }
1233            };
1234
1235            // Poll the listener(s) for new connections.
1236            match Pin::new(&mut this.transport).poll(cx) {
1237                Poll::Pending => {}
1238                Poll::Ready(transport_event) => {
1239                    if let Some(swarm_event) = this.handle_transport_event(transport_event) {
1240                        return Poll::Ready(swarm_event);
1241                    }
1242
1243                    continue;
1244                }
1245            }
1246
1247            return Poll::Pending;
1248        }
1249    }
1250}
1251
1252/// Connection to notify of a pending event.
1253///
1254/// The connection IDs out of which to notify one of an event are captured at
1255/// the time the behaviour emits the event, in order not to forward the event to
1256/// a new connection which the behaviour may not have been aware of at the time
1257/// it issued the request for sending it.
1258enum PendingNotifyHandler {
1259    One(ConnectionId),
1260    Any(SmallVec<[ConnectionId; 10]>),
1261}
1262
1263/// Notify a single connection of an event.
1264///
1265/// Returns `Some` with the given event if the connection is not currently
1266/// ready to receive another event, in which case the current task is
1267/// scheduled to be woken up.
1268///
1269/// Returns `None` if the connection is closing or the event has been
1270/// successfully sent, in either case the event is consumed.
1271fn notify_one<THandlerInEvent>(
1272    conn: &mut EstablishedConnection<THandlerInEvent>,
1273    event: THandlerInEvent,
1274    cx: &mut Context<'_>,
1275) -> Option<THandlerInEvent> {
1276    match conn.poll_ready_notify_handler(cx) {
1277        Poll::Pending => Some(event),
1278        Poll::Ready(Err(())) => None, // connection is closing
1279        Poll::Ready(Ok(())) => {
1280            // Can now only fail if connection is closing.
1281            let _ = conn.notify_handler(event);
1282            None
1283        }
1284    }
1285}
1286
1287/// Notify any one of a given list of connections of a peer of an event.
1288///
1289/// Returns `Some` with the given event and a new list of connections if
1290/// none of the given connections was able to receive the event but at
1291/// least one of them is not closing, in which case the current task
1292/// is scheduled to be woken up. The returned connections are those which
1293/// may still become ready to receive another event.
1294///
1295/// Returns `None` if either all connections are closing or the event
1296/// was successfully sent to a handler, in either case the event is consumed.
1297fn notify_any<THandler, TBehaviour>(
1298    ids: SmallVec<[ConnectionId; 10]>,
1299    pool: &mut Pool<THandler>,
1300    event: THandlerInEvent<TBehaviour>,
1301    cx: &mut Context<'_>,
1302) -> Option<(THandlerInEvent<TBehaviour>, SmallVec<[ConnectionId; 10]>)>
1303where
1304    TBehaviour: NetworkBehaviour,
1305    THandler: ConnectionHandler<
1306        FromBehaviour = THandlerInEvent<TBehaviour>,
1307        ToBehaviour = THandlerOutEvent<TBehaviour>,
1308    >,
1309{
1310    let mut pending = SmallVec::new();
1311    let mut event = Some(event); // (1)
1312    for id in ids.into_iter() {
1313        if let Some(conn) = pool.get_established(id) {
1314            match conn.poll_ready_notify_handler(cx) {
1315                Poll::Pending => pending.push(id),
1316                Poll::Ready(Err(())) => {} // connection is closing
1317                Poll::Ready(Ok(())) => {
1318                    let e = event.take().expect("by (1),(2)");
1319                    if let Err(e) = conn.notify_handler(e) {
1320                        event = Some(e) // (2)
1321                    } else {
1322                        break;
1323                    }
1324                }
1325            }
1326        }
1327    }
1328
1329    event.and_then(|e| {
1330        if !pending.is_empty() {
1331            Some((e, pending))
1332        } else {
1333            None
1334        }
1335    })
1336}
1337
1338/// Stream of events returned by [`Swarm`].
1339///
1340/// Includes events from the [`NetworkBehaviour`] as well as events about
1341/// connection and listener status. See [`SwarmEvent`] for details.
1342///
1343/// Note: This stream is infinite and it is guaranteed that
1344/// [`futures::Stream::poll_next`] will never return `Poll::Ready(None)`.
1345impl<TBehaviour> futures::Stream for Swarm<TBehaviour>
1346where
1347    TBehaviour: NetworkBehaviour,
1348{
1349    #[allow(deprecated)]
1350    type Item = SwarmEvent<TBehaviourOutEvent<TBehaviour>, THandlerErr<TBehaviour>>;
1351
1352    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1353        self.as_mut().poll_next_event(cx).map(Some)
1354    }
1355}
1356
1357/// The stream of swarm events never terminates, so we can implement fused for it.
1358impl<TBehaviour> FusedStream for Swarm<TBehaviour>
1359where
1360    TBehaviour: NetworkBehaviour,
1361{
1362    fn is_terminated(&self) -> bool {
1363        false
1364    }
1365}
1366
1367/// Parameters passed to `poll()`, that the `NetworkBehaviour` has access to.
1368// TODO: #[derive(Debug)]
1369pub struct SwarmPollParameters<'a> {
1370    supported_protocols: &'a [Vec<u8>],
1371}
1372
1373impl<'a> PollParameters for SwarmPollParameters<'a> {
1374    type SupportedProtocolsIter = std::iter::Cloned<std::slice::Iter<'a, std::vec::Vec<u8>>>;
1375
1376    fn supported_protocols(&self) -> Self::SupportedProtocolsIter {
1377        self.supported_protocols.iter().cloned()
1378    }
1379}
1380
1381pub struct Config {
1382    pool_config: PoolConfig,
1383}
1384
1385impl Config {
1386    /// Creates a new [`Config`] from the given executor. The [`Swarm`] is obtained via
1387    /// [`Swarm::new`].
1388    pub fn with_executor(executor: impl Executor + Send + 'static) -> Self {
1389        Self {
1390            pool_config: PoolConfig::new(Some(Box::new(executor))),
1391        }
1392    }
1393
1394    /// Sets executor to the `wasm` executor.
1395    /// Background tasks will be executed by the browser on the next micro-tick.
1396    ///
1397    /// Spawning a task is similar too:
1398    /// ```typescript
1399    /// function spawn(task: () => Promise<void>) {
1400    ///     task()
1401    /// }
1402    /// ```
1403    #[cfg(feature = "wasm-bindgen")]
1404    pub fn with_wasm_executor() -> Self {
1405        Self::with_executor(crate::executor::WasmBindgenExecutor)
1406    }
1407
1408    /// Builds a new [`Config`] from the given `tokio` executor.
1409    #[cfg(all(
1410        feature = "tokio",
1411        not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1412    ))]
1413    pub fn with_tokio_executor() -> Self {
1414        Self::with_executor(crate::executor::TokioExecutor)
1415    }
1416
1417    /// Builds a new [`Config`] from the given `async-std` executor.
1418    #[cfg(all(
1419        feature = "async-std",
1420        not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1421    ))]
1422    pub fn with_async_std_executor() -> Self {
1423        Self::with_executor(crate::executor::AsyncStdExecutor)
1424    }
1425
1426    /// Configures the number of events from the [`NetworkBehaviour`] in
1427    /// destination to the [`ConnectionHandler`] that can be buffered before
1428    /// the [`Swarm`] has to wait. An individual buffer with this number of
1429    /// events exists for each individual connection.
1430    ///
1431    /// The ideal value depends on the executor used, the CPU speed, and the
1432    /// volume of events. If this value is too low, then the [`Swarm`] will
1433    /// be sleeping more often than necessary. Increasing this value increases
1434    /// the overall memory usage.
1435    pub fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
1436        self.pool_config = self.pool_config.with_notify_handler_buffer_size(n);
1437        self
1438    }
1439
1440    /// Configures the size of the buffer for events sent by a [`ConnectionHandler`] to the
1441    /// [`NetworkBehaviour`].
1442    ///
1443    /// Each connection has its own buffer.
1444    ///
1445    /// The ideal value depends on the executor used, the CPU speed and the volume of events.
1446    /// If this value is too low, then the [`ConnectionHandler`]s will be sleeping more often
1447    /// than necessary. Increasing this value increases the overall memory
1448    /// usage, and more importantly the latency between the moment when an
1449    /// event is emitted and the moment when it is received by the
1450    /// [`NetworkBehaviour`].
1451    pub fn with_per_connection_event_buffer_size(mut self, n: usize) -> Self {
1452        self.pool_config = self.pool_config.with_per_connection_event_buffer_size(n);
1453        self
1454    }
1455
1456    /// Number of addresses concurrently dialed for a single outbound connection attempt.
1457    pub fn with_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
1458        self.pool_config = self.pool_config.with_dial_concurrency_factor(factor);
1459        self
1460    }
1461
1462    /// Configures an override for the substream upgrade protocol to use.
1463    ///
1464    /// The subtream upgrade protocol is the multistream-select protocol
1465    /// used for protocol negotiation on substreams. Since a listener
1466    /// supports all existing versions, the choice of upgrade protocol
1467    /// only effects the "dialer", i.e. the peer opening a substream.
1468    ///
1469    /// > **Note**: If configured, specific upgrade protocols for
1470    /// > individual [`SubstreamProtocol`]s emitted by the `NetworkBehaviour`
1471    /// > are ignored.
1472    pub fn with_substream_upgrade_protocol_override(
1473        mut self,
1474        v: libp2p_core::upgrade::Version,
1475    ) -> Self {
1476        self.pool_config = self.pool_config.with_substream_upgrade_protocol_override(v);
1477        self
1478    }
1479
1480    /// The maximum number of inbound streams concurrently negotiating on a
1481    /// connection. New inbound streams exceeding the limit are dropped and thus
1482    /// reset.
1483    ///
1484    /// Note: This only enforces a limit on the number of concurrently
1485    /// negotiating inbound streams. The total number of inbound streams on a
1486    /// connection is the sum of negotiating and negotiated streams. A limit on
1487    /// the total number of streams can be enforced at the
1488    /// [`StreamMuxerBox`] level.
1489    pub fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self {
1490        self.pool_config = self.pool_config.with_max_negotiating_inbound_streams(v);
1491        self
1492    }
1493
1494    /// How long to keep a connection alive once it is idling.
1495    ///
1496    /// Defaults to 0.
1497    pub fn with_idle_connection_timeout(mut self, timeout: Duration) -> Self {
1498        self.pool_config.idle_connection_timeout = timeout;
1499        self
1500    }
1501}
1502
1503/// A [`SwarmBuilder`] provides an API for configuring and constructing a [`Swarm`].
1504#[deprecated(
1505    note = "Use the new `libp2p::SwarmBuilder` instead of `libp2p::swarm::SwarmBuilder` or create a `Swarm` directly via `Swarm::new`."
1506)]
1507pub struct SwarmBuilder<TBehaviour> {
1508    local_peer_id: PeerId,
1509    transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
1510    behaviour: TBehaviour,
1511    pool_config: PoolConfig,
1512}
1513
1514#[allow(deprecated)]
1515impl<TBehaviour> SwarmBuilder<TBehaviour>
1516where
1517    TBehaviour: NetworkBehaviour,
1518{
1519    /// Creates a new [`SwarmBuilder`] from the given transport, behaviour, local peer ID and
1520    /// executor. The `Swarm` with its underlying `Network` is obtained via
1521    /// [`SwarmBuilder::build`].
1522    pub fn with_executor(
1523        transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
1524        behaviour: TBehaviour,
1525        local_peer_id: PeerId,
1526        executor: impl Executor + Send + 'static,
1527    ) -> Self {
1528        Self {
1529            local_peer_id,
1530            transport,
1531            behaviour,
1532            pool_config: PoolConfig::new(Some(Box::new(executor))),
1533        }
1534    }
1535
1536    /// Sets executor to the `wasm` executor.
1537    /// Background tasks will be executed by the browser on the next micro-tick.
1538    ///
1539    /// Spawning a task is similar too:
1540    /// ```typescript
1541    /// function spawn(task: () => Promise<void>) {
1542    ///     task()
1543    /// }
1544    /// ```
1545    #[cfg(feature = "wasm-bindgen")]
1546    pub fn with_wasm_executor(
1547        transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
1548        behaviour: TBehaviour,
1549        local_peer_id: PeerId,
1550    ) -> Self {
1551        Self::with_executor(
1552            transport,
1553            behaviour,
1554            local_peer_id,
1555            crate::executor::WasmBindgenExecutor,
1556        )
1557    }
1558
1559    /// Builds a new [`SwarmBuilder`] from the given transport, behaviour, local peer ID and a
1560    /// `tokio` executor.
1561    #[cfg(all(
1562        feature = "tokio",
1563        not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1564    ))]
1565    pub fn with_tokio_executor(
1566        transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
1567        behaviour: TBehaviour,
1568        local_peer_id: PeerId,
1569    ) -> Self {
1570        Self::with_executor(
1571            transport,
1572            behaviour,
1573            local_peer_id,
1574            crate::executor::TokioExecutor,
1575        )
1576    }
1577
1578    /// Builds a new [`SwarmBuilder`] from the given transport, behaviour, local peer ID and a
1579    /// `async-std` executor.
1580    #[cfg(all(
1581        feature = "async-std",
1582        not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1583    ))]
1584    pub fn with_async_std_executor(
1585        transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
1586        behaviour: TBehaviour,
1587        local_peer_id: PeerId,
1588    ) -> Self {
1589        Self::with_executor(
1590            transport,
1591            behaviour,
1592            local_peer_id,
1593            crate::executor::AsyncStdExecutor,
1594        )
1595    }
1596
1597    /// Creates a new [`SwarmBuilder`] from the given transport, behaviour and local peer ID. The
1598    /// `Swarm` with its underlying `Network` is obtained via [`SwarmBuilder::build`].
1599    ///
1600    /// ## ⚠️  Performance warning
1601    /// All connections will be polled on the current task, thus quite bad performance
1602    /// characteristics should be expected. Whenever possible use an executor and
1603    /// [`SwarmBuilder::with_executor`].
1604    pub fn without_executor(
1605        transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
1606        behaviour: TBehaviour,
1607        local_peer_id: PeerId,
1608    ) -> Self {
1609        Self {
1610            local_peer_id,
1611            transport,
1612            behaviour,
1613            pool_config: PoolConfig::new(None),
1614        }
1615    }
1616
1617    /// Configures the number of events from the [`NetworkBehaviour`] in
1618    /// destination to the [`ConnectionHandler`] that can be buffered before
1619    /// the [`Swarm`] has to wait. An individual buffer with this number of
1620    /// events exists for each individual connection.
1621    ///
1622    /// The ideal value depends on the executor used, the CPU speed, and the
1623    /// volume of events. If this value is too low, then the [`Swarm`] will
1624    /// be sleeping more often than necessary. Increasing this value increases
1625    /// the overall memory usage.
1626    pub fn notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
1627        self.pool_config = self.pool_config.with_notify_handler_buffer_size(n);
1628        self
1629    }
1630
1631    /// Configures the size of the buffer for events sent by a [`ConnectionHandler`] to the
1632    /// [`NetworkBehaviour`].
1633    ///
1634    /// Each connection has its own buffer.
1635    ///
1636    /// The ideal value depends on the executor used, the CPU speed and the volume of events.
1637    /// If this value is too low, then the [`ConnectionHandler`]s will be sleeping more often
1638    /// than necessary. Increasing this value increases the overall memory
1639    /// usage, and more importantly the latency between the moment when an
1640    /// event is emitted and the moment when it is received by the
1641    /// [`NetworkBehaviour`].
1642    pub fn per_connection_event_buffer_size(mut self, n: usize) -> Self {
1643        self.pool_config = self.pool_config.with_per_connection_event_buffer_size(n);
1644        self
1645    }
1646
1647    /// Number of addresses concurrently dialed for a single outbound connection attempt.
1648    pub fn dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
1649        self.pool_config = self.pool_config.with_dial_concurrency_factor(factor);
1650        self
1651    }
1652
1653    /// Configures an override for the substream upgrade protocol to use.
1654    ///
1655    /// The subtream upgrade protocol is the multistream-select protocol
1656    /// used for protocol negotiation on substreams. Since a listener
1657    /// supports all existing versions, the choice of upgrade protocol
1658    /// only effects the "dialer", i.e. the peer opening a substream.
1659    ///
1660    /// > **Note**: If configured, specific upgrade protocols for
1661    /// > individual [`SubstreamProtocol`]s emitted by the `NetworkBehaviour`
1662    /// > are ignored.
1663    pub fn substream_upgrade_protocol_override(mut self, v: libp2p_core::upgrade::Version) -> Self {
1664        self.pool_config = self.pool_config.with_substream_upgrade_protocol_override(v);
1665        self
1666    }
1667
1668    /// The maximum number of inbound streams concurrently negotiating on a
1669    /// connection. New inbound streams exceeding the limit are dropped and thus
1670    /// reset.
1671    ///
1672    /// Note: This only enforces a limit on the number of concurrently
1673    /// negotiating inbound streams. The total number of inbound streams on a
1674    /// connection is the sum of negotiating and negotiated streams. A limit on
1675    /// the total number of streams can be enforced at the [`StreamMuxerBox`] level.
1676    pub fn max_negotiating_inbound_streams(mut self, v: usize) -> Self {
1677        self.pool_config = self.pool_config.with_max_negotiating_inbound_streams(v);
1678        self
1679    }
1680
1681    /// How long to keep a connection alive once it is idling.
1682    ///
1683    /// Defaults to 0.
1684    pub fn idle_connection_timeout(mut self, timeout: Duration) -> Self {
1685        self.pool_config.idle_connection_timeout = timeout;
1686        self
1687    }
1688
1689    /// Builds a `Swarm` with the current configuration.
1690    pub fn build(self) -> Swarm<TBehaviour> {
1691        log::info!("Local peer id: {}", self.local_peer_id);
1692        Swarm {
1693            local_peer_id: self.local_peer_id,
1694            transport: self.transport,
1695            pool: Pool::new(self.local_peer_id, self.pool_config),
1696            behaviour: self.behaviour,
1697            supported_protocols: Default::default(),
1698            confirmed_external_addr: Default::default(),
1699            listened_addrs: HashMap::new(),
1700            pending_event: None,
1701        }
1702    }
1703}
1704
1705/// Possible errors when trying to establish or upgrade an outbound connection.
1706#[derive(Debug)]
1707pub enum DialError {
1708    /// The peer identity obtained on the connection matches the local peer.
1709    LocalPeerId {
1710        endpoint: ConnectedPoint,
1711    },
1712    /// No addresses have been provided by [`NetworkBehaviour::handle_pending_outbound_connection`] and [`DialOpts`].
1713    NoAddresses,
1714    /// The provided [`dial_opts::PeerCondition`] evaluated to false and thus
1715    /// the dial was aborted.
1716    DialPeerConditionFalse(dial_opts::PeerCondition),
1717    /// Pending connection attempt has been aborted.
1718    Aborted,
1719    /// The peer identity obtained on the connection did not match the one that was expected.
1720    WrongPeerId {
1721        obtained: PeerId,
1722        endpoint: ConnectedPoint,
1723    },
1724    Denied {
1725        cause: ConnectionDenied,
1726    },
1727    /// An error occurred while negotiating the transport protocol(s) on a connection.
1728    Transport(Vec<(Multiaddr, TransportError<io::Error>)>),
1729}
1730
1731impl From<PendingOutboundConnectionError> for DialError {
1732    fn from(error: PendingOutboundConnectionError) -> Self {
1733        match error {
1734            PendingConnectionError::Aborted => DialError::Aborted,
1735            PendingConnectionError::WrongPeerId { obtained, endpoint } => {
1736                DialError::WrongPeerId { obtained, endpoint }
1737            }
1738            PendingConnectionError::LocalPeerId { endpoint } => DialError::LocalPeerId { endpoint },
1739            PendingConnectionError::Transport(e) => DialError::Transport(e),
1740        }
1741    }
1742}
1743
1744impl fmt::Display for DialError {
1745    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1746        match self {
1747            DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
1748            DialError::LocalPeerId { endpoint } => write!(
1749                f,
1750                "Dial error: tried to dial local peer id at {endpoint:?}."
1751            ),
1752            DialError::DialPeerConditionFalse(PeerCondition::Disconnected) => write!(f, "Dial error: dial condition was configured to only happen when disconnected (`PeerCondition::Disconnected`), but node is already connected, thus cancelling new dial."),
1753            DialError::DialPeerConditionFalse(PeerCondition::NotDialing) => write!(f, "Dial error: dial condition was configured to only happen if there is currently no ongoing dialing attempt (`PeerCondition::NotDialing`), but a dial is in progress, thus cancelling new dial."),
1754            DialError::DialPeerConditionFalse(PeerCondition::Always) => unreachable!("Dial peer condition is by definition true."),
1755            DialError::Aborted => write!(
1756                f,
1757                "Dial error: Pending connection attempt has been aborted."
1758            ),
1759            DialError::WrongPeerId { obtained, endpoint } => write!(
1760                f,
1761                "Dial error: Unexpected peer ID {obtained} at {endpoint:?}."
1762            ),
1763            DialError::Transport(errors) => {
1764                write!(f, "Failed to negotiate transport protocol(s): [")?;
1765
1766                for (addr, error) in errors {
1767                    write!(f, "({addr}")?;
1768                    print_error_chain(f, error)?;
1769                    write!(f, ")")?;
1770                }
1771                write!(f, "]")?;
1772
1773                Ok(())
1774            }
1775            DialError::Denied { .. } => {
1776                write!(f, "Dial error")
1777            }
1778        }
1779    }
1780}
1781
1782fn print_error_chain(f: &mut fmt::Formatter<'_>, e: &dyn error::Error) -> fmt::Result {
1783    write!(f, ": {e}")?;
1784
1785    if let Some(source) = e.source() {
1786        print_error_chain(f, source)?;
1787    }
1788
1789    Ok(())
1790}
1791
1792impl error::Error for DialError {
1793    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1794        match self {
1795            DialError::LocalPeerId { .. } => None,
1796            DialError::NoAddresses => None,
1797            DialError::DialPeerConditionFalse(_) => None,
1798            DialError::Aborted => None,
1799            DialError::WrongPeerId { .. } => None,
1800            DialError::Transport(_) => None,
1801            DialError::Denied { cause } => Some(cause),
1802        }
1803    }
1804}
1805
1806/// Possible errors when upgrading an inbound connection.
1807#[derive(Debug)]
1808pub enum ListenError {
1809    /// Pending connection attempt has been aborted.
1810    Aborted,
1811    /// The peer identity obtained on the connection did not match the one that was expected.
1812    WrongPeerId {
1813        obtained: PeerId,
1814        endpoint: ConnectedPoint,
1815    },
1816    /// The connection was dropped because it resolved to our own [`PeerId`].
1817    LocalPeerId {
1818        endpoint: ConnectedPoint,
1819    },
1820    Denied {
1821        cause: ConnectionDenied,
1822    },
1823    /// An error occurred while negotiating the transport protocol(s) on a connection.
1824    Transport(TransportError<io::Error>),
1825}
1826
1827impl From<PendingInboundConnectionError> for ListenError {
1828    fn from(error: PendingInboundConnectionError) -> Self {
1829        match error {
1830            PendingInboundConnectionError::Transport(inner) => ListenError::Transport(inner),
1831            PendingInboundConnectionError::Aborted => ListenError::Aborted,
1832            PendingInboundConnectionError::WrongPeerId { obtained, endpoint } => {
1833                ListenError::WrongPeerId { obtained, endpoint }
1834            }
1835            PendingInboundConnectionError::LocalPeerId { endpoint } => {
1836                ListenError::LocalPeerId { endpoint }
1837            }
1838        }
1839    }
1840}
1841
1842impl fmt::Display for ListenError {
1843    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1844        match self {
1845            ListenError::Aborted => write!(
1846                f,
1847                "Listen error: Pending connection attempt has been aborted."
1848            ),
1849            ListenError::WrongPeerId { obtained, endpoint } => write!(
1850                f,
1851                "Listen error: Unexpected peer ID {obtained} at {endpoint:?}."
1852            ),
1853            ListenError::Transport(_) => {
1854                write!(f, "Listen error: Failed to negotiate transport protocol(s)")
1855            }
1856            ListenError::Denied { cause } => {
1857                write!(f, "Listen error: Denied: {cause}")
1858            }
1859            ListenError::LocalPeerId { endpoint } => {
1860                write!(f, "Listen error: Local peer ID at {endpoint:?}.")
1861            }
1862        }
1863    }
1864}
1865
1866impl error::Error for ListenError {
1867    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1868        match self {
1869            ListenError::WrongPeerId { .. } => None,
1870            ListenError::Transport(err) => Some(err),
1871            ListenError::Aborted => None,
1872            ListenError::Denied { cause } => Some(cause),
1873            ListenError::LocalPeerId { .. } => None,
1874        }
1875    }
1876}
1877
1878/// A connection was denied.
1879///
1880/// To figure out which [`NetworkBehaviour`] denied the connection, use [`ConnectionDenied::downcast`].
1881#[derive(Debug)]
1882pub struct ConnectionDenied {
1883    inner: Box<dyn error::Error + Send + Sync + 'static>,
1884}
1885
1886impl ConnectionDenied {
1887    pub fn new(cause: impl Into<Box<dyn error::Error + Send + Sync + 'static>>) -> Self {
1888        Self {
1889            inner: cause.into(),
1890        }
1891    }
1892
1893    /// Attempt to downcast to a particular reason for why the connection was denied.
1894    pub fn downcast<E>(self) -> Result<E, Self>
1895    where
1896        E: error::Error + Send + Sync + 'static,
1897    {
1898        let inner = self
1899            .inner
1900            .downcast::<E>()
1901            .map_err(|inner| ConnectionDenied { inner })?;
1902
1903        Ok(*inner)
1904    }
1905
1906    /// Attempt to downcast to a particular reason for why the connection was denied.
1907    pub fn downcast_ref<E>(&self) -> Option<&E>
1908    where
1909        E: error::Error + Send + Sync + 'static,
1910    {
1911        self.inner.downcast_ref::<E>()
1912    }
1913}
1914
1915impl fmt::Display for ConnectionDenied {
1916    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1917        write!(f, "connection denied")
1918    }
1919}
1920
1921impl error::Error for ConnectionDenied {
1922    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1923        Some(self.inner.as_ref())
1924    }
1925}
1926
1927/// Information about the connections obtained by [`Swarm::network_info()`].
1928#[derive(Clone, Debug)]
1929pub struct NetworkInfo {
1930    /// The total number of connected peers.
1931    num_peers: usize,
1932    /// Counters of ongoing network connections.
1933    connection_counters: ConnectionCounters,
1934}
1935
1936impl NetworkInfo {
1937    /// The number of connected peers, i.e. peers with whom at least
1938    /// one established connection exists.
1939    pub fn num_peers(&self) -> usize {
1940        self.num_peers
1941    }
1942
1943    /// Gets counters for ongoing network connections.
1944    pub fn connection_counters(&self) -> &ConnectionCounters {
1945        &self.connection_counters
1946    }
1947}
1948
1949/// Ensures a given `Multiaddr` is a `/p2p/...` address for the given peer.
1950///
1951/// If the given address is already a `p2p` address for the given peer,
1952/// i.e. the last encapsulated protocol is `/p2p/<peer-id>`, this is a no-op.
1953///
1954/// If the given address is already a `p2p` address for a different peer
1955/// than the one given, the given `Multiaddr` is returned as an `Err`.
1956///
1957/// If the given address is not yet a `p2p` address for the given peer,
1958/// the `/p2p/<peer-id>` protocol is appended to the returned address.
1959fn p2p_addr(peer: Option<PeerId>, addr: Multiaddr) -> Result<Multiaddr, Multiaddr> {
1960    let peer = match peer {
1961        Some(p) => p,
1962        None => return Ok(addr),
1963    };
1964
1965    if let Some(multiaddr::Protocol::P2p(peer_id)) = addr.iter().last() {
1966        if peer_id != peer {
1967            return Err(addr);
1968        }
1969
1970        return Ok(addr);
1971    }
1972
1973    Ok(addr.with(multiaddr::Protocol::P2p(peer)))
1974}
1975
1976#[cfg(test)]
1977mod tests {
1978    use super::*;
1979    use crate::dummy;
1980    use crate::test::{CallTraceBehaviour, MockBehaviour};
1981    use futures::future;
1982    use libp2p_core::multiaddr::multiaddr;
1983    use libp2p_core::transport::memory::MemoryTransportError;
1984    use libp2p_core::transport::TransportEvent;
1985    use libp2p_core::Endpoint;
1986    use libp2p_core::{multiaddr, transport, upgrade};
1987    use libp2p_identity as identity;
1988    use libp2p_plaintext as plaintext;
1989    use libp2p_yamux as yamux;
1990    use quickcheck::*;
1991
1992    // Test execution state.
1993    // Connection => Disconnecting => Connecting.
1994    enum State {
1995        Connecting,
1996        Disconnecting,
1997    }
1998
1999    fn new_test_swarm(
2000        config: Config,
2001    ) -> Swarm<CallTraceBehaviour<MockBehaviour<dummy::ConnectionHandler, ()>>> {
2002        let id_keys = identity::Keypair::generate_ed25519();
2003        let local_public_key = id_keys.public();
2004        let transport = transport::MemoryTransport::default()
2005            .upgrade(upgrade::Version::V1)
2006            .authenticate(plaintext::Config::new(&id_keys))
2007            .multiplex(yamux::Config::default())
2008            .boxed();
2009        let behaviour = CallTraceBehaviour::new(MockBehaviour::new(dummy::ConnectionHandler));
2010
2011        Swarm::new(
2012            transport,
2013            behaviour,
2014            local_public_key.into(),
2015            config.with_idle_connection_timeout(Duration::from_secs(5)),
2016        )
2017    }
2018
2019    fn swarms_connected<TBehaviour>(
2020        swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
2021        swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
2022        num_connections: usize,
2023    ) -> bool
2024    where
2025        TBehaviour: NetworkBehaviour,
2026        THandlerOutEvent<TBehaviour>: Clone,
2027    {
2028        swarm1
2029            .behaviour()
2030            .num_connections_to_peer(*swarm2.local_peer_id())
2031            == num_connections
2032            && swarm2
2033                .behaviour()
2034                .num_connections_to_peer(*swarm1.local_peer_id())
2035                == num_connections
2036            && swarm1.is_connected(swarm2.local_peer_id())
2037            && swarm2.is_connected(swarm1.local_peer_id())
2038    }
2039
2040    fn swarms_disconnected<TBehaviour: NetworkBehaviour>(
2041        swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
2042        swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
2043    ) -> bool
2044    where
2045        TBehaviour: NetworkBehaviour,
2046        THandlerOutEvent<TBehaviour>: Clone,
2047    {
2048        swarm1
2049            .behaviour()
2050            .num_connections_to_peer(*swarm2.local_peer_id())
2051            == 0
2052            && swarm2
2053                .behaviour()
2054                .num_connections_to_peer(*swarm1.local_peer_id())
2055                == 0
2056            && !swarm1.is_connected(swarm2.local_peer_id())
2057            && !swarm2.is_connected(swarm1.local_peer_id())
2058    }
2059
2060    /// Establishes multiple connections between two peers,
2061    /// after which one peer disconnects the other using [`Swarm::disconnect_peer_id`].
2062    ///
2063    /// The test expects both behaviours to be notified via calls to [`NetworkBehaviour::on_swarm_event`]
2064    /// with pairs of [`FromSwarm::ConnectionEstablished`] / [`FromSwarm::ConnectionClosed`]
2065    #[tokio::test]
2066    async fn test_swarm_disconnect() {
2067        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
2068        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
2069
2070        let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
2071        let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
2072
2073        swarm1.listen_on(addr1.clone()).unwrap();
2074        swarm2.listen_on(addr2.clone()).unwrap();
2075
2076        let swarm1_id = *swarm1.local_peer_id();
2077
2078        let mut reconnected = false;
2079        let num_connections = 10;
2080
2081        for _ in 0..num_connections {
2082            swarm1.dial(addr2.clone()).unwrap();
2083        }
2084        let mut state = State::Connecting;
2085
2086        future::poll_fn(move |cx| loop {
2087            let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
2088            let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
2089            match state {
2090                State::Connecting => {
2091                    if swarms_connected(&swarm1, &swarm2, num_connections) {
2092                        if reconnected {
2093                            return Poll::Ready(());
2094                        }
2095                        swarm2
2096                            .disconnect_peer_id(swarm1_id)
2097                            .expect("Error disconnecting");
2098                        state = State::Disconnecting;
2099                    }
2100                }
2101                State::Disconnecting => {
2102                    if swarms_disconnected(&swarm1, &swarm2) {
2103                        if reconnected {
2104                            return Poll::Ready(());
2105                        }
2106                        reconnected = true;
2107                        for _ in 0..num_connections {
2108                            swarm2.dial(addr1.clone()).unwrap();
2109                        }
2110                        state = State::Connecting;
2111                    }
2112                }
2113            }
2114
2115            if poll1.is_pending() && poll2.is_pending() {
2116                return Poll::Pending;
2117            }
2118        })
2119        .await
2120    }
2121
2122    /// Establishes multiple connections between two peers,
2123    /// after which one peer disconnects the other
2124    /// using [`ToSwarm::CloseConnection`] returned by a [`NetworkBehaviour`].
2125    ///
2126    /// The test expects both behaviours to be notified via calls to [`NetworkBehaviour::on_swarm_event`]
2127    /// with pairs of [`FromSwarm::ConnectionEstablished`] / [`FromSwarm::ConnectionClosed`]
2128    #[tokio::test]
2129    async fn test_behaviour_disconnect_all() {
2130        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
2131        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
2132
2133        let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
2134        let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
2135
2136        swarm1.listen_on(addr1.clone()).unwrap();
2137        swarm2.listen_on(addr2.clone()).unwrap();
2138
2139        let swarm1_id = *swarm1.local_peer_id();
2140
2141        let mut reconnected = false;
2142        let num_connections = 10;
2143
2144        for _ in 0..num_connections {
2145            swarm1.dial(addr2.clone()).unwrap();
2146        }
2147        let mut state = State::Connecting;
2148
2149        future::poll_fn(move |cx| loop {
2150            let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
2151            let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
2152            match state {
2153                State::Connecting => {
2154                    if swarms_connected(&swarm1, &swarm2, num_connections) {
2155                        if reconnected {
2156                            return Poll::Ready(());
2157                        }
2158                        swarm2
2159                            .behaviour
2160                            .inner()
2161                            .next_action
2162                            .replace(ToSwarm::CloseConnection {
2163                                peer_id: swarm1_id,
2164                                connection: CloseConnection::All,
2165                            });
2166                        state = State::Disconnecting;
2167                        continue;
2168                    }
2169                }
2170                State::Disconnecting => {
2171                    if swarms_disconnected(&swarm1, &swarm2) {
2172                        reconnected = true;
2173                        for _ in 0..num_connections {
2174                            swarm2.dial(addr1.clone()).unwrap();
2175                        }
2176                        state = State::Connecting;
2177                        continue;
2178                    }
2179                }
2180            }
2181
2182            if poll1.is_pending() && poll2.is_pending() {
2183                return Poll::Pending;
2184            }
2185        })
2186        .await
2187    }
2188
2189    /// Establishes multiple connections between two peers,
2190    /// after which one peer closes a single connection
2191    /// using [`ToSwarm::CloseConnection`] returned by a [`NetworkBehaviour`].
2192    ///
2193    /// The test expects both behaviours to be notified via calls to [`NetworkBehaviour::on_swarm_event`]
2194    /// with pairs of [`FromSwarm::ConnectionEstablished`] / [`FromSwarm::ConnectionClosed`]
2195    #[tokio::test]
2196    async fn test_behaviour_disconnect_one() {
2197        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
2198        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
2199
2200        let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
2201        let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
2202
2203        swarm1.listen_on(addr1).unwrap();
2204        swarm2.listen_on(addr2.clone()).unwrap();
2205
2206        let swarm1_id = *swarm1.local_peer_id();
2207
2208        let num_connections = 10;
2209
2210        for _ in 0..num_connections {
2211            swarm1.dial(addr2.clone()).unwrap();
2212        }
2213        let mut state = State::Connecting;
2214        let mut disconnected_conn_id = None;
2215
2216        future::poll_fn(move |cx| loop {
2217            let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
2218            let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
2219            match state {
2220                State::Connecting => {
2221                    if swarms_connected(&swarm1, &swarm2, num_connections) {
2222                        disconnected_conn_id = {
2223                            let conn_id =
2224                                swarm2.behaviour.on_connection_established[num_connections / 2].1;
2225                            swarm2.behaviour.inner().next_action.replace(
2226                                ToSwarm::CloseConnection {
2227                                    peer_id: swarm1_id,
2228                                    connection: CloseConnection::One(conn_id),
2229                                },
2230                            );
2231                            Some(conn_id)
2232                        };
2233                        state = State::Disconnecting;
2234                    }
2235                }
2236                State::Disconnecting => {
2237                    for s in &[&swarm1, &swarm2] {
2238                        assert!(s
2239                            .behaviour
2240                            .on_connection_closed
2241                            .iter()
2242                            .all(|(.., remaining_conns)| *remaining_conns > 0));
2243                        assert_eq!(s.behaviour.on_connection_established.len(), num_connections);
2244                        s.behaviour.assert_connected(num_connections, 1);
2245                    }
2246                    if [&swarm1, &swarm2]
2247                        .iter()
2248                        .all(|s| s.behaviour.on_connection_closed.len() == 1)
2249                    {
2250                        let conn_id = swarm2.behaviour.on_connection_closed[0].1;
2251                        assert_eq!(Some(conn_id), disconnected_conn_id);
2252                        return Poll::Ready(());
2253                    }
2254                }
2255            }
2256
2257            if poll1.is_pending() && poll2.is_pending() {
2258                return Poll::Pending;
2259            }
2260        })
2261        .await
2262    }
2263
2264    #[test]
2265    fn concurrent_dialing() {
2266        #[derive(Clone, Debug)]
2267        struct DialConcurrencyFactor(NonZeroU8);
2268
2269        impl Arbitrary for DialConcurrencyFactor {
2270            fn arbitrary(g: &mut Gen) -> Self {
2271                Self(NonZeroU8::new(g.gen_range(1..11)).unwrap())
2272            }
2273        }
2274
2275        fn prop(concurrency_factor: DialConcurrencyFactor) {
2276            tokio::runtime::Runtime::new().unwrap().block_on(async {
2277                let mut swarm = new_test_swarm(
2278                    Config::with_tokio_executor()
2279                        .with_dial_concurrency_factor(concurrency_factor.0),
2280                );
2281
2282                // Listen on `concurrency_factor + 1` addresses.
2283                //
2284                // `+ 2` to ensure a subset of addresses is dialed by network_2.
2285                let num_listen_addrs = concurrency_factor.0.get() + 2;
2286                let mut listen_addresses = Vec::new();
2287                let mut transports = Vec::new();
2288                for _ in 0..num_listen_addrs {
2289                    let mut transport = transport::MemoryTransport::default().boxed();
2290                    transport
2291                        .listen_on(ListenerId::next(), "/memory/0".parse().unwrap())
2292                        .unwrap();
2293
2294                    match transport.select_next_some().await {
2295                        TransportEvent::NewAddress { listen_addr, .. } => {
2296                            listen_addresses.push(listen_addr);
2297                        }
2298                        _ => panic!("Expected `NewListenAddr` event."),
2299                    }
2300
2301                    transports.push(transport);
2302                }
2303
2304                // Have swarm dial each listener and wait for each listener to receive the incoming
2305                // connections.
2306                swarm
2307                    .dial(
2308                        DialOpts::peer_id(PeerId::random())
2309                            .addresses(listen_addresses)
2310                            .build(),
2311                    )
2312                    .unwrap();
2313                for mut transport in transports.into_iter() {
2314                    match futures::future::select(transport.select_next_some(), swarm.next()).await
2315                    {
2316                        future::Either::Left((TransportEvent::Incoming { .. }, _)) => {}
2317                        future::Either::Left(_) => {
2318                            panic!("Unexpected transport event.")
2319                        }
2320                        future::Either::Right((e, _)) => {
2321                            panic!("Expect swarm to not emit any event {e:?}")
2322                        }
2323                    }
2324                }
2325
2326                match swarm.next().await.unwrap() {
2327                    SwarmEvent::OutgoingConnectionError { .. } => {}
2328                    e => panic!("Unexpected swarm event {e:?}"),
2329                }
2330            })
2331        }
2332
2333        QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _);
2334    }
2335
2336    #[tokio::test]
2337    async fn invalid_peer_id() {
2338        // Checks whether dialing an address containing the wrong peer id raises an error
2339        // for the expected peer id instead of the obtained peer id.
2340
2341        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
2342        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
2343
2344        swarm1.listen_on("/memory/0".parse().unwrap()).unwrap();
2345
2346        let address = future::poll_fn(|cx| match swarm1.poll_next_unpin(cx) {
2347            Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => Poll::Ready(address),
2348            Poll::Pending => Poll::Pending,
2349            _ => panic!("Was expecting the listen address to be reported"),
2350        })
2351        .await;
2352
2353        let other_id = PeerId::random();
2354        let other_addr = address.with(multiaddr::Protocol::P2p(other_id));
2355
2356        swarm2.dial(other_addr.clone()).unwrap();
2357
2358        let (peer_id, error) = future::poll_fn(|cx| {
2359            if let Poll::Ready(Some(SwarmEvent::IncomingConnection { .. })) =
2360                swarm1.poll_next_unpin(cx)
2361            {}
2362
2363            match swarm2.poll_next_unpin(cx) {
2364                Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
2365                    peer_id, error, ..
2366                })) => Poll::Ready((peer_id, error)),
2367                Poll::Ready(x) => panic!("unexpected {x:?}"),
2368                Poll::Pending => Poll::Pending,
2369            }
2370        })
2371        .await;
2372        assert_eq!(peer_id.unwrap(), other_id);
2373        match error {
2374            DialError::WrongPeerId { obtained, endpoint } => {
2375                assert_eq!(obtained, *swarm1.local_peer_id());
2376                assert_eq!(
2377                    endpoint,
2378                    ConnectedPoint::Dialer {
2379                        address: other_addr,
2380                        role_override: Endpoint::Dialer,
2381                    }
2382                );
2383            }
2384            x => panic!("wrong error {x:?}"),
2385        }
2386    }
2387
2388    #[tokio::test]
2389    async fn dial_self() {
2390        // Check whether dialing ourselves correctly fails.
2391        //
2392        // Dialing the same address we're listening should result in three events:
2393        //
2394        // - The incoming connection notification (before we know the incoming peer ID).
2395        // - The connection error for the dialing endpoint (once we've determined that it's our own ID).
2396        // - The connection error for the listening endpoint (once we've determined that it's our own ID).
2397        //
2398        // The last two can happen in any order.
2399
2400        let mut swarm = new_test_swarm(Config::with_tokio_executor());
2401        swarm.listen_on("/memory/0".parse().unwrap()).unwrap();
2402
2403        let local_address = future::poll_fn(|cx| match swarm.poll_next_unpin(cx) {
2404            Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => Poll::Ready(address),
2405            Poll::Pending => Poll::Pending,
2406            _ => panic!("Was expecting the listen address to be reported"),
2407        })
2408        .await;
2409
2410        swarm.listened_addrs.clear(); // This is a hack to actually execute the dial to ourselves which would otherwise be filtered.
2411
2412        swarm.dial(local_address.clone()).unwrap();
2413
2414        let mut got_dial_err = false;
2415        let mut got_inc_err = false;
2416        future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
2417            loop {
2418                match swarm.poll_next_unpin(cx) {
2419                    Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
2420                        peer_id,
2421                        error: DialError::LocalPeerId { .. },
2422                        ..
2423                    })) => {
2424                        assert_eq!(&peer_id.unwrap(), swarm.local_peer_id());
2425                        assert!(!got_dial_err);
2426                        got_dial_err = true;
2427                        if got_inc_err {
2428                            return Poll::Ready(Ok(()));
2429                        }
2430                    }
2431                    Poll::Ready(Some(SwarmEvent::IncomingConnectionError {
2432                        local_addr, ..
2433                    })) => {
2434                        assert!(!got_inc_err);
2435                        assert_eq!(local_addr, local_address);
2436                        got_inc_err = true;
2437                        if got_dial_err {
2438                            return Poll::Ready(Ok(()));
2439                        }
2440                    }
2441                    Poll::Ready(Some(SwarmEvent::IncomingConnection { local_addr, .. })) => {
2442                        assert_eq!(local_addr, local_address);
2443                    }
2444                    Poll::Ready(ev) => {
2445                        panic!("Unexpected event: {ev:?}")
2446                    }
2447                    Poll::Pending => break Poll::Pending,
2448                }
2449            }
2450        })
2451        .await
2452        .unwrap();
2453    }
2454
2455    #[tokio::test]
2456    async fn dial_self_by_id() {
2457        // Trying to dial self by passing the same `PeerId` shouldn't even be possible in the first
2458        // place.
2459        let swarm = new_test_swarm(Config::with_tokio_executor());
2460        let peer_id = *swarm.local_peer_id();
2461        assert!(!swarm.is_connected(&peer_id));
2462    }
2463
2464    #[tokio::test]
2465    async fn multiple_addresses_err() {
2466        // Tries dialing multiple addresses, and makes sure there's one dialing error per address.
2467
2468        let target = PeerId::random();
2469
2470        let mut swarm = new_test_swarm(Config::with_tokio_executor());
2471
2472        let addresses = HashSet::from([
2473            multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2474            multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2475            multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2476            multiaddr![Udp(rand::random::<u16>())],
2477            multiaddr![Udp(rand::random::<u16>())],
2478            multiaddr![Udp(rand::random::<u16>())],
2479            multiaddr![Udp(rand::random::<u16>())],
2480            multiaddr![Udp(rand::random::<u16>())],
2481        ]);
2482
2483        swarm
2484            .dial(
2485                DialOpts::peer_id(target)
2486                    .addresses(addresses.iter().cloned().collect())
2487                    .build(),
2488            )
2489            .unwrap();
2490
2491        match swarm.next().await.unwrap() {
2492            SwarmEvent::OutgoingConnectionError {
2493                peer_id,
2494                // multiaddr,
2495                error: DialError::Transport(errors),
2496                ..
2497            } => {
2498                assert_eq!(target, peer_id.unwrap());
2499
2500                let failed_addresses = errors.into_iter().map(|(addr, _)| addr).collect::<Vec<_>>();
2501                let expected_addresses = addresses
2502                    .into_iter()
2503                    .map(|addr| addr.with(multiaddr::Protocol::P2p(target)))
2504                    .collect::<Vec<_>>();
2505
2506                assert_eq!(expected_addresses, failed_addresses);
2507            }
2508            e => panic!("Unexpected event: {e:?}"),
2509        }
2510    }
2511
2512    #[tokio::test]
2513    async fn aborting_pending_connection_surfaces_error() {
2514        let _ = env_logger::try_init();
2515
2516        let mut dialer = new_test_swarm(Config::with_tokio_executor());
2517        let mut listener = new_test_swarm(Config::with_tokio_executor());
2518
2519        let listener_peer_id = *listener.local_peer_id();
2520        listener.listen_on(multiaddr![Memory(0u64)]).unwrap();
2521        let listener_address = match listener.next().await.unwrap() {
2522            SwarmEvent::NewListenAddr { address, .. } => address,
2523            e => panic!("Unexpected network event: {e:?}"),
2524        };
2525
2526        dialer
2527            .dial(
2528                DialOpts::peer_id(listener_peer_id)
2529                    .addresses(vec![listener_address])
2530                    .build(),
2531            )
2532            .unwrap();
2533
2534        dialer
2535            .disconnect_peer_id(listener_peer_id)
2536            .expect_err("Expect peer to not yet be connected.");
2537
2538        match dialer.next().await.unwrap() {
2539            SwarmEvent::OutgoingConnectionError {
2540                error: DialError::Aborted,
2541                ..
2542            } => {}
2543            e => panic!("Unexpected swarm event {e:?}."),
2544        }
2545    }
2546
2547    #[test]
2548    fn dial_error_prints_sources() {
2549        // This constitutes a fairly typical error for chained transports.
2550        let error = DialError::Transport(vec![(
2551            "/ip4/127.0.0.1/tcp/80".parse().unwrap(),
2552            TransportError::Other(io::Error::new(
2553                io::ErrorKind::Other,
2554                MemoryTransportError::Unreachable,
2555            )),
2556        )]);
2557
2558        let string = format!("{error}");
2559
2560        // Unfortunately, we have some "empty" errors that lead to multiple colons without text but that is the best we can do.
2561        assert_eq!("Failed to negotiate transport protocol(s): [(/ip4/127.0.0.1/tcp/80: : No listener on the given port.)]", string)
2562    }
2563}