1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
59
60#[cfg(feature = "async-std")]
61pub mod async_std {
62 use async_std_resolver::AsyncStdResolver;
63 use parking_lot::Mutex;
64 use std::{io, sync::Arc};
65 use trust_dns_resolver::{
66 config::{ResolverConfig, ResolverOpts},
67 system_conf,
68 };
69
70 pub type Transport<T> = crate::Transport<T, AsyncStdResolver>;
73
74 impl<T> Transport<T> {
75 pub async fn system(inner: T) -> Result<Transport<T>, io::Error> {
77 let (cfg, opts) = system_conf::read_system_conf()?;
78 Self::custom(inner, cfg, opts).await
79 }
80
81 pub async fn custom(
83 inner: T,
84 cfg: ResolverConfig,
85 opts: ResolverOpts,
86 ) -> Result<Transport<T>, io::Error> {
87 Ok(Transport {
88 inner: Arc::new(Mutex::new(inner)),
89 resolver: async_std_resolver::resolver(cfg, opts).await,
90 })
91 }
92 }
93}
94
95#[cfg(feature = "async-std")]
96#[deprecated(note = "Use `async_std::Transport` instead.")]
97pub type DnsConfig<T> = async_std::Transport<T>;
98
99#[cfg(feature = "tokio")]
100pub mod tokio {
101 use parking_lot::Mutex;
102 use std::sync::Arc;
103 use trust_dns_resolver::{system_conf, TokioAsyncResolver};
104
105 pub type Transport<T> = crate::Transport<T, TokioAsyncResolver>;
108
109 impl<T> Transport<T> {
110 pub fn system(inner: T) -> Result<crate::Transport<T, TokioAsyncResolver>, std::io::Error> {
112 let (cfg, opts) = system_conf::read_system_conf()?;
113 Self::custom(inner, cfg, opts)
114 }
115
116 pub fn custom(
119 inner: T,
120 cfg: trust_dns_resolver::config::ResolverConfig,
121 opts: trust_dns_resolver::config::ResolverOpts,
122 ) -> Result<crate::Transport<T, TokioAsyncResolver>, std::io::Error> {
123 Ok(Transport {
125 inner: Arc::new(Mutex::new(inner)),
126 resolver: TokioAsyncResolver::tokio(cfg, opts),
127 })
128 }
129 }
130}
131
132#[cfg(feature = "tokio")]
133#[deprecated(note = "Use `tokio::Transport` instead.")]
134pub type TokioDnsConfig<T> = tokio::Transport<T>;
135
136use async_trait::async_trait;
137use futures::{future::BoxFuture, prelude::*};
138use libp2p_core::{
139 connection::Endpoint,
140 multiaddr::{Multiaddr, Protocol},
141 transport::{ListenerId, TransportError, TransportEvent},
142};
143use parking_lot::Mutex;
144use smallvec::SmallVec;
145use std::io;
146use std::net::{Ipv4Addr, Ipv6Addr};
147use std::{
148 convert::TryFrom,
149 error, fmt, iter,
150 ops::DerefMut,
151 pin::Pin,
152 str,
153 sync::Arc,
154 task::{Context, Poll},
155};
156
157pub use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
158pub use trust_dns_resolver::error::{ResolveError, ResolveErrorKind};
159use trust_dns_resolver::lookup::{Ipv4Lookup, Ipv6Lookup, TxtLookup};
160use trust_dns_resolver::lookup_ip::LookupIp;
161use trust_dns_resolver::name_server::ConnectionProvider;
162use trust_dns_resolver::AsyncResolver;
163
164const DNSADDR_PREFIX: &str = "_dnsaddr.";
166
167const MAX_DIAL_ATTEMPTS: usize = 16;
169
170const MAX_DNS_LOOKUPS: usize = 32;
176
177const MAX_TXT_RECORDS: usize = 16;
181
182#[derive(Debug)]
185pub struct Transport<T, R> {
186 inner: Arc<Mutex<T>>,
188 resolver: R,
190}
191
192#[deprecated(note = "Use `async_std::Transport` or `tokio::Transport` instead.")]
193pub type GenDnsConfig<T, R> = Transport<T, R>;
194
195impl<T, R> libp2p_core::Transport for Transport<T, R>
196where
197 T: libp2p_core::Transport + Send + Unpin + 'static,
198 T::Error: Send,
199 T::Dial: Send,
200 R: Clone + Send + Sync + Resolver + 'static,
201{
202 type Output = T::Output;
203 type Error = Error<T::Error>;
204 type ListenerUpgrade = future::MapErr<T::ListenerUpgrade, fn(T::Error) -> Self::Error>;
205 type Dial = future::Either<
206 future::MapErr<T::Dial, fn(T::Error) -> Self::Error>,
207 BoxFuture<'static, Result<Self::Output, Self::Error>>,
208 >;
209
210 fn listen_on(
211 &mut self,
212 id: ListenerId,
213 addr: Multiaddr,
214 ) -> Result<(), TransportError<Self::Error>> {
215 self.inner
216 .lock()
217 .listen_on(id, addr)
218 .map_err(|e| e.map(Error::Transport))
219 }
220
221 fn remove_listener(&mut self, id: ListenerId) -> bool {
222 self.inner.lock().remove_listener(id)
223 }
224
225 fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
226 self.do_dial(addr, Endpoint::Dialer)
227 }
228
229 fn dial_as_listener(
230 &mut self,
231 addr: Multiaddr,
232 ) -> Result<Self::Dial, TransportError<Self::Error>> {
233 self.do_dial(addr, Endpoint::Listener)
234 }
235
236 fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
237 self.inner.lock().address_translation(server, observed)
238 }
239
240 fn poll(
241 self: Pin<&mut Self>,
242 cx: &mut Context<'_>,
243 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
244 let mut inner = self.inner.lock();
245 libp2p_core::Transport::poll(Pin::new(inner.deref_mut()), cx).map(|event| {
246 event
247 .map_upgrade(|upgr| upgr.map_err::<_, fn(_) -> _>(Error::Transport))
248 .map_err(Error::Transport)
249 })
250 }
251}
252
253impl<T, R> Transport<T, R>
254where
255 T: libp2p_core::Transport + Send + Unpin + 'static,
256 T::Error: Send,
257 T::Dial: Send,
258 R: Clone + Send + Sync + Resolver + 'static,
259{
260 fn do_dial(
261 &mut self,
262 addr: Multiaddr,
263 role_override: Endpoint,
264 ) -> Result<
265 <Self as libp2p_core::Transport>::Dial,
266 TransportError<<Self as libp2p_core::Transport>::Error>,
267 > {
268 let resolver = self.resolver.clone();
269 let inner = self.inner.clone();
270
271 Ok(async move {
274 let mut last_err = None;
275 let mut dns_lookups = 0;
276 let mut dial_attempts = 0;
277 let mut unresolved = SmallVec::<[Multiaddr; 1]>::new();
280 unresolved.push(addr.clone());
281
282 while let Some(addr) = unresolved.pop() {
286 if let Some((i, name)) = addr.iter().enumerate().find(|(_, p)| {
287 matches!(
288 p,
289 Protocol::Dns(_)
290 | Protocol::Dns4(_)
291 | Protocol::Dns6(_)
292 | Protocol::Dnsaddr(_)
293 )
294 }) {
295 if dns_lookups == MAX_DNS_LOOKUPS {
296 log::debug!("Too many DNS lookups. Dropping unresolved {}.", addr);
297 last_err = Some(Error::TooManyLookups);
298 continue;
301 }
302 dns_lookups += 1;
303 match resolve(&name, &resolver).await {
304 Err(e) => {
305 if unresolved.is_empty() {
306 return Err(e);
307 }
308 last_err = Some(e);
311 }
312 Ok(Resolved::One(ip)) => {
313 log::trace!("Resolved {} -> {}", name, ip);
314 let addr = addr.replace(i, |_| Some(ip)).expect("`i` is a valid index");
315 unresolved.push(addr);
316 }
317 Ok(Resolved::Many(ips)) => {
318 for ip in ips {
319 log::trace!("Resolved {} -> {}", name, ip);
320 let addr =
321 addr.replace(i, |_| Some(ip)).expect("`i` is a valid index");
322 unresolved.push(addr);
323 }
324 }
325 Ok(Resolved::Addrs(addrs)) => {
326 let suffix = addr.iter().skip(i + 1).collect::<Multiaddr>();
327 let prefix = addr.iter().take(i).collect::<Multiaddr>();
328 let mut n = 0;
329 for a in addrs {
330 if a.ends_with(&suffix) {
331 if n < MAX_TXT_RECORDS {
332 n += 1;
333 log::trace!("Resolved {} -> {}", name, a);
334 let addr =
335 prefix.iter().chain(a.iter()).collect::<Multiaddr>();
336 unresolved.push(addr);
337 } else {
338 log::debug!(
339 "Too many TXT records. Dropping resolved {}.",
340 a
341 );
342 }
343 }
344 }
345 }
346 }
347 } else {
348 log::debug!("Dialing {}", addr);
350
351 let transport = inner.clone();
352 let dial = match role_override {
353 Endpoint::Dialer => transport.lock().dial(addr),
354 Endpoint::Listener => transport.lock().dial_as_listener(addr),
355 };
356 let result = match dial {
357 Ok(out) => {
358 dial_attempts += 1;
362 out.await.map_err(Error::Transport)
363 }
364 Err(TransportError::MultiaddrNotSupported(a)) => {
365 Err(Error::MultiaddrNotSupported(a))
366 }
367 Err(TransportError::Other(err)) => Err(Error::Transport(err)),
368 };
369
370 match result {
371 Ok(out) => return Ok(out),
372 Err(err) => {
373 log::debug!("Dial error: {:?}.", err);
374 if unresolved.is_empty() {
375 return Err(err);
376 }
377 if dial_attempts == MAX_DIAL_ATTEMPTS {
378 log::debug!(
379 "Aborting dialing after {} attempts.",
380 MAX_DIAL_ATTEMPTS
381 );
382 return Err(err);
383 }
384 last_err = Some(err);
385 }
386 }
387 }
388 }
389
390 Err(last_err.unwrap_or_else(|| {
395 Error::ResolveError(ResolveErrorKind::Message("No matching records found.").into())
396 }))
397 }
398 .boxed()
399 .right_future())
400 }
401}
402
403#[derive(Debug)]
405#[allow(clippy::large_enum_variant)]
406pub enum Error<TErr> {
407 Transport(TErr),
409 ResolveError(ResolveError),
411 MultiaddrNotSupported(Multiaddr),
413 TooManyLookups,
420}
421
422#[deprecated(note = "Use `Error` instead.")]
423pub type DnsErr<TErr> = Error<TErr>;
424
425impl<TErr> fmt::Display for Error<TErr>
426where
427 TErr: fmt::Display,
428{
429 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
430 match self {
431 Error::Transport(err) => write!(f, "{err}"),
432 Error::ResolveError(err) => write!(f, "{err}"),
433 Error::MultiaddrNotSupported(a) => write!(f, "Unsupported resolved address: {a}"),
434 Error::TooManyLookups => write!(f, "Too many DNS lookups"),
435 }
436 }
437}
438
439impl<TErr> error::Error for Error<TErr>
440where
441 TErr: error::Error + 'static,
442{
443 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
444 match self {
445 Error::Transport(err) => Some(err),
446 Error::ResolveError(err) => Some(err),
447 Error::MultiaddrNotSupported(_) => None,
448 Error::TooManyLookups => None,
449 }
450 }
451}
452
453enum Resolved<'a> {
455 One(Protocol<'a>),
459 Many(Vec<Protocol<'a>>),
462 Addrs(Vec<Multiaddr>),
466}
467
468fn resolve<'a, E: 'a + Send, R: Resolver>(
472 proto: &Protocol<'a>,
473 resolver: &'a R,
474) -> BoxFuture<'a, Result<Resolved<'a>, Error<E>>> {
475 match proto {
476 Protocol::Dns(ref name) => resolver
477 .lookup_ip(name.clone().into_owned())
478 .map(move |res| match res {
479 Ok(ips) => {
480 let mut ips = ips.into_iter();
481 let one = ips
482 .next()
483 .expect("If there are no results, `Err(NoRecordsFound)` is expected.");
484 if let Some(two) = ips.next() {
485 Ok(Resolved::Many(
486 iter::once(one)
487 .chain(iter::once(two))
488 .chain(ips)
489 .map(Protocol::from)
490 .collect(),
491 ))
492 } else {
493 Ok(Resolved::One(Protocol::from(one)))
494 }
495 }
496 Err(e) => Err(Error::ResolveError(e)),
497 })
498 .boxed(),
499 Protocol::Dns4(ref name) => resolver
500 .ipv4_lookup(name.clone().into_owned())
501 .map(move |res| match res {
502 Ok(ips) => {
503 let mut ips = ips.into_iter();
504 let one = ips
505 .next()
506 .expect("If there are no results, `Err(NoRecordsFound)` is expected.");
507 if let Some(two) = ips.next() {
508 Ok(Resolved::Many(
509 iter::once(one)
510 .chain(iter::once(two))
511 .chain(ips)
512 .map(Ipv4Addr::from)
513 .map(Protocol::from)
514 .collect(),
515 ))
516 } else {
517 Ok(Resolved::One(Protocol::from(Ipv4Addr::from(one))))
518 }
519 }
520 Err(e) => Err(Error::ResolveError(e)),
521 })
522 .boxed(),
523 Protocol::Dns6(ref name) => resolver
524 .ipv6_lookup(name.clone().into_owned())
525 .map(move |res| match res {
526 Ok(ips) => {
527 let mut ips = ips.into_iter();
528 let one = ips
529 .next()
530 .expect("If there are no results, `Err(NoRecordsFound)` is expected.");
531 if let Some(two) = ips.next() {
532 Ok(Resolved::Many(
533 iter::once(one)
534 .chain(iter::once(two))
535 .chain(ips)
536 .map(Ipv6Addr::from)
537 .map(Protocol::from)
538 .collect(),
539 ))
540 } else {
541 Ok(Resolved::One(Protocol::from(Ipv6Addr::from(one))))
542 }
543 }
544 Err(e) => Err(Error::ResolveError(e)),
545 })
546 .boxed(),
547 Protocol::Dnsaddr(ref name) => {
548 let name = [DNSADDR_PREFIX, name].concat();
549 resolver
550 .txt_lookup(name)
551 .map(move |res| match res {
552 Ok(txts) => {
553 let mut addrs = Vec::new();
554 for txt in txts {
555 if let Some(chars) = txt.txt_data().first() {
556 match parse_dnsaddr_txt(chars) {
557 Err(e) => {
558 log::debug!("Invalid TXT record: {:?}", e);
560 }
561 Ok(a) => {
562 addrs.push(a);
563 }
564 }
565 }
566 }
567 Ok(Resolved::Addrs(addrs))
568 }
569 Err(e) => Err(Error::ResolveError(e)),
570 })
571 .boxed()
572 }
573 proto => future::ready(Ok(Resolved::One(proto.clone()))).boxed(),
574 }
575}
576
577fn parse_dnsaddr_txt(txt: &[u8]) -> io::Result<Multiaddr> {
579 let s = str::from_utf8(txt).map_err(invalid_data)?;
580 match s.strip_prefix("dnsaddr=") {
581 None => Err(invalid_data("Missing `dnsaddr=` prefix.")),
582 Some(a) => Ok(Multiaddr::try_from(a).map_err(invalid_data)?),
583 }
584}
585
586fn invalid_data(e: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> io::Error {
587 io::Error::new(io::ErrorKind::InvalidData, e)
588}
589
590#[async_trait::async_trait]
591#[doc(hidden)]
592pub trait Resolver {
593 async fn lookup_ip(&self, name: String) -> Result<LookupIp, ResolveError>;
594 async fn ipv4_lookup(&self, name: String) -> Result<Ipv4Lookup, ResolveError>;
595 async fn ipv6_lookup(&self, name: String) -> Result<Ipv6Lookup, ResolveError>;
596 async fn txt_lookup(&self, name: String) -> Result<TxtLookup, ResolveError>;
597}
598
599#[async_trait]
600impl<C> Resolver for AsyncResolver<C>
601where
602 C: ConnectionProvider,
603{
604 async fn lookup_ip(&self, name: String) -> Result<LookupIp, ResolveError> {
605 self.lookup_ip(name).await
606 }
607
608 async fn ipv4_lookup(&self, name: String) -> Result<Ipv4Lookup, ResolveError> {
609 self.ipv4_lookup(name).await
610 }
611
612 async fn ipv6_lookup(&self, name: String) -> Result<Ipv6Lookup, ResolveError> {
613 self.ipv6_lookup(name).await
614 }
615
616 async fn txt_lookup(&self, name: String) -> Result<TxtLookup, ResolveError> {
617 self.txt_lookup(name).await
618 }
619}
620
621#[cfg(all(test, any(feature = "tokio", feature = "async-std")))]
622mod tests {
623 use super::*;
624 use futures::future::BoxFuture;
625 use libp2p_core::{
626 multiaddr::{Multiaddr, Protocol},
627 transport::{TransportError, TransportEvent},
628 Transport,
629 };
630 use libp2p_identity::PeerId;
631
632 #[test]
633 fn basic_resolve() {
634 let _ = env_logger::try_init();
635
636 #[derive(Clone)]
637 struct CustomTransport;
638
639 impl Transport for CustomTransport {
640 type Output = ();
641 type Error = std::io::Error;
642 type ListenerUpgrade = BoxFuture<'static, Result<Self::Output, Self::Error>>;
643 type Dial = BoxFuture<'static, Result<Self::Output, Self::Error>>;
644
645 fn listen_on(
646 &mut self,
647 _: ListenerId,
648 _: Multiaddr,
649 ) -> Result<(), TransportError<Self::Error>> {
650 unreachable!()
651 }
652
653 fn remove_listener(&mut self, _: ListenerId) -> bool {
654 false
655 }
656
657 fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
658 assert!(!addr.iter().any(|p| matches!(
660 p,
661 Protocol::Dns(_) | Protocol::Dns4(_) | Protocol::Dns6(_) | Protocol::Dnsaddr(_)
662 )));
663 Ok(Box::pin(future::ready(Ok(()))))
664 }
665
666 fn dial_as_listener(
667 &mut self,
668 addr: Multiaddr,
669 ) -> Result<Self::Dial, TransportError<Self::Error>> {
670 self.dial(addr)
671 }
672
673 fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option<Multiaddr> {
674 None
675 }
676
677 fn poll(
678 self: Pin<&mut Self>,
679 _: &mut Context<'_>,
680 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
681 unreachable!()
682 }
683 }
684
685 async fn run<T, R>(mut transport: super::Transport<T, R>)
686 where
687 T: Transport + Clone + Send + Unpin + 'static,
688 T::Error: Send,
689 T::Dial: Send,
690 R: Clone + Send + Sync + Resolver + 'static,
691 {
692 let _ = transport
694 .dial("/dns4/example.com/tcp/20000".parse().unwrap())
695 .unwrap()
696 .await
697 .unwrap();
698
699 let _ = transport
701 .dial("/dns6/example.com/tcp/20000".parse().unwrap())
702 .unwrap()
703 .await
704 .unwrap();
705
706 let _ = transport
708 .dial("/ip4/1.2.3.4/tcp/20000".parse().unwrap())
709 .unwrap()
710 .await
711 .unwrap();
712
713 let _ = transport
715 .dial("/dnsaddr/bootstrap.libp2p.io".parse().unwrap())
716 .unwrap()
717 .await
718 .unwrap();
719
720 let _ = transport
724 .dial("/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN".parse().unwrap())
725 .unwrap()
726 .await
727 .unwrap();
728
729 match transport
732 .dial(
733 format!("/dnsaddr/bootstrap.libp2p.io/p2p/{}", PeerId::random())
734 .parse()
735 .unwrap(),
736 )
737 .unwrap()
738 .await
739 {
740 Err(Error::ResolveError(_)) => {}
741 Err(e) => panic!("Unexpected error: {e:?}"),
742 Ok(_) => panic!("Unexpected success."),
743 }
744
745 match transport
747 .dial("/dns4/example.invalid/tcp/20000".parse().unwrap())
748 .unwrap()
749 .await
750 {
751 Err(Error::ResolveError(e)) => match e.kind() {
752 ResolveErrorKind::NoRecordsFound { .. } => {}
753 _ => panic!("Unexpected DNS error: {e:?}"),
754 },
755 Err(e) => panic!("Unexpected error: {e:?}"),
756 Ok(_) => panic!("Unexpected success."),
757 }
758 }
759
760 #[cfg(feature = "async-std")]
761 {
762 let config = ResolverConfig::quad9();
765 let opts = ResolverOpts::default();
766 async_std_crate::task::block_on(
767 async_std::Transport::custom(CustomTransport, config, opts)
768 .then(|dns| run(dns.unwrap())),
769 );
770 }
771
772 #[cfg(feature = "tokio")]
773 {
774 let config = ResolverConfig::quad9();
777 let opts = ResolverOpts::default();
778 let rt = tokio_crate::runtime::Builder::new_current_thread()
779 .enable_io()
780 .enable_time()
781 .build()
782 .unwrap();
783
784 rt.block_on(run(
785 tokio::Transport::custom(CustomTransport, config, opts).unwrap()
786 ));
787 }
788 }
789}