litep2p/transport/manager/
handle.rs

1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::{
22    addresses::PublicAddresses,
23    crypto::ed25519::Keypair,
24    error::ImmediateDialError,
25    executor::Executor,
26    protocol::ProtocolSet,
27    transport::manager::{
28        address::{AddressRecord, AddressStore},
29        types::{PeerContext, PeerState, SupportedTransport},
30        ProtocolContext, TransportManagerEvent, LOG_TARGET,
31    },
32    types::{protocol::ProtocolName, ConnectionId},
33    BandwidthSink, PeerId,
34};
35
36use multiaddr::{Multiaddr, Protocol};
37use parking_lot::RwLock;
38use tokio::sync::mpsc::{error::TrySendError, Sender};
39
40use std::{
41    collections::{HashMap, HashSet},
42    sync::{
43        atomic::{AtomicUsize, Ordering},
44        Arc,
45    },
46};
47
48/// Inner commands sent from [`TransportManagerHandle`] to
49/// [`crate::transport::manager::TransportManager`].
50pub enum InnerTransportManagerCommand {
51    /// Dial peer.
52    DialPeer {
53        /// Remote peer ID.
54        peer: PeerId,
55    },
56
57    /// Dial address.
58    DialAddress {
59        /// Remote address.
60        address: Multiaddr,
61    },
62}
63
64/// Handle for communicating with [`crate::transport::manager::TransportManager`].
65#[derive(Debug, Clone)]
66pub struct TransportManagerHandle {
67    /// Local peer ID.
68    local_peer_id: PeerId,
69
70    /// Peers.
71    peers: Arc<RwLock<HashMap<PeerId, PeerContext>>>,
72
73    /// TX channel for sending commands to [`crate::transport::manager::TransportManager`].
74    cmd_tx: Sender<InnerTransportManagerCommand>,
75
76    /// Supported transports.
77    supported_transport: HashSet<SupportedTransport>,
78
79    /// Local listen addresess.
80    listen_addresses: Arc<RwLock<HashSet<Multiaddr>>>,
81
82    /// Public addresses.
83    public_addresses: PublicAddresses,
84}
85
86impl TransportManagerHandle {
87    /// Create new [`TransportManagerHandle`].
88    pub fn new(
89        local_peer_id: PeerId,
90        peers: Arc<RwLock<HashMap<PeerId, PeerContext>>>,
91        cmd_tx: Sender<InnerTransportManagerCommand>,
92        supported_transport: HashSet<SupportedTransport>,
93        listen_addresses: Arc<RwLock<HashSet<Multiaddr>>>,
94        public_addresses: PublicAddresses,
95    ) -> Self {
96        Self {
97            peers,
98            cmd_tx,
99            local_peer_id,
100            supported_transport,
101            listen_addresses,
102            public_addresses,
103        }
104    }
105
106    /// Register new transport to [`TransportManagerHandle`].
107    pub(crate) fn register_transport(&mut self, transport: SupportedTransport) {
108        self.supported_transport.insert(transport);
109    }
110
111    /// Get the list of public addresses of the node.
112    pub(crate) fn public_addresses(&self) -> PublicAddresses {
113        self.public_addresses.clone()
114    }
115
116    /// Get the list of listen addresses of the node.
117    pub(crate) fn listen_addresses(&self) -> HashSet<Multiaddr> {
118        self.listen_addresses.read().clone()
119    }
120
121    /// Check if `address` is supported by one of the enabled transports.
122    pub fn supported_transport(&self, address: &Multiaddr) -> bool {
123        let mut iter = address.iter();
124
125        match iter.next() {
126            Some(Protocol::Ip4(address)) =>
127                if address.is_unspecified() {
128                    return false;
129                },
130            Some(Protocol::Ip6(address)) =>
131                if address.is_unspecified() {
132                    return false;
133                },
134            Some(Protocol::Dns(_)) | Some(Protocol::Dns4(_)) | Some(Protocol::Dns6(_)) => {}
135            _ => return false,
136        }
137
138        match iter.next() {
139            None => false,
140            Some(Protocol::Tcp(_)) => match iter.next() {
141                Some(Protocol::P2p(_)) =>
142                    self.supported_transport.contains(&SupportedTransport::Tcp),
143                #[cfg(feature = "websocket")]
144                Some(Protocol::Ws(_)) =>
145                    self.supported_transport.contains(&SupportedTransport::WebSocket),
146                #[cfg(feature = "websocket")]
147                Some(Protocol::Wss(_)) =>
148                    self.supported_transport.contains(&SupportedTransport::WebSocket),
149                _ => false,
150            },
151            #[cfg(feature = "quic")]
152            Some(Protocol::Udp(_)) => match (
153                iter.next(),
154                self.supported_transport.contains(&SupportedTransport::Quic),
155            ) {
156                (Some(Protocol::QuicV1), true) => true,
157                _ => false,
158            },
159            _ => false,
160        }
161    }
162
163    /// Check if the address is a local listen address and if so, discard it.
164    fn is_local_address(&self, address: &Multiaddr) -> bool {
165        let address: Multiaddr = address
166            .iter()
167            .take_while(|protocol| !std::matches!(protocol, Protocol::P2p(_)))
168            .collect();
169
170        self.listen_addresses.read().contains(&address)
171    }
172
173    /// Add one or more known addresses for peer.
174    ///
175    /// If peer doesn't exist, it will be added to known peers.
176    ///
177    /// Returns the number of added addresses after non-supported transports were filtered out.
178    pub fn add_known_address(
179        &mut self,
180        peer: &PeerId,
181        addresses: impl Iterator<Item = Multiaddr>,
182    ) -> usize {
183        let mut peers = self.peers.write();
184        let addresses = addresses
185            .filter_map(|address| {
186                (self.supported_transport(&address) && !self.is_local_address(&address))
187                    .then_some(AddressRecord::from_multiaddr(address)?)
188            })
189            .collect::<HashSet<_>>();
190
191        // if all of the added addresses belonged to unsupported transports, exit early
192        let num_added = addresses.len();
193        if num_added == 0 {
194            tracing::debug!(
195                target: LOG_TARGET,
196                ?peer,
197                "didn't add any addresses for peer because transport is not supported",
198            );
199
200            return 0usize;
201        }
202
203        tracing::trace!(
204            target: LOG_TARGET,
205            ?peer,
206            ?addresses,
207            "add known addresses",
208        );
209
210        match peers.get_mut(peer) {
211            Some(context) =>
212                for record in addresses {
213                    if !context.addresses.contains(record.address()) {
214                        context.addresses.insert(record);
215                    }
216                },
217            None => {
218                peers.insert(
219                    *peer,
220                    PeerContext {
221                        state: PeerState::Disconnected { dial_record: None },
222                        addresses: AddressStore::from_iter(addresses),
223                        secondary_connection: None,
224                    },
225                );
226            }
227        }
228
229        num_added
230    }
231
232    /// Dial peer using `PeerId`.
233    ///
234    /// Returns an error if the peer is unknown or the peer is already connected.
235    pub fn dial(&self, peer: &PeerId) -> Result<(), ImmediateDialError> {
236        if peer == &self.local_peer_id {
237            return Err(ImmediateDialError::TriedToDialSelf);
238        }
239
240        {
241            match self.peers.read().get(peer) {
242                Some(PeerContext {
243                    state: PeerState::Connected { .. },
244                    ..
245                }) => return Err(ImmediateDialError::AlreadyConnected),
246                Some(PeerContext {
247                    state: PeerState::Disconnected { dial_record },
248                    addresses,
249                    ..
250                }) => {
251                    if addresses.is_empty() {
252                        return Err(ImmediateDialError::NoAddressAvailable);
253                    }
254
255                    // peer is already being dialed, don't dial again until the first dial concluded
256                    if dial_record.is_some() {
257                        tracing::debug!(
258                            target: LOG_TARGET,
259                            ?peer,
260                            ?dial_record,
261                            "peer is aready being dialed",
262                        );
263                        return Ok(());
264                    }
265                }
266                Some(PeerContext {
267                    state: PeerState::Dialing { .. } | PeerState::Opening { .. },
268                    ..
269                }) => return Ok(()),
270                None => return Err(ImmediateDialError::NoAddressAvailable),
271            }
272        }
273
274        self.cmd_tx
275            .try_send(InnerTransportManagerCommand::DialPeer { peer: *peer })
276            .map_err(|error| match error {
277                TrySendError::Full(_) => ImmediateDialError::ChannelClogged,
278                TrySendError::Closed(_) => ImmediateDialError::TaskClosed,
279            })
280    }
281
282    /// Dial peer using `Multiaddr`.
283    ///
284    /// Returns an error if address it not valid.
285    pub fn dial_address(&self, address: Multiaddr) -> Result<(), ImmediateDialError> {
286        if !address.iter().any(|protocol| std::matches!(protocol, Protocol::P2p(_))) {
287            return Err(ImmediateDialError::PeerIdMissing);
288        }
289
290        self.cmd_tx
291            .try_send(InnerTransportManagerCommand::DialAddress { address })
292            .map_err(|error| match error {
293                TrySendError::Full(_) => ImmediateDialError::ChannelClogged,
294                TrySendError::Closed(_) => ImmediateDialError::TaskClosed,
295            })
296    }
297}
298
299// TODO: add getters for these
300pub struct TransportHandle {
301    pub keypair: Keypair,
302    pub tx: Sender<TransportManagerEvent>,
303    pub protocols: HashMap<ProtocolName, ProtocolContext>,
304    pub next_connection_id: Arc<AtomicUsize>,
305    pub next_substream_id: Arc<AtomicUsize>,
306    pub bandwidth_sink: BandwidthSink,
307    pub executor: Arc<dyn Executor>,
308}
309
310impl TransportHandle {
311    pub fn protocol_set(&self, connection_id: ConnectionId) -> ProtocolSet {
312        ProtocolSet::new(
313            connection_id,
314            self.tx.clone(),
315            self.next_substream_id.clone(),
316            self.protocols.clone(),
317        )
318    }
319
320    /// Get next connection ID.
321    pub fn next_connection_id(&mut self) -> ConnectionId {
322        let connection_id = self.next_connection_id.fetch_add(1usize, Ordering::Relaxed);
323
324        ConnectionId::from(connection_id)
325    }
326}
327
328#[cfg(test)]
329mod tests {
330    use super::*;
331    use multihash::Multihash;
332    use parking_lot::lock_api::RwLock;
333    use tokio::sync::mpsc::{channel, Receiver};
334
335    fn make_transport_manager_handle() -> (
336        TransportManagerHandle,
337        Receiver<InnerTransportManagerCommand>,
338    ) {
339        let (cmd_tx, cmd_rx) = channel(64);
340
341        let local_peer_id = PeerId::random();
342        (
343            TransportManagerHandle {
344                local_peer_id,
345                cmd_tx,
346                peers: Default::default(),
347                supported_transport: HashSet::new(),
348                listen_addresses: Default::default(),
349                public_addresses: PublicAddresses::new(local_peer_id),
350            },
351            cmd_rx,
352        )
353    }
354
355    #[tokio::test]
356    async fn tcp_supported() {
357        let (mut handle, _rx) = make_transport_manager_handle();
358        handle.supported_transport.insert(SupportedTransport::Tcp);
359
360        let address =
361            "/dns4/google.com/tcp/24928/p2p/12D3KooWKrUnV42yDR7G6DewmgHtFaVCJWLjQRi2G9t5eJD3BvTy"
362                .parse()
363                .unwrap();
364        assert!(handle.supported_transport(&address));
365    }
366
367    #[cfg(feature = "websocket")]
368    #[tokio::test]
369    async fn websocket_supported() {
370        let (mut handle, _rx) = make_transport_manager_handle();
371        handle.supported_transport.insert(SupportedTransport::WebSocket);
372
373        let address =
374            "/dns4/google.com/tcp/24928/ws/p2p/12D3KooWKrUnV42yDR7G6DewmgHtFaVCJWLjQRi2G9t5eJD3BvTy"
375                .parse()
376                .unwrap();
377        assert!(handle.supported_transport(&address));
378    }
379
380    #[test]
381    fn transport_not_supported() {
382        let (handle, _rx) = make_transport_manager_handle();
383
384        // only peer id (used by Polkadot sometimes)
385        assert!(!handle.supported_transport(
386            &Multiaddr::empty().with(Protocol::P2p(Multihash::from(PeerId::random())))
387        ));
388
389        // only one transport
390        assert!(!handle.supported_transport(
391            &Multiaddr::empty().with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
392        ));
393
394        // any udp-based protocol other than quic
395        assert!(!handle.supported_transport(
396            &Multiaddr::empty()
397                .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
398                .with(Protocol::Udp(8888))
399                .with(Protocol::Utp)
400        ));
401
402        // any other protocol other than tcp
403        assert!(!handle.supported_transport(
404            &Multiaddr::empty()
405                .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
406                .with(Protocol::Sctp(8888))
407        ));
408    }
409
410    #[test]
411    fn zero_addresses_added() {
412        let (mut handle, _rx) = make_transport_manager_handle();
413        handle.supported_transport.insert(SupportedTransport::Tcp);
414
415        assert!(
416            handle.add_known_address(
417                &PeerId::random(),
418                vec![
419                    Multiaddr::empty()
420                        .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
421                        .with(Protocol::Udp(8888))
422                        .with(Protocol::Utp),
423                    Multiaddr::empty()
424                        .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
425                        .with(Protocol::Tcp(8888))
426                        .with(Protocol::Wss(std::borrow::Cow::Owned("/".to_string()))),
427                ]
428                .into_iter()
429            ) == 0usize
430        );
431    }
432
433    #[tokio::test]
434    async fn dial_already_connected_peer() {
435        let (mut handle, _rx) = make_transport_manager_handle();
436        handle.supported_transport.insert(SupportedTransport::Tcp);
437
438        let peer = {
439            let peer = PeerId::random();
440            let mut peers = handle.peers.write();
441
442            peers.insert(
443                peer,
444                PeerContext {
445                    state: PeerState::Connected {
446                        record: AddressRecord::from_multiaddr(
447                            Multiaddr::empty()
448                                .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
449                                .with(Protocol::Tcp(8888))
450                                .with(Protocol::P2p(Multihash::from(peer))),
451                        )
452                        .unwrap(),
453                        dial_record: None,
454                    },
455                    secondary_connection: None,
456                    addresses: AddressStore::from_iter(
457                        vec![Multiaddr::empty()
458                            .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
459                            .with(Protocol::Tcp(8888))
460                            .with(Protocol::P2p(Multihash::from(peer)))]
461                        .into_iter(),
462                    ),
463                },
464            );
465            drop(peers);
466
467            peer
468        };
469
470        match handle.dial(&peer) {
471            Err(ImmediateDialError::AlreadyConnected) => {}
472            _ => panic!("invalid return value"),
473        }
474    }
475
476    #[tokio::test]
477    async fn peer_already_being_dialed() {
478        let (mut handle, _rx) = make_transport_manager_handle();
479        handle.supported_transport.insert(SupportedTransport::Tcp);
480
481        let peer = {
482            let peer = PeerId::random();
483            let mut peers = handle.peers.write();
484
485            peers.insert(
486                peer,
487                PeerContext {
488                    state: PeerState::Dialing {
489                        record: AddressRecord::from_multiaddr(
490                            Multiaddr::empty()
491                                .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
492                                .with(Protocol::Tcp(8888))
493                                .with(Protocol::P2p(Multihash::from(peer))),
494                        )
495                        .unwrap(),
496                    },
497                    secondary_connection: None,
498                    addresses: AddressStore::from_iter(
499                        vec![Multiaddr::empty()
500                            .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
501                            .with(Protocol::Tcp(8888))
502                            .with(Protocol::P2p(Multihash::from(peer)))]
503                        .into_iter(),
504                    ),
505                },
506            );
507            drop(peers);
508
509            peer
510        };
511
512        match handle.dial(&peer) {
513            Ok(()) => {}
514            _ => panic!("invalid return value"),
515        }
516    }
517
518    #[tokio::test]
519    async fn no_address_available_for_peer() {
520        let (mut handle, _rx) = make_transport_manager_handle();
521        handle.supported_transport.insert(SupportedTransport::Tcp);
522
523        let peer = {
524            let peer = PeerId::random();
525            let mut peers = handle.peers.write();
526
527            peers.insert(
528                peer,
529                PeerContext {
530                    state: PeerState::Disconnected { dial_record: None },
531                    secondary_connection: None,
532                    addresses: AddressStore::new(),
533                },
534            );
535            drop(peers);
536
537            peer
538        };
539
540        let err = handle.dial(&peer).unwrap_err();
541        assert!(matches!(err, ImmediateDialError::NoAddressAvailable));
542    }
543
544    #[tokio::test]
545    async fn pending_connection_for_disconnected_peer() {
546        let (mut handle, mut rx) = make_transport_manager_handle();
547        handle.supported_transport.insert(SupportedTransport::Tcp);
548
549        let peer = {
550            let peer = PeerId::random();
551            let mut peers = handle.peers.write();
552
553            peers.insert(
554                peer,
555                PeerContext {
556                    state: PeerState::Disconnected {
557                        dial_record: Some(
558                            AddressRecord::from_multiaddr(
559                                Multiaddr::empty()
560                                    .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
561                                    .with(Protocol::Tcp(8888))
562                                    .with(Protocol::P2p(Multihash::from(peer))),
563                            )
564                            .unwrap(),
565                        ),
566                    },
567                    secondary_connection: None,
568                    addresses: AddressStore::from_iter(
569                        vec![Multiaddr::empty()
570                            .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
571                            .with(Protocol::Tcp(8888))
572                            .with(Protocol::P2p(Multihash::from(peer)))]
573                        .into_iter(),
574                    ),
575                },
576            );
577            drop(peers);
578
579            peer
580        };
581
582        match handle.dial(&peer) {
583            Ok(()) => {}
584            _ => panic!("invalid return value"),
585        }
586        assert!(rx.try_recv().is_err());
587    }
588
589    #[tokio::test]
590    async fn try_to_dial_self() {
591        let (mut handle, mut rx) = make_transport_manager_handle();
592        handle.supported_transport.insert(SupportedTransport::Tcp);
593
594        let err = handle.dial(&handle.local_peer_id).unwrap_err();
595        assert_eq!(err, ImmediateDialError::TriedToDialSelf);
596
597        assert!(rx.try_recv().is_err());
598    }
599
600    #[test]
601    fn is_local_address() {
602        let (cmd_tx, _cmd_rx) = channel(64);
603
604        let local_peer_id = PeerId::random();
605        let first_addr: Multiaddr = "/ip6/::1/tcp/8888".parse().expect("valid multiaddress");
606        let second_addr: Multiaddr = "/ip4/127.0.0.1/tcp/8888".parse().expect("valid multiaddress");
607
608        let listen_addresses = Arc::new(RwLock::new(
609            [first_addr.clone(), second_addr.clone()].iter().cloned().collect(),
610        ));
611        println!("{:?}", listen_addresses);
612
613        let handle = TransportManagerHandle {
614            local_peer_id,
615            cmd_tx,
616            peers: Default::default(),
617            supported_transport: HashSet::new(),
618            listen_addresses,
619            public_addresses: PublicAddresses::new(local_peer_id),
620        };
621
622        // local addresses
623        assert!(handle.is_local_address(
624            &"/ip6/::1/tcp/8888".parse::<Multiaddr>().expect("valid multiaddress")
625        ));
626        assert!(handle
627            .is_local_address(&"/ip4/127.0.0.1/tcp/8888".parse().expect("valid multiaddress")));
628        assert!(handle.is_local_address(
629            &"/ip6/::1/tcp/8888/p2p/12D3KooWT2ouvz5uMmCvHJGzAGRHiqDts5hzXR7NdoQ27pGdzp9Q"
630                .parse()
631                .expect("valid multiaddress")
632        ));
633        assert!(handle.is_local_address(
634            &"/ip4/127.0.0.1/tcp/8888/p2p/12D3KooWT2ouvz5uMmCvHJGzAGRHiqDts5hzXR7NdoQ27pGdzp9Q"
635                .parse()
636                .expect("valid multiaddress")
637        ));
638
639        // same address but different peer id
640        assert!(handle.is_local_address(
641            &"/ip6/::1/tcp/8888/p2p/12D3KooWPGxxxQiBEBZ52RY31Z2chn4xsDrGCMouZ88izJrak2T1"
642                .parse::<Multiaddr>()
643                .expect("valid multiaddress")
644        ));
645        assert!(handle.is_local_address(
646            &"/ip4/127.0.0.1/tcp/8888/p2p/12D3KooWPGxxxQiBEBZ52RY31Z2chn4xsDrGCMouZ88izJrak2T1"
647                .parse()
648                .expect("valid multiaddress")
649        ));
650
651        // different address
652        assert!(!handle
653            .is_local_address(&"/ip4/127.0.0.1/tcp/9999".parse().expect("valid multiaddress")));
654        // different address
655        assert!(!handle
656            .is_local_address(&"/ip4/127.0.0.1/tcp/7777".parse().expect("valid multiaddress")));
657    }
658}