litep2p/transport/manager/
mod.rs

1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::{
22    addresses::PublicAddresses,
23    codec::ProtocolCodec,
24    crypto::ed25519::Keypair,
25    error::{AddressError, DialError, Error},
26    executor::Executor,
27    protocol::{InnerTransportEvent, TransportService},
28    transport::{
29        manager::{
30            address::{AddressRecord, AddressStore},
31            handle::InnerTransportManagerCommand,
32            types::{PeerContext, PeerState},
33        },
34        Endpoint, Transport, TransportEvent,
35    },
36    types::{protocol::ProtocolName, ConnectionId},
37    BandwidthSink, PeerId,
38};
39
40use futures::{Stream, StreamExt};
41use indexmap::IndexMap;
42use multiaddr::{Multiaddr, Protocol};
43use multihash::Multihash;
44use parking_lot::RwLock;
45use tokio::sync::mpsc::{channel, Receiver, Sender};
46
47use std::{
48    collections::{hash_map::Entry, HashMap, HashSet},
49    pin::Pin,
50    sync::{
51        atomic::{AtomicUsize, Ordering},
52        Arc,
53    },
54    task::{Context, Poll},
55    time::Duration,
56};
57
58pub use handle::{TransportHandle, TransportManagerHandle};
59pub use types::SupportedTransport;
60
61mod address;
62pub mod limits;
63mod types;
64
65pub(crate) mod handle;
66
67// TODO: store `Multiaddr` in `Arc`
68// TODO: limit number of peers and addresses
69// TODO: rename constants
70// TODO: add lots of documentation
71
72/// Logging target for the file.
73const LOG_TARGET: &str = "litep2p::transport-manager";
74
75/// Score for a working address.
76const SCORE_CONNECT_SUCCESS: i32 = 100i32;
77
78/// Score for a non-working address.
79const SCORE_CONNECT_FAILURE: i32 = -100i32;
80
81/// The connection established result.
82#[derive(Debug, Clone, Copy, Eq, PartialEq)]
83enum ConnectionEstablishedResult {
84    /// Accept connection and inform `Litep2p` about the connection.
85    Accept,
86
87    /// Reject connection.
88    Reject,
89}
90
91/// [`crate::transport::manager::TransportManager`] events.
92pub enum TransportManagerEvent {
93    /// Connection closed to remote peer.
94    ConnectionClosed {
95        /// Peer ID.
96        peer: PeerId,
97
98        /// Connection ID.
99        connection: ConnectionId,
100    },
101}
102
103// Protocol context.
104#[derive(Debug, Clone)]
105pub struct ProtocolContext {
106    /// Codec used by the protocol.
107    pub codec: ProtocolCodec,
108
109    /// TX channel for sending events to protocol.
110    pub tx: Sender<InnerTransportEvent>,
111
112    /// Fallback names for the protocol.
113    pub fallback_names: Vec<ProtocolName>,
114}
115
116impl ProtocolContext {
117    /// Create new [`ProtocolContext`].
118    fn new(
119        codec: ProtocolCodec,
120        tx: Sender<InnerTransportEvent>,
121        fallback_names: Vec<ProtocolName>,
122    ) -> Self {
123        Self {
124            tx,
125            codec,
126            fallback_names,
127        }
128    }
129}
130
131/// Transport context for enabled transports.
132struct TransportContext {
133    /// Polling index.
134    index: usize,
135
136    /// Registered transports.
137    transports: IndexMap<SupportedTransport, Box<dyn Transport<Item = TransportEvent>>>,
138}
139
140impl TransportContext {
141    /// Create new [`TransportContext`].
142    pub fn new() -> Self {
143        Self {
144            index: 0usize,
145            transports: IndexMap::new(),
146        }
147    }
148
149    /// Get an iterator of supported transports.
150    pub fn keys(&self) -> impl Iterator<Item = &SupportedTransport> {
151        self.transports.keys()
152    }
153
154    /// Get mutable access to transport.
155    pub fn get_mut(
156        &mut self,
157        key: &SupportedTransport,
158    ) -> Option<&mut Box<dyn Transport<Item = TransportEvent>>> {
159        self.transports.get_mut(key)
160    }
161
162    /// Register `transport` to `TransportContext`.
163    pub fn register_transport(
164        &mut self,
165        name: SupportedTransport,
166        transport: Box<dyn Transport<Item = TransportEvent>>,
167    ) {
168        assert!(self.transports.insert(name, transport).is_none());
169    }
170}
171
172impl Stream for TransportContext {
173    type Item = (SupportedTransport, TransportEvent);
174
175    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
176        let len = match self.transports.len() {
177            0 => return Poll::Ready(None),
178            len => len,
179        };
180        let start_index = self.index;
181
182        loop {
183            let index = self.index % len;
184            self.index += 1;
185
186            let (key, stream) = self.transports.get_index_mut(index).expect("transport to exist");
187            match stream.poll_next_unpin(cx) {
188                Poll::Pending => {}
189                Poll::Ready(None) => return Poll::Ready(None),
190                Poll::Ready(Some(event)) => return Poll::Ready(Some((*key, event))),
191            }
192
193            if self.index == start_index + len {
194                break Poll::Pending;
195            }
196        }
197    }
198}
199
200/// Litep2p connection manager.
201pub struct TransportManager {
202    /// Local peer ID.
203    local_peer_id: PeerId,
204
205    /// Keypair.
206    keypair: Keypair,
207
208    /// Bandwidth sink.
209    bandwidth_sink: BandwidthSink,
210
211    /// Maximum parallel dial attempts per peer.
212    max_parallel_dials: usize,
213
214    /// Installed protocols.
215    protocols: HashMap<ProtocolName, ProtocolContext>,
216
217    /// All names (main and fallback(s)) of the installed protocols.
218    protocol_names: HashSet<ProtocolName>,
219
220    /// Listen addresses.
221    listen_addresses: Arc<RwLock<HashSet<Multiaddr>>>,
222
223    /// Listen addresses.
224    public_addresses: PublicAddresses,
225
226    /// Next connection ID.
227    next_connection_id: Arc<AtomicUsize>,
228
229    /// Next substream ID.
230    next_substream_id: Arc<AtomicUsize>,
231
232    /// Installed transports.
233    transports: TransportContext,
234
235    /// Peers
236    peers: Arc<RwLock<HashMap<PeerId, PeerContext>>>,
237
238    /// Handle to [`crate::transport::manager::TransportManager`].
239    transport_manager_handle: TransportManagerHandle,
240
241    /// RX channel for receiving events from installed transports.
242    event_rx: Receiver<TransportManagerEvent>,
243
244    /// RX channel for receiving commands from installed protocols.
245    cmd_rx: Receiver<InnerTransportManagerCommand>,
246
247    /// TX channel for transport events that is given to installed transports.
248    event_tx: Sender<TransportManagerEvent>,
249
250    /// Pending connections.
251    pending_connections: HashMap<ConnectionId, PeerId>,
252
253    /// Connection limits.
254    connection_limits: limits::ConnectionLimits,
255
256    /// Opening connections errors.
257    opening_errors: HashMap<ConnectionId, Vec<(Multiaddr, DialError)>>,
258}
259
260impl TransportManager {
261    /// Create new [`crate::transport::manager::TransportManager`].
262    // TODO: don't return handle here
263    pub fn new(
264        keypair: Keypair,
265        supported_transports: HashSet<SupportedTransport>,
266        bandwidth_sink: BandwidthSink,
267        max_parallel_dials: usize,
268        connection_limits_config: limits::ConnectionLimitsConfig,
269    ) -> (Self, TransportManagerHandle) {
270        let local_peer_id = PeerId::from_public_key(&keypair.public().into());
271        let peers = Arc::new(RwLock::new(HashMap::new()));
272        let (cmd_tx, cmd_rx) = channel(256);
273        let (event_tx, event_rx) = channel(256);
274        let listen_addresses = Arc::new(RwLock::new(HashSet::new()));
275        let public_addresses = PublicAddresses::new(local_peer_id);
276        let handle = TransportManagerHandle::new(
277            local_peer_id,
278            peers.clone(),
279            cmd_tx,
280            supported_transports,
281            listen_addresses.clone(),
282            public_addresses.clone(),
283        );
284
285        (
286            Self {
287                peers,
288                cmd_rx,
289                keypair,
290                event_tx,
291                event_rx,
292                local_peer_id,
293                bandwidth_sink,
294                listen_addresses,
295                public_addresses,
296                max_parallel_dials,
297                protocols: HashMap::new(),
298                transports: TransportContext::new(),
299                protocol_names: HashSet::new(),
300                transport_manager_handle: handle.clone(),
301                pending_connections: HashMap::new(),
302                next_substream_id: Arc::new(AtomicUsize::new(0usize)),
303                next_connection_id: Arc::new(AtomicUsize::new(0usize)),
304                connection_limits: limits::ConnectionLimits::new(connection_limits_config),
305                opening_errors: HashMap::new(),
306            },
307            handle,
308        )
309    }
310
311    /// Get iterator to installed protocols.
312    pub fn protocols(&self) -> impl Iterator<Item = &ProtocolName> {
313        self.protocols.keys()
314    }
315
316    /// Get iterator to installed transports
317    pub fn installed_transports(&self) -> impl Iterator<Item = &SupportedTransport> {
318        self.transports.keys()
319    }
320
321    /// Get next connection ID.
322    fn next_connection_id(&mut self) -> ConnectionId {
323        let connection_id = self.next_connection_id.fetch_add(1usize, Ordering::Relaxed);
324
325        ConnectionId::from(connection_id)
326    }
327
328    /// Register protocol to the [`crate::transport::manager::TransportManager`].
329    ///
330    /// This allocates new context for the protocol and returns a handle
331    /// which the protocol can use the interact with the transport subsystem.
332    pub fn register_protocol(
333        &mut self,
334        protocol: ProtocolName,
335        fallback_names: Vec<ProtocolName>,
336        codec: ProtocolCodec,
337        keep_alive_timeout: Duration,
338    ) -> TransportService {
339        assert!(!self.protocol_names.contains(&protocol));
340
341        for fallback in &fallback_names {
342            if self.protocol_names.contains(fallback) {
343                panic!("duplicate fallback protocol given: {fallback:?}");
344            }
345        }
346
347        let (service, sender) = TransportService::new(
348            self.local_peer_id,
349            protocol.clone(),
350            fallback_names.clone(),
351            self.next_substream_id.clone(),
352            self.transport_manager_handle.clone(),
353            keep_alive_timeout,
354        );
355
356        self.protocols.insert(
357            protocol.clone(),
358            ProtocolContext::new(codec, sender, fallback_names.clone()),
359        );
360        self.protocol_names.insert(protocol);
361        self.protocol_names.extend(fallback_names);
362
363        service
364    }
365
366    /// Acquire `TransportHandle`.
367    pub fn transport_handle(&self, executor: Arc<dyn Executor>) -> TransportHandle {
368        TransportHandle {
369            tx: self.event_tx.clone(),
370            executor,
371            keypair: self.keypair.clone(),
372            protocols: self.protocols.clone(),
373            bandwidth_sink: self.bandwidth_sink.clone(),
374            next_substream_id: self.next_substream_id.clone(),
375            next_connection_id: self.next_connection_id.clone(),
376        }
377    }
378
379    /// Register transport to `TransportManager`.
380    pub(crate) fn register_transport(
381        &mut self,
382        name: SupportedTransport,
383        transport: Box<dyn Transport<Item = TransportEvent>>,
384    ) {
385        tracing::debug!(target: LOG_TARGET, transport = ?name, "register transport");
386
387        self.transports.register_transport(name, transport);
388        self.transport_manager_handle.register_transport(name);
389    }
390
391    /// Get the list of public addresses of the node.
392    pub(crate) fn public_addresses(&self) -> PublicAddresses {
393        self.public_addresses.clone()
394    }
395
396    /// Register local listen address.
397    pub fn register_listen_address(&mut self, address: Multiaddr) {
398        assert!(!address.iter().any(|protocol| std::matches!(protocol, Protocol::P2p(_))));
399
400        let mut listen_addresses = self.listen_addresses.write();
401
402        listen_addresses.insert(address.clone());
403        listen_addresses.insert(address.with(Protocol::P2p(
404            Multihash::from_bytes(&self.local_peer_id.to_bytes()).unwrap(),
405        )));
406    }
407
408    /// Add one or more known addresses for `peer`.
409    pub fn add_known_address(
410        &mut self,
411        peer: PeerId,
412        address: impl Iterator<Item = Multiaddr>,
413    ) -> usize {
414        self.transport_manager_handle.add_known_address(&peer, address)
415    }
416
417    /// Dial peer using `PeerId`.
418    ///
419    /// Returns an error if the peer is unknown or the peer is already connected.
420    pub async fn dial(&mut self, peer: PeerId) -> crate::Result<()> {
421        // Don't alter the peer state if there's no capacity to dial.
422        let available_capacity = self.connection_limits.on_dial_address()?;
423        // The available capacity is the maximum number of connections that can be established,
424        // so we limit the number of parallel dials to the minimum of these values.
425        let limit = available_capacity.min(self.max_parallel_dials);
426
427        if peer == self.local_peer_id {
428            return Err(Error::TriedToDialSelf);
429        }
430        let mut peers = self.peers.write();
431
432        // if the peer is disconnected, return its context
433        //
434        // otherwise set the state back what it was and return dial status to caller
435        let PeerContext {
436            state,
437            secondary_connection,
438            mut addresses,
439        } = match peers.remove(&peer) {
440            None => return Err(Error::PeerDoesntExist(peer)),
441            Some(
442                context @ PeerContext {
443                    state: PeerState::Connected { .. },
444                    ..
445                },
446            ) => {
447                peers.insert(peer, context);
448                return Err(Error::AlreadyConnected);
449            }
450            Some(
451                context @ PeerContext {
452                    state: PeerState::Dialing { .. } | PeerState::Opening { .. },
453                    ..
454                },
455            ) => {
456                peers.insert(peer, context);
457                return Ok(());
458            }
459            Some(context) => context,
460        };
461
462        if let PeerState::Disconnected {
463            dial_record: Some(_),
464        } = &state
465        {
466            tracing::debug!(
467                target: LOG_TARGET,
468                ?peer,
469                "peer is already being dialed",
470            );
471
472            peers.insert(
473                peer,
474                PeerContext {
475                    state,
476                    secondary_connection,
477                    addresses,
478                },
479            );
480
481            return Ok(());
482        }
483
484        let mut records: HashMap<_, _> = addresses
485            .take(limit)
486            .into_iter()
487            .map(|record| (record.address().clone(), record))
488            .collect();
489
490        if records.is_empty() {
491            return Err(Error::NoAddressAvailable(peer));
492        }
493
494        let locked_addresses = self.listen_addresses.read();
495        for record in records.values() {
496            if locked_addresses.contains(record.as_ref()) {
497                tracing::warn!(
498                    target: LOG_TARGET,
499                    ?peer,
500                    ?record,
501                    "tried to dial self",
502                );
503
504                debug_assert!(false);
505                return Err(Error::TriedToDialSelf);
506            }
507        }
508        drop(locked_addresses);
509
510        // set connection id for the address record and put peer into `Opening` state
511        let connection_id =
512            ConnectionId::from(self.next_connection_id.fetch_add(1usize, Ordering::Relaxed));
513
514        tracing::debug!(
515            target: LOG_TARGET,
516            ?connection_id,
517            addresses = ?records,
518            "dial remote peer",
519        );
520
521        let mut transports = HashSet::new();
522        #[cfg(feature = "websocket")]
523        let mut websocket = Vec::new();
524        #[cfg(feature = "quic")]
525        let mut quic = Vec::new();
526        let mut tcp = Vec::new();
527
528        for (address, record) in &mut records {
529            record.set_connection_id(connection_id);
530
531            #[cfg(feature = "quic")]
532            if address.iter().any(|p| std::matches!(&p, Protocol::QuicV1)) {
533                quic.push(address.clone());
534                transports.insert(SupportedTransport::Quic);
535                continue;
536            }
537
538            #[cfg(feature = "websocket")]
539            if address.iter().any(|p| std::matches!(&p, Protocol::Ws(_) | Protocol::Wss(_))) {
540                websocket.push(address.clone());
541                transports.insert(SupportedTransport::WebSocket);
542                continue;
543            }
544
545            tcp.push(address.clone());
546            transports.insert(SupportedTransport::Tcp);
547        }
548
549        peers.insert(
550            peer,
551            PeerContext {
552                state: PeerState::Opening {
553                    records,
554                    connection_id,
555                    transports,
556                },
557                secondary_connection,
558                addresses,
559            },
560        );
561
562        if !tcp.is_empty() {
563            self.transports
564                .get_mut(&SupportedTransport::Tcp)
565                .expect("transport to be supported")
566                .open(connection_id, tcp)?;
567        }
568
569        #[cfg(feature = "quic")]
570        if !quic.is_empty() {
571            self.transports
572                .get_mut(&SupportedTransport::Quic)
573                .expect("transport to be supported")
574                .open(connection_id, quic)?;
575        }
576
577        #[cfg(feature = "websocket")]
578        if !websocket.is_empty() {
579            self.transports
580                .get_mut(&SupportedTransport::WebSocket)
581                .expect("transport to be supported")
582                .open(connection_id, websocket)?;
583        }
584
585        self.pending_connections.insert(connection_id, peer);
586
587        Ok(())
588    }
589
590    /// Dial peer using `Multiaddr`.
591    ///
592    /// Returns an error if address it not valid.
593    pub async fn dial_address(&mut self, address: Multiaddr) -> crate::Result<()> {
594        self.connection_limits.on_dial_address()?;
595
596        let mut record = AddressRecord::from_multiaddr(address)
597            .ok_or(Error::AddressError(AddressError::PeerIdMissing))?;
598
599        if self.listen_addresses.read().contains(record.as_ref()) {
600            return Err(Error::TriedToDialSelf);
601        }
602
603        tracing::debug!(target: LOG_TARGET, address = ?record.address(), "dial address");
604
605        let mut protocol_stack = record.as_ref().iter();
606        match protocol_stack
607            .next()
608            .ok_or_else(|| Error::TransportNotSupported(record.address().clone()))?
609        {
610            Protocol::Ip4(_) | Protocol::Ip6(_) => {}
611            Protocol::Dns(_) | Protocol::Dns4(_) | Protocol::Dns6(_) => {}
612            transport => {
613                tracing::error!(
614                    target: LOG_TARGET,
615                    ?transport,
616                    "invalid transport, expected `ip4`/`ip6`"
617                );
618                return Err(Error::TransportNotSupported(record.address().clone()));
619            }
620        };
621
622        let supported_transport = match protocol_stack
623            .next()
624            .ok_or_else(|| Error::TransportNotSupported(record.address().clone()))?
625        {
626            Protocol::Tcp(_) => match protocol_stack.next() {
627                #[cfg(feature = "websocket")]
628                Some(Protocol::Ws(_)) | Some(Protocol::Wss(_)) => SupportedTransport::WebSocket,
629                Some(Protocol::P2p(_)) => SupportedTransport::Tcp,
630                _ => return Err(Error::TransportNotSupported(record.address().clone())),
631            },
632            #[cfg(feature = "quic")]
633            Protocol::Udp(_) => match protocol_stack
634                .next()
635                .ok_or_else(|| Error::TransportNotSupported(record.address().clone()))?
636            {
637                Protocol::QuicV1 => SupportedTransport::Quic,
638                _ => {
639                    tracing::debug!(target: LOG_TARGET, address = ?record.address(), "expected `quic-v1`");
640                    return Err(Error::TransportNotSupported(record.address().clone()));
641                }
642            },
643            protocol => {
644                tracing::error!(
645                    target: LOG_TARGET,
646                    ?protocol,
647                    "invalid protocol"
648                );
649
650                return Err(Error::TransportNotSupported(record.address().clone()));
651            }
652        };
653
654        // when constructing `AddressRecord`, `PeerId` was verified to be part of the address
655        let remote_peer_id =
656            PeerId::try_from_multiaddr(record.address()).expect("`PeerId` to exist");
657
658        // set connection id for the address record and put peer into `Dialing` state
659        let connection_id = self.next_connection_id();
660        record.set_connection_id(connection_id);
661
662        {
663            let mut peers = self.peers.write();
664
665            match peers.entry(remote_peer_id) {
666                Entry::Occupied(occupied) => {
667                    let context = occupied.into_mut();
668
669                    // For a better address tacking, see:
670                    // https://github.com/paritytech/litep2p/issues/180
671                    //
672                    // TODO: context.addresses.insert(record.clone());
673
674                    tracing::debug!(
675                        target: LOG_TARGET,
676                        peer = ?remote_peer_id,
677                        state = ?context.state,
678                        "peer state exists",
679                    );
680
681                    match context.state {
682                        PeerState::Connected { .. } => {
683                            return Err(Error::AlreadyConnected);
684                        }
685                        PeerState::Dialing { .. } | PeerState::Opening { .. } => {
686                            return Ok(());
687                        }
688                        PeerState::Disconnected {
689                            dial_record: Some(_),
690                        } => {
691                            tracing::debug!(
692                                target: LOG_TARGET,
693                                peer = ?remote_peer_id,
694                                state = ?context.state,
695                                "peer is already being dialed from a disconnected state"
696                            );
697                            return Ok(());
698                        }
699                        PeerState::Disconnected { dial_record: None } => {
700                            context.state = PeerState::Dialing {
701                                record: record.clone(),
702                            };
703                        }
704                    }
705                }
706                Entry::Vacant(vacant) => {
707                    vacant.insert(PeerContext {
708                        state: PeerState::Dialing {
709                            record: record.clone(),
710                        },
711                        addresses: AddressStore::new(),
712                        secondary_connection: None,
713                    });
714                }
715            };
716        }
717
718        self.transports
719            .get_mut(&supported_transport)
720            .ok_or(Error::TransportNotSupported(record.address().clone()))?
721            .dial(connection_id, record.address().clone())?;
722        self.pending_connections.insert(connection_id, remote_peer_id);
723
724        Ok(())
725    }
726
727    /// Handle dial failure.
728    fn on_dial_failure(&mut self, connection_id: ConnectionId) -> crate::Result<()> {
729        let peer = self.pending_connections.remove(&connection_id).ok_or_else(|| {
730            tracing::error!(
731                target: LOG_TARGET,
732                ?connection_id,
733                "dial failed for a connection that doesn't exist",
734            );
735            Error::InvalidState
736        })?;
737
738        let mut peers = self.peers.write();
739        let context = peers.get_mut(&peer).ok_or_else(|| {
740            tracing::error!(
741                target: LOG_TARGET,
742                ?peer,
743                ?connection_id,
744                "dial failed for a peer that doesn't exist",
745            );
746            debug_assert!(false);
747
748            Error::InvalidState
749        })?;
750
751        match std::mem::replace(
752            &mut context.state,
753            PeerState::Disconnected { dial_record: None },
754        ) {
755            PeerState::Dialing { ref mut record } => {
756                debug_assert_eq!(record.connection_id(), &Some(connection_id));
757                if record.connection_id() != &Some(connection_id) {
758                    tracing::warn!(
759                        target: LOG_TARGET,
760                        ?peer,
761                        ?connection_id,
762                        ?record,
763                        "unknown dial failure for a dialing peer",
764                    );
765
766                    context.state = PeerState::Dialing {
767                        record: record.clone(),
768                    };
769                    debug_assert!(false);
770                    return Ok(());
771                }
772
773                record.update_score(SCORE_CONNECT_FAILURE);
774                context.addresses.insert(record.clone());
775
776                context.state = PeerState::Disconnected { dial_record: None };
777                Ok(())
778            }
779            PeerState::Opening { .. } => {
780                todo!();
781            }
782            PeerState::Connected {
783                record,
784                dial_record: Some(mut dial_record),
785            } => {
786                if dial_record.connection_id() != &Some(connection_id) {
787                    tracing::warn!(
788                        target: LOG_TARGET,
789                        ?peer,
790                        ?connection_id,
791                        ?record,
792                        "unknown dial failure for a connected peer",
793                    );
794
795                    context.state = PeerState::Connected {
796                        record,
797                        dial_record: Some(dial_record),
798                    };
799                    debug_assert!(false);
800                    return Ok(());
801                }
802
803                dial_record.update_score(SCORE_CONNECT_FAILURE);
804                context.addresses.insert(dial_record);
805
806                context.state = PeerState::Connected {
807                    record,
808                    dial_record: None,
809                };
810                Ok(())
811            }
812            PeerState::Disconnected {
813                dial_record: Some(mut dial_record),
814            } => {
815                tracing::debug!(
816                    target: LOG_TARGET,
817                    ?connection_id,
818                    ?dial_record,
819                    "dial failed for a disconnected peer",
820                );
821
822                if dial_record.connection_id() != &Some(connection_id) {
823                    tracing::warn!(
824                        target: LOG_TARGET,
825                        ?peer,
826                        ?connection_id,
827                        ?dial_record,
828                        "unknown dial failure for a disconnected peer",
829                    );
830
831                    context.state = PeerState::Disconnected {
832                        dial_record: Some(dial_record),
833                    };
834                    debug_assert!(false);
835                    return Ok(());
836                }
837
838                dial_record.update_score(SCORE_CONNECT_FAILURE);
839                context.addresses.insert(dial_record);
840
841                Ok(())
842            }
843            state => {
844                tracing::warn!(
845                    target: LOG_TARGET,
846                    ?peer,
847                    ?connection_id,
848                    ?state,
849                    "invalid state for dial failure",
850                );
851                context.state = state;
852
853                debug_assert!(false);
854                Ok(())
855            }
856        }
857    }
858
859    fn on_pending_incoming_connection(&mut self) -> crate::Result<()> {
860        self.connection_limits.on_incoming()?;
861        Ok(())
862    }
863
864    /// Handle closed connection.
865    fn on_connection_closed(
866        &mut self,
867        peer: PeerId,
868        connection_id: ConnectionId,
869    ) -> crate::Result<Option<TransportEvent>> {
870        self.connection_limits.on_connection_closed(connection_id);
871
872        let mut peers = self.peers.write();
873        let Some(context) = peers.get_mut(&peer) else {
874            tracing::warn!(
875                target: LOG_TARGET,
876                ?peer,
877                ?connection_id,
878                "cannot handle closed connection: peer doesn't exist",
879            );
880            debug_assert!(false);
881            return Err(Error::PeerDoesntExist(peer));
882        };
883
884        tracing::trace!(
885            target: LOG_TARGET,
886            ?peer,
887            ?connection_id,
888            "connection closed",
889        );
890
891        match std::mem::replace(
892            &mut context.state,
893            PeerState::Disconnected { dial_record: None },
894        ) {
895            PeerState::Connected {
896                record,
897                dial_record: actual_dial_record,
898            } => match record.connection_id() == &Some(connection_id) {
899                // primary connection was closed
900                //
901                // if secondary connection exists, switch to using it while keeping peer in
902                // `Connected` state and if there's only one connection, set peer
903                // state to `Disconnected`
904                true => match context.secondary_connection.take() {
905                    None => {
906                        context.addresses.insert(record);
907                        context.state = PeerState::Disconnected {
908                            dial_record: actual_dial_record,
909                        };
910
911                        Ok(Some(TransportEvent::ConnectionClosed {
912                            peer,
913                            connection_id,
914                        }))
915                    }
916                    Some(secondary_connection) => {
917                        context.addresses.insert(record);
918                        context.state = PeerState::Connected {
919                            record: secondary_connection,
920                            dial_record: actual_dial_record,
921                        };
922
923                        Ok(None)
924                    }
925                },
926                // secondary connection was closed
927                false => match context.secondary_connection.take() {
928                    Some(secondary_connection) => {
929                        if secondary_connection.connection_id() != &Some(connection_id) {
930                            tracing::debug!(
931                                target: LOG_TARGET,
932                                ?peer,
933                                ?connection_id,
934                                "unknown connection was closed, potentially ignored tertiary connection",
935                            );
936
937                            context.secondary_connection = Some(secondary_connection);
938                            context.state = PeerState::Connected {
939                                record,
940                                dial_record: actual_dial_record,
941                            };
942
943                            return Ok(None);
944                        }
945
946                        tracing::trace!(
947                            target: LOG_TARGET,
948                            ?peer,
949                            ?connection_id,
950                            "secondary connection closed",
951                        );
952
953                        context.addresses.insert(secondary_connection);
954                        context.state = PeerState::Connected {
955                            record,
956                            dial_record: actual_dial_record,
957                        };
958                        Ok(None)
959                    }
960                    None => {
961                        tracing::warn!(
962                            target: LOG_TARGET,
963                            ?peer,
964                            ?connection_id,
965                            "non-primary connection was closed but secondary connection doesn't exist",
966                        );
967
968                        debug_assert!(false);
969                        Err(Error::InvalidState)
970                    }
971                },
972            },
973            PeerState::Disconnected { dial_record } => match context.secondary_connection.take() {
974                Some(record) => {
975                    tracing::warn!(
976                        target: LOG_TARGET,
977                        ?peer,
978                        ?connection_id,
979                        ?record,
980                        ?dial_record,
981                        "peer is disconnected but secondary connection exists",
982                    );
983
984                    debug_assert!(false);
985                    context.state = PeerState::Disconnected { dial_record };
986                    Err(Error::InvalidState)
987                }
988                None => {
989                    context.state = PeerState::Disconnected { dial_record };
990
991                    Ok(Some(TransportEvent::ConnectionClosed {
992                        peer,
993                        connection_id,
994                    }))
995                }
996            },
997            state => {
998                tracing::warn!(target: LOG_TARGET, ?peer, ?connection_id, ?state, "invalid state for a closed connection");
999                debug_assert!(false);
1000                Err(Error::InvalidState)
1001            }
1002        }
1003    }
1004
1005    fn on_connection_established(
1006        &mut self,
1007        peer: PeerId,
1008        endpoint: &Endpoint,
1009    ) -> crate::Result<ConnectionEstablishedResult> {
1010        if let Some(dialed_peer) = self.pending_connections.remove(&endpoint.connection_id()) {
1011            if dialed_peer != peer {
1012                tracing::warn!(
1013                    target: LOG_TARGET,
1014                    ?dialed_peer,
1015                    ?peer,
1016                    ?endpoint,
1017                    "peer ids do not match but transport was supposed to reject connection"
1018                );
1019                debug_assert!(false);
1020                return Err(Error::InvalidState);
1021            }
1022        };
1023
1024        // Reject the connection if exceeded limits.
1025        if let Err(error) = self
1026            .connection_limits
1027            .on_connection_established(endpoint.connection_id(), endpoint.is_listener())
1028        {
1029            tracing::debug!(
1030                target: LOG_TARGET,
1031                ?peer,
1032                ?endpoint,
1033                ?error,
1034                "connection limit exceeded, rejecting connection",
1035            );
1036            return Ok(ConnectionEstablishedResult::Reject);
1037        }
1038
1039        let mut peers = self.peers.write();
1040        match peers.get_mut(&peer) {
1041            Some(context) => match context.state {
1042                PeerState::Connected {
1043                    ref mut dial_record,
1044                    ..
1045                } => match context.secondary_connection {
1046                    Some(_) => {
1047                        tracing::debug!(
1048                            target: LOG_TARGET,
1049                            ?peer,
1050                            connection_id = ?endpoint.connection_id(),
1051                            ?endpoint,
1052                            "secondary connection already exists, ignoring connection",
1053                        );
1054
1055                        // insert address into the store only if we're the dialer
1056                        //
1057                        // if we're the listener, remote might have dialed with an ephemeral port
1058                        // which it might not be listening, making this address useless
1059                        if endpoint.is_listener() {
1060                            context.addresses.insert(AddressRecord::new(
1061                                &peer,
1062                                endpoint.address().clone(),
1063                                SCORE_CONNECT_SUCCESS,
1064                                None,
1065                            ))
1066                        }
1067
1068                        return Ok(ConnectionEstablishedResult::Reject);
1069                    }
1070                    None => match dial_record.take() {
1071                        Some(record)
1072                            if record.connection_id() == &Some(endpoint.connection_id()) =>
1073                        {
1074                            tracing::debug!(
1075                                target: LOG_TARGET,
1076                                ?peer,
1077                                connection_id = ?endpoint.connection_id(),
1078                                address = ?endpoint.address(),
1079                                "dialed connection opened as secondary connection",
1080                            );
1081
1082                            context.secondary_connection = Some(AddressRecord::new(
1083                                &peer,
1084                                endpoint.address().clone(),
1085                                SCORE_CONNECT_SUCCESS,
1086                                Some(endpoint.connection_id()),
1087                            ));
1088                        }
1089                        None => {
1090                            tracing::debug!(
1091                                target: LOG_TARGET,
1092                                ?peer,
1093                                connection_id = ?endpoint.connection_id(),
1094                                address = ?endpoint.address(),
1095                                "secondary connection",
1096                            );
1097
1098                            context.secondary_connection = Some(AddressRecord::new(
1099                                &peer,
1100                                endpoint.address().clone(),
1101                                SCORE_CONNECT_SUCCESS,
1102                                Some(endpoint.connection_id()),
1103                            ));
1104                        }
1105                        Some(record) => {
1106                            tracing::warn!(
1107                                target: LOG_TARGET,
1108                                ?peer,
1109                                connection_id = ?endpoint.connection_id(),
1110                                address = ?endpoint.address(),
1111                                dial_record = ?record,
1112                                "unknown connection opened as secondary connection, discarding",
1113                            );
1114
1115                            // Preserve the dial record.
1116                            *dial_record = Some(record);
1117
1118                            return Ok(ConnectionEstablishedResult::Reject);
1119                        }
1120                    },
1121                },
1122                PeerState::Dialing { ref record, .. } => {
1123                    match record.connection_id() == &Some(endpoint.connection_id()) {
1124                        true => {
1125                            tracing::trace!(
1126                                target: LOG_TARGET,
1127                                ?peer,
1128                                connection_id = ?endpoint.connection_id(),
1129                                ?endpoint,
1130                                ?record,
1131                                "connection opened to remote",
1132                            );
1133
1134                            context.state = PeerState::Connected {
1135                                record: record.clone(),
1136                                dial_record: None,
1137                            };
1138                        }
1139                        false => {
1140                            tracing::trace!(
1141                                target: LOG_TARGET,
1142                                ?peer,
1143                                connection_id = ?endpoint.connection_id(),
1144                                ?endpoint,
1145                                "connection opened by remote while local node was dialing",
1146                            );
1147
1148                            context.state = PeerState::Connected {
1149                                record: AddressRecord::new(
1150                                    &peer,
1151                                    endpoint.address().clone(),
1152                                    SCORE_CONNECT_SUCCESS,
1153                                    Some(endpoint.connection_id()),
1154                                ),
1155                                dial_record: Some(record.clone()),
1156                            };
1157                        }
1158                    }
1159                }
1160                PeerState::Opening {
1161                    ref mut records,
1162                    connection_id,
1163                    ref transports,
1164                } => {
1165                    debug_assert!(std::matches!(endpoint, &Endpoint::Listener { .. }));
1166
1167                    tracing::trace!(
1168                        target: LOG_TARGET,
1169                        ?peer,
1170                        dial_connection_id = ?connection_id,
1171                        dial_records = ?records,
1172                        dial_transports = ?transports,
1173                        listener_endpoint = ?endpoint,
1174                        "inbound connection while opening an outbound connection",
1175                    );
1176
1177                    // cancel all pending dials
1178                    transports.iter().for_each(|transport| {
1179                        self.transports
1180                            .get_mut(transport)
1181                            .expect("transport to exist")
1182                            .cancel(connection_id);
1183                    });
1184
1185                    // since an inbound connection was removed, the outbound connection can be
1186                    // removed from pending dials
1187                    //
1188                    // all records have the same `ConnectionId` so it doesn't matter which of them
1189                    // is used to remove the pending dial
1190                    self.pending_connections.remove(
1191                        &records
1192                            .iter()
1193                            .next()
1194                            .expect("record to exist")
1195                            .1
1196                            .connection_id()
1197                            .expect("`ConnectionId` to exist"),
1198                    );
1199
1200                    let record = match records.remove(endpoint.address()) {
1201                        Some(mut record) => {
1202                            record.update_score(SCORE_CONNECT_SUCCESS);
1203                            record.set_connection_id(endpoint.connection_id());
1204                            record
1205                        }
1206                        None => AddressRecord::new(
1207                            &peer,
1208                            endpoint.address().clone(),
1209                            SCORE_CONNECT_SUCCESS,
1210                            Some(endpoint.connection_id()),
1211                        ),
1212                    };
1213                    context.addresses.extend(records.iter().map(|(_, record)| record));
1214
1215                    context.state = PeerState::Connected {
1216                        record,
1217                        dial_record: None,
1218                    };
1219                }
1220                PeerState::Disconnected {
1221                    ref mut dial_record,
1222                } => {
1223                    tracing::trace!(
1224                        target: LOG_TARGET,
1225                        ?peer,
1226                        connection_id = ?endpoint.connection_id(),
1227                        ?endpoint,
1228                        ?dial_record,
1229                        "connection opened by remote or delayed dial succeeded",
1230                    );
1231
1232                    let (record, dial_record) = match dial_record.take() {
1233                        Some(mut dial_record) =>
1234                            if dial_record.address() == endpoint.address() {
1235                                dial_record.set_connection_id(endpoint.connection_id());
1236                                (dial_record, None)
1237                            } else {
1238                                (
1239                                    AddressRecord::new(
1240                                        &peer,
1241                                        endpoint.address().clone(),
1242                                        SCORE_CONNECT_SUCCESS,
1243                                        Some(endpoint.connection_id()),
1244                                    ),
1245                                    Some(dial_record),
1246                                )
1247                            },
1248                        None => (
1249                            AddressRecord::new(
1250                                &peer,
1251                                endpoint.address().clone(),
1252                                SCORE_CONNECT_SUCCESS,
1253                                Some(endpoint.connection_id()),
1254                            ),
1255                            None,
1256                        ),
1257                    };
1258
1259                    context.state = PeerState::Connected {
1260                        record,
1261                        dial_record,
1262                    };
1263                }
1264            },
1265            None => {
1266                peers.insert(
1267                    peer,
1268                    PeerContext {
1269                        state: PeerState::Connected {
1270                            record: AddressRecord::new(
1271                                &peer,
1272                                endpoint.address().clone(),
1273                                SCORE_CONNECT_SUCCESS,
1274                                Some(endpoint.connection_id()),
1275                            ),
1276                            dial_record: None,
1277                        },
1278                        addresses: AddressStore::new(),
1279                        secondary_connection: None,
1280                    },
1281                );
1282            }
1283        }
1284
1285        Ok(ConnectionEstablishedResult::Accept)
1286    }
1287
1288    fn on_connection_opened(
1289        &mut self,
1290        transport: SupportedTransport,
1291        connection_id: ConnectionId,
1292        address: Multiaddr,
1293    ) -> crate::Result<()> {
1294        let Some(peer) = self.pending_connections.remove(&connection_id) else {
1295            tracing::warn!(
1296                target: LOG_TARGET,
1297                ?connection_id,
1298                ?transport,
1299                ?address,
1300                "connection opened but dial record doesn't exist",
1301            );
1302
1303            debug_assert!(false);
1304            return Err(Error::InvalidState);
1305        };
1306
1307        let mut peers = self.peers.write();
1308        let context = peers.get_mut(&peer).ok_or_else(|| {
1309            tracing::warn!(
1310                target: LOG_TARGET,
1311                ?peer,
1312                ?connection_id,
1313                "connection opened but peer doesn't exist",
1314            );
1315
1316            debug_assert!(false);
1317            Error::InvalidState
1318        })?;
1319
1320        match std::mem::replace(
1321            &mut context.state,
1322            PeerState::Disconnected { dial_record: None },
1323        ) {
1324            PeerState::Opening {
1325                mut records,
1326                connection_id,
1327                transports,
1328            } => {
1329                tracing::trace!(
1330                    target: LOG_TARGET,
1331                    ?peer,
1332                    ?connection_id,
1333                    ?address,
1334                    ?transport,
1335                    "connection opened to peer",
1336                );
1337
1338                // cancel open attempts for other transports as connection already exists
1339                for transport in transports.iter() {
1340                    self.transports
1341                        .get_mut(transport)
1342                        .expect("transport to exist")
1343                        .cancel(connection_id);
1344                }
1345
1346                // set peer state to `Dialing` to signal that the connection is fully opening
1347                //
1348                // set the succeeded `AddressRecord` as the one that is used for dialing and move
1349                // all other address records back to `AddressStore`. and ask
1350                // transport to negotiate the
1351                let mut dial_record = records.remove(&address).expect("address to exist");
1352                dial_record.update_score(SCORE_CONNECT_SUCCESS);
1353
1354                // negotiate the connection
1355                match self
1356                    .transports
1357                    .get_mut(&transport)
1358                    .expect("transport to exist")
1359                    .negotiate(connection_id)
1360                {
1361                    Ok(()) => {
1362                        tracing::trace!(
1363                            target: LOG_TARGET,
1364                            ?peer,
1365                            ?connection_id,
1366                            ?dial_record,
1367                            ?transport,
1368                            "negotiation started"
1369                        );
1370
1371                        self.pending_connections.insert(connection_id, peer);
1372
1373                        context.state = PeerState::Dialing {
1374                            record: dial_record,
1375                        };
1376
1377                        for (_, record) in records {
1378                            context.addresses.insert(record);
1379                        }
1380
1381                        Ok(())
1382                    }
1383                    Err(error) => {
1384                        tracing::warn!(
1385                            target: LOG_TARGET,
1386                            ?peer,
1387                            ?connection_id,
1388                            ?error,
1389                            "failed to negotiate connection",
1390                        );
1391                        context.state = PeerState::Disconnected { dial_record: None };
1392
1393                        debug_assert!(false);
1394                        Err(Error::InvalidState)
1395                    }
1396                }
1397            }
1398            state => {
1399                tracing::warn!(
1400                    target: LOG_TARGET,
1401                    ?peer,
1402                    ?connection_id,
1403                    ?state,
1404                    "connection opened but `PeerState` is not `Opening`",
1405                );
1406                context.state = state;
1407
1408                debug_assert!(false);
1409                Err(Error::InvalidState)
1410            }
1411        }
1412    }
1413
1414    /// Handle open failure for dialing attempt for `transport`
1415    fn on_open_failure(
1416        &mut self,
1417        transport: SupportedTransport,
1418        connection_id: ConnectionId,
1419    ) -> crate::Result<Option<PeerId>> {
1420        let Some(peer) = self.pending_connections.remove(&connection_id) else {
1421            tracing::warn!(
1422                target: LOG_TARGET,
1423                ?connection_id,
1424                "open failure but dial record doesn't exist",
1425            );
1426            return Err(Error::InvalidState);
1427        };
1428
1429        let mut peers = self.peers.write();
1430        let context = peers.get_mut(&peer).ok_or_else(|| {
1431            tracing::warn!(
1432                target: LOG_TARGET,
1433                ?peer,
1434                ?connection_id,
1435                "open failure but peer doesn't exist",
1436            );
1437
1438            debug_assert!(false);
1439            Error::InvalidState
1440        })?;
1441
1442        match std::mem::replace(
1443            &mut context.state,
1444            PeerState::Disconnected { dial_record: None },
1445        ) {
1446            PeerState::Opening {
1447                records,
1448                connection_id,
1449                mut transports,
1450            } => {
1451                tracing::trace!(
1452                    target: LOG_TARGET,
1453                    ?peer,
1454                    ?connection_id,
1455                    ?transport,
1456                    "open failure for peer",
1457                );
1458                transports.remove(&transport);
1459
1460                if transports.is_empty() {
1461                    for (_, mut record) in records {
1462                        record.update_score(SCORE_CONNECT_FAILURE);
1463                        context.addresses.insert(record);
1464                    }
1465
1466                    tracing::trace!(
1467                        target: LOG_TARGET,
1468                        ?peer,
1469                        ?connection_id,
1470                        "open failure for last transport",
1471                    );
1472
1473                    return Ok(Some(peer));
1474                }
1475
1476                self.pending_connections.insert(connection_id, peer);
1477                context.state = PeerState::Opening {
1478                    records,
1479                    connection_id,
1480                    transports,
1481                };
1482
1483                Ok(None)
1484            }
1485            state => {
1486                tracing::warn!(
1487                    target: LOG_TARGET,
1488                    ?peer,
1489                    ?connection_id,
1490                    ?state,
1491                    "open failure but `PeerState` is not `Opening`",
1492                );
1493                context.state = state;
1494
1495                debug_assert!(false);
1496                Err(Error::InvalidState)
1497            }
1498        }
1499    }
1500
1501    /// Poll next event from [`crate::transport::manager::TransportManager`].
1502    pub async fn next(&mut self) -> Option<TransportEvent> {
1503        loop {
1504            tokio::select! {
1505                event = self.event_rx.recv() => match event? {
1506                    TransportManagerEvent::ConnectionClosed {
1507                        peer,
1508                        connection: connection_id,
1509                    } => match self.on_connection_closed(peer, connection_id) {
1510                        Ok(None) => {}
1511                        Ok(Some(event)) => return Some(event),
1512                        Err(error) => tracing::error!(
1513                            target: LOG_TARGET,
1514                            ?error,
1515                            "failed to handle closed connection",
1516                        ),
1517                    }
1518                },
1519                command = self.cmd_rx.recv() => match command? {
1520                    InnerTransportManagerCommand::DialPeer { peer } => {
1521                        if let Err(error) = self.dial(peer).await {
1522                            tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to dial peer")
1523                        }
1524                    }
1525                    InnerTransportManagerCommand::DialAddress { address } => {
1526                        if let Err(error) = self.dial_address(address).await {
1527                            tracing::debug!(target: LOG_TARGET, ?error, "failed to dial peer")
1528                        }
1529                    }
1530                },
1531                event = self.transports.next() => {
1532                    let (transport, event) = event?;
1533
1534                    match event {
1535                        TransportEvent::DialFailure { connection_id, address, error } => {
1536                            tracing::debug!(
1537                                target: LOG_TARGET,
1538                                ?connection_id,
1539                                ?address,
1540                                ?error,
1541                                "failed to dial peer",
1542                            );
1543
1544                            if let Ok(()) = self.on_dial_failure(connection_id) {
1545                                match address.iter().last() {
1546                                    Some(Protocol::P2p(hash)) => match PeerId::from_multihash(hash) {
1547                                        Ok(peer) => {
1548                                            tracing::trace!(
1549                                                target: LOG_TARGET,
1550                                                ?connection_id,
1551                                                ?error,
1552                                                ?address,
1553                                                num_protocols = self.protocols.len(),
1554                                                "dial failure, notify protocols",
1555                                            );
1556
1557                                            for (protocol, context) in &self.protocols {
1558                                                tracing::trace!(
1559                                                    target: LOG_TARGET,
1560                                                    ?connection_id,
1561                                                    ?error,
1562                                                    ?address,
1563                                                    ?protocol,
1564                                                    "dial failure, notify protocol",
1565                                                );
1566                                                match context.tx.try_send(InnerTransportEvent::DialFailure {
1567                                                    peer,
1568                                                    address: address.clone(),
1569                                                }) {
1570                                                    Ok(()) => {}
1571                                                    Err(_) => {
1572                                                        tracing::trace!(
1573                                                            target: LOG_TARGET,
1574                                                            ?connection_id,
1575                                                            ?error,
1576                                                            ?address,
1577                                                            ?protocol,
1578                                                            "dial failure, channel to protocol clogged, use await",
1579                                                        );
1580                                                        let _ = context
1581                                                            .tx
1582                                                            .send(InnerTransportEvent::DialFailure {
1583                                                                peer,
1584                                                                address: address.clone(),
1585                                                            })
1586                                                            .await;
1587                                                    }
1588                                                }
1589                                            }
1590
1591                                            tracing::trace!(
1592                                                target: LOG_TARGET,
1593                                                ?connection_id,
1594                                                ?error,
1595                                                ?address,
1596                                                "all protocols notified",
1597                                            );
1598                                        }
1599                                        Err(error) => {
1600                                            tracing::warn!(
1601                                                target: LOG_TARGET,
1602                                                ?address,
1603                                                ?connection_id,
1604                                                ?error,
1605                                                "failed to parse `PeerId` from `Multiaddr`",
1606                                            );
1607                                            debug_assert!(false);
1608                                        }
1609                                    },
1610                                    _ => {
1611                                        tracing::warn!(target: LOG_TARGET, ?address, ?connection_id, "address doesn't contain `PeerId`");
1612                                        debug_assert!(false);
1613                                    }
1614                                }
1615
1616                                return Some(TransportEvent::DialFailure {
1617                                    connection_id,
1618                                    address,
1619                                    error,
1620                                })
1621                            }
1622                        }
1623                        TransportEvent::ConnectionEstablished { peer, endpoint } => {
1624                            self.opening_errors.remove(&endpoint.connection_id());
1625                            match self.on_connection_established(peer, &endpoint) {
1626                                Err(error) => {
1627                                    tracing::debug!(
1628                                        target: LOG_TARGET,
1629                                        ?peer,
1630                                        ?endpoint,
1631                                        ?error,
1632                                        "failed to handle established connection",
1633                                    );
1634
1635                                    let _ = self
1636                                        .transports
1637                                        .get_mut(&transport)
1638                                        .expect("transport to exist")
1639                                        .reject(endpoint.connection_id());
1640                                }
1641                                Ok(ConnectionEstablishedResult::Accept) => {
1642                                    tracing::trace!(
1643                                        target: LOG_TARGET,
1644                                        ?peer,
1645                                        ?endpoint,
1646                                        "accept connection",
1647                                    );
1648
1649                                    let _ = self
1650                                        .transports
1651                                        .get_mut(&transport)
1652                                        .expect("transport to exist")
1653                                        .accept(endpoint.connection_id());
1654
1655                                    return Some(TransportEvent::ConnectionEstablished {
1656                                        peer,
1657                                        endpoint,
1658                                    });
1659                                }
1660                                Ok(ConnectionEstablishedResult::Reject) => {
1661                                    tracing::trace!(
1662                                        target: LOG_TARGET,
1663                                        ?peer,
1664                                        ?endpoint,
1665                                        "reject connection",
1666                                    );
1667
1668                                    let _ = self
1669                                        .transports
1670                                        .get_mut(&transport)
1671                                        .expect("transport to exist")
1672                                        .reject(endpoint.connection_id());
1673                                }
1674                            }
1675                        }
1676                        TransportEvent::ConnectionOpened { connection_id, address } => {
1677                            self.opening_errors.remove(&connection_id);
1678
1679                            if let Err(error) = self.on_connection_opened(transport, connection_id, address) {
1680                                tracing::debug!(
1681                                    target: LOG_TARGET,
1682                                    ?connection_id,
1683                                    ?error,
1684                                    "failed to handle opened connection",
1685                                );
1686                            }
1687                        }
1688                        TransportEvent::OpenFailure { connection_id, errors } => {
1689                            match self.on_open_failure(transport, connection_id) {
1690                                Err(error) => tracing::debug!(
1691                                    target: LOG_TARGET,
1692                                    ?connection_id,
1693                                    ?error,
1694                                    "failed to handle opened connection",
1695                                ),
1696                                Ok(Some(peer)) => {
1697                                    tracing::trace!(
1698                                        target: LOG_TARGET,
1699                                        ?peer,
1700                                        ?connection_id,
1701                                        num_protocols = self.protocols.len(),
1702                                        "inform protocols about open failure",
1703                                    );
1704
1705                                    for (protocol, context) in &self.protocols {
1706                                        let _ = match context
1707                                            .tx
1708                                            .try_send(InnerTransportEvent::DialFailure {
1709                                                peer,
1710                                                address: Multiaddr::empty(),
1711                                            }) {
1712                                            Ok(_) => Ok(()),
1713                                            Err(_) => {
1714                                                tracing::trace!(
1715                                                    target: LOG_TARGET,
1716                                                    ?peer,
1717                                                    %protocol,
1718                                                    ?connection_id,
1719                                                    "call to protocol would, block try sending in a blocking way",
1720                                                );
1721
1722                                                context
1723                                                    .tx
1724                                                    .send(InnerTransportEvent::DialFailure {
1725                                                        peer,
1726                                                        address: Multiaddr::empty(),
1727                                                    })
1728                                                    .await
1729                                            }
1730                                        };
1731                                    }
1732
1733                                    let mut grouped_errors = self.opening_errors.remove(&connection_id).unwrap_or_default();
1734                                    grouped_errors.extend(errors);
1735                                    return Some(TransportEvent::OpenFailure { connection_id, errors: grouped_errors });
1736                                }
1737                                Ok(None) => {
1738                                    tracing::trace!(
1739                                        target: LOG_TARGET,
1740                                        ?connection_id,
1741                                        "open failure, but not the last transport",
1742                                    );
1743
1744                                    self.opening_errors.entry(connection_id).or_default().extend(errors);
1745                                }
1746                            }
1747                        },
1748                        TransportEvent::PendingInboundConnection { connection_id } => {
1749                            if self.on_pending_incoming_connection().is_ok() {
1750                                tracing::trace!(
1751                                    target: LOG_TARGET,
1752                                    ?connection_id,
1753                                    "accept pending incoming connection",
1754                                );
1755
1756                                let _ = self
1757                                    .transports
1758                                    .get_mut(&transport)
1759                                    .expect("transport to exist")
1760                                    .accept_pending(connection_id);
1761                            } else {
1762                                tracing::debug!(
1763                                    target: LOG_TARGET,
1764                                    ?connection_id,
1765                                    "reject pending incoming connection",
1766                                );
1767
1768                                let _ = self
1769                                    .transports
1770                                    .get_mut(&transport)
1771                                    .expect("transport to exist")
1772                                    .reject_pending(connection_id);
1773                            }
1774                        },
1775                        event => panic!("event not supported: {event:?}"),
1776                    }
1777                },
1778            }
1779        }
1780    }
1781}
1782
1783#[cfg(test)]
1784mod tests {
1785    use limits::ConnectionLimitsConfig;
1786
1787    use multihash::Multihash;
1788
1789    use super::*;
1790    use crate::{
1791        crypto::ed25519::Keypair,
1792        executor::DefaultExecutor,
1793        transport::{dummy::DummyTransport, KEEP_ALIVE_TIMEOUT},
1794    };
1795    #[cfg(feature = "websocket")]
1796    use std::borrow::Cow;
1797    use std::{
1798        net::{Ipv4Addr, Ipv6Addr},
1799        sync::Arc,
1800    };
1801
1802    /// Setup TCP address and connection id.
1803    fn setup_dial_addr(peer: PeerId, connection_id: u16) -> (Multiaddr, ConnectionId) {
1804        let dial_address = Multiaddr::empty()
1805            .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
1806            .with(Protocol::Tcp(8888 + connection_id))
1807            .with(Protocol::P2p(
1808                Multihash::from_bytes(&peer.to_bytes()).unwrap(),
1809            ));
1810        let connection_id = ConnectionId::from(connection_id as usize);
1811
1812        (dial_address, connection_id)
1813    }
1814
1815    #[test]
1816    #[should_panic]
1817    #[cfg(debug_assertions)]
1818    fn duplicate_protocol() {
1819        let sink = BandwidthSink::new();
1820        let (mut manager, _handle) = TransportManager::new(
1821            Keypair::generate(),
1822            HashSet::new(),
1823            sink,
1824            8usize,
1825            ConnectionLimitsConfig::default(),
1826        );
1827
1828        manager.register_protocol(
1829            ProtocolName::from("/notif/1"),
1830            Vec::new(),
1831            ProtocolCodec::UnsignedVarint(None),
1832            KEEP_ALIVE_TIMEOUT,
1833        );
1834        manager.register_protocol(
1835            ProtocolName::from("/notif/1"),
1836            Vec::new(),
1837            ProtocolCodec::UnsignedVarint(None),
1838            KEEP_ALIVE_TIMEOUT,
1839        );
1840    }
1841
1842    #[test]
1843    #[should_panic]
1844    #[cfg(debug_assertions)]
1845    fn fallback_protocol_as_duplicate_main_protocol() {
1846        let sink = BandwidthSink::new();
1847        let (mut manager, _handle) = TransportManager::new(
1848            Keypair::generate(),
1849            HashSet::new(),
1850            sink,
1851            8usize,
1852            ConnectionLimitsConfig::default(),
1853        );
1854
1855        manager.register_protocol(
1856            ProtocolName::from("/notif/1"),
1857            Vec::new(),
1858            ProtocolCodec::UnsignedVarint(None),
1859            KEEP_ALIVE_TIMEOUT,
1860        );
1861        manager.register_protocol(
1862            ProtocolName::from("/notif/2"),
1863            vec![
1864                ProtocolName::from("/notif/2/new"),
1865                ProtocolName::from("/notif/1"),
1866            ],
1867            ProtocolCodec::UnsignedVarint(None),
1868            KEEP_ALIVE_TIMEOUT,
1869        );
1870    }
1871
1872    #[test]
1873    #[should_panic]
1874    #[cfg(debug_assertions)]
1875    fn duplicate_fallback_protocol() {
1876        let sink = BandwidthSink::new();
1877        let (mut manager, _handle) = TransportManager::new(
1878            Keypair::generate(),
1879            HashSet::new(),
1880            sink,
1881            8usize,
1882            ConnectionLimitsConfig::default(),
1883        );
1884
1885        manager.register_protocol(
1886            ProtocolName::from("/notif/1"),
1887            vec![
1888                ProtocolName::from("/notif/1/new"),
1889                ProtocolName::from("/notif/1"),
1890            ],
1891            ProtocolCodec::UnsignedVarint(None),
1892            KEEP_ALIVE_TIMEOUT,
1893        );
1894        manager.register_protocol(
1895            ProtocolName::from("/notif/2"),
1896            vec![
1897                ProtocolName::from("/notif/2/new"),
1898                ProtocolName::from("/notif/1/new"),
1899            ],
1900            ProtocolCodec::UnsignedVarint(None),
1901            KEEP_ALIVE_TIMEOUT,
1902        );
1903    }
1904
1905    #[test]
1906    #[should_panic]
1907    #[cfg(debug_assertions)]
1908    fn duplicate_transport() {
1909        let sink = BandwidthSink::new();
1910        let (mut manager, _handle) = TransportManager::new(
1911            Keypair::generate(),
1912            HashSet::new(),
1913            sink,
1914            8usize,
1915            ConnectionLimitsConfig::default(),
1916        );
1917
1918        manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
1919        manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
1920    }
1921
1922    #[tokio::test]
1923    async fn tried_to_self_using_peer_id() {
1924        let keypair = Keypair::generate();
1925        let local_peer_id = PeerId::from_public_key(&keypair.public().into());
1926        let sink = BandwidthSink::new();
1927        let (mut manager, _handle) = TransportManager::new(
1928            keypair,
1929            HashSet::new(),
1930            sink,
1931            8usize,
1932            ConnectionLimitsConfig::default(),
1933        );
1934
1935        assert!(manager.dial(local_peer_id).await.is_err());
1936    }
1937
1938    #[tokio::test]
1939    async fn try_to_dial_over_disabled_transport() {
1940        let (mut manager, _handle) = TransportManager::new(
1941            Keypair::generate(),
1942            HashSet::new(),
1943            BandwidthSink::new(),
1944            8usize,
1945            ConnectionLimitsConfig::default(),
1946        );
1947        let _handle = manager.transport_handle(Arc::new(DefaultExecutor {}));
1948        manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
1949
1950        let address = Multiaddr::empty()
1951            .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
1952            .with(Protocol::Udp(8888))
1953            .with(Protocol::QuicV1)
1954            .with(Protocol::P2p(
1955                Multihash::from_bytes(&PeerId::random().to_bytes()).unwrap(),
1956            ));
1957
1958        assert!(std::matches!(
1959            manager.dial_address(address).await,
1960            Err(Error::TransportNotSupported(_))
1961        ));
1962    }
1963
1964    #[tokio::test]
1965    async fn successful_dial_reported_to_transport_manager() {
1966        let _ = tracing_subscriber::fmt()
1967            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1968            .try_init();
1969
1970        let (mut manager, _handle) = TransportManager::new(
1971            Keypair::generate(),
1972            HashSet::new(),
1973            BandwidthSink::new(),
1974            8usize,
1975            ConnectionLimitsConfig::default(),
1976        );
1977        let peer = PeerId::random();
1978        let dial_address = Multiaddr::empty()
1979            .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
1980            .with(Protocol::Tcp(8888))
1981            .with(Protocol::P2p(
1982                Multihash::from_bytes(&peer.to_bytes()).unwrap(),
1983            ));
1984
1985        let transport = Box::new({
1986            let mut transport = DummyTransport::new();
1987            transport.inject_event(TransportEvent::ConnectionEstablished {
1988                peer,
1989                endpoint: Endpoint::dialer(dial_address.clone(), ConnectionId::from(0usize)),
1990            });
1991            transport
1992        });
1993        manager.register_transport(SupportedTransport::Tcp, transport);
1994
1995        assert!(manager.dial_address(dial_address.clone()).await.is_ok());
1996        assert!(!manager.pending_connections.is_empty());
1997
1998        {
1999            let peers = manager.peers.read();
2000
2001            match peers.get(&peer) {
2002                Some(PeerContext {
2003                    state: PeerState::Dialing { .. },
2004                    ..
2005                }) => {}
2006                state => panic!("invalid state for peer: {state:?}"),
2007            }
2008        }
2009
2010        match manager.next().await.unwrap() {
2011            TransportEvent::ConnectionEstablished {
2012                peer: event_peer,
2013                endpoint: event_endpoint,
2014                ..
2015            } => {
2016                assert_eq!(peer, event_peer);
2017                assert_eq!(
2018                    event_endpoint,
2019                    Endpoint::dialer(dial_address.clone(), ConnectionId::from(0usize))
2020                )
2021            }
2022            event => panic!("invalid event: {event:?}"),
2023        }
2024    }
2025
2026    #[tokio::test]
2027    async fn try_to_dial_same_peer_twice() {
2028        let _ = tracing_subscriber::fmt()
2029            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2030            .try_init();
2031
2032        let (mut manager, _handle) = TransportManager::new(
2033            Keypair::generate(),
2034            HashSet::new(),
2035            BandwidthSink::new(),
2036            8usize,
2037            ConnectionLimitsConfig::default(),
2038        );
2039        let _handle = manager.transport_handle(Arc::new(DefaultExecutor {}));
2040        manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
2041
2042        let peer = PeerId::random();
2043        let dial_address = Multiaddr::empty()
2044            .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
2045            .with(Protocol::Tcp(8888))
2046            .with(Protocol::P2p(
2047                Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2048            ));
2049
2050        assert!(manager.dial_address(dial_address.clone()).await.is_ok());
2051        assert_eq!(manager.pending_connections.len(), 1);
2052
2053        assert!(manager.dial_address(dial_address.clone()).await.is_ok());
2054        assert_eq!(manager.pending_connections.len(), 1);
2055    }
2056
2057    #[tokio::test]
2058    async fn try_to_dial_same_peer_twice_diffrent_address() {
2059        let _ = tracing_subscriber::fmt()
2060            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2061            .try_init();
2062
2063        let (mut manager, _handle) = TransportManager::new(
2064            Keypair::generate(),
2065            HashSet::new(),
2066            BandwidthSink::new(),
2067            8usize,
2068            ConnectionLimitsConfig::default(),
2069        );
2070        let _handle = manager.transport_handle(Arc::new(DefaultExecutor {}));
2071        manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
2072
2073        let peer = PeerId::random();
2074
2075        assert!(manager
2076            .dial_address(
2077                Multiaddr::empty()
2078                    .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
2079                    .with(Protocol::Tcp(8888))
2080                    .with(Protocol::P2p(
2081                        Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2082                    ))
2083            )
2084            .await
2085            .is_ok());
2086        assert_eq!(manager.pending_connections.len(), 1);
2087
2088        assert!(manager
2089            .dial_address(
2090                Multiaddr::empty()
2091                    .with(Protocol::Ip6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)))
2092                    .with(Protocol::Tcp(8888))
2093                    .with(Protocol::P2p(
2094                        Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2095                    ))
2096            )
2097            .await
2098            .is_ok());
2099        assert_eq!(manager.pending_connections.len(), 1);
2100    }
2101
2102    #[tokio::test]
2103    async fn dial_non_existent_peer() {
2104        let _ = tracing_subscriber::fmt()
2105            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2106            .try_init();
2107
2108        let (mut manager, _handle) = TransportManager::new(
2109            Keypair::generate(),
2110            HashSet::new(),
2111            BandwidthSink::new(),
2112            8usize,
2113            ConnectionLimitsConfig::default(),
2114        );
2115        let _handle = manager.transport_handle(Arc::new(DefaultExecutor {}));
2116        manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
2117
2118        assert!(manager.dial(PeerId::random()).await.is_err());
2119    }
2120
2121    #[tokio::test]
2122    async fn dial_non_peer_with_no_known_addresses() {
2123        let _ = tracing_subscriber::fmt()
2124            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2125            .try_init();
2126
2127        let (mut manager, _handle) = TransportManager::new(
2128            Keypair::generate(),
2129            HashSet::new(),
2130            BandwidthSink::new(),
2131            8usize,
2132            ConnectionLimitsConfig::default(),
2133        );
2134        let _handle = manager.transport_handle(Arc::new(DefaultExecutor {}));
2135        manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
2136
2137        let peer = PeerId::random();
2138        manager.peers.write().insert(
2139            peer,
2140            PeerContext {
2141                state: PeerState::Disconnected { dial_record: None },
2142                addresses: AddressStore::new(),
2143                secondary_connection: None,
2144            },
2145        );
2146
2147        assert!(manager.dial(peer).await.is_err());
2148    }
2149
2150    #[tokio::test]
2151    async fn check_supported_transport_when_adding_known_address() {
2152        let _ = tracing_subscriber::fmt()
2153            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2154            .try_init();
2155
2156        let mut transports = HashSet::new();
2157        transports.insert(SupportedTransport::Tcp);
2158        #[cfg(feature = "quic")]
2159        transports.insert(SupportedTransport::Quic);
2160
2161        let (_manager, handle) = TransportManager::new(
2162            Keypair::generate(),
2163            transports,
2164            BandwidthSink::new(),
2165            8usize,
2166            ConnectionLimitsConfig::default(),
2167        );
2168
2169        // ipv6
2170        let address = Multiaddr::empty()
2171            .with(Protocol::Ip6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)))
2172            .with(Protocol::Tcp(8888))
2173            .with(Protocol::P2p(
2174                Multihash::from_bytes(&PeerId::random().to_bytes()).unwrap(),
2175            ));
2176        assert!(handle.supported_transport(&address));
2177
2178        // ipv4
2179        let address = Multiaddr::empty()
2180            .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
2181            .with(Protocol::Tcp(8888))
2182            .with(Protocol::P2p(
2183                Multihash::from_bytes(&PeerId::random().to_bytes()).unwrap(),
2184            ));
2185        assert!(handle.supported_transport(&address));
2186
2187        // quic
2188        let address = Multiaddr::empty()
2189            .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
2190            .with(Protocol::Udp(8888))
2191            .with(Protocol::QuicV1)
2192            .with(Protocol::P2p(
2193                Multihash::from_bytes(&PeerId::random().to_bytes()).unwrap(),
2194            ));
2195        #[cfg(feature = "quic")]
2196        assert!(handle.supported_transport(&address));
2197        #[cfg(not(feature = "quic"))]
2198        assert!(!handle.supported_transport(&address));
2199
2200        // websocket
2201        let address = Multiaddr::empty()
2202            .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
2203            .with(Protocol::Tcp(8888))
2204            .with(Protocol::Ws(std::borrow::Cow::Owned("/".to_string())));
2205        assert!(!handle.supported_transport(&address));
2206
2207        // websocket secure
2208        let address = Multiaddr::empty()
2209            .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
2210            .with(Protocol::Tcp(8888))
2211            .with(Protocol::Wss(std::borrow::Cow::Owned("/".to_string())));
2212        assert!(!handle.supported_transport(&address));
2213    }
2214
2215    // local node tried to dial a node and it failed but in the mean
2216    // time the remote node dialed local node and that succeeded.
2217    #[tokio::test]
2218    async fn on_dial_failure_already_connected() {
2219        let _ = tracing_subscriber::fmt()
2220            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2221            .try_init();
2222
2223        let (mut manager, _handle) = TransportManager::new(
2224            Keypair::generate(),
2225            HashSet::new(),
2226            BandwidthSink::new(),
2227            8usize,
2228            ConnectionLimitsConfig::default(),
2229        );
2230        let _handle = manager.transport_handle(Arc::new(DefaultExecutor {}));
2231        manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
2232
2233        let peer = PeerId::random();
2234        let dial_address = Multiaddr::empty()
2235            .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
2236            .with(Protocol::Tcp(8888))
2237            .with(Protocol::P2p(
2238                Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2239            ));
2240        let connect_address = Multiaddr::empty()
2241            .with(Protocol::Ip4(Ipv4Addr::new(192, 168, 1, 173)))
2242            .with(Protocol::Tcp(8888))
2243            .with(Protocol::P2p(
2244                Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2245            ));
2246        assert!(manager.dial_address(dial_address.clone()).await.is_ok());
2247        assert_eq!(manager.pending_connections.len(), 1);
2248
2249        match &manager.peers.read().get(&peer).unwrap().state {
2250            PeerState::Dialing { record } => {
2251                assert_eq!(record.address(), &dial_address);
2252            }
2253            state => panic!("invalid state for peer: {state:?}"),
2254        }
2255
2256        // remote peer connected to local node from a different address that was dialed
2257        manager
2258            .on_connection_established(
2259                peer,
2260                &Endpoint::dialer(connect_address, ConnectionId::from(1usize)),
2261            )
2262            .unwrap();
2263
2264        // dialing the peer failed
2265        manager.on_dial_failure(ConnectionId::from(0usize)).unwrap();
2266
2267        let peers = manager.peers.read();
2268        let peer = peers.get(&peer).unwrap();
2269
2270        match &peer.state {
2271            PeerState::Connected { dial_record, .. } => {
2272                assert!(dial_record.is_none());
2273                assert!(peer.addresses.contains(&dial_address));
2274            }
2275            state => panic!("invalid state: {state:?}"),
2276        }
2277    }
2278
2279    // local node tried to dial a node and it failed but in the mean
2280    // time the remote node dialed local node and that succeeded.
2281    //
2282    // while the dial was still in progresss, the remote node disconnected after which
2283    // the dial failure was reported.
2284    #[tokio::test]
2285    async fn on_dial_failure_already_connected_and_disconnected() {
2286        let _ = tracing_subscriber::fmt()
2287            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2288            .try_init();
2289
2290        let (mut manager, _handle) = TransportManager::new(
2291            Keypair::generate(),
2292            HashSet::new(),
2293            BandwidthSink::new(),
2294            8usize,
2295            ConnectionLimitsConfig::default(),
2296        );
2297        let _handle = manager.transport_handle(Arc::new(DefaultExecutor {}));
2298        manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
2299
2300        let peer = PeerId::random();
2301        let dial_address = Multiaddr::empty()
2302            .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
2303            .with(Protocol::Tcp(8888))
2304            .with(Protocol::P2p(
2305                Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2306            ));
2307        let connect_address = Multiaddr::empty()
2308            .with(Protocol::Ip4(Ipv4Addr::new(192, 168, 1, 173)))
2309            .with(Protocol::Tcp(8888))
2310            .with(Protocol::P2p(
2311                Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2312            ));
2313        assert!(manager.dial_address(dial_address.clone()).await.is_ok());
2314        assert_eq!(manager.pending_connections.len(), 1);
2315
2316        match &manager.peers.read().get(&peer).unwrap().state {
2317            PeerState::Dialing { record } => {
2318                assert_eq!(record.address(), &dial_address);
2319            }
2320            state => panic!("invalid state for peer: {state:?}"),
2321        }
2322
2323        // remote peer connected to local node from a different address that was dialed
2324        manager
2325            .on_connection_established(
2326                peer,
2327                &Endpoint::listener(connect_address, ConnectionId::from(1usize)),
2328            )
2329            .unwrap();
2330
2331        // connection to remote was closed while the dial was still in progress
2332        manager.on_connection_closed(peer, ConnectionId::from(1usize)).unwrap();
2333
2334        // verify that the peer state is `Disconnected`
2335        {
2336            let peers = manager.peers.read();
2337            let peer = peers.get(&peer).unwrap();
2338
2339            match &peer.state {
2340                PeerState::Disconnected {
2341                    dial_record: Some(dial_record),
2342                    ..
2343                } => {
2344                    assert_eq!(dial_record.address(), &dial_address);
2345                }
2346                state => panic!("invalid state: {state:?}"),
2347            }
2348        }
2349
2350        // dialing the peer failed
2351        manager.on_dial_failure(ConnectionId::from(0usize)).unwrap();
2352
2353        let peers = manager.peers.read();
2354        let peer = peers.get(&peer).unwrap();
2355
2356        match &peer.state {
2357            PeerState::Disconnected {
2358                dial_record: None, ..
2359            } => {
2360                assert!(peer.addresses.contains(&dial_address));
2361            }
2362            state => panic!("invalid state: {state:?}"),
2363        }
2364    }
2365
2366    // local node tried to dial a node and it failed but in the mean
2367    // time the remote node dialed local node and that succeeded.
2368    //
2369    // while the dial was still in progresss, the remote node disconnected after which
2370    // the dial failure was reported.
2371    #[tokio::test]
2372    async fn on_dial_success_while_connected_and_disconnected() {
2373        let _ = tracing_subscriber::fmt()
2374            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2375            .try_init();
2376
2377        let (mut manager, _handle) = TransportManager::new(
2378            Keypair::generate(),
2379            HashSet::new(),
2380            BandwidthSink::new(),
2381            8usize,
2382            ConnectionLimitsConfig::default(),
2383        );
2384        let _handle = manager.transport_handle(Arc::new(DefaultExecutor {}));
2385        manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
2386
2387        let peer = PeerId::random();
2388        let dial_address = Multiaddr::empty()
2389            .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
2390            .with(Protocol::Tcp(8888))
2391            .with(Protocol::P2p(
2392                Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2393            ));
2394        let connect_address = Multiaddr::empty()
2395            .with(Protocol::Ip4(Ipv4Addr::new(192, 168, 1, 173)))
2396            .with(Protocol::Tcp(8888))
2397            .with(Protocol::P2p(
2398                Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2399            ));
2400        assert!(manager.dial_address(dial_address.clone()).await.is_ok());
2401        assert_eq!(manager.pending_connections.len(), 1);
2402
2403        match &manager.peers.read().get(&peer).unwrap().state {
2404            PeerState::Dialing { record } => {
2405                assert_eq!(record.address(), &dial_address);
2406            }
2407            state => panic!("invalid state for peer: {state:?}"),
2408        }
2409
2410        // remote peer connected to local node from a different address that was dialed
2411        manager
2412            .on_connection_established(
2413                peer,
2414                &Endpoint::listener(connect_address, ConnectionId::from(1usize)),
2415            )
2416            .unwrap();
2417
2418        // connection to remote was closed while the dial was still in progress
2419        manager.on_connection_closed(peer, ConnectionId::from(1usize)).unwrap();
2420
2421        // verify that the peer state is `Disconnected`
2422        {
2423            let peers = manager.peers.read();
2424            let peer = peers.get(&peer).unwrap();
2425
2426            match &peer.state {
2427                PeerState::Disconnected {
2428                    dial_record: Some(dial_record),
2429                    ..
2430                } => {
2431                    assert_eq!(dial_record.address(), &dial_address);
2432                }
2433                state => panic!("invalid state: {state:?}"),
2434            }
2435        }
2436
2437        // the original dial succeeded
2438        manager
2439            .on_connection_established(
2440                peer,
2441                &Endpoint::dialer(dial_address, ConnectionId::from(0usize)),
2442            )
2443            .unwrap();
2444
2445        let peers = manager.peers.read();
2446        let peer = peers.get(&peer).unwrap();
2447
2448        match &peer.state {
2449            PeerState::Connected {
2450                dial_record: None, ..
2451            } => {}
2452            state => panic!("invalid state: {state:?}"),
2453        }
2454    }
2455
2456    #[tokio::test]
2457    async fn secondary_connection_is_tracked() {
2458        let _ = tracing_subscriber::fmt()
2459            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2460            .try_init();
2461
2462        let (mut manager, _handle) = TransportManager::new(
2463            Keypair::generate(),
2464            HashSet::new(),
2465            BandwidthSink::new(),
2466            8usize,
2467            ConnectionLimitsConfig::default(),
2468        );
2469        manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
2470
2471        let peer = PeerId::random();
2472        let address1 = Multiaddr::empty()
2473            .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
2474            .with(Protocol::Tcp(8888))
2475            .with(Protocol::P2p(
2476                Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2477            ));
2478        let address2 = Multiaddr::empty()
2479            .with(Protocol::Ip4(Ipv4Addr::new(192, 168, 1, 173)))
2480            .with(Protocol::Tcp(8888))
2481            .with(Protocol::P2p(
2482                Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2483            ));
2484        let address3 = Multiaddr::empty()
2485            .with(Protocol::Ip4(Ipv4Addr::new(192, 168, 10, 64)))
2486            .with(Protocol::Tcp(9999))
2487            .with(Protocol::P2p(
2488                Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2489            ));
2490
2491        // remote peer connected to local node
2492        let established_result = manager
2493            .on_connection_established(
2494                peer,
2495                &Endpoint::listener(address1, ConnectionId::from(0usize)),
2496            )
2497            .unwrap();
2498        assert_eq!(established_result, ConnectionEstablishedResult::Accept);
2499
2500        // verify that the peer state is `Connected` with no secondary connection
2501        {
2502            let peers = manager.peers.read();
2503            let peer = peers.get(&peer).unwrap();
2504
2505            match &peer.state {
2506                PeerState::Connected {
2507                    dial_record: None, ..
2508                } => {
2509                    assert!(peer.secondary_connection.is_none());
2510                }
2511                state => panic!("invalid state: {state:?}"),
2512            }
2513        }
2514
2515        // second connection is established, verify that the secondary connection is tracked
2516        let established_result = manager
2517            .on_connection_established(
2518                peer,
2519                &Endpoint::listener(address2.clone(), ConnectionId::from(1usize)),
2520            )
2521            .unwrap();
2522        assert_eq!(established_result, ConnectionEstablishedResult::Accept);
2523
2524        let peers = manager.peers.read();
2525        let context = peers.get(&peer).unwrap();
2526
2527        match &context.state {
2528            PeerState::Connected {
2529                dial_record: None, ..
2530            } => {
2531                let seconary_connection = context.secondary_connection.as_ref().unwrap();
2532                assert_eq!(seconary_connection.address(), &address2);
2533                assert_eq!(seconary_connection.score(), SCORE_CONNECT_SUCCESS);
2534            }
2535            state => panic!("invalid state: {state:?}"),
2536        }
2537        drop(peers);
2538
2539        // tertiary connection is ignored
2540        let established_result = manager
2541            .on_connection_established(
2542                peer,
2543                &Endpoint::listener(address3.clone(), ConnectionId::from(2usize)),
2544            )
2545            .unwrap();
2546        assert_eq!(established_result, ConnectionEstablishedResult::Reject);
2547
2548        let peers = manager.peers.read();
2549        let peer = peers.get(&peer).unwrap();
2550
2551        match &peer.state {
2552            PeerState::Connected {
2553                dial_record: None, ..
2554            } => {
2555                let seconary_connection = peer.secondary_connection.as_ref().unwrap();
2556                assert_eq!(seconary_connection.address(), &address2);
2557                assert_eq!(seconary_connection.score(), SCORE_CONNECT_SUCCESS);
2558                assert!(peer.addresses.contains(&address3));
2559            }
2560            state => panic!("invalid state: {state:?}"),
2561        }
2562    }
2563
2564    #[tokio::test]
2565    async fn secondary_connection_with_different_dial_endpoint_is_rejected() {
2566        let _ = tracing_subscriber::fmt()
2567            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2568            .try_init();
2569
2570        let (mut manager, _handle) = TransportManager::new(
2571            Keypair::generate(),
2572            HashSet::new(),
2573            BandwidthSink::new(),
2574            8usize,
2575            ConnectionLimitsConfig::default(),
2576        );
2577        manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
2578
2579        let peer = PeerId::random();
2580        let address1 = Multiaddr::empty()
2581            .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
2582            .with(Protocol::Tcp(8888))
2583            .with(Protocol::P2p(
2584                Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2585            ));
2586        let address2 = Multiaddr::empty()
2587            .with(Protocol::Ip4(Ipv4Addr::new(192, 168, 1, 173)))
2588            .with(Protocol::Tcp(8888))
2589            .with(Protocol::P2p(
2590                Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2591            ));
2592
2593        // remote peer connected to local node
2594        let established_result = manager
2595            .on_connection_established(
2596                peer,
2597                &Endpoint::listener(address1, ConnectionId::from(0usize)),
2598            )
2599            .unwrap();
2600        assert_eq!(established_result, ConnectionEstablishedResult::Accept);
2601
2602        // verify that the peer state is `Connected` with no secondary connection
2603        {
2604            let peers = manager.peers.read();
2605            let peer = peers.get(&peer).unwrap();
2606
2607            match &peer.state {
2608                PeerState::Connected {
2609                    dial_record: None, ..
2610                } => {
2611                    assert!(peer.secondary_connection.is_none());
2612                }
2613                state => panic!("invalid state: {state:?}"),
2614            }
2615        }
2616
2617        // Add a dial record for the peer.
2618        {
2619            let mut peers = manager.peers.write();
2620            let peer_context = peers.get_mut(&peer).unwrap();
2621
2622            let record = match &peer_context.state {
2623                PeerState::Connected { record, .. } => record.clone(),
2624                state => panic!("invalid state: {state:?}"),
2625            };
2626
2627            let dial_record = Some(AddressRecord::new(
2628                &peer,
2629                address2.clone(),
2630                0,
2631                Some(ConnectionId::from(0usize)),
2632            ));
2633
2634            peer_context.state = PeerState::Connected {
2635                record,
2636                dial_record,
2637            };
2638        }
2639
2640        // second connection is from a different endpoint should fail.
2641        let established_result = manager
2642            .on_connection_established(
2643                peer,
2644                &Endpoint::listener(address2.clone(), ConnectionId::from(1usize)),
2645            )
2646            .unwrap();
2647        assert_eq!(established_result, ConnectionEstablishedResult::Reject);
2648
2649        // Multiple secondary connections should also fail.
2650        let established_result = manager
2651            .on_connection_established(
2652                peer,
2653                &Endpoint::listener(address2.clone(), ConnectionId::from(1usize)),
2654            )
2655            .unwrap();
2656        assert_eq!(established_result, ConnectionEstablishedResult::Reject);
2657
2658        // Accept the proper connection ID.
2659        let established_result = manager
2660            .on_connection_established(
2661                peer,
2662                &Endpoint::listener(address2.clone(), ConnectionId::from(0usize)),
2663            )
2664            .unwrap();
2665        assert_eq!(established_result, ConnectionEstablishedResult::Accept);
2666    }
2667
2668    #[tokio::test]
2669    async fn secondary_connection_closed() {
2670        let _ = tracing_subscriber::fmt()
2671            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2672            .try_init();
2673
2674        let (mut manager, _handle) = TransportManager::new(
2675            Keypair::generate(),
2676            HashSet::new(),
2677            BandwidthSink::new(),
2678            8usize,
2679            ConnectionLimitsConfig::default(),
2680        );
2681        manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
2682
2683        let peer = PeerId::random();
2684        let address1 = Multiaddr::empty()
2685            .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
2686            .with(Protocol::Tcp(8888))
2687            .with(Protocol::P2p(
2688                Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2689            ));
2690        let address2 = Multiaddr::empty()
2691            .with(Protocol::Ip4(Ipv4Addr::new(192, 168, 1, 173)))
2692            .with(Protocol::Tcp(8888))
2693            .with(Protocol::P2p(
2694                Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2695            ));
2696
2697        // remote peer connected to local node
2698        let emit_event = manager
2699            .on_connection_established(
2700                peer,
2701                &Endpoint::listener(address1, ConnectionId::from(0usize)),
2702            )
2703            .unwrap();
2704        assert!(std::matches!(
2705            emit_event,
2706            ConnectionEstablishedResult::Accept
2707        ));
2708
2709        // verify that the peer state is `Connected` with no seconary connection
2710        {
2711            let peers = manager.peers.read();
2712            let peer = peers.get(&peer).unwrap();
2713
2714            match &peer.state {
2715                PeerState::Connected {
2716                    dial_record: None, ..
2717                } => {
2718                    assert!(peer.secondary_connection.is_none());
2719                }
2720                state => panic!("invalid state: {state:?}"),
2721            }
2722        }
2723
2724        // second connection is established, verify that the seconary connection is tracked
2725        let emit_event = manager
2726            .on_connection_established(
2727                peer,
2728                &Endpoint::dialer(address2.clone(), ConnectionId::from(1usize)),
2729            )
2730            .unwrap();
2731        assert!(std::matches!(
2732            emit_event,
2733            ConnectionEstablishedResult::Accept
2734        ));
2735
2736        let peers = manager.peers.read();
2737        let context = peers.get(&peer).unwrap();
2738
2739        match &context.state {
2740            PeerState::Connected {
2741                dial_record: None, ..
2742            } => {
2743                let seconary_connection = context.secondary_connection.as_ref().unwrap();
2744                assert_eq!(seconary_connection.address(), &address2);
2745                assert_eq!(seconary_connection.score(), SCORE_CONNECT_SUCCESS);
2746            }
2747            state => panic!("invalid state: {state:?}"),
2748        }
2749        drop(peers);
2750
2751        // close the secondary connection and verify that the peer remains connected
2752        let emit_event = manager.on_connection_closed(peer, ConnectionId::from(1usize)).unwrap();
2753        assert!(emit_event.is_none());
2754
2755        let peers = manager.peers.read();
2756        let context = peers.get(&peer).unwrap();
2757
2758        match &context.state {
2759            PeerState::Connected {
2760                dial_record: None,
2761                record,
2762            } => {
2763                assert!(context.secondary_connection.is_none());
2764                assert!(context.addresses.contains(&address2));
2765                assert_eq!(record.connection_id(), &Some(ConnectionId::from(0usize)));
2766            }
2767            state => panic!("invalid state: {state:?}"),
2768        }
2769    }
2770
2771    #[tokio::test]
2772    async fn switch_to_secondary_connection() {
2773        let _ = tracing_subscriber::fmt()
2774            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2775            .try_init();
2776
2777        let (mut manager, _handle) = TransportManager::new(
2778            Keypair::generate(),
2779            HashSet::new(),
2780            BandwidthSink::new(),
2781            8usize,
2782            ConnectionLimitsConfig::default(),
2783        );
2784        manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
2785
2786        let peer = PeerId::random();
2787        let address1 = Multiaddr::empty()
2788            .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
2789            .with(Protocol::Tcp(8888))
2790            .with(Protocol::P2p(
2791                Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2792            ));
2793        let address2 = Multiaddr::empty()
2794            .with(Protocol::Ip4(Ipv4Addr::new(192, 168, 1, 173)))
2795            .with(Protocol::Tcp(8888))
2796            .with(Protocol::P2p(
2797                Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2798            ));
2799
2800        // remote peer connected to local node
2801        let emit_event = manager
2802            .on_connection_established(
2803                peer,
2804                &Endpoint::listener(address1.clone(), ConnectionId::from(0usize)),
2805            )
2806            .unwrap();
2807        assert!(std::matches!(
2808            emit_event,
2809            ConnectionEstablishedResult::Accept
2810        ));
2811
2812        // verify that the peer state is `Connected` with no seconary connection
2813        {
2814            let peers = manager.peers.read();
2815            let peer = peers.get(&peer).unwrap();
2816
2817            match &peer.state {
2818                PeerState::Connected {
2819                    dial_record: None, ..
2820                } => {
2821                    assert!(peer.secondary_connection.is_none());
2822                }
2823                state => panic!("invalid state: {state:?}"),
2824            }
2825        }
2826
2827        // second connection is established, verify that the seconary connection is tracked
2828        let emit_event = manager
2829            .on_connection_established(
2830                peer,
2831                &Endpoint::dialer(address2.clone(), ConnectionId::from(1usize)),
2832            )
2833            .unwrap();
2834        assert!(std::matches!(
2835            emit_event,
2836            ConnectionEstablishedResult::Accept
2837        ));
2838
2839        let peers = manager.peers.read();
2840        let context = peers.get(&peer).unwrap();
2841
2842        match &context.state {
2843            PeerState::Connected {
2844                dial_record: None, ..
2845            } => {
2846                let seconary_connection = context.secondary_connection.as_ref().unwrap();
2847                assert_eq!(seconary_connection.address(), &address2);
2848                assert_eq!(seconary_connection.score(), SCORE_CONNECT_SUCCESS);
2849            }
2850            state => panic!("invalid state: {state:?}"),
2851        }
2852        drop(peers);
2853
2854        // close the primary connection and verify that the peer remains connected
2855        // while the primary connection address is stored in peer addresses
2856        let emit_event = manager.on_connection_closed(peer, ConnectionId::from(0usize)).unwrap();
2857        assert!(emit_event.is_none());
2858
2859        let peers = manager.peers.read();
2860        let context = peers.get(&peer).unwrap();
2861
2862        match &context.state {
2863            PeerState::Connected {
2864                dial_record: None,
2865                record,
2866            } => {
2867                assert!(context.secondary_connection.is_none());
2868                assert!(context.addresses.contains(&address1));
2869                assert_eq!(record.connection_id(), &Some(ConnectionId::from(1usize)));
2870            }
2871            state => panic!("invalid state: {state:?}"),
2872        }
2873    }
2874
2875    // two connections already exist and a third was opened which is ignored by
2876    // `on_connection_established()`, when that connection is closed, verify that
2877    // it's handled gracefully
2878    #[tokio::test]
2879    async fn tertiary_connection_closed() {
2880        let _ = tracing_subscriber::fmt()
2881            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2882            .try_init();
2883
2884        let (mut manager, _handle) = TransportManager::new(
2885            Keypair::generate(),
2886            HashSet::new(),
2887            BandwidthSink::new(),
2888            8usize,
2889            ConnectionLimitsConfig::default(),
2890        );
2891        manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
2892
2893        let peer = PeerId::random();
2894        let address1 = Multiaddr::empty()
2895            .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
2896            .with(Protocol::Tcp(8888))
2897            .with(Protocol::P2p(
2898                Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2899            ));
2900        let address2 = Multiaddr::empty()
2901            .with(Protocol::Ip4(Ipv4Addr::new(192, 168, 1, 173)))
2902            .with(Protocol::Tcp(8888))
2903            .with(Protocol::P2p(
2904                Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2905            ));
2906        let address3 = Multiaddr::empty()
2907            .with(Protocol::Ip4(Ipv4Addr::new(192, 168, 1, 173)))
2908            .with(Protocol::Tcp(9999))
2909            .with(Protocol::P2p(
2910                Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2911            ));
2912
2913        // remote peer connected to local node
2914        let emit_event = manager
2915            .on_connection_established(
2916                peer,
2917                &Endpoint::listener(address1, ConnectionId::from(0usize)),
2918            )
2919            .unwrap();
2920        assert!(std::matches!(
2921            emit_event,
2922            ConnectionEstablishedResult::Accept
2923        ));
2924
2925        // verify that the peer state is `Connected` with no seconary connection
2926        {
2927            let peers = manager.peers.read();
2928            let peer = peers.get(&peer).unwrap();
2929
2930            match &peer.state {
2931                PeerState::Connected {
2932                    dial_record: None, ..
2933                } => {
2934                    assert!(peer.secondary_connection.is_none());
2935                }
2936                state => panic!("invalid state: {state:?}"),
2937            }
2938        }
2939
2940        // second connection is established, verify that the seconary connection is tracked
2941        let emit_event = manager
2942            .on_connection_established(
2943                peer,
2944                &Endpoint::dialer(address2.clone(), ConnectionId::from(1usize)),
2945            )
2946            .unwrap();
2947        assert!(std::matches!(
2948            emit_event,
2949            ConnectionEstablishedResult::Accept
2950        ));
2951
2952        let peers = manager.peers.read();
2953        let context = peers.get(&peer).unwrap();
2954
2955        match &context.state {
2956            PeerState::Connected {
2957                dial_record: None, ..
2958            } => {
2959                let seconary_connection = context.secondary_connection.as_ref().unwrap();
2960                assert_eq!(seconary_connection.address(), &address2);
2961                assert_eq!(seconary_connection.score(), SCORE_CONNECT_SUCCESS);
2962            }
2963            state => panic!("invalid state: {state:?}"),
2964        }
2965        drop(peers);
2966
2967        // third connection is established, verify that it's discarded
2968        let emit_event = manager
2969            .on_connection_established(
2970                peer,
2971                &Endpoint::listener(address3.clone(), ConnectionId::from(2usize)),
2972            )
2973            .unwrap();
2974        assert!(std::matches!(
2975            emit_event,
2976            ConnectionEstablishedResult::Reject
2977        ));
2978
2979        let peers = manager.peers.read();
2980        let context = peers.get(&peer).unwrap();
2981        assert!(context.addresses.contains(&address3));
2982        drop(peers);
2983
2984        // close the tertiary connection that was ignored
2985        let emit_event = manager.on_connection_closed(peer, ConnectionId::from(2usize)).unwrap();
2986        assert!(emit_event.is_none());
2987
2988        // verify that the state remains unchanged
2989        let peers = manager.peers.read();
2990        let context = peers.get(&peer).unwrap();
2991
2992        match &context.state {
2993            PeerState::Connected {
2994                dial_record: None, ..
2995            } => {
2996                let seconary_connection = context.secondary_connection.as_ref().unwrap();
2997                assert_eq!(seconary_connection.address(), &address2);
2998                assert_eq!(seconary_connection.score(), SCORE_CONNECT_SUCCESS);
2999            }
3000            state => panic!("invalid state: {state:?}"),
3001        }
3002        drop(peers);
3003    }
3004
3005    #[tokio::test]
3006    #[cfg(debug_assertions)]
3007    #[should_panic]
3008    async fn dial_failure_for_unknow_connection() {
3009        let _ = tracing_subscriber::fmt()
3010            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3011            .try_init();
3012
3013        let (mut manager, _handle) = TransportManager::new(
3014            Keypair::generate(),
3015            HashSet::new(),
3016            BandwidthSink::new(),
3017            8usize,
3018            ConnectionLimitsConfig::default(),
3019        );
3020
3021        manager.on_dial_failure(ConnectionId::random()).unwrap();
3022    }
3023
3024    #[tokio::test]
3025    #[cfg(debug_assertions)]
3026    #[should_panic]
3027    async fn dial_failure_for_unknow_peer() {
3028        let _ = tracing_subscriber::fmt()
3029            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3030            .try_init();
3031
3032        let (mut manager, _handle) = TransportManager::new(
3033            Keypair::generate(),
3034            HashSet::new(),
3035            BandwidthSink::new(),
3036            8usize,
3037            ConnectionLimitsConfig::default(),
3038        );
3039        let connection_id = ConnectionId::random();
3040        let peer = PeerId::random();
3041        manager.pending_connections.insert(connection_id, peer);
3042        manager.on_dial_failure(connection_id).unwrap();
3043    }
3044
3045    #[tokio::test]
3046    #[cfg(debug_assertions)]
3047    #[should_panic]
3048    async fn connection_closed_for_unknown_peer() {
3049        let _ = tracing_subscriber::fmt()
3050            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3051            .try_init();
3052
3053        let (mut manager, _handle) = TransportManager::new(
3054            Keypair::generate(),
3055            HashSet::new(),
3056            BandwidthSink::new(),
3057            8usize,
3058            ConnectionLimitsConfig::default(),
3059        );
3060        manager.on_connection_closed(PeerId::random(), ConnectionId::random()).unwrap();
3061    }
3062
3063    #[tokio::test]
3064    #[cfg(debug_assertions)]
3065    #[should_panic]
3066    async fn unknown_connection_opened() {
3067        let _ = tracing_subscriber::fmt()
3068            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3069            .try_init();
3070
3071        let (mut manager, _handle) = TransportManager::new(
3072            Keypair::generate(),
3073            HashSet::new(),
3074            BandwidthSink::new(),
3075            8usize,
3076            ConnectionLimitsConfig::default(),
3077        );
3078        manager
3079            .on_connection_opened(
3080                SupportedTransport::Tcp,
3081                ConnectionId::random(),
3082                Multiaddr::empty(),
3083            )
3084            .unwrap();
3085    }
3086
3087    #[tokio::test]
3088    #[cfg(debug_assertions)]
3089    #[should_panic]
3090    async fn connection_opened_for_unknown_peer() {
3091        let _ = tracing_subscriber::fmt()
3092            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3093            .try_init();
3094
3095        let (mut manager, _handle) = TransportManager::new(
3096            Keypair::generate(),
3097            HashSet::new(),
3098            BandwidthSink::new(),
3099            8usize,
3100            ConnectionLimitsConfig::default(),
3101        );
3102        let connection_id = ConnectionId::random();
3103        let peer = PeerId::random();
3104
3105        manager.pending_connections.insert(connection_id, peer);
3106        manager
3107            .on_connection_opened(SupportedTransport::Tcp, connection_id, Multiaddr::empty())
3108            .unwrap();
3109    }
3110
3111    #[tokio::test]
3112    #[cfg(debug_assertions)]
3113    #[should_panic]
3114    async fn connection_established_for_wrong_peer() {
3115        let _ = tracing_subscriber::fmt()
3116            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3117            .try_init();
3118
3119        let (mut manager, _handle) = TransportManager::new(
3120            Keypair::generate(),
3121            HashSet::new(),
3122            BandwidthSink::new(),
3123            8usize,
3124            ConnectionLimitsConfig::default(),
3125        );
3126        let connection_id = ConnectionId::random();
3127        let peer = PeerId::random();
3128
3129        manager.pending_connections.insert(connection_id, peer);
3130        manager
3131            .on_connection_established(
3132                PeerId::random(),
3133                &Endpoint::dialer(Multiaddr::empty(), connection_id),
3134            )
3135            .unwrap();
3136    }
3137
3138    #[tokio::test]
3139    #[cfg(debug_assertions)]
3140    #[should_panic]
3141    async fn open_failure_unknown_connection() {
3142        let _ = tracing_subscriber::fmt()
3143            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3144            .try_init();
3145
3146        let (mut manager, _handle) = TransportManager::new(
3147            Keypair::generate(),
3148            HashSet::new(),
3149            BandwidthSink::new(),
3150            8usize,
3151            ConnectionLimitsConfig::default(),
3152        );
3153
3154        manager
3155            .on_open_failure(SupportedTransport::Tcp, ConnectionId::random())
3156            .unwrap();
3157    }
3158
3159    #[tokio::test]
3160    #[cfg(debug_assertions)]
3161    #[should_panic]
3162    async fn open_failure_unknown_peer() {
3163        let _ = tracing_subscriber::fmt()
3164            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3165            .try_init();
3166
3167        let (mut manager, _handle) = TransportManager::new(
3168            Keypair::generate(),
3169            HashSet::new(),
3170            BandwidthSink::new(),
3171            8usize,
3172            ConnectionLimitsConfig::default(),
3173        );
3174        let connection_id = ConnectionId::random();
3175        let peer = PeerId::random();
3176
3177        manager.pending_connections.insert(connection_id, peer);
3178        manager.on_open_failure(SupportedTransport::Tcp, connection_id).unwrap();
3179    }
3180
3181    #[tokio::test]
3182    async fn no_transports() {
3183        let _ = tracing_subscriber::fmt()
3184            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3185            .try_init();
3186
3187        let (mut manager, _handle) = TransportManager::new(
3188            Keypair::generate(),
3189            HashSet::new(),
3190            BandwidthSink::new(),
3191            8usize,
3192            ConnectionLimitsConfig::default(),
3193        );
3194
3195        assert!(manager.next().await.is_none());
3196    }
3197
3198    #[tokio::test]
3199    async fn dial_already_connected_peer() {
3200        let (mut manager, _handle) = TransportManager::new(
3201            Keypair::generate(),
3202            HashSet::new(),
3203            BandwidthSink::new(),
3204            8usize,
3205            ConnectionLimitsConfig::default(),
3206        );
3207
3208        let peer = {
3209            let peer = PeerId::random();
3210            let mut peers = manager.peers.write();
3211
3212            peers.insert(
3213                peer,
3214                PeerContext {
3215                    state: PeerState::Connected {
3216                        record: AddressRecord::from_multiaddr(
3217                            Multiaddr::empty()
3218                                .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
3219                                .with(Protocol::Tcp(8888))
3220                                .with(Protocol::P2p(Multihash::from(peer))),
3221                        )
3222                        .unwrap(),
3223                        dial_record: None,
3224                    },
3225                    secondary_connection: None,
3226                    addresses: AddressStore::from_iter(
3227                        vec![Multiaddr::empty()
3228                            .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
3229                            .with(Protocol::Tcp(8888))
3230                            .with(Protocol::P2p(Multihash::from(peer)))]
3231                        .into_iter(),
3232                    ),
3233                },
3234            );
3235            drop(peers);
3236
3237            peer
3238        };
3239
3240        match manager.dial(peer).await {
3241            Err(Error::AlreadyConnected) => {}
3242            _ => panic!("invalid return value"),
3243        }
3244    }
3245
3246    #[tokio::test]
3247    async fn peer_already_being_dialed() {
3248        let (mut manager, _handle) = TransportManager::new(
3249            Keypair::generate(),
3250            HashSet::new(),
3251            BandwidthSink::new(),
3252            8usize,
3253            ConnectionLimitsConfig::default(),
3254        );
3255
3256        let peer = {
3257            let peer = PeerId::random();
3258            let mut peers = manager.peers.write();
3259
3260            peers.insert(
3261                peer,
3262                PeerContext {
3263                    state: PeerState::Dialing {
3264                        record: AddressRecord::from_multiaddr(
3265                            Multiaddr::empty()
3266                                .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
3267                                .with(Protocol::Tcp(8888))
3268                                .with(Protocol::P2p(Multihash::from(peer))),
3269                        )
3270                        .unwrap(),
3271                    },
3272                    secondary_connection: None,
3273                    addresses: AddressStore::from_iter(
3274                        vec![Multiaddr::empty()
3275                            .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
3276                            .with(Protocol::Tcp(8888))
3277                            .with(Protocol::P2p(Multihash::from(peer)))]
3278                        .into_iter(),
3279                    ),
3280                },
3281            );
3282            drop(peers);
3283
3284            peer
3285        };
3286
3287        manager.dial(peer).await.unwrap();
3288
3289        // Check state is unaltered.
3290        {
3291            let peers = manager.peers.read();
3292            let peer_context = peers.get(&peer).unwrap();
3293
3294            match &peer_context.state {
3295                PeerState::Dialing { record } => {
3296                    assert_eq!(
3297                        record.address(),
3298                        &Multiaddr::empty()
3299                            .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
3300                            .with(Protocol::Tcp(8888))
3301                            .with(Protocol::P2p(Multihash::from(peer)))
3302                    );
3303                }
3304                state => panic!("invalid state: {state:?}"),
3305            }
3306        }
3307    }
3308
3309    #[tokio::test]
3310    async fn pending_connection_for_disconnected_peer() {
3311        let (mut manager, _handle) = TransportManager::new(
3312            Keypair::generate(),
3313            HashSet::new(),
3314            BandwidthSink::new(),
3315            8usize,
3316            ConnectionLimitsConfig::default(),
3317        );
3318
3319        let peer = {
3320            let peer = PeerId::random();
3321            let mut peers = manager.peers.write();
3322
3323            peers.insert(
3324                peer,
3325                PeerContext {
3326                    state: PeerState::Disconnected {
3327                        dial_record: Some(
3328                            AddressRecord::from_multiaddr(
3329                                Multiaddr::empty()
3330                                    .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
3331                                    .with(Protocol::Tcp(8888))
3332                                    .with(Protocol::P2p(Multihash::from(peer))),
3333                            )
3334                            .unwrap(),
3335                        ),
3336                    },
3337                    secondary_connection: None,
3338                    addresses: AddressStore::new(),
3339                },
3340            );
3341            drop(peers);
3342
3343            peer
3344        };
3345
3346        manager.dial(peer).await.unwrap();
3347    }
3348
3349    #[tokio::test]
3350    async fn dial_address_invalid_transport() {
3351        let _ = tracing_subscriber::fmt()
3352            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3353            .try_init();
3354
3355        let (mut manager, _handle) = TransportManager::new(
3356            Keypair::generate(),
3357            HashSet::new(),
3358            BandwidthSink::new(),
3359            8usize,
3360            ConnectionLimitsConfig::default(),
3361        );
3362
3363        // transport doesn't start with ip/dns
3364        {
3365            let address = Multiaddr::empty().with(Protocol::P2p(Multihash::from(PeerId::random())));
3366            match manager.dial_address(address.clone()).await {
3367                Err(Error::TransportNotSupported(dial_address)) => {
3368                    assert_eq!(dial_address, address);
3369                }
3370                _ => panic!("invalid return value"),
3371            }
3372        }
3373
3374        {
3375            // upd-based protocol but not quic
3376            let address = Multiaddr::empty()
3377                .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
3378                .with(Protocol::Udp(8888))
3379                .with(Protocol::Utp)
3380                .with(Protocol::P2p(Multihash::from(PeerId::random())));
3381            match manager.dial_address(address.clone()).await {
3382                Err(Error::TransportNotSupported(dial_address)) => {
3383                    assert_eq!(dial_address, address);
3384                }
3385                res => panic!("invalid return value: {res:?}"),
3386            }
3387        }
3388
3389        // not tcp nor udp
3390        {
3391            let address = Multiaddr::empty()
3392                .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
3393                .with(Protocol::Sctp(8888))
3394                .with(Protocol::P2p(Multihash::from(PeerId::random())));
3395            match manager.dial_address(address.clone()).await {
3396                Err(Error::TransportNotSupported(dial_address)) => {
3397                    assert_eq!(dial_address, address);
3398                }
3399                _ => panic!("invalid return value"),
3400            }
3401        }
3402
3403        // random protocol after tcp
3404        {
3405            let address = Multiaddr::empty()
3406                .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
3407                .with(Protocol::Tcp(8888))
3408                .with(Protocol::Utp)
3409                .with(Protocol::P2p(Multihash::from(PeerId::random())));
3410            match manager.dial_address(address.clone()).await {
3411                Err(Error::TransportNotSupported(dial_address)) => {
3412                    assert_eq!(dial_address, address);
3413                }
3414                _ => panic!("invalid return value"),
3415            }
3416        }
3417    }
3418
3419    #[tokio::test]
3420    async fn dial_address_peer_id_missing() {
3421        let (mut manager, _handle) = TransportManager::new(
3422            Keypair::generate(),
3423            HashSet::new(),
3424            BandwidthSink::new(),
3425            8usize,
3426            ConnectionLimitsConfig::default(),
3427        );
3428
3429        async fn call_manager(manager: &mut TransportManager, address: Multiaddr) {
3430            match manager.dial_address(address).await {
3431                Err(Error::AddressError(AddressError::PeerIdMissing)) => {}
3432                _ => panic!("invalid return value"),
3433            }
3434        }
3435
3436        {
3437            call_manager(
3438                &mut manager,
3439                Multiaddr::empty()
3440                    .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
3441                    .with(Protocol::Tcp(8888)),
3442            )
3443            .await;
3444        }
3445
3446        {
3447            call_manager(
3448                &mut manager,
3449                Multiaddr::empty()
3450                    .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
3451                    .with(Protocol::Tcp(8888))
3452                    .with(Protocol::Wss(std::borrow::Cow::Owned("".to_string()))),
3453            )
3454            .await;
3455        }
3456
3457        {
3458            call_manager(
3459                &mut manager,
3460                Multiaddr::empty()
3461                    .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
3462                    .with(Protocol::Udp(8888))
3463                    .with(Protocol::QuicV1),
3464            )
3465            .await;
3466        }
3467    }
3468
3469    #[tokio::test]
3470    async fn inbound_connection_while_dialing() {
3471        let _ = tracing_subscriber::fmt()
3472            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3473            .try_init();
3474
3475        let (mut manager, _handle) = TransportManager::new(
3476            Keypair::generate(),
3477            HashSet::new(),
3478            BandwidthSink::new(),
3479            8usize,
3480            ConnectionLimitsConfig::default(),
3481        );
3482        let peer = PeerId::random();
3483        let dial_address = Multiaddr::empty()
3484            .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
3485            .with(Protocol::Tcp(8888))
3486            .with(Protocol::P2p(
3487                Multihash::from_bytes(&peer.to_bytes()).unwrap(),
3488            ));
3489
3490        let connection_id = ConnectionId::random();
3491        let transport = Box::new({
3492            let mut transport = DummyTransport::new();
3493            transport.inject_event(TransportEvent::ConnectionEstablished {
3494                peer,
3495                endpoint: Endpoint::listener(dial_address.clone(), connection_id),
3496            });
3497            transport
3498        });
3499        manager.register_transport(SupportedTransport::Tcp, transport);
3500        manager.add_known_address(
3501            peer,
3502            vec![Multiaddr::empty()
3503                .with(Protocol::Ip4(Ipv4Addr::new(192, 168, 1, 5)))
3504                .with(Protocol::Tcp(8888))
3505                .with(Protocol::P2p(Multihash::from(peer)))]
3506            .into_iter(),
3507        );
3508
3509        assert!(manager.dial(peer).await.is_ok());
3510        assert!(!manager.pending_connections.is_empty());
3511
3512        {
3513            let peers = manager.peers.read();
3514
3515            match peers.get(&peer) {
3516                Some(PeerContext {
3517                    state: PeerState::Opening { .. },
3518                    ..
3519                }) => {}
3520                state => panic!("invalid state for peer: {state:?}"),
3521            }
3522        }
3523
3524        match manager.next().await.unwrap() {
3525            TransportEvent::ConnectionEstablished {
3526                peer: event_peer,
3527                endpoint: event_endpoint,
3528                ..
3529            } => {
3530                assert_eq!(peer, event_peer);
3531                assert_eq!(
3532                    event_endpoint,
3533                    Endpoint::listener(dial_address.clone(), connection_id),
3534                );
3535            }
3536            event => panic!("invalid event: {event:?}"),
3537        }
3538        assert!(manager.pending_connections.is_empty());
3539
3540        let peers = manager.peers.read();
3541        match peers.get(&peer).unwrap() {
3542            PeerContext {
3543                state:
3544                    PeerState::Connected {
3545                        record,
3546                        dial_record,
3547                    },
3548                secondary_connection,
3549                addresses,
3550            } => {
3551                assert!(!addresses.contains(record.address()));
3552                assert!(dial_record.is_none());
3553                assert!(secondary_connection.is_none());
3554                assert_eq!(record.address(), &dial_address);
3555                assert_eq!(record.connection_id(), &Some(connection_id));
3556            }
3557            state => panic!("invalid peer state: {state:?}"),
3558        }
3559    }
3560
3561    #[tokio::test]
3562    async fn inbound_connection_for_same_address_while_dialing() {
3563        let _ = tracing_subscriber::fmt()
3564            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3565            .try_init();
3566
3567        let (mut manager, _handle) = TransportManager::new(
3568            Keypair::generate(),
3569            HashSet::new(),
3570            BandwidthSink::new(),
3571            8usize,
3572            ConnectionLimitsConfig::default(),
3573        );
3574        let peer = PeerId::random();
3575        let dial_address = Multiaddr::empty()
3576            .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
3577            .with(Protocol::Tcp(8888))
3578            .with(Protocol::P2p(
3579                Multihash::from_bytes(&peer.to_bytes()).unwrap(),
3580            ));
3581
3582        let connection_id = ConnectionId::random();
3583        let transport = Box::new({
3584            let mut transport = DummyTransport::new();
3585            transport.inject_event(TransportEvent::ConnectionEstablished {
3586                peer,
3587                endpoint: Endpoint::listener(dial_address.clone(), connection_id),
3588            });
3589            transport
3590        });
3591        manager.register_transport(SupportedTransport::Tcp, transport);
3592        manager.add_known_address(
3593            peer,
3594            vec![Multiaddr::empty()
3595                .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
3596                .with(Protocol::Tcp(8888))
3597                .with(Protocol::P2p(Multihash::from(peer)))]
3598            .into_iter(),
3599        );
3600
3601        assert!(manager.dial(peer).await.is_ok());
3602        assert!(!manager.pending_connections.is_empty());
3603
3604        {
3605            let peers = manager.peers.read();
3606
3607            match peers.get(&peer) {
3608                Some(PeerContext {
3609                    state: PeerState::Opening { .. },
3610                    ..
3611                }) => {}
3612                state => panic!("invalid state for peer: {state:?}"),
3613            }
3614        }
3615
3616        match manager.next().await.unwrap() {
3617            TransportEvent::ConnectionEstablished {
3618                peer: event_peer,
3619                endpoint: event_endpoint,
3620                ..
3621            } => {
3622                assert_eq!(peer, event_peer);
3623                assert_eq!(
3624                    event_endpoint,
3625                    Endpoint::listener(dial_address.clone(), connection_id),
3626                );
3627            }
3628            event => panic!("invalid event: {event:?}"),
3629        }
3630        assert!(manager.pending_connections.is_empty());
3631
3632        let peers = manager.peers.read();
3633        match peers.get(&peer).unwrap() {
3634            PeerContext {
3635                state:
3636                    PeerState::Connected {
3637                        record,
3638                        dial_record,
3639                    },
3640                secondary_connection,
3641                addresses,
3642            } => {
3643                assert!(addresses.is_empty());
3644                assert!(dial_record.is_none());
3645                assert!(secondary_connection.is_none());
3646                assert_eq!(record.address(), &dial_address);
3647                assert_eq!(record.connection_id(), &Some(connection_id));
3648            }
3649            state => panic!("invalid peer state: {state:?}"),
3650        }
3651    }
3652
3653    #[tokio::test]
3654    async fn manager_limits_incoming_connections() {
3655        let _ = tracing_subscriber::fmt()
3656            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3657            .try_init();
3658
3659        let (mut manager, _handle) = TransportManager::new(
3660            Keypair::generate(),
3661            HashSet::new(),
3662            BandwidthSink::new(),
3663            8usize,
3664            ConnectionLimitsConfig::default()
3665                .max_incoming_connections(Some(3))
3666                .max_outgoing_connections(Some(2)),
3667        );
3668        // The connection limit is agnostic of the underlying transports.
3669        manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
3670
3671        let peer = PeerId::random();
3672        let second_peer = PeerId::random();
3673
3674        // Setup addresses.
3675        let (first_addr, first_connection_id) = setup_dial_addr(peer, 0);
3676        let (second_addr, second_connection_id) = setup_dial_addr(second_peer, 1);
3677        let (_, third_connection_id) = setup_dial_addr(peer, 2);
3678        let (_, remote_connection_id) = setup_dial_addr(peer, 3);
3679
3680        // Peer established the first inbound connection.
3681        let result = manager
3682            .on_connection_established(
3683                peer,
3684                &Endpoint::listener(first_addr.clone(), first_connection_id),
3685            )
3686            .unwrap();
3687        assert_eq!(result, ConnectionEstablishedResult::Accept);
3688
3689        // The peer is allowed to dial us a second time.
3690        let result = manager
3691            .on_connection_established(
3692                peer,
3693                &Endpoint::listener(first_addr.clone(), second_connection_id),
3694            )
3695            .unwrap();
3696        assert_eq!(result, ConnectionEstablishedResult::Accept);
3697
3698        // Second peer calls us.
3699        let result = manager
3700            .on_connection_established(
3701                second_peer,
3702                &Endpoint::listener(second_addr.clone(), third_connection_id),
3703            )
3704            .unwrap();
3705        assert_eq!(result, ConnectionEstablishedResult::Accept);
3706
3707        // Limits of inbound connections are reached.
3708        let result = manager
3709            .on_connection_established(
3710                second_peer,
3711                &Endpoint::listener(second_addr.clone(), remote_connection_id),
3712            )
3713            .unwrap();
3714        assert_eq!(result, ConnectionEstablishedResult::Reject);
3715
3716        // Close one connection.
3717        let _ = manager.on_connection_closed(peer, first_connection_id).unwrap();
3718
3719        // The second peer can establish 2 inbounds now.
3720        let result = manager
3721            .on_connection_established(
3722                second_peer,
3723                &Endpoint::listener(second_addr.clone(), remote_connection_id),
3724            )
3725            .unwrap();
3726        assert_eq!(result, ConnectionEstablishedResult::Accept);
3727    }
3728
3729    #[tokio::test]
3730    async fn manager_limits_outbound_connections() {
3731        let _ = tracing_subscriber::fmt()
3732            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3733            .try_init();
3734
3735        let (mut manager, _handle) = TransportManager::new(
3736            Keypair::generate(),
3737            HashSet::new(),
3738            BandwidthSink::new(),
3739            8usize,
3740            ConnectionLimitsConfig::default()
3741                .max_incoming_connections(Some(3))
3742                .max_outgoing_connections(Some(2)),
3743        );
3744        // The connection limit is agnostic of the underlying transports.
3745        manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
3746
3747        let peer = PeerId::random();
3748        let second_peer = PeerId::random();
3749        let third_peer = PeerId::random();
3750
3751        // Setup addresses.
3752        let (first_addr, first_connection_id) = setup_dial_addr(peer, 0);
3753        let (second_addr, second_connection_id) = setup_dial_addr(second_peer, 1);
3754        let (third_addr, third_connection_id) = setup_dial_addr(third_peer, 2);
3755
3756        // First dial.
3757        manager.dial_address(first_addr.clone()).await.unwrap();
3758
3759        // Second dial.
3760        manager.dial_address(second_addr.clone()).await.unwrap();
3761
3762        // Third dial, we have a limit on 2 outbound connections.
3763        manager.dial_address(third_addr.clone()).await.unwrap();
3764
3765        let result = manager
3766            .on_connection_established(
3767                peer,
3768                &Endpoint::dialer(first_addr.clone(), first_connection_id),
3769            )
3770            .unwrap();
3771
3772        assert_eq!(result, ConnectionEstablishedResult::Accept);
3773
3774        let result = manager
3775            .on_connection_established(
3776                second_peer,
3777                &Endpoint::dialer(second_addr.clone(), second_connection_id),
3778            )
3779            .unwrap();
3780        assert_eq!(result, ConnectionEstablishedResult::Accept);
3781
3782        // We have reached the limit now.
3783        let result = manager
3784            .on_connection_established(
3785                third_peer,
3786                &Endpoint::dialer(third_addr.clone(), third_connection_id),
3787            )
3788            .unwrap();
3789        assert_eq!(result, ConnectionEstablishedResult::Reject);
3790
3791        // While we have 2 outbound connections active, any dials will fail immediately.
3792        // We cannot perform this check for the non negotiated inbound connections yet,
3793        // since the transport will eagerly accept and negotiate them. This requires
3794        // a refactor into the transport manager, to not waste resources on
3795        // negotiating connections that will be rejected.
3796        let result = manager.dial(peer).await.unwrap_err();
3797        assert!(std::matches!(
3798            result,
3799            Error::ConnectionLimit(limits::ConnectionLimitsError::MaxOutgoingConnectionsExceeded)
3800        ));
3801        let result = manager.dial_address(first_addr.clone()).await.unwrap_err();
3802        assert!(std::matches!(
3803            result,
3804            Error::ConnectionLimit(limits::ConnectionLimitsError::MaxOutgoingConnectionsExceeded)
3805        ));
3806
3807        // Close one connection.
3808        let _ = manager.on_connection_closed(peer, first_connection_id).unwrap();
3809        // We can now dial again.
3810        manager.dial_address(first_addr.clone()).await.unwrap();
3811
3812        let result = manager
3813            .on_connection_established(peer, &Endpoint::dialer(first_addr, first_connection_id))
3814            .unwrap();
3815        assert_eq!(result, ConnectionEstablishedResult::Accept);
3816    }
3817
3818    #[tokio::test]
3819    async fn reject_unknown_secondary_connections_with_different_connection_ids() {
3820        let _ = tracing_subscriber::fmt()
3821            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3822            .try_init();
3823
3824        let (mut manager, _handle) = TransportManager::new(
3825            Keypair::generate(),
3826            HashSet::new(),
3827            BandwidthSink::new(),
3828            8usize,
3829            ConnectionLimitsConfig::default(),
3830        );
3831        manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
3832
3833        // Random peer ID.
3834        let peer = PeerId::random();
3835        let (first_addr, first_connection_id) = setup_dial_addr(peer, 0);
3836        let second_connection_id = ConnectionId::from(1);
3837        let different_connection_id = ConnectionId::from(2);
3838
3839        // Setup a connected peer with a dial record active.
3840        {
3841            let mut peers = manager.peers.write();
3842
3843            let state = PeerState::Connected {
3844                record: AddressRecord::new(&peer, first_addr.clone(), 0, Some(first_connection_id)),
3845                dial_record: Some(AddressRecord::new(
3846                    &peer,
3847                    first_addr.clone(),
3848                    0,
3849                    Some(second_connection_id),
3850                )),
3851            };
3852
3853            let peer_context = PeerContext {
3854                state,
3855                secondary_connection: None,
3856                addresses: AddressStore::from_iter(vec![first_addr.clone()].into_iter()),
3857            };
3858
3859            peers.insert(peer, peer_context);
3860        }
3861
3862        // Establish a connection, however the connection ID is different.
3863        let result = manager
3864            .on_connection_established(
3865                peer,
3866                &Endpoint::dialer(first_addr.clone(), different_connection_id),
3867            )
3868            .unwrap();
3869        assert_eq!(result, ConnectionEstablishedResult::Reject);
3870    }
3871
3872    #[tokio::test]
3873    async fn guard_against_secondary_connections_with_different_connection_ids() {
3874        // This is the repro case for https://github.com/paritytech/litep2p/issues/172.
3875        let _ = tracing_subscriber::fmt()
3876            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3877            .try_init();
3878
3879        let (mut manager, _handle) = TransportManager::new(
3880            Keypair::generate(),
3881            HashSet::new(),
3882            BandwidthSink::new(),
3883            8usize,
3884            ConnectionLimitsConfig::default(),
3885        );
3886        manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
3887
3888        // Random peer ID.
3889        let peer = PeerId::random();
3890
3891        let setup_dial_addr = |connection_id: u16| {
3892            let dial_address = Multiaddr::empty()
3893                .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
3894                .with(Protocol::Tcp(8888 + connection_id))
3895                .with(Protocol::P2p(
3896                    Multihash::from_bytes(&peer.to_bytes()).unwrap(),
3897                ));
3898            let connection_id = ConnectionId::from(connection_id as usize);
3899
3900            (dial_address, connection_id)
3901        };
3902
3903        // Setup addresses.
3904        let (first_addr, first_connection_id) = setup_dial_addr(0);
3905        let (second_addr, _second_connection_id) = setup_dial_addr(1);
3906        let (remote_addr, remote_connection_id) = setup_dial_addr(2);
3907
3908        // Step 1. Dialing state to peer.
3909        manager.dial_address(first_addr.clone()).await.unwrap();
3910        {
3911            let peers = manager.peers.read();
3912            let peer_context = peers.get(&peer).unwrap();
3913            match &peer_context.state {
3914                PeerState::Dialing { record } => {
3915                    assert_eq!(record.address(), &first_addr);
3916                }
3917                state => panic!("invalid state: {state:?}"),
3918            }
3919        }
3920
3921        // Step 2. Connection established by the remote peer.
3922        let result = manager
3923            .on_connection_established(
3924                peer,
3925                &Endpoint::listener(remote_addr.clone(), remote_connection_id),
3926            )
3927            .unwrap();
3928        assert_eq!(result, ConnectionEstablishedResult::Accept);
3929        {
3930            let peers = manager.peers.read();
3931            let peer_context = peers.get(&peer).unwrap();
3932            match &peer_context.state {
3933                PeerState::Connected {
3934                    record,
3935                    dial_record,
3936                } => {
3937                    assert_eq!(record.address(), &remote_addr);
3938                    assert_eq!(record.connection_id(), &Some(remote_connection_id));
3939
3940                    let dial_record = dial_record.as_ref().unwrap();
3941                    assert_eq!(dial_record.address(), &first_addr);
3942                    assert_eq!(dial_record.connection_id(), &Some(first_connection_id))
3943                }
3944                state => panic!("invalid state: {state:?}"),
3945            }
3946        }
3947
3948        // Step 3. The peer disconnects while we have a dialing in flight.
3949        let event = manager.on_connection_closed(peer, remote_connection_id).unwrap().unwrap();
3950        match event {
3951            TransportEvent::ConnectionClosed {
3952                peer: event_peer,
3953                connection_id: event_connection_id,
3954            } => {
3955                assert_eq!(peer, event_peer);
3956                assert_eq!(event_connection_id, remote_connection_id);
3957            }
3958            event => panic!("invalid event: {event:?}"),
3959        }
3960        {
3961            let peers = manager.peers.read();
3962            let peer_context = peers.get(&peer).unwrap();
3963            match &peer_context.state {
3964                PeerState::Disconnected { dial_record } => {
3965                    let dial_record = dial_record.as_ref().unwrap();
3966                    assert_eq!(dial_record.address(), &first_addr);
3967                    assert_eq!(dial_record.connection_id(), &Some(first_connection_id));
3968                }
3969                state => panic!("invalid state: {state:?}"),
3970            }
3971        }
3972
3973        // Step 4. Dial by the second address and expect to not overwrite the state.
3974        manager.dial_address(second_addr.clone()).await.unwrap();
3975        // The state remains unchanged since we already have a dialing in flight.
3976        {
3977            let peers = manager.peers.read();
3978            let peer_context = peers.get(&peer).unwrap();
3979            match &peer_context.state {
3980                PeerState::Disconnected { dial_record } => {
3981                    let dial_record = dial_record.as_ref().unwrap();
3982                    assert_eq!(dial_record.address(), &first_addr);
3983                    assert_eq!(dial_record.connection_id(), &Some(first_connection_id));
3984                }
3985                state => panic!("invalid state: {state:?}"),
3986            }
3987        }
3988
3989        // Step 5. Remote peer reconnects again.
3990        let result = manager
3991            .on_connection_established(
3992                peer,
3993                &Endpoint::listener(remote_addr.clone(), remote_connection_id),
3994            )
3995            .unwrap();
3996        assert_eq!(result, ConnectionEstablishedResult::Accept);
3997        {
3998            let peers = manager.peers.read();
3999            let peer_context = peers.get(&peer).unwrap();
4000            match &peer_context.state {
4001                PeerState::Connected {
4002                    record,
4003                    dial_record,
4004                } => {
4005                    assert_eq!(record.address(), &remote_addr);
4006                    assert_eq!(record.connection_id(), &Some(remote_connection_id));
4007
4008                    // We have not overwritten the first dial record in step 4.
4009                    let dial_record = dial_record.as_ref().unwrap();
4010                    assert_eq!(dial_record.address(), &first_addr);
4011                    assert_eq!(dial_record.connection_id(), &Some(first_connection_id));
4012                }
4013                state => panic!("invalid state: {state:?}"),
4014            }
4015        }
4016
4017        // Step 6. First dial responds.
4018        let result = manager
4019            .on_connection_established(
4020                peer,
4021                &Endpoint::dialer(first_addr.clone(), first_connection_id),
4022            )
4023            .unwrap();
4024        assert_eq!(result, ConnectionEstablishedResult::Accept);
4025    }
4026
4027    #[tokio::test]
4028    async fn do_not_overwrite_dial_addresses() {
4029        let _ = tracing_subscriber::fmt()
4030            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
4031            .try_init();
4032
4033        let (mut manager, _handle) = TransportManager::new(
4034            Keypair::generate(),
4035            HashSet::new(),
4036            BandwidthSink::new(),
4037            8usize,
4038            ConnectionLimitsConfig::default(),
4039        );
4040        let peer = PeerId::random();
4041        let dial_address = Multiaddr::empty()
4042            .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
4043            .with(Protocol::Tcp(8888))
4044            .with(Protocol::P2p(
4045                Multihash::from_bytes(&peer.to_bytes()).unwrap(),
4046            ));
4047
4048        let connection_id = ConnectionId::from(0);
4049        let transport = Box::new({
4050            let mut transport = DummyTransport::new();
4051            transport.inject_event(TransportEvent::ConnectionEstablished {
4052                peer,
4053                endpoint: Endpoint::listener(dial_address.clone(), connection_id),
4054            });
4055            transport
4056        });
4057        manager.register_transport(SupportedTransport::Tcp, transport);
4058
4059        // First dial attempt.
4060        manager.dial_address(dial_address.clone()).await.unwrap();
4061        // check the state of the peer.
4062        {
4063            let peers = manager.peers.read();
4064            let peer_context = peers.get(&peer).unwrap();
4065            match &peer_context.state {
4066                PeerState::Dialing { record } => {
4067                    assert_eq!(record.address(), &dial_address);
4068                }
4069                state => panic!("invalid state: {state:?}"),
4070            }
4071
4072            // The address is not saved yet.
4073            assert!(!peer_context.addresses.contains(&dial_address));
4074        }
4075
4076        let second_address = Multiaddr::empty()
4077            .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
4078            .with(Protocol::Tcp(8889))
4079            .with(Protocol::P2p(
4080                Multihash::from_bytes(&peer.to_bytes()).unwrap(),
4081            ));
4082
4083        // Second dial attempt with different address.
4084        manager.dial_address(second_address.clone()).await.unwrap();
4085        // check the state of the peer.
4086        {
4087            let peers = manager.peers.read();
4088            let peer_context = peers.get(&peer).unwrap();
4089            match &peer_context.state {
4090                // Must still be dialing the first address.
4091                PeerState::Dialing { record } => {
4092                    assert_eq!(record.address(), &dial_address);
4093                }
4094                state => panic!("invalid state: {state:?}"),
4095            }
4096
4097            assert!(!peer_context.addresses.contains(&dial_address));
4098            assert!(!peer_context.addresses.contains(&second_address));
4099        }
4100    }
4101
4102    #[cfg(feature = "websocket")]
4103    #[tokio::test]
4104    async fn opening_errors_are_reported() {
4105        let _ = tracing_subscriber::fmt()
4106            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
4107            .try_init();
4108
4109        let (mut manager, _handle) = TransportManager::new(
4110            Keypair::generate(),
4111            HashSet::new(),
4112            BandwidthSink::new(),
4113            8usize,
4114            ConnectionLimitsConfig::default(),
4115        );
4116        let peer = PeerId::random();
4117        let connection_id = ConnectionId::from(0);
4118
4119        // Setup TCP transport.
4120        let dial_address_tcp = Multiaddr::empty()
4121            .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
4122            .with(Protocol::Tcp(8888))
4123            .with(Protocol::P2p(
4124                Multihash::from_bytes(&peer.to_bytes()).unwrap(),
4125            ));
4126        let transport = Box::new({
4127            let mut transport = DummyTransport::new();
4128            transport.inject_event(TransportEvent::OpenFailure {
4129                connection_id,
4130                errors: vec![(dial_address_tcp.clone(), DialError::Timeout)],
4131            });
4132            transport
4133        });
4134        manager.register_transport(SupportedTransport::Tcp, transport);
4135        manager.add_known_address(
4136            peer,
4137            vec![Multiaddr::empty()
4138                .with(Protocol::Ip4(Ipv4Addr::new(192, 168, 1, 5)))
4139                .with(Protocol::Tcp(8888))
4140                .with(Protocol::P2p(Multihash::from(peer)))]
4141            .into_iter(),
4142        );
4143
4144        // Setup WebSockets transport.
4145        let dial_address_ws = Multiaddr::empty()
4146            .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
4147            .with(Protocol::Tcp(8889))
4148            .with(Protocol::Ws(Cow::Borrowed("/")))
4149            .with(Protocol::P2p(
4150                Multihash::from_bytes(&peer.to_bytes()).unwrap(),
4151            ));
4152
4153        let transport = Box::new({
4154            let mut transport = DummyTransport::new();
4155            transport.inject_event(TransportEvent::OpenFailure {
4156                connection_id,
4157                errors: vec![(dial_address_ws.clone(), DialError::Timeout)],
4158            });
4159            transport
4160        });
4161        manager.register_transport(SupportedTransport::WebSocket, transport);
4162        manager.add_known_address(
4163            peer,
4164            vec![Multiaddr::empty()
4165                .with(Protocol::Ip4(Ipv4Addr::new(192, 168, 1, 5)))
4166                .with(Protocol::Tcp(8889))
4167                .with(Protocol::Ws(Cow::Borrowed("/")))
4168                .with(Protocol::P2p(
4169                    Multihash::from_bytes(&peer.to_bytes()).unwrap(),
4170                ))]
4171            .into_iter(),
4172        );
4173
4174        // Dial the peer on both transports.
4175        assert!(manager.dial(peer).await.is_ok());
4176        assert!(!manager.pending_connections.is_empty());
4177
4178        {
4179            let peers = manager.peers.read();
4180
4181            match peers.get(&peer) {
4182                Some(PeerContext {
4183                    state: PeerState::Opening { .. },
4184                    ..
4185                }) => {}
4186                state => panic!("invalid state for peer: {state:?}"),
4187            }
4188        }
4189
4190        match manager.next().await.unwrap() {
4191            TransportEvent::OpenFailure {
4192                connection_id,
4193                errors,
4194            } => {
4195                assert_eq!(connection_id, ConnectionId::from(0));
4196                assert_eq!(errors.len(), 2);
4197                let tcp = errors.iter().find(|(addr, _)| addr == &dial_address_tcp).unwrap();
4198                assert!(std::matches!(tcp.1, DialError::Timeout));
4199
4200                let ws = errors.iter().find(|(addr, _)| addr == &dial_address_ws).unwrap();
4201                assert!(std::matches!(ws.1, DialError::Timeout));
4202            }
4203            event => panic!("invalid event: {event:?}"),
4204        }
4205        assert!(manager.pending_connections.is_empty());
4206        assert!(manager.opening_errors.is_empty());
4207    }
4208}