litep2p/
lib.rs

1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21#![allow(clippy::single_match)]
22#![allow(clippy::result_large_err)]
23#![allow(clippy::redundant_pattern_matching)]
24#![allow(clippy::type_complexity)]
25#![allow(clippy::result_unit_err)]
26#![allow(clippy::should_implement_trait)]
27#![allow(clippy::too_many_arguments)]
28#![allow(clippy::assign_op_pattern)]
29#![allow(clippy::match_like_matches_macro)]
30
31use crate::{
32    addresses::PublicAddresses,
33    config::Litep2pConfig,
34    error::DialError,
35    protocol::{
36        libp2p::{bitswap::Bitswap, identify::Identify, kademlia::Kademlia, ping::Ping},
37        mdns::Mdns,
38        notification::NotificationProtocol,
39        request_response::RequestResponseProtocol,
40    },
41    transport::{
42        manager::{SupportedTransport, TransportManager},
43        tcp::TcpTransport,
44        TransportBuilder, TransportEvent,
45    },
46};
47
48#[cfg(feature = "quic")]
49use crate::transport::quic::QuicTransport;
50#[cfg(feature = "webrtc")]
51use crate::transport::webrtc::WebRtcTransport;
52#[cfg(feature = "websocket")]
53use crate::transport::websocket::WebSocketTransport;
54
55use multiaddr::{Multiaddr, Protocol};
56use multihash::Multihash;
57use transport::Endpoint;
58use types::ConnectionId;
59
60use std::{collections::HashSet, sync::Arc};
61
62pub use bandwidth::BandwidthSink;
63pub use error::Error;
64pub use peer_id::PeerId;
65pub use types::protocol::ProtocolName;
66
67pub(crate) mod peer_id;
68
69pub mod addresses;
70pub mod codec;
71pub mod config;
72pub mod crypto;
73pub mod error;
74pub mod executor;
75pub mod protocol;
76pub mod substream;
77pub mod transport;
78pub mod types;
79pub mod yamux;
80
81mod bandwidth;
82mod multistream_select;
83
84#[cfg(test)]
85mod mock;
86
87/// Public result type used by the crate.
88pub type Result<T> = std::result::Result<T, error::Error>;
89
90/// Logging target for the file.
91const LOG_TARGET: &str = "litep2p";
92
93/// Default channel size.
94const DEFAULT_CHANNEL_SIZE: usize = 4096usize;
95
96/// Litep2p events.
97#[derive(Debug)]
98pub enum Litep2pEvent {
99    /// Connection established to peer.
100    ConnectionEstablished {
101        /// Remote peer ID.
102        peer: PeerId,
103
104        /// Endpoint.
105        endpoint: Endpoint,
106    },
107
108    /// Connection closed to remote peer.
109    ConnectionClosed {
110        /// Peer ID.
111        peer: PeerId,
112
113        /// Connection ID.
114        connection_id: ConnectionId,
115    },
116
117    /// Failed to dial peer.
118    ///
119    /// This error can originate from dialing a single peer address.
120    DialFailure {
121        /// Address of the peer.
122        address: Multiaddr,
123
124        /// Dial error.
125        error: DialError,
126    },
127
128    /// A list of multiple dial failures.
129    ListDialFailures {
130        /// List of errors.
131        ///
132        /// Depending on the transport, the address might be different for each error.
133        errors: Vec<(Multiaddr, DialError)>,
134    },
135}
136
137/// [`Litep2p`] object.
138pub struct Litep2p {
139    /// Local peer ID.
140    local_peer_id: PeerId,
141
142    /// Listen addresses.
143    listen_addresses: Vec<Multiaddr>,
144
145    /// Transport manager.
146    transport_manager: TransportManager,
147
148    /// Bandwidth sink.
149    bandwidth_sink: BandwidthSink,
150}
151
152impl Litep2p {
153    /// Create new [`Litep2p`].
154    pub fn new(mut litep2p_config: Litep2pConfig) -> crate::Result<Litep2p> {
155        let local_peer_id = PeerId::from_public_key(&litep2p_config.keypair.public().into());
156        let bandwidth_sink = BandwidthSink::new();
157        let mut listen_addresses = vec![];
158
159        let supported_transports = Self::supported_transports(&litep2p_config);
160        let (mut transport_manager, transport_handle) = TransportManager::new(
161            litep2p_config.keypair.clone(),
162            supported_transports,
163            bandwidth_sink.clone(),
164            litep2p_config.max_parallel_dials,
165            litep2p_config.connection_limits,
166        );
167
168        // add known addresses to `TransportManager`, if any exist
169        if !litep2p_config.known_addresses.is_empty() {
170            for (peer, addresses) in litep2p_config.known_addresses {
171                transport_manager.add_known_address(peer, addresses.iter().cloned());
172            }
173        }
174
175        // start notification protocol event loops
176        for (protocol, config) in litep2p_config.notification_protocols.into_iter() {
177            tracing::debug!(
178                target: LOG_TARGET,
179                ?protocol,
180                "enable notification protocol",
181            );
182
183            let service = transport_manager.register_protocol(
184                protocol,
185                config.fallback_names.clone(),
186                config.codec,
187                litep2p_config.keep_alive_timeout,
188            );
189            let executor = Arc::clone(&litep2p_config.executor);
190            litep2p_config.executor.run(Box::pin(async move {
191                NotificationProtocol::new(service, config, executor).run().await
192            }));
193        }
194
195        // start request-response protocol event loops
196        for (protocol, config) in litep2p_config.request_response_protocols.into_iter() {
197            tracing::debug!(
198                target: LOG_TARGET,
199                ?protocol,
200                "enable request-response protocol",
201            );
202
203            let service = transport_manager.register_protocol(
204                protocol,
205                config.fallback_names.clone(),
206                config.codec,
207                litep2p_config.keep_alive_timeout,
208            );
209            litep2p_config.executor.run(Box::pin(async move {
210                RequestResponseProtocol::new(service, config).run().await
211            }));
212        }
213
214        // start user protocol event loops
215        for (protocol_name, protocol) in litep2p_config.user_protocols.into_iter() {
216            tracing::debug!(target: LOG_TARGET, protocol = ?protocol_name, "enable user protocol");
217
218            let service = transport_manager.register_protocol(
219                protocol_name,
220                Vec::new(),
221                protocol.codec(),
222                litep2p_config.keep_alive_timeout,
223            );
224            litep2p_config.executor.run(Box::pin(async move {
225                let _ = protocol.run(service).await;
226            }));
227        }
228
229        // start ping protocol event loop if enabled
230        if let Some(ping_config) = litep2p_config.ping.take() {
231            tracing::debug!(
232                target: LOG_TARGET,
233                protocol = ?ping_config.protocol,
234                "enable ipfs ping protocol",
235            );
236
237            let service = transport_manager.register_protocol(
238                ping_config.protocol.clone(),
239                Vec::new(),
240                ping_config.codec,
241                litep2p_config.keep_alive_timeout,
242            );
243            litep2p_config.executor.run(Box::pin(async move {
244                Ping::new(service, ping_config).run().await
245            }));
246        }
247
248        // start kademlia protocol event loop if enabled
249        if let Some(kademlia_config) = litep2p_config.kademlia.take() {
250            tracing::debug!(
251                target: LOG_TARGET,
252                protocol_names = ?kademlia_config.protocol_names,
253                "enable ipfs kademlia protocol",
254            );
255
256            let main_protocol =
257                kademlia_config.protocol_names.first().expect("protocol name to exist");
258            let fallback_names = kademlia_config.protocol_names.iter().skip(1).cloned().collect();
259
260            let service = transport_manager.register_protocol(
261                main_protocol.clone(),
262                fallback_names,
263                kademlia_config.codec,
264                litep2p_config.keep_alive_timeout,
265            );
266            litep2p_config.executor.run(Box::pin(async move {
267                let _ = Kademlia::new(service, kademlia_config).run().await;
268            }));
269        }
270
271        // start identify protocol event loop if enabled
272        let mut identify_info = match litep2p_config.identify.take() {
273            None => None,
274            Some(mut identify_config) => {
275                tracing::debug!(
276                    target: LOG_TARGET,
277                    protocol = ?identify_config.protocol,
278                    "enable ipfs identify protocol",
279                );
280
281                let service = transport_manager.register_protocol(
282                    identify_config.protocol.clone(),
283                    Vec::new(),
284                    identify_config.codec,
285                    litep2p_config.keep_alive_timeout,
286                );
287                identify_config.public = Some(litep2p_config.keypair.public().into());
288
289                Some((service, identify_config))
290            }
291        };
292
293        // start bitswap protocol event loop if enabled
294        if let Some(bitswap_config) = litep2p_config.bitswap.take() {
295            tracing::debug!(
296                target: LOG_TARGET,
297                protocol = ?bitswap_config.protocol,
298                "enable ipfs bitswap protocol",
299            );
300
301            let service = transport_manager.register_protocol(
302                bitswap_config.protocol.clone(),
303                Vec::new(),
304                bitswap_config.codec,
305                litep2p_config.keep_alive_timeout,
306            );
307            litep2p_config.executor.run(Box::pin(async move {
308                Bitswap::new(service, bitswap_config).run().await
309            }));
310        }
311
312        // enable tcp transport if the config exists
313        if let Some(config) = litep2p_config.tcp.take() {
314            let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor));
315            let (transport, transport_listen_addresses) =
316                <TcpTransport as TransportBuilder>::new(handle, config)?;
317
318            for address in transport_listen_addresses {
319                transport_manager.register_listen_address(address.clone());
320                listen_addresses.push(address.with(Protocol::P2p(
321                    Multihash::from_bytes(&local_peer_id.to_bytes()).unwrap(),
322                )));
323            }
324
325            transport_manager.register_transport(SupportedTransport::Tcp, Box::new(transport));
326        }
327
328        // enable quic transport if the config exists
329        #[cfg(feature = "quic")]
330        if let Some(config) = litep2p_config.quic.take() {
331            let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor));
332            let (transport, transport_listen_addresses) =
333                <QuicTransport as TransportBuilder>::new(handle, config)?;
334
335            for address in transport_listen_addresses {
336                transport_manager.register_listen_address(address.clone());
337                listen_addresses.push(address.with(Protocol::P2p(
338                    Multihash::from_bytes(&local_peer_id.to_bytes()).unwrap(),
339                )));
340            }
341
342            transport_manager.register_transport(SupportedTransport::Quic, Box::new(transport));
343        }
344
345        // enable webrtc transport if the config exists
346        #[cfg(feature = "webrtc")]
347        if let Some(config) = litep2p_config.webrtc.take() {
348            let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor));
349            let (transport, transport_listen_addresses) =
350                <WebRtcTransport as TransportBuilder>::new(handle, config)?;
351
352            for address in transport_listen_addresses {
353                transport_manager.register_listen_address(address.clone());
354                listen_addresses.push(address.with(Protocol::P2p(
355                    Multihash::from_bytes(&local_peer_id.to_bytes()).unwrap(),
356                )));
357            }
358
359            transport_manager.register_transport(SupportedTransport::WebRtc, Box::new(transport));
360        }
361
362        // enable websocket transport if the config exists
363        #[cfg(feature = "websocket")]
364        if let Some(config) = litep2p_config.websocket.take() {
365            let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor));
366            let (transport, transport_listen_addresses) =
367                <WebSocketTransport as TransportBuilder>::new(handle, config)?;
368
369            for address in transport_listen_addresses {
370                transport_manager.register_listen_address(address.clone());
371                listen_addresses.push(address.with(Protocol::P2p(
372                    Multihash::from_bytes(&local_peer_id.to_bytes()).unwrap(),
373                )));
374            }
375
376            transport_manager
377                .register_transport(SupportedTransport::WebSocket, Box::new(transport));
378        }
379
380        // enable mdns if the config exists
381        if let Some(config) = litep2p_config.mdns.take() {
382            let mdns = Mdns::new(transport_handle, config, listen_addresses.clone())?;
383
384            litep2p_config.executor.run(Box::pin(async move {
385                let _ = mdns.start().await;
386            }));
387        }
388
389        // if identify was enabled, give it the enabled protocols and listen addresses and start it
390        if let Some((service, mut identify_config)) = identify_info.take() {
391            identify_config.protocols = transport_manager.protocols().cloned().collect();
392            let identify = Identify::new(service, identify_config);
393
394            litep2p_config.executor.run(Box::pin(async move {
395                let _ = identify.run().await;
396            }));
397        }
398
399        if transport_manager.installed_transports().count() == 0 {
400            return Err(Error::Other("No transport specified".to_string()));
401        }
402
403        // verify that at least one transport is specified
404        if listen_addresses.is_empty() {
405            tracing::warn!(
406                target: LOG_TARGET,
407                "litep2p started with no listen addresses, cannot accept inbound connections",
408            );
409        }
410
411        Ok(Self {
412            local_peer_id,
413            bandwidth_sink,
414            listen_addresses,
415            transport_manager,
416        })
417    }
418
419    /// Collect supported transports before initializing the transports themselves.
420    ///
421    /// Information of the supported transports is needed to initialize protocols but
422    /// information about protocols must be known to initialize transports so the initialization
423    /// has to be split.
424    fn supported_transports(config: &Litep2pConfig) -> HashSet<SupportedTransport> {
425        let mut supported_transports = HashSet::new();
426
427        config
428            .tcp
429            .is_some()
430            .then(|| supported_transports.insert(SupportedTransport::Tcp));
431        #[cfg(feature = "quic")]
432        config
433            .quic
434            .is_some()
435            .then(|| supported_transports.insert(SupportedTransport::Quic));
436        #[cfg(feature = "websocket")]
437        config
438            .websocket
439            .is_some()
440            .then(|| supported_transports.insert(SupportedTransport::WebSocket));
441        #[cfg(feature = "webrtc")]
442        config
443            .webrtc
444            .is_some()
445            .then(|| supported_transports.insert(SupportedTransport::WebRtc));
446
447        supported_transports
448    }
449
450    /// Get local peer ID.
451    pub fn local_peer_id(&self) -> &PeerId {
452        &self.local_peer_id
453    }
454
455    /// Get the list of public addresses of the node.
456    pub fn public_addresses(&self) -> PublicAddresses {
457        self.transport_manager.public_addresses()
458    }
459
460    /// Get the list of listen addresses of the node.
461    pub fn listen_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
462        self.listen_addresses.iter()
463    }
464
465    /// Get handle to bandwidth sink.
466    pub fn bandwidth_sink(&self) -> BandwidthSink {
467        self.bandwidth_sink.clone()
468    }
469
470    /// Dial peer.
471    pub async fn dial(&mut self, peer: &PeerId) -> crate::Result<()> {
472        self.transport_manager.dial(*peer).await
473    }
474
475    /// Dial address.
476    pub async fn dial_address(&mut self, address: Multiaddr) -> crate::Result<()> {
477        self.transport_manager.dial_address(address).await
478    }
479
480    /// Add one ore more known addresses for peer.
481    ///
482    /// Return value denotes how many addresses were added for the peer.
483    /// Addresses belonging to disabled/unsupported transports will be ignored.
484    pub fn add_known_address(
485        &mut self,
486        peer: PeerId,
487        address: impl Iterator<Item = Multiaddr>,
488    ) -> usize {
489        self.transport_manager.add_known_address(peer, address)
490    }
491
492    /// Poll next event.
493    ///
494    /// This function must be called in order for litep2p to make progress.
495    pub async fn next_event(&mut self) -> Option<Litep2pEvent> {
496        loop {
497            match self.transport_manager.next().await? {
498                TransportEvent::ConnectionEstablished { peer, endpoint, .. } =>
499                    return Some(Litep2pEvent::ConnectionEstablished { peer, endpoint }),
500                TransportEvent::ConnectionClosed {
501                    peer,
502                    connection_id,
503                } =>
504                    return Some(Litep2pEvent::ConnectionClosed {
505                        peer,
506                        connection_id,
507                    }),
508                TransportEvent::DialFailure { address, error, .. } =>
509                    return Some(Litep2pEvent::DialFailure { address, error }),
510
511                TransportEvent::OpenFailure { errors, .. } => {
512                    return Some(Litep2pEvent::ListDialFailures { errors });
513                }
514                _ => {}
515            }
516        }
517    }
518}
519
520#[cfg(test)]
521mod tests {
522    use crate::{
523        config::ConfigBuilder,
524        protocol::{libp2p::ping, notification::Config as NotificationConfig},
525        types::protocol::ProtocolName,
526        Litep2p, Litep2pEvent, PeerId,
527    };
528    use multiaddr::{Multiaddr, Protocol};
529    use multihash::Multihash;
530    use std::net::Ipv4Addr;
531
532    #[tokio::test]
533    async fn initialize_litep2p() {
534        let _ = tracing_subscriber::fmt()
535            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
536            .try_init();
537
538        let (config1, _service1) = NotificationConfig::new(
539            ProtocolName::from("/notificaton/1"),
540            1337usize,
541            vec![1, 2, 3, 4],
542            Vec::new(),
543            false,
544            64,
545            64,
546            true,
547        );
548        let (config2, _service2) = NotificationConfig::new(
549            ProtocolName::from("/notificaton/2"),
550            1337usize,
551            vec![1, 2, 3, 4],
552            Vec::new(),
553            false,
554            64,
555            64,
556            true,
557        );
558        let (ping_config, _ping_event_stream) = ping::Config::default();
559
560        let config = ConfigBuilder::new()
561            .with_tcp(Default::default())
562            .with_notification_protocol(config1)
563            .with_notification_protocol(config2)
564            .with_libp2p_ping(ping_config)
565            .build();
566
567        let _litep2p = Litep2p::new(config).unwrap();
568    }
569
570    #[tokio::test]
571    async fn no_transport_given() {
572        let _ = tracing_subscriber::fmt()
573            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
574            .try_init();
575
576        let (config1, _service1) = NotificationConfig::new(
577            ProtocolName::from("/notificaton/1"),
578            1337usize,
579            vec![1, 2, 3, 4],
580            Vec::new(),
581            false,
582            64,
583            64,
584            true,
585        );
586        let (config2, _service2) = NotificationConfig::new(
587            ProtocolName::from("/notificaton/2"),
588            1337usize,
589            vec![1, 2, 3, 4],
590            Vec::new(),
591            false,
592            64,
593            64,
594            true,
595        );
596        let (ping_config, _ping_event_stream) = ping::Config::default();
597
598        let config = ConfigBuilder::new()
599            .with_notification_protocol(config1)
600            .with_notification_protocol(config2)
601            .with_libp2p_ping(ping_config)
602            .build();
603
604        assert!(Litep2p::new(config).is_err());
605    }
606
607    #[tokio::test]
608    async fn dial_same_address_twice() {
609        let _ = tracing_subscriber::fmt()
610            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
611            .try_init();
612
613        let (config1, _service1) = NotificationConfig::new(
614            ProtocolName::from("/notificaton/1"),
615            1337usize,
616            vec![1, 2, 3, 4],
617            Vec::new(),
618            false,
619            64,
620            64,
621            true,
622        );
623        let (config2, _service2) = NotificationConfig::new(
624            ProtocolName::from("/notificaton/2"),
625            1337usize,
626            vec![1, 2, 3, 4],
627            Vec::new(),
628            false,
629            64,
630            64,
631            true,
632        );
633        let (ping_config, _ping_event_stream) = ping::Config::default();
634
635        let config = ConfigBuilder::new()
636            .with_tcp(Default::default())
637            .with_notification_protocol(config1)
638            .with_notification_protocol(config2)
639            .with_libp2p_ping(ping_config)
640            .build();
641
642        let peer = PeerId::random();
643        let address = Multiaddr::empty()
644            .with(Protocol::Ip4(Ipv4Addr::new(255, 254, 253, 252)))
645            .with(Protocol::Tcp(8888))
646            .with(Protocol::P2p(
647                Multihash::from_bytes(&peer.to_bytes()).unwrap(),
648            ));
649
650        let mut litep2p = Litep2p::new(config).unwrap();
651        litep2p.dial_address(address.clone()).await.unwrap();
652        litep2p.dial_address(address.clone()).await.unwrap();
653
654        match litep2p.next_event().await {
655            Some(Litep2pEvent::DialFailure { .. }) => {}
656            _ => panic!("invalid event received"),
657        }
658
659        // verify that the second same dial was ignored and the dial failure is reported only once
660        match tokio::time::timeout(std::time::Duration::from_secs(20), litep2p.next_event()).await {
661            Err(_) => {}
662            _ => panic!("invalid event received"),
663        }
664    }
665}