1#![allow(clippy::single_match)]
22#![allow(clippy::result_large_err)]
23#![allow(clippy::large_enum_variant)]
24#![allow(clippy::redundant_pattern_matching)]
25#![allow(clippy::type_complexity)]
26#![allow(clippy::result_unit_err)]
27#![allow(clippy::should_implement_trait)]
28#![allow(clippy::too_many_arguments)]
29#![allow(clippy::assign_op_pattern)]
30#![allow(clippy::match_like_matches_macro)]
31
32use crate::{
33 addresses::PublicAddresses,
34 config::Litep2pConfig,
35 error::DialError,
36 protocol::{
37 libp2p::{bitswap::Bitswap, identify::Identify, kademlia::Kademlia, ping::Ping},
38 mdns::Mdns,
39 notification::NotificationProtocol,
40 request_response::RequestResponseProtocol,
41 SubstreamKeepAlive,
42 },
43 transport::{
44 manager::{SupportedTransport, TransportManager, TransportManagerBuilder},
45 tcp::TcpTransport,
46 TransportBuilder, TransportEvent,
47 },
48};
49
50#[cfg(feature = "quic")]
51use crate::transport::quic::QuicTransport;
52#[cfg(feature = "webrtc")]
53use crate::transport::webrtc::WebRtcTransport;
54#[cfg(feature = "websocket")]
55use crate::transport::websocket::WebSocketTransport;
56
57use hickory_resolver::{name_server::TokioConnectionProvider, TokioResolver};
58use multiaddr::{Multiaddr, Protocol};
59use transport::Endpoint;
60use types::ConnectionId;
61
62pub use bandwidth::BandwidthSink;
63pub use error::Error;
64pub use peer_id::{ParseError, PeerId};
65use std::{collections::HashSet, sync::Arc};
66pub use types::protocol::ProtocolName;
67
68pub(crate) mod peer_id;
69
70pub mod addresses;
71pub mod codec;
72pub mod config;
73pub mod crypto;
74pub mod error;
75pub mod executor;
76pub mod protocol;
77pub mod substream;
78pub mod transport;
79pub mod types;
80pub mod yamux;
81
82mod bandwidth;
83mod multistream_select;
84pub mod utils;
85
86#[cfg(test)]
87mod mock;
88
89pub type Result<T> = std::result::Result<T, error::Error>;
91
92const LOG_TARGET: &str = "litep2p";
94
95const DEFAULT_CHANNEL_SIZE: usize = 4096usize;
97
98#[derive(Debug)]
100pub enum Litep2pEvent {
101 ConnectionEstablished {
103 peer: PeerId,
105
106 endpoint: Endpoint,
108 },
109
110 ConnectionClosed {
112 peer: PeerId,
114
115 connection_id: ConnectionId,
117 },
118
119 DialFailure {
123 address: Multiaddr,
125
126 error: DialError,
128 },
129
130 ListDialFailures {
132 errors: Vec<(Multiaddr, DialError)>,
136 },
137}
138
139pub struct Litep2p {
141 local_peer_id: PeerId,
143
144 listen_addresses: Vec<Multiaddr>,
146
147 transport_manager: TransportManager,
149
150 bandwidth_sink: BandwidthSink,
152}
153
154impl Litep2p {
155 pub fn new(mut litep2p_config: Litep2pConfig) -> crate::Result<Litep2p> {
157 let local_peer_id = PeerId::from_public_key(&litep2p_config.keypair.public().into());
158 let bandwidth_sink = BandwidthSink::new();
159 let mut listen_addresses = vec![];
160
161 let (resolver_config, resolver_opts) = if litep2p_config.use_system_dns_config {
162 hickory_resolver::system_conf::read_system_conf()
163 .map_err(Error::CannotReadSystemDnsConfig)?
164 } else {
165 (Default::default(), Default::default())
166 };
167 let resolver = Arc::new(
168 TokioResolver::builder_with_config(resolver_config, TokioConnectionProvider::default())
169 .with_options(resolver_opts)
170 .build(),
171 );
172
173 let supported_transports = Self::supported_transports(&litep2p_config);
174 let mut transport_manager = TransportManagerBuilder::new()
175 .with_keypair(litep2p_config.keypair.clone())
176 .with_supported_transports(supported_transports)
177 .with_bandwidth_sink(bandwidth_sink.clone())
178 .with_connection_limits_config(litep2p_config.connection_limits)
179 .build();
180
181 let transport_handle = transport_manager.transport_manager_handle();
182 if !litep2p_config.known_addresses.is_empty() {
184 for (peer, addresses) in litep2p_config.known_addresses {
185 transport_manager.add_known_address(peer, addresses.iter().cloned());
186 }
187 }
188
189 for (protocol, config) in litep2p_config.notification_protocols.into_iter() {
191 tracing::debug!(
192 target: LOG_TARGET,
193 ?protocol,
194 "enable notification protocol",
195 );
196
197 let service = transport_manager.register_protocol(
198 protocol,
199 config.fallback_names.clone(),
200 config.codec,
201 litep2p_config.keep_alive_timeout,
202 SubstreamKeepAlive::Yes,
203 );
204 let executor = Arc::clone(&litep2p_config.executor);
205 litep2p_config.executor.run(Box::pin(async move {
206 NotificationProtocol::new(service, config, executor).run().await
207 }));
208 }
209
210 for (protocol, config) in litep2p_config.request_response_protocols.into_iter() {
212 tracing::debug!(
213 target: LOG_TARGET,
214 ?protocol,
215 "enable request-response protocol",
216 );
217
218 let service = transport_manager.register_protocol(
219 protocol,
220 config.fallback_names.clone(),
221 config.codec,
222 litep2p_config.keep_alive_timeout,
223 SubstreamKeepAlive::Yes,
224 );
225 litep2p_config.executor.run(Box::pin(async move {
226 RequestResponseProtocol::new(service, config).run().await
227 }));
228 }
229
230 for (protocol_name, protocol) in litep2p_config.user_protocols.into_iter() {
232 tracing::debug!(target: LOG_TARGET, protocol = ?protocol_name, "enable user protocol");
233
234 let service = transport_manager.register_protocol(
235 protocol_name,
236 Vec::new(),
237 protocol.codec(),
238 litep2p_config.keep_alive_timeout,
239 SubstreamKeepAlive::Yes,
241 );
242 litep2p_config.executor.run(Box::pin(async move {
243 let _ = protocol.run(service).await;
244 }));
245 }
246
247 if let Some(ping_config) = litep2p_config.ping.take() {
249 tracing::debug!(
250 target: LOG_TARGET,
251 protocol = ?ping_config.protocol,
252 "enable ipfs ping protocol",
253 );
254
255 let service = transport_manager.register_protocol(
256 ping_config.protocol.clone(),
257 Vec::new(),
258 ping_config.codec,
259 litep2p_config.keep_alive_timeout,
260 SubstreamKeepAlive::No,
261 );
262 litep2p_config.executor.run(Box::pin(async move {
263 Ping::new(service, ping_config).run().await
264 }));
265 }
266
267 for kademlia_config in litep2p_config.kademlia.into_iter() {
269 tracing::debug!(
270 target: LOG_TARGET,
271 protocol_names = ?kademlia_config.protocol_names,
272 "enable ipfs kademlia protocol",
273 );
274
275 let main_protocol =
276 kademlia_config.protocol_names.first().expect("protocol name to exist");
277 let fallback_names = kademlia_config.protocol_names.iter().skip(1).cloned().collect();
278
279 let service = transport_manager.register_protocol(
280 main_protocol.clone(),
281 fallback_names,
282 kademlia_config.codec,
283 litep2p_config.keep_alive_timeout,
284 SubstreamKeepAlive::Yes,
285 );
286 litep2p_config.executor.run(Box::pin(async move {
287 let _ = Kademlia::new(service, kademlia_config).run().await;
288 }));
289 }
290
291 let mut identify_info = match litep2p_config.identify.take() {
293 None => None,
294 Some(mut identify_config) => {
295 tracing::debug!(
296 target: LOG_TARGET,
297 protocol = ?identify_config.protocol,
298 "enable ipfs identify protocol",
299 );
300
301 let service = transport_manager.register_protocol(
302 identify_config.protocol.clone(),
303 Vec::new(),
304 identify_config.codec,
305 litep2p_config.keep_alive_timeout,
306 SubstreamKeepAlive::No,
307 );
308 identify_config.public = Some(litep2p_config.keypair.public().into());
309
310 Some((service, identify_config))
311 }
312 };
313
314 if let Some(bitswap_config) = litep2p_config.bitswap.take() {
316 tracing::debug!(
317 target: LOG_TARGET,
318 protocol = ?bitswap_config.protocol,
319 "enable ipfs bitswap protocol",
320 );
321
322 let service = transport_manager.register_protocol(
323 bitswap_config.protocol.clone(),
324 Vec::new(),
325 bitswap_config.codec,
326 litep2p_config.keep_alive_timeout,
327 SubstreamKeepAlive::Yes,
328 );
329 litep2p_config.executor.run(Box::pin(async move {
330 Bitswap::new(service, bitswap_config).run().await
331 }));
332 }
333
334 if let Some(mut config) = litep2p_config.tcp.take() {
336 config.max_parallel_dials = litep2p_config.max_parallel_dials;
337 let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor));
338 let (transport, transport_listen_addresses) =
339 <TcpTransport as TransportBuilder>::new(handle, config, resolver.clone())?;
340
341 for address in transport_listen_addresses {
342 transport_manager.register_listen_address(address.clone());
343 listen_addresses.push(address.with(Protocol::P2p(local_peer_id.into())));
344 }
345
346 transport_manager.register_transport(SupportedTransport::Tcp, Box::new(transport));
347 }
348
349 #[cfg(feature = "quic")]
351 if let Some(config) = litep2p_config.quic.take() {
352 let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor));
353 let (transport, transport_listen_addresses) =
354 <QuicTransport as TransportBuilder>::new(handle, config, resolver.clone())?;
355
356 for address in transport_listen_addresses {
357 transport_manager.register_listen_address(address.clone());
358 listen_addresses.push(address.with(Protocol::P2p(local_peer_id.into())));
359 }
360
361 transport_manager.register_transport(SupportedTransport::Quic, Box::new(transport));
362 }
363
364 #[cfg(feature = "webrtc")]
366 if let Some(config) = litep2p_config.webrtc.take() {
367 let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor));
368 let (transport, transport_listen_addresses) =
369 <WebRtcTransport as TransportBuilder>::new(handle, config, resolver.clone())?;
370
371 for address in transport_listen_addresses {
372 transport_manager.register_listen_address(address.clone());
373 listen_addresses.push(address.with(Protocol::P2p(local_peer_id.into())));
374 }
375
376 transport_manager.register_transport(SupportedTransport::WebRtc, Box::new(transport));
377 }
378
379 #[cfg(feature = "websocket")]
381 if let Some(mut config) = litep2p_config.websocket.take() {
382 config.max_parallel_dials = litep2p_config.max_parallel_dials;
383 let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor));
384 let (transport, transport_listen_addresses) =
385 <WebSocketTransport as TransportBuilder>::new(handle, config, resolver)?;
386
387 for address in transport_listen_addresses {
388 transport_manager.register_listen_address(address.clone());
389 listen_addresses.push(address.with(Protocol::P2p(local_peer_id.into())));
390 }
391
392 transport_manager
393 .register_transport(SupportedTransport::WebSocket, Box::new(transport));
394 }
395
396 if let Some(config) = litep2p_config.mdns.take() {
398 let mdns = Mdns::new(transport_handle, config, listen_addresses.clone());
399
400 litep2p_config.executor.run(Box::pin(async move {
401 let _ = mdns.start().await;
402 }));
403 }
404
405 if let Some((service, mut identify_config)) = identify_info.take() {
407 identify_config.protocols = transport_manager.protocols().cloned().collect();
408 let identify = Identify::new(service, identify_config);
409
410 litep2p_config.executor.run(Box::pin(async move {
411 let _ = identify.run().await;
412 }));
413 }
414
415 if transport_manager.installed_transports().count() == 0 {
416 return Err(Error::Other("No transport specified".to_string()));
417 }
418
419 if listen_addresses.is_empty() {
421 tracing::warn!(
422 target: LOG_TARGET,
423 "litep2p started with no listen addresses, cannot accept inbound connections",
424 );
425 }
426
427 Ok(Self {
428 local_peer_id,
429 bandwidth_sink,
430 listen_addresses,
431 transport_manager,
432 })
433 }
434
435 fn supported_transports(config: &Litep2pConfig) -> HashSet<SupportedTransport> {
441 let mut supported_transports = HashSet::new();
442
443 config
444 .tcp
445 .is_some()
446 .then(|| supported_transports.insert(SupportedTransport::Tcp));
447 #[cfg(feature = "quic")]
448 config
449 .quic
450 .is_some()
451 .then(|| supported_transports.insert(SupportedTransport::Quic));
452 #[cfg(feature = "websocket")]
453 config
454 .websocket
455 .is_some()
456 .then(|| supported_transports.insert(SupportedTransport::WebSocket));
457 #[cfg(feature = "webrtc")]
458 config
459 .webrtc
460 .is_some()
461 .then(|| supported_transports.insert(SupportedTransport::WebRtc));
462
463 supported_transports
464 }
465
466 pub fn local_peer_id(&self) -> &PeerId {
468 &self.local_peer_id
469 }
470
471 pub fn public_addresses(&self) -> PublicAddresses {
473 self.transport_manager.public_addresses()
474 }
475
476 pub fn listen_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
478 self.listen_addresses.iter()
479 }
480
481 pub fn bandwidth_sink(&self) -> BandwidthSink {
483 self.bandwidth_sink.clone()
484 }
485
486 pub async fn dial(&mut self, peer: &PeerId) -> crate::Result<()> {
488 self.transport_manager.dial(*peer).await
489 }
490
491 pub async fn dial_address(&mut self, address: Multiaddr) -> crate::Result<()> {
493 self.transport_manager.dial_address(address).await
494 }
495
496 pub fn add_known_address(
501 &mut self,
502 peer: PeerId,
503 address: impl Iterator<Item = Multiaddr>,
504 ) -> usize {
505 self.transport_manager.add_known_address(peer, address)
506 }
507
508 pub async fn next_event(&mut self) -> Option<Litep2pEvent> {
512 loop {
513 match self.transport_manager.next().await? {
514 TransportEvent::ConnectionEstablished { peer, endpoint, .. } =>
515 return Some(Litep2pEvent::ConnectionEstablished { peer, endpoint }),
516 TransportEvent::ConnectionClosed {
517 peer,
518 connection_id,
519 } =>
520 return Some(Litep2pEvent::ConnectionClosed {
521 peer,
522 connection_id,
523 }),
524 TransportEvent::DialFailure { address, error, .. } =>
525 return Some(Litep2pEvent::DialFailure { address, error }),
526
527 TransportEvent::OpenFailure { errors, .. } => {
528 return Some(Litep2pEvent::ListDialFailures { errors });
529 }
530 _ => {}
531 }
532 }
533 }
534}
535
536#[cfg(test)]
537mod tests {
538 use crate::{
539 config::ConfigBuilder,
540 protocol::{libp2p::ping, notification::Config as NotificationConfig},
541 types::protocol::ProtocolName,
542 Litep2p, Litep2pEvent, PeerId,
543 };
544 use multiaddr::{Multiaddr, Protocol};
545 use std::net::Ipv4Addr;
546
547 #[tokio::test]
548 async fn initialize_litep2p() {
549 let _ = tracing_subscriber::fmt()
550 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
551 .try_init();
552
553 let (config1, _service1) = NotificationConfig::new(
554 ProtocolName::from("/notificaton/1"),
555 1337usize,
556 vec![1, 2, 3, 4],
557 Vec::new(),
558 false,
559 64,
560 64,
561 true,
562 );
563 let (config2, _service2) = NotificationConfig::new(
564 ProtocolName::from("/notificaton/2"),
565 1337usize,
566 vec![1, 2, 3, 4],
567 Vec::new(),
568 false,
569 64,
570 64,
571 true,
572 );
573 let (ping_config, _ping_event_stream) = ping::Config::default();
574
575 let config = ConfigBuilder::new()
576 .with_tcp(Default::default())
577 .with_notification_protocol(config1)
578 .with_notification_protocol(config2)
579 .with_libp2p_ping(ping_config)
580 .build();
581
582 let _litep2p = Litep2p::new(config).unwrap();
583 }
584
585 #[tokio::test]
586 async fn no_transport_given() {
587 let _ = tracing_subscriber::fmt()
588 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
589 .try_init();
590
591 let (config1, _service1) = NotificationConfig::new(
592 ProtocolName::from("/notificaton/1"),
593 1337usize,
594 vec![1, 2, 3, 4],
595 Vec::new(),
596 false,
597 64,
598 64,
599 true,
600 );
601 let (config2, _service2) = NotificationConfig::new(
602 ProtocolName::from("/notificaton/2"),
603 1337usize,
604 vec![1, 2, 3, 4],
605 Vec::new(),
606 false,
607 64,
608 64,
609 true,
610 );
611 let (ping_config, _ping_event_stream) = ping::Config::default();
612
613 let config = ConfigBuilder::new()
614 .with_notification_protocol(config1)
615 .with_notification_protocol(config2)
616 .with_libp2p_ping(ping_config)
617 .build();
618
619 assert!(Litep2p::new(config).is_err());
620 }
621
622 #[tokio::test]
623 async fn dial_same_address_twice() {
624 let _ = tracing_subscriber::fmt()
625 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
626 .try_init();
627
628 let (config1, _service1) = NotificationConfig::new(
629 ProtocolName::from("/notificaton/1"),
630 1337usize,
631 vec![1, 2, 3, 4],
632 Vec::new(),
633 false,
634 64,
635 64,
636 true,
637 );
638 let (config2, _service2) = NotificationConfig::new(
639 ProtocolName::from("/notificaton/2"),
640 1337usize,
641 vec![1, 2, 3, 4],
642 Vec::new(),
643 false,
644 64,
645 64,
646 true,
647 );
648 let (ping_config, _ping_event_stream) = ping::Config::default();
649
650 let config = ConfigBuilder::new()
651 .with_tcp(Default::default())
652 .with_notification_protocol(config1)
653 .with_notification_protocol(config2)
654 .with_libp2p_ping(ping_config)
655 .build();
656
657 let peer = PeerId::random();
658 let address = Multiaddr::empty()
659 .with(Protocol::Ip4(Ipv4Addr::new(255, 254, 253, 252)))
660 .with(Protocol::Tcp(8888))
661 .with(Protocol::P2p(peer.into()));
662
663 let mut litep2p = Litep2p::new(config).unwrap();
664 litep2p.dial_address(address.clone()).await.unwrap();
665 litep2p.dial_address(address.clone()).await.unwrap();
666
667 match litep2p.next_event().await {
668 Some(Litep2pEvent::DialFailure { .. }) => {}
669 _ => panic!("invalid event received"),
670 }
671
672 match tokio::time::timeout(std::time::Duration::from_secs(20), litep2p.next_event()).await {
674 Err(_) => {}
675 _ => panic!("invalid event received"),
676 }
677 }
678}