1#![allow(clippy::single_match)]
22#![allow(clippy::result_large_err)]
23#![allow(clippy::large_enum_variant)]
24#![allow(clippy::redundant_pattern_matching)]
25#![allow(clippy::type_complexity)]
26#![allow(clippy::result_unit_err)]
27#![allow(clippy::should_implement_trait)]
28#![allow(clippy::too_many_arguments)]
29#![allow(clippy::assign_op_pattern)]
30#![allow(clippy::match_like_matches_macro)]
31
32use crate::{
33 addresses::PublicAddresses,
34 config::Litep2pConfig,
35 error::DialError,
36 protocol::{
37 libp2p::{bitswap::Bitswap, identify::Identify, kademlia::Kademlia, ping::Ping},
38 mdns::Mdns,
39 notification::NotificationProtocol,
40 request_response::RequestResponseProtocol,
41 },
42 transport::{
43 manager::{SupportedTransport, TransportManager, TransportManagerBuilder},
44 tcp::TcpTransport,
45 TransportBuilder, TransportEvent,
46 },
47};
48
49#[cfg(feature = "quic")]
50use crate::transport::quic::QuicTransport;
51#[cfg(feature = "webrtc")]
52use crate::transport::webrtc::WebRtcTransport;
53#[cfg(feature = "websocket")]
54use crate::transport::websocket::WebSocketTransport;
55
56use hickory_resolver::{name_server::TokioConnectionProvider, TokioResolver};
57use multiaddr::{Multiaddr, Protocol};
58use transport::Endpoint;
59use types::ConnectionId;
60
61pub use bandwidth::BandwidthSink;
62pub use error::Error;
63pub use peer_id::PeerId;
64use std::{collections::HashSet, sync::Arc};
65pub use types::protocol::ProtocolName;
66
67pub(crate) mod peer_id;
68
69pub mod addresses;
70pub mod codec;
71pub mod config;
72pub mod crypto;
73pub mod error;
74pub mod executor;
75pub mod protocol;
76pub mod substream;
77pub mod transport;
78pub mod types;
79pub mod yamux;
80
81mod bandwidth;
82mod multistream_select;
83pub mod utils;
84
85#[cfg(test)]
86mod mock;
87
88pub type Result<T> = std::result::Result<T, error::Error>;
90
91const LOG_TARGET: &str = "litep2p";
93
94const DEFAULT_CHANNEL_SIZE: usize = 4096usize;
96
97#[derive(Debug)]
99pub enum Litep2pEvent {
100 ConnectionEstablished {
102 peer: PeerId,
104
105 endpoint: Endpoint,
107 },
108
109 ConnectionClosed {
111 peer: PeerId,
113
114 connection_id: ConnectionId,
116 },
117
118 DialFailure {
122 address: Multiaddr,
124
125 error: DialError,
127 },
128
129 ListDialFailures {
131 errors: Vec<(Multiaddr, DialError)>,
135 },
136}
137
138pub struct Litep2p {
140 local_peer_id: PeerId,
142
143 listen_addresses: Vec<Multiaddr>,
145
146 transport_manager: TransportManager,
148
149 bandwidth_sink: BandwidthSink,
151}
152
153impl Litep2p {
154 pub fn new(mut litep2p_config: Litep2pConfig) -> crate::Result<Litep2p> {
156 let local_peer_id = PeerId::from_public_key(&litep2p_config.keypair.public().into());
157 let bandwidth_sink = BandwidthSink::new();
158 let mut listen_addresses = vec![];
159
160 let (resolver_config, resolver_opts) = if litep2p_config.use_system_dns_config {
161 hickory_resolver::system_conf::read_system_conf()
162 .map_err(Error::CannotReadSystemDnsConfig)?
163 } else {
164 (Default::default(), Default::default())
165 };
166 let resolver = Arc::new(
167 TokioResolver::builder_with_config(resolver_config, TokioConnectionProvider::default())
168 .with_options(resolver_opts)
169 .build(),
170 );
171
172 let supported_transports = Self::supported_transports(&litep2p_config);
173 let mut transport_manager = TransportManagerBuilder::new()
174 .with_keypair(litep2p_config.keypair.clone())
175 .with_supported_transports(supported_transports)
176 .with_bandwidth_sink(bandwidth_sink.clone())
177 .with_max_parallel_dials(litep2p_config.max_parallel_dials)
178 .with_connection_limits_config(litep2p_config.connection_limits)
179 .build();
180
181 let transport_handle = transport_manager.transport_manager_handle();
182 if !litep2p_config.known_addresses.is_empty() {
184 for (peer, addresses) in litep2p_config.known_addresses {
185 transport_manager.add_known_address(peer, addresses.iter().cloned());
186 }
187 }
188
189 for (protocol, config) in litep2p_config.notification_protocols.into_iter() {
191 tracing::debug!(
192 target: LOG_TARGET,
193 ?protocol,
194 "enable notification protocol",
195 );
196
197 let service = transport_manager.register_protocol(
198 protocol,
199 config.fallback_names.clone(),
200 config.codec,
201 litep2p_config.keep_alive_timeout,
202 );
203 let executor = Arc::clone(&litep2p_config.executor);
204 litep2p_config.executor.run(Box::pin(async move {
205 NotificationProtocol::new(service, config, executor).run().await
206 }));
207 }
208
209 for (protocol, config) in litep2p_config.request_response_protocols.into_iter() {
211 tracing::debug!(
212 target: LOG_TARGET,
213 ?protocol,
214 "enable request-response protocol",
215 );
216
217 let service = transport_manager.register_protocol(
218 protocol,
219 config.fallback_names.clone(),
220 config.codec,
221 litep2p_config.keep_alive_timeout,
222 );
223 litep2p_config.executor.run(Box::pin(async move {
224 RequestResponseProtocol::new(service, config).run().await
225 }));
226 }
227
228 for (protocol_name, protocol) in litep2p_config.user_protocols.into_iter() {
230 tracing::debug!(target: LOG_TARGET, protocol = ?protocol_name, "enable user protocol");
231
232 let service = transport_manager.register_protocol(
233 protocol_name,
234 Vec::new(),
235 protocol.codec(),
236 litep2p_config.keep_alive_timeout,
237 );
238 litep2p_config.executor.run(Box::pin(async move {
239 let _ = protocol.run(service).await;
240 }));
241 }
242
243 if let Some(ping_config) = litep2p_config.ping.take() {
245 tracing::debug!(
246 target: LOG_TARGET,
247 protocol = ?ping_config.protocol,
248 "enable ipfs ping protocol",
249 );
250
251 let service = transport_manager.register_protocol(
252 ping_config.protocol.clone(),
253 Vec::new(),
254 ping_config.codec,
255 litep2p_config.keep_alive_timeout,
256 );
257 litep2p_config.executor.run(Box::pin(async move {
258 Ping::new(service, ping_config).run().await
259 }));
260 }
261
262 for kademlia_config in litep2p_config.kademlia.into_iter() {
264 tracing::debug!(
265 target: LOG_TARGET,
266 protocol_names = ?kademlia_config.protocol_names,
267 "enable ipfs kademlia protocol",
268 );
269
270 let main_protocol =
271 kademlia_config.protocol_names.first().expect("protocol name to exist");
272 let fallback_names = kademlia_config.protocol_names.iter().skip(1).cloned().collect();
273
274 let service = transport_manager.register_protocol(
275 main_protocol.clone(),
276 fallback_names,
277 kademlia_config.codec,
278 litep2p_config.keep_alive_timeout,
279 );
280 litep2p_config.executor.run(Box::pin(async move {
281 let _ = Kademlia::new(service, kademlia_config).run().await;
282 }));
283 }
284
285 let mut identify_info = match litep2p_config.identify.take() {
287 None => None,
288 Some(mut identify_config) => {
289 tracing::debug!(
290 target: LOG_TARGET,
291 protocol = ?identify_config.protocol,
292 "enable ipfs identify protocol",
293 );
294
295 let service = transport_manager.register_protocol(
296 identify_config.protocol.clone(),
297 Vec::new(),
298 identify_config.codec,
299 litep2p_config.keep_alive_timeout,
300 );
301 identify_config.public = Some(litep2p_config.keypair.public().into());
302
303 Some((service, identify_config))
304 }
305 };
306
307 if let Some(bitswap_config) = litep2p_config.bitswap.take() {
309 tracing::debug!(
310 target: LOG_TARGET,
311 protocol = ?bitswap_config.protocol,
312 "enable ipfs bitswap protocol",
313 );
314
315 let service = transport_manager.register_protocol(
316 bitswap_config.protocol.clone(),
317 Vec::new(),
318 bitswap_config.codec,
319 litep2p_config.keep_alive_timeout,
320 );
321 litep2p_config.executor.run(Box::pin(async move {
322 Bitswap::new(service, bitswap_config).run().await
323 }));
324 }
325
326 if let Some(config) = litep2p_config.tcp.take() {
328 let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor));
329 let (transport, transport_listen_addresses) =
330 <TcpTransport as TransportBuilder>::new(handle, config, resolver.clone())?;
331
332 for address in transport_listen_addresses {
333 transport_manager.register_listen_address(address.clone());
334 listen_addresses.push(address.with(Protocol::P2p(*local_peer_id.as_ref())));
335 }
336
337 transport_manager.register_transport(SupportedTransport::Tcp, Box::new(transport));
338 }
339
340 #[cfg(feature = "quic")]
342 if let Some(config) = litep2p_config.quic.take() {
343 let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor));
344 let (transport, transport_listen_addresses) =
345 <QuicTransport as TransportBuilder>::new(handle, config, resolver.clone())?;
346
347 for address in transport_listen_addresses {
348 transport_manager.register_listen_address(address.clone());
349 listen_addresses.push(address.with(Protocol::P2p(*local_peer_id.as_ref())));
350 }
351
352 transport_manager.register_transport(SupportedTransport::Quic, Box::new(transport));
353 }
354
355 #[cfg(feature = "webrtc")]
357 if let Some(config) = litep2p_config.webrtc.take() {
358 let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor));
359 let (transport, transport_listen_addresses) =
360 <WebRtcTransport as TransportBuilder>::new(handle, config, resolver.clone())?;
361
362 for address in transport_listen_addresses {
363 transport_manager.register_listen_address(address.clone());
364 listen_addresses.push(address.with(Protocol::P2p(*local_peer_id.as_ref())));
365 }
366
367 transport_manager.register_transport(SupportedTransport::WebRtc, Box::new(transport));
368 }
369
370 #[cfg(feature = "websocket")]
372 if let Some(config) = litep2p_config.websocket.take() {
373 let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor));
374 let (transport, transport_listen_addresses) =
375 <WebSocketTransport as TransportBuilder>::new(handle, config, resolver)?;
376
377 for address in transport_listen_addresses {
378 transport_manager.register_listen_address(address.clone());
379 listen_addresses.push(address.with(Protocol::P2p(*local_peer_id.as_ref())));
380 }
381
382 transport_manager
383 .register_transport(SupportedTransport::WebSocket, Box::new(transport));
384 }
385
386 if let Some(config) = litep2p_config.mdns.take() {
388 let mdns = Mdns::new(transport_handle, config, listen_addresses.clone());
389
390 litep2p_config.executor.run(Box::pin(async move {
391 let _ = mdns.start().await;
392 }));
393 }
394
395 if let Some((service, mut identify_config)) = identify_info.take() {
397 identify_config.protocols = transport_manager.protocols().cloned().collect();
398 let identify = Identify::new(service, identify_config);
399
400 litep2p_config.executor.run(Box::pin(async move {
401 let _ = identify.run().await;
402 }));
403 }
404
405 if transport_manager.installed_transports().count() == 0 {
406 return Err(Error::Other("No transport specified".to_string()));
407 }
408
409 if listen_addresses.is_empty() {
411 tracing::warn!(
412 target: LOG_TARGET,
413 "litep2p started with no listen addresses, cannot accept inbound connections",
414 );
415 }
416
417 Ok(Self {
418 local_peer_id,
419 bandwidth_sink,
420 listen_addresses,
421 transport_manager,
422 })
423 }
424
425 fn supported_transports(config: &Litep2pConfig) -> HashSet<SupportedTransport> {
431 let mut supported_transports = HashSet::new();
432
433 config
434 .tcp
435 .is_some()
436 .then(|| supported_transports.insert(SupportedTransport::Tcp));
437 #[cfg(feature = "quic")]
438 config
439 .quic
440 .is_some()
441 .then(|| supported_transports.insert(SupportedTransport::Quic));
442 #[cfg(feature = "websocket")]
443 config
444 .websocket
445 .is_some()
446 .then(|| supported_transports.insert(SupportedTransport::WebSocket));
447 #[cfg(feature = "webrtc")]
448 config
449 .webrtc
450 .is_some()
451 .then(|| supported_transports.insert(SupportedTransport::WebRtc));
452
453 supported_transports
454 }
455
456 pub fn local_peer_id(&self) -> &PeerId {
458 &self.local_peer_id
459 }
460
461 pub fn public_addresses(&self) -> PublicAddresses {
463 self.transport_manager.public_addresses()
464 }
465
466 pub fn listen_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
468 self.listen_addresses.iter()
469 }
470
471 pub fn bandwidth_sink(&self) -> BandwidthSink {
473 self.bandwidth_sink.clone()
474 }
475
476 pub async fn dial(&mut self, peer: &PeerId) -> crate::Result<()> {
478 self.transport_manager.dial(*peer).await
479 }
480
481 pub async fn dial_address(&mut self, address: Multiaddr) -> crate::Result<()> {
483 self.transport_manager.dial_address(address).await
484 }
485
486 pub fn add_known_address(
491 &mut self,
492 peer: PeerId,
493 address: impl Iterator<Item = Multiaddr>,
494 ) -> usize {
495 self.transport_manager.add_known_address(peer, address)
496 }
497
498 pub async fn next_event(&mut self) -> Option<Litep2pEvent> {
502 loop {
503 match self.transport_manager.next().await? {
504 TransportEvent::ConnectionEstablished { peer, endpoint, .. } =>
505 return Some(Litep2pEvent::ConnectionEstablished { peer, endpoint }),
506 TransportEvent::ConnectionClosed {
507 peer,
508 connection_id,
509 } =>
510 return Some(Litep2pEvent::ConnectionClosed {
511 peer,
512 connection_id,
513 }),
514 TransportEvent::DialFailure { address, error, .. } =>
515 return Some(Litep2pEvent::DialFailure { address, error }),
516
517 TransportEvent::OpenFailure { errors, .. } => {
518 return Some(Litep2pEvent::ListDialFailures { errors });
519 }
520 _ => {}
521 }
522 }
523 }
524}
525
526#[cfg(test)]
527mod tests {
528 use crate::{
529 config::ConfigBuilder,
530 protocol::{libp2p::ping, notification::Config as NotificationConfig},
531 types::protocol::ProtocolName,
532 Litep2p, Litep2pEvent, PeerId,
533 };
534 use multiaddr::{Multiaddr, Protocol};
535 use multihash::Multihash;
536 use std::net::Ipv4Addr;
537
538 #[tokio::test]
539 async fn initialize_litep2p() {
540 let _ = tracing_subscriber::fmt()
541 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
542 .try_init();
543
544 let (config1, _service1) = NotificationConfig::new(
545 ProtocolName::from("/notificaton/1"),
546 1337usize,
547 vec![1, 2, 3, 4],
548 Vec::new(),
549 false,
550 64,
551 64,
552 true,
553 );
554 let (config2, _service2) = NotificationConfig::new(
555 ProtocolName::from("/notificaton/2"),
556 1337usize,
557 vec![1, 2, 3, 4],
558 Vec::new(),
559 false,
560 64,
561 64,
562 true,
563 );
564 let (ping_config, _ping_event_stream) = ping::Config::default();
565
566 let config = ConfigBuilder::new()
567 .with_tcp(Default::default())
568 .with_notification_protocol(config1)
569 .with_notification_protocol(config2)
570 .with_libp2p_ping(ping_config)
571 .build();
572
573 let _litep2p = Litep2p::new(config).unwrap();
574 }
575
576 #[tokio::test]
577 async fn no_transport_given() {
578 let _ = tracing_subscriber::fmt()
579 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
580 .try_init();
581
582 let (config1, _service1) = NotificationConfig::new(
583 ProtocolName::from("/notificaton/1"),
584 1337usize,
585 vec![1, 2, 3, 4],
586 Vec::new(),
587 false,
588 64,
589 64,
590 true,
591 );
592 let (config2, _service2) = NotificationConfig::new(
593 ProtocolName::from("/notificaton/2"),
594 1337usize,
595 vec![1, 2, 3, 4],
596 Vec::new(),
597 false,
598 64,
599 64,
600 true,
601 );
602 let (ping_config, _ping_event_stream) = ping::Config::default();
603
604 let config = ConfigBuilder::new()
605 .with_notification_protocol(config1)
606 .with_notification_protocol(config2)
607 .with_libp2p_ping(ping_config)
608 .build();
609
610 assert!(Litep2p::new(config).is_err());
611 }
612
613 #[tokio::test]
614 async fn dial_same_address_twice() {
615 let _ = tracing_subscriber::fmt()
616 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
617 .try_init();
618
619 let (config1, _service1) = NotificationConfig::new(
620 ProtocolName::from("/notificaton/1"),
621 1337usize,
622 vec![1, 2, 3, 4],
623 Vec::new(),
624 false,
625 64,
626 64,
627 true,
628 );
629 let (config2, _service2) = NotificationConfig::new(
630 ProtocolName::from("/notificaton/2"),
631 1337usize,
632 vec![1, 2, 3, 4],
633 Vec::new(),
634 false,
635 64,
636 64,
637 true,
638 );
639 let (ping_config, _ping_event_stream) = ping::Config::default();
640
641 let config = ConfigBuilder::new()
642 .with_tcp(Default::default())
643 .with_notification_protocol(config1)
644 .with_notification_protocol(config2)
645 .with_libp2p_ping(ping_config)
646 .build();
647
648 let peer = PeerId::random();
649 let address = Multiaddr::empty()
650 .with(Protocol::Ip4(Ipv4Addr::new(255, 254, 253, 252)))
651 .with(Protocol::Tcp(8888))
652 .with(Protocol::P2p(
653 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
654 ));
655
656 let mut litep2p = Litep2p::new(config).unwrap();
657 litep2p.dial_address(address.clone()).await.unwrap();
658 litep2p.dial_address(address.clone()).await.unwrap();
659
660 match litep2p.next_event().await {
661 Some(Litep2pEvent::DialFailure { .. }) => {}
662 _ => panic!("invalid event received"),
663 }
664
665 match tokio::time::timeout(std::time::Duration::from_secs(20), litep2p.next_event()).await {
667 Err(_) => {}
668 _ => panic!("invalid event received"),
669 }
670 }
671}