litep2p/
lib.rs

1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21#![allow(clippy::single_match)]
22#![allow(clippy::result_large_err)]
23#![allow(clippy::large_enum_variant)]
24#![allow(clippy::redundant_pattern_matching)]
25#![allow(clippy::type_complexity)]
26#![allow(clippy::result_unit_err)]
27#![allow(clippy::should_implement_trait)]
28#![allow(clippy::too_many_arguments)]
29#![allow(clippy::assign_op_pattern)]
30#![allow(clippy::match_like_matches_macro)]
31
32use crate::{
33    addresses::PublicAddresses,
34    config::Litep2pConfig,
35    error::DialError,
36    protocol::{
37        libp2p::{bitswap::Bitswap, identify::Identify, kademlia::Kademlia, ping::Ping},
38        mdns::Mdns,
39        notification::NotificationProtocol,
40        request_response::RequestResponseProtocol,
41    },
42    transport::{
43        manager::{SupportedTransport, TransportManager, TransportManagerBuilder},
44        tcp::TcpTransport,
45        TransportBuilder, TransportEvent,
46    },
47};
48
49#[cfg(feature = "quic")]
50use crate::transport::quic::QuicTransport;
51#[cfg(feature = "webrtc")]
52use crate::transport::webrtc::WebRtcTransport;
53#[cfg(feature = "websocket")]
54use crate::transport::websocket::WebSocketTransport;
55
56use hickory_resolver::{name_server::TokioConnectionProvider, TokioResolver};
57use multiaddr::{Multiaddr, Protocol};
58use transport::Endpoint;
59use types::ConnectionId;
60
61pub use bandwidth::BandwidthSink;
62pub use error::Error;
63pub use peer_id::PeerId;
64use std::{collections::HashSet, sync::Arc};
65pub use types::protocol::ProtocolName;
66
67pub(crate) mod peer_id;
68
69pub mod addresses;
70pub mod codec;
71pub mod config;
72pub mod crypto;
73pub mod error;
74pub mod executor;
75pub mod protocol;
76pub mod substream;
77pub mod transport;
78pub mod types;
79pub mod yamux;
80
81mod bandwidth;
82mod multistream_select;
83pub mod utils;
84
85#[cfg(test)]
86mod mock;
87
88/// Public result type used by the crate.
89pub type Result<T> = std::result::Result<T, error::Error>;
90
91/// Logging target for the file.
92const LOG_TARGET: &str = "litep2p";
93
94/// Default channel size.
95const DEFAULT_CHANNEL_SIZE: usize = 4096usize;
96
97/// Litep2p events.
98#[derive(Debug)]
99pub enum Litep2pEvent {
100    /// Connection established to peer.
101    ConnectionEstablished {
102        /// Remote peer ID.
103        peer: PeerId,
104
105        /// Endpoint.
106        endpoint: Endpoint,
107    },
108
109    /// Connection closed to remote peer.
110    ConnectionClosed {
111        /// Peer ID.
112        peer: PeerId,
113
114        /// Connection ID.
115        connection_id: ConnectionId,
116    },
117
118    /// Failed to dial peer.
119    ///
120    /// This error can originate from dialing a single peer address.
121    DialFailure {
122        /// Address of the peer.
123        address: Multiaddr,
124
125        /// Dial error.
126        error: DialError,
127    },
128
129    /// A list of multiple dial failures.
130    ListDialFailures {
131        /// List of errors.
132        ///
133        /// Depending on the transport, the address might be different for each error.
134        errors: Vec<(Multiaddr, DialError)>,
135    },
136}
137
138/// [`Litep2p`] object.
139pub struct Litep2p {
140    /// Local peer ID.
141    local_peer_id: PeerId,
142
143    /// Listen addresses.
144    listen_addresses: Vec<Multiaddr>,
145
146    /// Transport manager.
147    transport_manager: TransportManager,
148
149    /// Bandwidth sink.
150    bandwidth_sink: BandwidthSink,
151}
152
153impl Litep2p {
154    /// Create new [`Litep2p`].
155    pub fn new(mut litep2p_config: Litep2pConfig) -> crate::Result<Litep2p> {
156        let local_peer_id = PeerId::from_public_key(&litep2p_config.keypair.public().into());
157        let bandwidth_sink = BandwidthSink::new();
158        let mut listen_addresses = vec![];
159
160        let (resolver_config, resolver_opts) = if litep2p_config.use_system_dns_config {
161            hickory_resolver::system_conf::read_system_conf()
162                .map_err(Error::CannotReadSystemDnsConfig)?
163        } else {
164            (Default::default(), Default::default())
165        };
166        let resolver = Arc::new(
167            TokioResolver::builder_with_config(resolver_config, TokioConnectionProvider::default())
168                .with_options(resolver_opts)
169                .build(),
170        );
171
172        let supported_transports = Self::supported_transports(&litep2p_config);
173        let mut transport_manager = TransportManagerBuilder::new()
174            .with_keypair(litep2p_config.keypair.clone())
175            .with_supported_transports(supported_transports)
176            .with_bandwidth_sink(bandwidth_sink.clone())
177            .with_max_parallel_dials(litep2p_config.max_parallel_dials)
178            .with_connection_limits_config(litep2p_config.connection_limits)
179            .build();
180
181        let transport_handle = transport_manager.transport_manager_handle();
182        // add known addresses to `TransportManager`, if any exist
183        if !litep2p_config.known_addresses.is_empty() {
184            for (peer, addresses) in litep2p_config.known_addresses {
185                transport_manager.add_known_address(peer, addresses.iter().cloned());
186            }
187        }
188
189        // start notification protocol event loops
190        for (protocol, config) in litep2p_config.notification_protocols.into_iter() {
191            tracing::debug!(
192                target: LOG_TARGET,
193                ?protocol,
194                "enable notification protocol",
195            );
196
197            let service = transport_manager.register_protocol(
198                protocol,
199                config.fallback_names.clone(),
200                config.codec,
201                litep2p_config.keep_alive_timeout,
202            );
203            let executor = Arc::clone(&litep2p_config.executor);
204            litep2p_config.executor.run(Box::pin(async move {
205                NotificationProtocol::new(service, config, executor).run().await
206            }));
207        }
208
209        // start request-response protocol event loops
210        for (protocol, config) in litep2p_config.request_response_protocols.into_iter() {
211            tracing::debug!(
212                target: LOG_TARGET,
213                ?protocol,
214                "enable request-response protocol",
215            );
216
217            let service = transport_manager.register_protocol(
218                protocol,
219                config.fallback_names.clone(),
220                config.codec,
221                litep2p_config.keep_alive_timeout,
222            );
223            litep2p_config.executor.run(Box::pin(async move {
224                RequestResponseProtocol::new(service, config).run().await
225            }));
226        }
227
228        // start user protocol event loops
229        for (protocol_name, protocol) in litep2p_config.user_protocols.into_iter() {
230            tracing::debug!(target: LOG_TARGET, protocol = ?protocol_name, "enable user protocol");
231
232            let service = transport_manager.register_protocol(
233                protocol_name,
234                Vec::new(),
235                protocol.codec(),
236                litep2p_config.keep_alive_timeout,
237            );
238            litep2p_config.executor.run(Box::pin(async move {
239                let _ = protocol.run(service).await;
240            }));
241        }
242
243        // start ping protocol event loop if enabled
244        if let Some(ping_config) = litep2p_config.ping.take() {
245            tracing::debug!(
246                target: LOG_TARGET,
247                protocol = ?ping_config.protocol,
248                "enable ipfs ping protocol",
249            );
250
251            let service = transport_manager.register_protocol(
252                ping_config.protocol.clone(),
253                Vec::new(),
254                ping_config.codec,
255                litep2p_config.keep_alive_timeout,
256            );
257            litep2p_config.executor.run(Box::pin(async move {
258                Ping::new(service, ping_config).run().await
259            }));
260        }
261
262        // start kademlia protocol event loops
263        for kademlia_config in litep2p_config.kademlia.into_iter() {
264            tracing::debug!(
265                target: LOG_TARGET,
266                protocol_names = ?kademlia_config.protocol_names,
267                "enable ipfs kademlia protocol",
268            );
269
270            let main_protocol =
271                kademlia_config.protocol_names.first().expect("protocol name to exist");
272            let fallback_names = kademlia_config.protocol_names.iter().skip(1).cloned().collect();
273
274            let service = transport_manager.register_protocol(
275                main_protocol.clone(),
276                fallback_names,
277                kademlia_config.codec,
278                litep2p_config.keep_alive_timeout,
279            );
280            litep2p_config.executor.run(Box::pin(async move {
281                let _ = Kademlia::new(service, kademlia_config).run().await;
282            }));
283        }
284
285        // start identify protocol event loop if enabled
286        let mut identify_info = match litep2p_config.identify.take() {
287            None => None,
288            Some(mut identify_config) => {
289                tracing::debug!(
290                    target: LOG_TARGET,
291                    protocol = ?identify_config.protocol,
292                    "enable ipfs identify protocol",
293                );
294
295                let service = transport_manager.register_protocol(
296                    identify_config.protocol.clone(),
297                    Vec::new(),
298                    identify_config.codec,
299                    litep2p_config.keep_alive_timeout,
300                );
301                identify_config.public = Some(litep2p_config.keypair.public().into());
302
303                Some((service, identify_config))
304            }
305        };
306
307        // start bitswap protocol event loop if enabled
308        if let Some(bitswap_config) = litep2p_config.bitswap.take() {
309            tracing::debug!(
310                target: LOG_TARGET,
311                protocol = ?bitswap_config.protocol,
312                "enable ipfs bitswap protocol",
313            );
314
315            let service = transport_manager.register_protocol(
316                bitswap_config.protocol.clone(),
317                Vec::new(),
318                bitswap_config.codec,
319                litep2p_config.keep_alive_timeout,
320            );
321            litep2p_config.executor.run(Box::pin(async move {
322                Bitswap::new(service, bitswap_config).run().await
323            }));
324        }
325
326        // enable tcp transport if the config exists
327        if let Some(config) = litep2p_config.tcp.take() {
328            let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor));
329            let (transport, transport_listen_addresses) =
330                <TcpTransport as TransportBuilder>::new(handle, config, resolver.clone())?;
331
332            for address in transport_listen_addresses {
333                transport_manager.register_listen_address(address.clone());
334                listen_addresses.push(address.with(Protocol::P2p(*local_peer_id.as_ref())));
335            }
336
337            transport_manager.register_transport(SupportedTransport::Tcp, Box::new(transport));
338        }
339
340        // enable quic transport if the config exists
341        #[cfg(feature = "quic")]
342        if let Some(config) = litep2p_config.quic.take() {
343            let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor));
344            let (transport, transport_listen_addresses) =
345                <QuicTransport as TransportBuilder>::new(handle, config, resolver.clone())?;
346
347            for address in transport_listen_addresses {
348                transport_manager.register_listen_address(address.clone());
349                listen_addresses.push(address.with(Protocol::P2p(*local_peer_id.as_ref())));
350            }
351
352            transport_manager.register_transport(SupportedTransport::Quic, Box::new(transport));
353        }
354
355        // enable webrtc transport if the config exists
356        #[cfg(feature = "webrtc")]
357        if let Some(config) = litep2p_config.webrtc.take() {
358            let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor));
359            let (transport, transport_listen_addresses) =
360                <WebRtcTransport as TransportBuilder>::new(handle, config, resolver.clone())?;
361
362            for address in transport_listen_addresses {
363                transport_manager.register_listen_address(address.clone());
364                listen_addresses.push(address.with(Protocol::P2p(*local_peer_id.as_ref())));
365            }
366
367            transport_manager.register_transport(SupportedTransport::WebRtc, Box::new(transport));
368        }
369
370        // enable websocket transport if the config exists
371        #[cfg(feature = "websocket")]
372        if let Some(config) = litep2p_config.websocket.take() {
373            let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor));
374            let (transport, transport_listen_addresses) =
375                <WebSocketTransport as TransportBuilder>::new(handle, config, resolver)?;
376
377            for address in transport_listen_addresses {
378                transport_manager.register_listen_address(address.clone());
379                listen_addresses.push(address.with(Protocol::P2p(*local_peer_id.as_ref())));
380            }
381
382            transport_manager
383                .register_transport(SupportedTransport::WebSocket, Box::new(transport));
384        }
385
386        // enable mdns if the config exists
387        if let Some(config) = litep2p_config.mdns.take() {
388            let mdns = Mdns::new(transport_handle, config, listen_addresses.clone());
389
390            litep2p_config.executor.run(Box::pin(async move {
391                let _ = mdns.start().await;
392            }));
393        }
394
395        // if identify was enabled, give it the enabled protocols and listen addresses and start it
396        if let Some((service, mut identify_config)) = identify_info.take() {
397            identify_config.protocols = transport_manager.protocols().cloned().collect();
398            let identify = Identify::new(service, identify_config);
399
400            litep2p_config.executor.run(Box::pin(async move {
401                let _ = identify.run().await;
402            }));
403        }
404
405        if transport_manager.installed_transports().count() == 0 {
406            return Err(Error::Other("No transport specified".to_string()));
407        }
408
409        // verify that at least one transport is specified
410        if listen_addresses.is_empty() {
411            tracing::warn!(
412                target: LOG_TARGET,
413                "litep2p started with no listen addresses, cannot accept inbound connections",
414            );
415        }
416
417        Ok(Self {
418            local_peer_id,
419            bandwidth_sink,
420            listen_addresses,
421            transport_manager,
422        })
423    }
424
425    /// Collect supported transports before initializing the transports themselves.
426    ///
427    /// Information of the supported transports is needed to initialize protocols but
428    /// information about protocols must be known to initialize transports so the initialization
429    /// has to be split.
430    fn supported_transports(config: &Litep2pConfig) -> HashSet<SupportedTransport> {
431        let mut supported_transports = HashSet::new();
432
433        config
434            .tcp
435            .is_some()
436            .then(|| supported_transports.insert(SupportedTransport::Tcp));
437        #[cfg(feature = "quic")]
438        config
439            .quic
440            .is_some()
441            .then(|| supported_transports.insert(SupportedTransport::Quic));
442        #[cfg(feature = "websocket")]
443        config
444            .websocket
445            .is_some()
446            .then(|| supported_transports.insert(SupportedTransport::WebSocket));
447        #[cfg(feature = "webrtc")]
448        config
449            .webrtc
450            .is_some()
451            .then(|| supported_transports.insert(SupportedTransport::WebRtc));
452
453        supported_transports
454    }
455
456    /// Get local peer ID.
457    pub fn local_peer_id(&self) -> &PeerId {
458        &self.local_peer_id
459    }
460
461    /// Get the list of public addresses of the node.
462    pub fn public_addresses(&self) -> PublicAddresses {
463        self.transport_manager.public_addresses()
464    }
465
466    /// Get the list of listen addresses of the node.
467    pub fn listen_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
468        self.listen_addresses.iter()
469    }
470
471    /// Get handle to bandwidth sink.
472    pub fn bandwidth_sink(&self) -> BandwidthSink {
473        self.bandwidth_sink.clone()
474    }
475
476    /// Dial peer.
477    pub async fn dial(&mut self, peer: &PeerId) -> crate::Result<()> {
478        self.transport_manager.dial(*peer).await
479    }
480
481    /// Dial address.
482    pub async fn dial_address(&mut self, address: Multiaddr) -> crate::Result<()> {
483        self.transport_manager.dial_address(address).await
484    }
485
486    /// Add one ore more known addresses for peer.
487    ///
488    /// Return value denotes how many addresses were added for the peer.
489    /// Addresses belonging to disabled/unsupported transports will be ignored.
490    pub fn add_known_address(
491        &mut self,
492        peer: PeerId,
493        address: impl Iterator<Item = Multiaddr>,
494    ) -> usize {
495        self.transport_manager.add_known_address(peer, address)
496    }
497
498    /// Poll next event.
499    ///
500    /// This function must be called in order for litep2p to make progress.
501    pub async fn next_event(&mut self) -> Option<Litep2pEvent> {
502        loop {
503            match self.transport_manager.next().await? {
504                TransportEvent::ConnectionEstablished { peer, endpoint, .. } =>
505                    return Some(Litep2pEvent::ConnectionEstablished { peer, endpoint }),
506                TransportEvent::ConnectionClosed {
507                    peer,
508                    connection_id,
509                } =>
510                    return Some(Litep2pEvent::ConnectionClosed {
511                        peer,
512                        connection_id,
513                    }),
514                TransportEvent::DialFailure { address, error, .. } =>
515                    return Some(Litep2pEvent::DialFailure { address, error }),
516
517                TransportEvent::OpenFailure { errors, .. } => {
518                    return Some(Litep2pEvent::ListDialFailures { errors });
519                }
520                _ => {}
521            }
522        }
523    }
524}
525
526#[cfg(test)]
527mod tests {
528    use crate::{
529        config::ConfigBuilder,
530        protocol::{libp2p::ping, notification::Config as NotificationConfig},
531        types::protocol::ProtocolName,
532        Litep2p, Litep2pEvent, PeerId,
533    };
534    use multiaddr::{Multiaddr, Protocol};
535    use multihash::Multihash;
536    use std::net::Ipv4Addr;
537
538    #[tokio::test]
539    async fn initialize_litep2p() {
540        let _ = tracing_subscriber::fmt()
541            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
542            .try_init();
543
544        let (config1, _service1) = NotificationConfig::new(
545            ProtocolName::from("/notificaton/1"),
546            1337usize,
547            vec![1, 2, 3, 4],
548            Vec::new(),
549            false,
550            64,
551            64,
552            true,
553        );
554        let (config2, _service2) = NotificationConfig::new(
555            ProtocolName::from("/notificaton/2"),
556            1337usize,
557            vec![1, 2, 3, 4],
558            Vec::new(),
559            false,
560            64,
561            64,
562            true,
563        );
564        let (ping_config, _ping_event_stream) = ping::Config::default();
565
566        let config = ConfigBuilder::new()
567            .with_tcp(Default::default())
568            .with_notification_protocol(config1)
569            .with_notification_protocol(config2)
570            .with_libp2p_ping(ping_config)
571            .build();
572
573        let _litep2p = Litep2p::new(config).unwrap();
574    }
575
576    #[tokio::test]
577    async fn no_transport_given() {
578        let _ = tracing_subscriber::fmt()
579            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
580            .try_init();
581
582        let (config1, _service1) = NotificationConfig::new(
583            ProtocolName::from("/notificaton/1"),
584            1337usize,
585            vec![1, 2, 3, 4],
586            Vec::new(),
587            false,
588            64,
589            64,
590            true,
591        );
592        let (config2, _service2) = NotificationConfig::new(
593            ProtocolName::from("/notificaton/2"),
594            1337usize,
595            vec![1, 2, 3, 4],
596            Vec::new(),
597            false,
598            64,
599            64,
600            true,
601        );
602        let (ping_config, _ping_event_stream) = ping::Config::default();
603
604        let config = ConfigBuilder::new()
605            .with_notification_protocol(config1)
606            .with_notification_protocol(config2)
607            .with_libp2p_ping(ping_config)
608            .build();
609
610        assert!(Litep2p::new(config).is_err());
611    }
612
613    #[tokio::test]
614    async fn dial_same_address_twice() {
615        let _ = tracing_subscriber::fmt()
616            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
617            .try_init();
618
619        let (config1, _service1) = NotificationConfig::new(
620            ProtocolName::from("/notificaton/1"),
621            1337usize,
622            vec![1, 2, 3, 4],
623            Vec::new(),
624            false,
625            64,
626            64,
627            true,
628        );
629        let (config2, _service2) = NotificationConfig::new(
630            ProtocolName::from("/notificaton/2"),
631            1337usize,
632            vec![1, 2, 3, 4],
633            Vec::new(),
634            false,
635            64,
636            64,
637            true,
638        );
639        let (ping_config, _ping_event_stream) = ping::Config::default();
640
641        let config = ConfigBuilder::new()
642            .with_tcp(Default::default())
643            .with_notification_protocol(config1)
644            .with_notification_protocol(config2)
645            .with_libp2p_ping(ping_config)
646            .build();
647
648        let peer = PeerId::random();
649        let address = Multiaddr::empty()
650            .with(Protocol::Ip4(Ipv4Addr::new(255, 254, 253, 252)))
651            .with(Protocol::Tcp(8888))
652            .with(Protocol::P2p(
653                Multihash::from_bytes(&peer.to_bytes()).unwrap(),
654            ));
655
656        let mut litep2p = Litep2p::new(config).unwrap();
657        litep2p.dial_address(address.clone()).await.unwrap();
658        litep2p.dial_address(address.clone()).await.unwrap();
659
660        match litep2p.next_event().await {
661            Some(Litep2pEvent::DialFailure { .. }) => {}
662            _ => panic!("invalid event received"),
663        }
664
665        // verify that the second same dial was ignored and the dial failure is reported only once
666        match tokio::time::timeout(std::time::Duration::from_secs(20), litep2p.next_event()).await {
667            Err(_) => {}
668            _ => panic!("invalid event received"),
669        }
670    }
671}