1#![allow(clippy::single_match)]
22#![allow(clippy::result_large_err)]
23#![allow(clippy::redundant_pattern_matching)]
24#![allow(clippy::type_complexity)]
25#![allow(clippy::result_unit_err)]
26#![allow(clippy::should_implement_trait)]
27#![allow(clippy::too_many_arguments)]
28#![allow(clippy::assign_op_pattern)]
29#![allow(clippy::match_like_matches_macro)]
30
31use crate::{
32 addresses::PublicAddresses,
33 config::Litep2pConfig,
34 error::DialError,
35 protocol::{
36 libp2p::{bitswap::Bitswap, identify::Identify, kademlia::Kademlia, ping::Ping},
37 mdns::Mdns,
38 notification::NotificationProtocol,
39 request_response::RequestResponseProtocol,
40 },
41 transport::{
42 manager::{SupportedTransport, TransportManager},
43 tcp::TcpTransport,
44 TransportBuilder, TransportEvent,
45 },
46};
47
48#[cfg(feature = "quic")]
49use crate::transport::quic::QuicTransport;
50#[cfg(feature = "webrtc")]
51use crate::transport::webrtc::WebRtcTransport;
52#[cfg(feature = "websocket")]
53use crate::transport::websocket::WebSocketTransport;
54
55use multiaddr::{Multiaddr, Protocol};
56use multihash::Multihash;
57use transport::Endpoint;
58use types::ConnectionId;
59
60use std::{collections::HashSet, sync::Arc};
61
62pub use bandwidth::BandwidthSink;
63pub use error::Error;
64pub use peer_id::PeerId;
65pub use types::protocol::ProtocolName;
66
67pub(crate) mod peer_id;
68
69pub mod addresses;
70pub mod codec;
71pub mod config;
72pub mod crypto;
73pub mod error;
74pub mod executor;
75pub mod protocol;
76pub mod substream;
77pub mod transport;
78pub mod types;
79pub mod yamux;
80
81mod bandwidth;
82mod multistream_select;
83
84#[cfg(test)]
85mod mock;
86
87pub type Result<T> = std::result::Result<T, error::Error>;
89
90const LOG_TARGET: &str = "litep2p";
92
93const DEFAULT_CHANNEL_SIZE: usize = 4096usize;
95
96#[derive(Debug)]
98pub enum Litep2pEvent {
99 ConnectionEstablished {
101 peer: PeerId,
103
104 endpoint: Endpoint,
106 },
107
108 ConnectionClosed {
110 peer: PeerId,
112
113 connection_id: ConnectionId,
115 },
116
117 DialFailure {
121 address: Multiaddr,
123
124 error: DialError,
126 },
127
128 ListDialFailures {
130 errors: Vec<(Multiaddr, DialError)>,
134 },
135}
136
137pub struct Litep2p {
139 local_peer_id: PeerId,
141
142 listen_addresses: Vec<Multiaddr>,
144
145 transport_manager: TransportManager,
147
148 bandwidth_sink: BandwidthSink,
150}
151
152impl Litep2p {
153 pub fn new(mut litep2p_config: Litep2pConfig) -> crate::Result<Litep2p> {
155 let local_peer_id = PeerId::from_public_key(&litep2p_config.keypair.public().into());
156 let bandwidth_sink = BandwidthSink::new();
157 let mut listen_addresses = vec![];
158
159 let supported_transports = Self::supported_transports(&litep2p_config);
160 let (mut transport_manager, transport_handle) = TransportManager::new(
161 litep2p_config.keypair.clone(),
162 supported_transports,
163 bandwidth_sink.clone(),
164 litep2p_config.max_parallel_dials,
165 litep2p_config.connection_limits,
166 );
167
168 if !litep2p_config.known_addresses.is_empty() {
170 for (peer, addresses) in litep2p_config.known_addresses {
171 transport_manager.add_known_address(peer, addresses.iter().cloned());
172 }
173 }
174
175 for (protocol, config) in litep2p_config.notification_protocols.into_iter() {
177 tracing::debug!(
178 target: LOG_TARGET,
179 ?protocol,
180 "enable notification protocol",
181 );
182
183 let service = transport_manager.register_protocol(
184 protocol,
185 config.fallback_names.clone(),
186 config.codec,
187 litep2p_config.keep_alive_timeout,
188 );
189 let executor = Arc::clone(&litep2p_config.executor);
190 litep2p_config.executor.run(Box::pin(async move {
191 NotificationProtocol::new(service, config, executor).run().await
192 }));
193 }
194
195 for (protocol, config) in litep2p_config.request_response_protocols.into_iter() {
197 tracing::debug!(
198 target: LOG_TARGET,
199 ?protocol,
200 "enable request-response protocol",
201 );
202
203 let service = transport_manager.register_protocol(
204 protocol,
205 config.fallback_names.clone(),
206 config.codec,
207 litep2p_config.keep_alive_timeout,
208 );
209 litep2p_config.executor.run(Box::pin(async move {
210 RequestResponseProtocol::new(service, config).run().await
211 }));
212 }
213
214 for (protocol_name, protocol) in litep2p_config.user_protocols.into_iter() {
216 tracing::debug!(target: LOG_TARGET, protocol = ?protocol_name, "enable user protocol");
217
218 let service = transport_manager.register_protocol(
219 protocol_name,
220 Vec::new(),
221 protocol.codec(),
222 litep2p_config.keep_alive_timeout,
223 );
224 litep2p_config.executor.run(Box::pin(async move {
225 let _ = protocol.run(service).await;
226 }));
227 }
228
229 if let Some(ping_config) = litep2p_config.ping.take() {
231 tracing::debug!(
232 target: LOG_TARGET,
233 protocol = ?ping_config.protocol,
234 "enable ipfs ping protocol",
235 );
236
237 let service = transport_manager.register_protocol(
238 ping_config.protocol.clone(),
239 Vec::new(),
240 ping_config.codec,
241 litep2p_config.keep_alive_timeout,
242 );
243 litep2p_config.executor.run(Box::pin(async move {
244 Ping::new(service, ping_config).run().await
245 }));
246 }
247
248 if let Some(kademlia_config) = litep2p_config.kademlia.take() {
250 tracing::debug!(
251 target: LOG_TARGET,
252 protocol_names = ?kademlia_config.protocol_names,
253 "enable ipfs kademlia protocol",
254 );
255
256 let main_protocol =
257 kademlia_config.protocol_names.first().expect("protocol name to exist");
258 let fallback_names = kademlia_config.protocol_names.iter().skip(1).cloned().collect();
259
260 let service = transport_manager.register_protocol(
261 main_protocol.clone(),
262 fallback_names,
263 kademlia_config.codec,
264 litep2p_config.keep_alive_timeout,
265 );
266 litep2p_config.executor.run(Box::pin(async move {
267 let _ = Kademlia::new(service, kademlia_config).run().await;
268 }));
269 }
270
271 let mut identify_info = match litep2p_config.identify.take() {
273 None => None,
274 Some(mut identify_config) => {
275 tracing::debug!(
276 target: LOG_TARGET,
277 protocol = ?identify_config.protocol,
278 "enable ipfs identify protocol",
279 );
280
281 let service = transport_manager.register_protocol(
282 identify_config.protocol.clone(),
283 Vec::new(),
284 identify_config.codec,
285 litep2p_config.keep_alive_timeout,
286 );
287 identify_config.public = Some(litep2p_config.keypair.public().into());
288
289 Some((service, identify_config))
290 }
291 };
292
293 if let Some(bitswap_config) = litep2p_config.bitswap.take() {
295 tracing::debug!(
296 target: LOG_TARGET,
297 protocol = ?bitswap_config.protocol,
298 "enable ipfs bitswap protocol",
299 );
300
301 let service = transport_manager.register_protocol(
302 bitswap_config.protocol.clone(),
303 Vec::new(),
304 bitswap_config.codec,
305 litep2p_config.keep_alive_timeout,
306 );
307 litep2p_config.executor.run(Box::pin(async move {
308 Bitswap::new(service, bitswap_config).run().await
309 }));
310 }
311
312 if let Some(config) = litep2p_config.tcp.take() {
314 let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor));
315 let (transport, transport_listen_addresses) =
316 <TcpTransport as TransportBuilder>::new(handle, config)?;
317
318 for address in transport_listen_addresses {
319 transport_manager.register_listen_address(address.clone());
320 listen_addresses.push(address.with(Protocol::P2p(
321 Multihash::from_bytes(&local_peer_id.to_bytes()).unwrap(),
322 )));
323 }
324
325 transport_manager.register_transport(SupportedTransport::Tcp, Box::new(transport));
326 }
327
328 #[cfg(feature = "quic")]
330 if let Some(config) = litep2p_config.quic.take() {
331 let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor));
332 let (transport, transport_listen_addresses) =
333 <QuicTransport as TransportBuilder>::new(handle, config)?;
334
335 for address in transport_listen_addresses {
336 transport_manager.register_listen_address(address.clone());
337 listen_addresses.push(address.with(Protocol::P2p(
338 Multihash::from_bytes(&local_peer_id.to_bytes()).unwrap(),
339 )));
340 }
341
342 transport_manager.register_transport(SupportedTransport::Quic, Box::new(transport));
343 }
344
345 #[cfg(feature = "webrtc")]
347 if let Some(config) = litep2p_config.webrtc.take() {
348 let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor));
349 let (transport, transport_listen_addresses) =
350 <WebRtcTransport as TransportBuilder>::new(handle, config)?;
351
352 for address in transport_listen_addresses {
353 transport_manager.register_listen_address(address.clone());
354 listen_addresses.push(address.with(Protocol::P2p(
355 Multihash::from_bytes(&local_peer_id.to_bytes()).unwrap(),
356 )));
357 }
358
359 transport_manager.register_transport(SupportedTransport::WebRtc, Box::new(transport));
360 }
361
362 #[cfg(feature = "websocket")]
364 if let Some(config) = litep2p_config.websocket.take() {
365 let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor));
366 let (transport, transport_listen_addresses) =
367 <WebSocketTransport as TransportBuilder>::new(handle, config)?;
368
369 for address in transport_listen_addresses {
370 transport_manager.register_listen_address(address.clone());
371 listen_addresses.push(address.with(Protocol::P2p(
372 Multihash::from_bytes(&local_peer_id.to_bytes()).unwrap(),
373 )));
374 }
375
376 transport_manager
377 .register_transport(SupportedTransport::WebSocket, Box::new(transport));
378 }
379
380 if let Some(config) = litep2p_config.mdns.take() {
382 let mdns = Mdns::new(transport_handle, config, listen_addresses.clone())?;
383
384 litep2p_config.executor.run(Box::pin(async move {
385 let _ = mdns.start().await;
386 }));
387 }
388
389 if let Some((service, mut identify_config)) = identify_info.take() {
391 identify_config.protocols = transport_manager.protocols().cloned().collect();
392 let identify = Identify::new(service, identify_config);
393
394 litep2p_config.executor.run(Box::pin(async move {
395 let _ = identify.run().await;
396 }));
397 }
398
399 if transport_manager.installed_transports().count() == 0 {
400 return Err(Error::Other("No transport specified".to_string()));
401 }
402
403 if listen_addresses.is_empty() {
405 tracing::warn!(
406 target: LOG_TARGET,
407 "litep2p started with no listen addresses, cannot accept inbound connections",
408 );
409 }
410
411 Ok(Self {
412 local_peer_id,
413 bandwidth_sink,
414 listen_addresses,
415 transport_manager,
416 })
417 }
418
419 fn supported_transports(config: &Litep2pConfig) -> HashSet<SupportedTransport> {
425 let mut supported_transports = HashSet::new();
426
427 config
428 .tcp
429 .is_some()
430 .then(|| supported_transports.insert(SupportedTransport::Tcp));
431 #[cfg(feature = "quic")]
432 config
433 .quic
434 .is_some()
435 .then(|| supported_transports.insert(SupportedTransport::Quic));
436 #[cfg(feature = "websocket")]
437 config
438 .websocket
439 .is_some()
440 .then(|| supported_transports.insert(SupportedTransport::WebSocket));
441 #[cfg(feature = "webrtc")]
442 config
443 .webrtc
444 .is_some()
445 .then(|| supported_transports.insert(SupportedTransport::WebRtc));
446
447 supported_transports
448 }
449
450 pub fn local_peer_id(&self) -> &PeerId {
452 &self.local_peer_id
453 }
454
455 pub fn public_addresses(&self) -> PublicAddresses {
457 self.transport_manager.public_addresses()
458 }
459
460 pub fn listen_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
462 self.listen_addresses.iter()
463 }
464
465 pub fn bandwidth_sink(&self) -> BandwidthSink {
467 self.bandwidth_sink.clone()
468 }
469
470 pub async fn dial(&mut self, peer: &PeerId) -> crate::Result<()> {
472 self.transport_manager.dial(*peer).await
473 }
474
475 pub async fn dial_address(&mut self, address: Multiaddr) -> crate::Result<()> {
477 self.transport_manager.dial_address(address).await
478 }
479
480 pub fn add_known_address(
485 &mut self,
486 peer: PeerId,
487 address: impl Iterator<Item = Multiaddr>,
488 ) -> usize {
489 self.transport_manager.add_known_address(peer, address)
490 }
491
492 pub async fn next_event(&mut self) -> Option<Litep2pEvent> {
496 loop {
497 match self.transport_manager.next().await? {
498 TransportEvent::ConnectionEstablished { peer, endpoint, .. } =>
499 return Some(Litep2pEvent::ConnectionEstablished { peer, endpoint }),
500 TransportEvent::ConnectionClosed {
501 peer,
502 connection_id,
503 } =>
504 return Some(Litep2pEvent::ConnectionClosed {
505 peer,
506 connection_id,
507 }),
508 TransportEvent::DialFailure { address, error, .. } =>
509 return Some(Litep2pEvent::DialFailure { address, error }),
510
511 TransportEvent::OpenFailure { errors, .. } => {
512 return Some(Litep2pEvent::ListDialFailures { errors });
513 }
514 _ => {}
515 }
516 }
517 }
518}
519
520#[cfg(test)]
521mod tests {
522 use crate::{
523 config::ConfigBuilder,
524 protocol::{libp2p::ping, notification::Config as NotificationConfig},
525 types::protocol::ProtocolName,
526 Litep2p, Litep2pEvent, PeerId,
527 };
528 use multiaddr::{Multiaddr, Protocol};
529 use multihash::Multihash;
530 use std::net::Ipv4Addr;
531
532 #[tokio::test]
533 async fn initialize_litep2p() {
534 let _ = tracing_subscriber::fmt()
535 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
536 .try_init();
537
538 let (config1, _service1) = NotificationConfig::new(
539 ProtocolName::from("/notificaton/1"),
540 1337usize,
541 vec![1, 2, 3, 4],
542 Vec::new(),
543 false,
544 64,
545 64,
546 true,
547 );
548 let (config2, _service2) = NotificationConfig::new(
549 ProtocolName::from("/notificaton/2"),
550 1337usize,
551 vec![1, 2, 3, 4],
552 Vec::new(),
553 false,
554 64,
555 64,
556 true,
557 );
558 let (ping_config, _ping_event_stream) = ping::Config::default();
559
560 let config = ConfigBuilder::new()
561 .with_tcp(Default::default())
562 .with_notification_protocol(config1)
563 .with_notification_protocol(config2)
564 .with_libp2p_ping(ping_config)
565 .build();
566
567 let _litep2p = Litep2p::new(config).unwrap();
568 }
569
570 #[tokio::test]
571 async fn no_transport_given() {
572 let _ = tracing_subscriber::fmt()
573 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
574 .try_init();
575
576 let (config1, _service1) = NotificationConfig::new(
577 ProtocolName::from("/notificaton/1"),
578 1337usize,
579 vec![1, 2, 3, 4],
580 Vec::new(),
581 false,
582 64,
583 64,
584 true,
585 );
586 let (config2, _service2) = NotificationConfig::new(
587 ProtocolName::from("/notificaton/2"),
588 1337usize,
589 vec![1, 2, 3, 4],
590 Vec::new(),
591 false,
592 64,
593 64,
594 true,
595 );
596 let (ping_config, _ping_event_stream) = ping::Config::default();
597
598 let config = ConfigBuilder::new()
599 .with_notification_protocol(config1)
600 .with_notification_protocol(config2)
601 .with_libp2p_ping(ping_config)
602 .build();
603
604 assert!(Litep2p::new(config).is_err());
605 }
606
607 #[tokio::test]
608 async fn dial_same_address_twice() {
609 let _ = tracing_subscriber::fmt()
610 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
611 .try_init();
612
613 let (config1, _service1) = NotificationConfig::new(
614 ProtocolName::from("/notificaton/1"),
615 1337usize,
616 vec![1, 2, 3, 4],
617 Vec::new(),
618 false,
619 64,
620 64,
621 true,
622 );
623 let (config2, _service2) = NotificationConfig::new(
624 ProtocolName::from("/notificaton/2"),
625 1337usize,
626 vec![1, 2, 3, 4],
627 Vec::new(),
628 false,
629 64,
630 64,
631 true,
632 );
633 let (ping_config, _ping_event_stream) = ping::Config::default();
634
635 let config = ConfigBuilder::new()
636 .with_tcp(Default::default())
637 .with_notification_protocol(config1)
638 .with_notification_protocol(config2)
639 .with_libp2p_ping(ping_config)
640 .build();
641
642 let peer = PeerId::random();
643 let address = Multiaddr::empty()
644 .with(Protocol::Ip4(Ipv4Addr::new(255, 254, 253, 252)))
645 .with(Protocol::Tcp(8888))
646 .with(Protocol::P2p(
647 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
648 ));
649
650 let mut litep2p = Litep2p::new(config).unwrap();
651 litep2p.dial_address(address.clone()).await.unwrap();
652 litep2p.dial_address(address.clone()).await.unwrap();
653
654 match litep2p.next_event().await {
655 Some(Litep2pEvent::DialFailure { .. }) => {}
656 _ => panic!("invalid event received"),
657 }
658
659 match tokio::time::timeout(std::time::Duration::from_secs(20), litep2p.next_event()).await {
661 Err(_) => {}
662 _ => panic!("invalid event received"),
663 }
664 }
665}