litep2p/
lib.rs

1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21#![allow(clippy::single_match)]
22#![allow(clippy::result_large_err)]
23#![allow(clippy::large_enum_variant)]
24#![allow(clippy::redundant_pattern_matching)]
25#![allow(clippy::type_complexity)]
26#![allow(clippy::result_unit_err)]
27#![allow(clippy::should_implement_trait)]
28#![allow(clippy::too_many_arguments)]
29#![allow(clippy::assign_op_pattern)]
30#![allow(clippy::match_like_matches_macro)]
31
32use crate::{
33    addresses::PublicAddresses,
34    config::Litep2pConfig,
35    error::DialError,
36    protocol::{
37        libp2p::{bitswap::Bitswap, identify::Identify, kademlia::Kademlia, ping::Ping},
38        mdns::Mdns,
39        notification::NotificationProtocol,
40        request_response::RequestResponseProtocol,
41        SubstreamKeepAlive,
42    },
43    transport::{
44        manager::{SupportedTransport, TransportManager, TransportManagerBuilder},
45        tcp::TcpTransport,
46        TransportBuilder, TransportEvent,
47    },
48};
49
50#[cfg(feature = "quic")]
51use crate::transport::quic::QuicTransport;
52#[cfg(feature = "webrtc")]
53use crate::transport::webrtc::WebRtcTransport;
54#[cfg(feature = "websocket")]
55use crate::transport::websocket::WebSocketTransport;
56
57use hickory_resolver::{name_server::TokioConnectionProvider, TokioResolver};
58use multiaddr::{Multiaddr, Protocol};
59use transport::Endpoint;
60use types::ConnectionId;
61
62pub use bandwidth::BandwidthSink;
63pub use error::Error;
64pub use peer_id::{ParseError, PeerId};
65use std::{collections::HashSet, sync::Arc};
66pub use types::protocol::ProtocolName;
67
68pub(crate) mod peer_id;
69
70pub mod addresses;
71pub mod codec;
72pub mod config;
73pub mod crypto;
74pub mod error;
75pub mod executor;
76pub mod protocol;
77pub mod substream;
78pub mod transport;
79pub mod types;
80pub mod yamux;
81
82mod bandwidth;
83mod multistream_select;
84pub mod utils;
85
86#[cfg(test)]
87mod mock;
88
89/// Public result type used by the crate.
90pub type Result<T> = std::result::Result<T, error::Error>;
91
92/// Logging target for the file.
93const LOG_TARGET: &str = "litep2p";
94
95/// Default channel size.
96const DEFAULT_CHANNEL_SIZE: usize = 4096usize;
97
98/// Litep2p events.
99#[derive(Debug)]
100pub enum Litep2pEvent {
101    /// Connection established to peer.
102    ConnectionEstablished {
103        /// Remote peer ID.
104        peer: PeerId,
105
106        /// Endpoint.
107        endpoint: Endpoint,
108    },
109
110    /// Connection closed to remote peer.
111    ConnectionClosed {
112        /// Peer ID.
113        peer: PeerId,
114
115        /// Connection ID.
116        connection_id: ConnectionId,
117    },
118
119    /// Failed to dial peer.
120    ///
121    /// This error can originate from dialing a single peer address.
122    DialFailure {
123        /// Address of the peer.
124        address: Multiaddr,
125
126        /// Dial error.
127        error: DialError,
128    },
129
130    /// A list of multiple dial failures.
131    ListDialFailures {
132        /// List of errors.
133        ///
134        /// Depending on the transport, the address might be different for each error.
135        errors: Vec<(Multiaddr, DialError)>,
136    },
137}
138
139/// [`Litep2p`] object.
140pub struct Litep2p {
141    /// Local peer ID.
142    local_peer_id: PeerId,
143
144    /// Listen addresses.
145    listen_addresses: Vec<Multiaddr>,
146
147    /// Transport manager.
148    transport_manager: TransportManager,
149
150    /// Bandwidth sink.
151    bandwidth_sink: BandwidthSink,
152}
153
154impl Litep2p {
155    /// Create new [`Litep2p`].
156    pub fn new(mut litep2p_config: Litep2pConfig) -> crate::Result<Litep2p> {
157        let local_peer_id = PeerId::from_public_key(&litep2p_config.keypair.public().into());
158        let bandwidth_sink = BandwidthSink::new();
159        let mut listen_addresses = vec![];
160
161        let (resolver_config, resolver_opts) = if litep2p_config.use_system_dns_config {
162            hickory_resolver::system_conf::read_system_conf()
163                .map_err(Error::CannotReadSystemDnsConfig)?
164        } else {
165            (Default::default(), Default::default())
166        };
167        let resolver = Arc::new(
168            TokioResolver::builder_with_config(resolver_config, TokioConnectionProvider::default())
169                .with_options(resolver_opts)
170                .build(),
171        );
172
173        let supported_transports = Self::supported_transports(&litep2p_config);
174        let mut transport_manager = TransportManagerBuilder::new()
175            .with_keypair(litep2p_config.keypair.clone())
176            .with_supported_transports(supported_transports)
177            .with_bandwidth_sink(bandwidth_sink.clone())
178            .with_connection_limits_config(litep2p_config.connection_limits)
179            .build();
180
181        let transport_handle = transport_manager.transport_manager_handle();
182        // add known addresses to `TransportManager`, if any exist
183        if !litep2p_config.known_addresses.is_empty() {
184            for (peer, addresses) in litep2p_config.known_addresses {
185                transport_manager.add_known_address(peer, addresses.iter().cloned());
186            }
187        }
188
189        // start notification protocol event loops
190        for (protocol, config) in litep2p_config.notification_protocols.into_iter() {
191            tracing::debug!(
192                target: LOG_TARGET,
193                ?protocol,
194                "enable notification protocol",
195            );
196
197            let service = transport_manager.register_protocol(
198                protocol,
199                config.fallback_names.clone(),
200                config.codec,
201                litep2p_config.keep_alive_timeout,
202                SubstreamKeepAlive::Yes,
203            );
204            let executor = Arc::clone(&litep2p_config.executor);
205            litep2p_config.executor.run(Box::pin(async move {
206                NotificationProtocol::new(service, config, executor).run().await
207            }));
208        }
209
210        // start request-response protocol event loops
211        for (protocol, config) in litep2p_config.request_response_protocols.into_iter() {
212            tracing::debug!(
213                target: LOG_TARGET,
214                ?protocol,
215                "enable request-response protocol",
216            );
217
218            let service = transport_manager.register_protocol(
219                protocol,
220                config.fallback_names.clone(),
221                config.codec,
222                litep2p_config.keep_alive_timeout,
223                SubstreamKeepAlive::Yes,
224            );
225            litep2p_config.executor.run(Box::pin(async move {
226                RequestResponseProtocol::new(service, config).run().await
227            }));
228        }
229
230        // start user protocol event loops
231        for (protocol_name, protocol) in litep2p_config.user_protocols.into_iter() {
232            tracing::debug!(target: LOG_TARGET, protocol = ?protocol_name, "enable user protocol");
233
234            let service = transport_manager.register_protocol(
235                protocol_name,
236                Vec::new(),
237                protocol.codec(),
238                litep2p_config.keep_alive_timeout,
239                // TODO: make configurable by user.
240                SubstreamKeepAlive::Yes,
241            );
242            litep2p_config.executor.run(Box::pin(async move {
243                let _ = protocol.run(service).await;
244            }));
245        }
246
247        // start ping protocol event loop if enabled
248        if let Some(ping_config) = litep2p_config.ping.take() {
249            tracing::debug!(
250                target: LOG_TARGET,
251                protocol = ?ping_config.protocol,
252                "enable ipfs ping protocol",
253            );
254
255            let service = transport_manager.register_protocol(
256                ping_config.protocol.clone(),
257                Vec::new(),
258                ping_config.codec,
259                litep2p_config.keep_alive_timeout,
260                SubstreamKeepAlive::No,
261            );
262            litep2p_config.executor.run(Box::pin(async move {
263                Ping::new(service, ping_config).run().await
264            }));
265        }
266
267        // start kademlia protocol event loops
268        for kademlia_config in litep2p_config.kademlia.into_iter() {
269            tracing::debug!(
270                target: LOG_TARGET,
271                protocol_names = ?kademlia_config.protocol_names,
272                "enable ipfs kademlia protocol",
273            );
274
275            let main_protocol =
276                kademlia_config.protocol_names.first().expect("protocol name to exist");
277            let fallback_names = kademlia_config.protocol_names.iter().skip(1).cloned().collect();
278
279            let service = transport_manager.register_protocol(
280                main_protocol.clone(),
281                fallback_names,
282                kademlia_config.codec,
283                litep2p_config.keep_alive_timeout,
284                SubstreamKeepAlive::Yes,
285            );
286            litep2p_config.executor.run(Box::pin(async move {
287                let _ = Kademlia::new(service, kademlia_config).run().await;
288            }));
289        }
290
291        // start identify protocol event loop if enabled
292        let mut identify_info = match litep2p_config.identify.take() {
293            None => None,
294            Some(mut identify_config) => {
295                tracing::debug!(
296                    target: LOG_TARGET,
297                    protocol = ?identify_config.protocol,
298                    "enable ipfs identify protocol",
299                );
300
301                let service = transport_manager.register_protocol(
302                    identify_config.protocol.clone(),
303                    Vec::new(),
304                    identify_config.codec,
305                    litep2p_config.keep_alive_timeout,
306                    SubstreamKeepAlive::No,
307                );
308                identify_config.public = Some(litep2p_config.keypair.public().into());
309
310                Some((service, identify_config))
311            }
312        };
313
314        // start bitswap protocol event loop if enabled
315        if let Some(bitswap_config) = litep2p_config.bitswap.take() {
316            tracing::debug!(
317                target: LOG_TARGET,
318                protocol = ?bitswap_config.protocol,
319                "enable ipfs bitswap protocol",
320            );
321
322            let service = transport_manager.register_protocol(
323                bitswap_config.protocol.clone(),
324                Vec::new(),
325                bitswap_config.codec,
326                litep2p_config.keep_alive_timeout,
327                SubstreamKeepAlive::Yes,
328            );
329            litep2p_config.executor.run(Box::pin(async move {
330                Bitswap::new(service, bitswap_config).run().await
331            }));
332        }
333
334        // enable tcp transport if the config exists
335        if let Some(mut config) = litep2p_config.tcp.take() {
336            config.max_parallel_dials = litep2p_config.max_parallel_dials;
337            let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor));
338            let (transport, transport_listen_addresses) =
339                <TcpTransport as TransportBuilder>::new(handle, config, resolver.clone())?;
340
341            for address in transport_listen_addresses {
342                transport_manager.register_listen_address(address.clone());
343                listen_addresses.push(address.with(Protocol::P2p(local_peer_id.into())));
344            }
345
346            transport_manager.register_transport(SupportedTransport::Tcp, Box::new(transport));
347        }
348
349        // enable quic transport if the config exists
350        #[cfg(feature = "quic")]
351        if let Some(config) = litep2p_config.quic.take() {
352            let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor));
353            let (transport, transport_listen_addresses) =
354                <QuicTransport as TransportBuilder>::new(handle, config, resolver.clone())?;
355
356            for address in transport_listen_addresses {
357                transport_manager.register_listen_address(address.clone());
358                listen_addresses.push(address.with(Protocol::P2p(local_peer_id.into())));
359            }
360
361            transport_manager.register_transport(SupportedTransport::Quic, Box::new(transport));
362        }
363
364        // enable webrtc transport if the config exists
365        #[cfg(feature = "webrtc")]
366        if let Some(config) = litep2p_config.webrtc.take() {
367            let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor));
368            let (transport, transport_listen_addresses) =
369                <WebRtcTransport as TransportBuilder>::new(handle, config, resolver.clone())?;
370
371            for address in transport_listen_addresses {
372                transport_manager.register_listen_address(address.clone());
373                listen_addresses.push(address.with(Protocol::P2p(local_peer_id.into())));
374            }
375
376            transport_manager.register_transport(SupportedTransport::WebRtc, Box::new(transport));
377        }
378
379        // enable websocket transport if the config exists
380        #[cfg(feature = "websocket")]
381        if let Some(mut config) = litep2p_config.websocket.take() {
382            config.max_parallel_dials = litep2p_config.max_parallel_dials;
383            let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor));
384            let (transport, transport_listen_addresses) =
385                <WebSocketTransport as TransportBuilder>::new(handle, config, resolver)?;
386
387            for address in transport_listen_addresses {
388                transport_manager.register_listen_address(address.clone());
389                listen_addresses.push(address.with(Protocol::P2p(local_peer_id.into())));
390            }
391
392            transport_manager
393                .register_transport(SupportedTransport::WebSocket, Box::new(transport));
394        }
395
396        // enable mdns if the config exists
397        if let Some(config) = litep2p_config.mdns.take() {
398            let mdns = Mdns::new(transport_handle, config, listen_addresses.clone());
399
400            litep2p_config.executor.run(Box::pin(async move {
401                let _ = mdns.start().await;
402            }));
403        }
404
405        // if identify was enabled, give it the enabled protocols and listen addresses and start it
406        if let Some((service, mut identify_config)) = identify_info.take() {
407            identify_config.protocols = transport_manager.protocols().cloned().collect();
408            let identify = Identify::new(service, identify_config);
409
410            litep2p_config.executor.run(Box::pin(async move {
411                let _ = identify.run().await;
412            }));
413        }
414
415        if transport_manager.installed_transports().count() == 0 {
416            return Err(Error::Other("No transport specified".to_string()));
417        }
418
419        // verify that at least one transport is specified
420        if listen_addresses.is_empty() {
421            tracing::warn!(
422                target: LOG_TARGET,
423                "litep2p started with no listen addresses, cannot accept inbound connections",
424            );
425        }
426
427        Ok(Self {
428            local_peer_id,
429            bandwidth_sink,
430            listen_addresses,
431            transport_manager,
432        })
433    }
434
435    /// Collect supported transports before initializing the transports themselves.
436    ///
437    /// Information of the supported transports is needed to initialize protocols but
438    /// information about protocols must be known to initialize transports so the initialization
439    /// has to be split.
440    fn supported_transports(config: &Litep2pConfig) -> HashSet<SupportedTransport> {
441        let mut supported_transports = HashSet::new();
442
443        config
444            .tcp
445            .is_some()
446            .then(|| supported_transports.insert(SupportedTransport::Tcp));
447        #[cfg(feature = "quic")]
448        config
449            .quic
450            .is_some()
451            .then(|| supported_transports.insert(SupportedTransport::Quic));
452        #[cfg(feature = "websocket")]
453        config
454            .websocket
455            .is_some()
456            .then(|| supported_transports.insert(SupportedTransport::WebSocket));
457        #[cfg(feature = "webrtc")]
458        config
459            .webrtc
460            .is_some()
461            .then(|| supported_transports.insert(SupportedTransport::WebRtc));
462
463        supported_transports
464    }
465
466    /// Get local peer ID.
467    pub fn local_peer_id(&self) -> &PeerId {
468        &self.local_peer_id
469    }
470
471    /// Get the list of public addresses of the node.
472    pub fn public_addresses(&self) -> PublicAddresses {
473        self.transport_manager.public_addresses()
474    }
475
476    /// Get the list of listen addresses of the node.
477    pub fn listen_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
478        self.listen_addresses.iter()
479    }
480
481    /// Get handle to bandwidth sink.
482    pub fn bandwidth_sink(&self) -> BandwidthSink {
483        self.bandwidth_sink.clone()
484    }
485
486    /// Dial peer.
487    pub async fn dial(&mut self, peer: &PeerId) -> crate::Result<()> {
488        self.transport_manager.dial(*peer).await
489    }
490
491    /// Dial address.
492    pub async fn dial_address(&mut self, address: Multiaddr) -> crate::Result<()> {
493        self.transport_manager.dial_address(address).await
494    }
495
496    /// Add one ore more known addresses for peer.
497    ///
498    /// Return value denotes how many addresses were added for the peer.
499    /// Addresses belonging to disabled/unsupported transports will be ignored.
500    pub fn add_known_address(
501        &mut self,
502        peer: PeerId,
503        address: impl Iterator<Item = Multiaddr>,
504    ) -> usize {
505        self.transport_manager.add_known_address(peer, address)
506    }
507
508    /// Poll next event.
509    ///
510    /// This function must be called in order for litep2p to make progress.
511    pub async fn next_event(&mut self) -> Option<Litep2pEvent> {
512        loop {
513            match self.transport_manager.next().await? {
514                TransportEvent::ConnectionEstablished { peer, endpoint, .. } =>
515                    return Some(Litep2pEvent::ConnectionEstablished { peer, endpoint }),
516                TransportEvent::ConnectionClosed {
517                    peer,
518                    connection_id,
519                } =>
520                    return Some(Litep2pEvent::ConnectionClosed {
521                        peer,
522                        connection_id,
523                    }),
524                TransportEvent::DialFailure { address, error, .. } =>
525                    return Some(Litep2pEvent::DialFailure { address, error }),
526
527                TransportEvent::OpenFailure { errors, .. } => {
528                    return Some(Litep2pEvent::ListDialFailures { errors });
529                }
530                _ => {}
531            }
532        }
533    }
534}
535
536#[cfg(test)]
537mod tests {
538    use crate::{
539        config::ConfigBuilder,
540        protocol::{libp2p::ping, notification::Config as NotificationConfig},
541        types::protocol::ProtocolName,
542        Litep2p, Litep2pEvent, PeerId,
543    };
544    use multiaddr::{Multiaddr, Protocol};
545    use std::net::Ipv4Addr;
546
547    #[tokio::test]
548    async fn initialize_litep2p() {
549        let _ = tracing_subscriber::fmt()
550            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
551            .try_init();
552
553        let (config1, _service1) = NotificationConfig::new(
554            ProtocolName::from("/notificaton/1"),
555            1337usize,
556            vec![1, 2, 3, 4],
557            Vec::new(),
558            false,
559            64,
560            64,
561            true,
562        );
563        let (config2, _service2) = NotificationConfig::new(
564            ProtocolName::from("/notificaton/2"),
565            1337usize,
566            vec![1, 2, 3, 4],
567            Vec::new(),
568            false,
569            64,
570            64,
571            true,
572        );
573        let (ping_config, _ping_event_stream) = ping::Config::default();
574
575        let config = ConfigBuilder::new()
576            .with_tcp(Default::default())
577            .with_notification_protocol(config1)
578            .with_notification_protocol(config2)
579            .with_libp2p_ping(ping_config)
580            .build();
581
582        let _litep2p = Litep2p::new(config).unwrap();
583    }
584
585    #[tokio::test]
586    async fn no_transport_given() {
587        let _ = tracing_subscriber::fmt()
588            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
589            .try_init();
590
591        let (config1, _service1) = NotificationConfig::new(
592            ProtocolName::from("/notificaton/1"),
593            1337usize,
594            vec![1, 2, 3, 4],
595            Vec::new(),
596            false,
597            64,
598            64,
599            true,
600        );
601        let (config2, _service2) = NotificationConfig::new(
602            ProtocolName::from("/notificaton/2"),
603            1337usize,
604            vec![1, 2, 3, 4],
605            Vec::new(),
606            false,
607            64,
608            64,
609            true,
610        );
611        let (ping_config, _ping_event_stream) = ping::Config::default();
612
613        let config = ConfigBuilder::new()
614            .with_notification_protocol(config1)
615            .with_notification_protocol(config2)
616            .with_libp2p_ping(ping_config)
617            .build();
618
619        assert!(Litep2p::new(config).is_err());
620    }
621
622    #[tokio::test]
623    async fn dial_same_address_twice() {
624        let _ = tracing_subscriber::fmt()
625            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
626            .try_init();
627
628        let (config1, _service1) = NotificationConfig::new(
629            ProtocolName::from("/notificaton/1"),
630            1337usize,
631            vec![1, 2, 3, 4],
632            Vec::new(),
633            false,
634            64,
635            64,
636            true,
637        );
638        let (config2, _service2) = NotificationConfig::new(
639            ProtocolName::from("/notificaton/2"),
640            1337usize,
641            vec![1, 2, 3, 4],
642            Vec::new(),
643            false,
644            64,
645            64,
646            true,
647        );
648        let (ping_config, _ping_event_stream) = ping::Config::default();
649
650        let config = ConfigBuilder::new()
651            .with_tcp(Default::default())
652            .with_notification_protocol(config1)
653            .with_notification_protocol(config2)
654            .with_libp2p_ping(ping_config)
655            .build();
656
657        let peer = PeerId::random();
658        let address = Multiaddr::empty()
659            .with(Protocol::Ip4(Ipv4Addr::new(255, 254, 253, 252)))
660            .with(Protocol::Tcp(8888))
661            .with(Protocol::P2p(peer.into()));
662
663        let mut litep2p = Litep2p::new(config).unwrap();
664        litep2p.dial_address(address.clone()).await.unwrap();
665        litep2p.dial_address(address.clone()).await.unwrap();
666
667        match litep2p.next_event().await {
668            Some(Litep2pEvent::DialFailure { .. }) => {}
669            _ => panic!("invalid event received"),
670        }
671
672        // verify that the second same dial was ignored and the dial failure is reported only once
673        match tokio::time::timeout(std::time::Duration::from_secs(20), litep2p.next_event()).await {
674            Err(_) => {}
675            _ => panic!("invalid event received"),
676        }
677    }
678}