1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
30
31mod provider;
32
33#[cfg(feature = "async-io")]
34pub use provider::async_io;
35
36#[cfg(feature = "tokio")]
37pub use provider::tokio;
38
39use futures::{
40 future::{self, Ready},
41 prelude::*,
42 stream::SelectAll,
43};
44use futures_timer::Delay;
45use if_watch::IfEvent;
46use libp2p_core::{
47 address_translation,
48 multiaddr::{Multiaddr, Protocol},
49 transport::{ListenerId, TransportError, TransportEvent},
50};
51use provider::{Incoming, Provider};
52use socket2::{Domain, Socket, Type};
53use std::{
54 collections::{HashSet, VecDeque},
55 io,
56 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, TcpListener},
57 pin::Pin,
58 sync::{Arc, RwLock},
59 task::{Context, Poll, Waker},
60 time::Duration,
61};
62
63#[derive(Clone, Debug)]
65pub struct Config {
66 ttl: Option<u32>,
68 nodelay: Option<bool>,
70 backlog: u32,
72 enable_port_reuse: bool,
74}
75
76type Port = u16;
77
78#[derive(Debug, Clone)]
80enum PortReuse {
81 Disabled,
84 Enabled {
89 listen_addrs: Arc<RwLock<HashSet<(IpAddr, Port)>>>,
92 },
93}
94
95impl PortReuse {
96 fn register(&mut self, ip: IpAddr, port: Port) {
100 if let PortReuse::Enabled { listen_addrs } = self {
101 log::trace!("Registering for port reuse: {}:{}", ip, port);
102 listen_addrs
103 .write()
104 .expect("`register()` and `unregister()` never panic while holding the lock")
105 .insert((ip, port));
106 }
107 }
108
109 fn unregister(&mut self, ip: IpAddr, port: Port) {
113 if let PortReuse::Enabled { listen_addrs } = self {
114 log::trace!("Unregistering for port reuse: {}:{}", ip, port);
115 listen_addrs
116 .write()
117 .expect("`register()` and `unregister()` never panic while holding the lock")
118 .remove(&(ip, port));
119 }
120 }
121
122 fn local_dial_addr(&self, remote_ip: &IpAddr) -> Option<SocketAddr> {
132 if let PortReuse::Enabled { listen_addrs } = self {
133 for (ip, port) in listen_addrs
134 .read()
135 .expect("`local_dial_addr` never panic while holding the lock")
136 .iter()
137 {
138 if ip.is_ipv4() == remote_ip.is_ipv4()
139 && ip.is_loopback() == remote_ip.is_loopback()
140 {
141 if remote_ip.is_ipv4() {
142 return Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), *port));
143 } else {
144 return Some(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), *port));
145 }
146 }
147 }
148 }
149
150 None
151 }
152}
153
154impl Config {
155 pub fn new() -> Self {
166 Self {
167 ttl: None,
168 nodelay: None,
169 backlog: 1024,
170 enable_port_reuse: false,
171 }
172 }
173
174 pub fn ttl(mut self, value: u32) -> Self {
176 self.ttl = Some(value);
177 self
178 }
179
180 pub fn nodelay(mut self, value: bool) -> Self {
182 self.nodelay = Some(value);
183 self
184 }
185
186 pub fn listen_backlog(mut self, backlog: u32) -> Self {
188 self.backlog = backlog;
189 self
190 }
191
192 pub fn port_reuse(mut self, port_reuse: bool) -> Self {
288 self.enable_port_reuse = port_reuse;
289 self
290 }
291}
292
293impl Default for Config {
294 fn default() -> Self {
295 Self::new()
296 }
297}
298
299pub struct Transport<T>
306where
307 T: Provider + Send,
308{
309 config: Config,
310
311 port_reuse: PortReuse,
313 listeners: SelectAll<ListenStream<T>>,
317 pending_events:
319 VecDeque<TransportEvent<<Self as libp2p_core::Transport>::ListenerUpgrade, io::Error>>,
320}
321
322impl<T> Transport<T>
323where
324 T: Provider + Send,
325{
326 pub fn new(config: Config) -> Self {
335 let port_reuse = if config.enable_port_reuse {
336 PortReuse::Enabled {
337 listen_addrs: Arc::new(RwLock::new(HashSet::new())),
338 }
339 } else {
340 PortReuse::Disabled
341 };
342 Transport {
343 config,
344 port_reuse,
345 ..Default::default()
346 }
347 }
348
349 fn create_socket(&self, socket_addr: SocketAddr) -> io::Result<Socket> {
350 let socket = Socket::new(
351 Domain::for_address(socket_addr),
352 Type::STREAM,
353 Some(socket2::Protocol::TCP),
354 )?;
355 if socket_addr.is_ipv6() {
356 socket.set_only_v6(true)?;
357 }
358 if let Some(ttl) = self.config.ttl {
359 socket.set_ttl(ttl)?;
360 }
361 if let Some(nodelay) = self.config.nodelay {
362 socket.set_nodelay(nodelay)?;
363 }
364 socket.set_reuse_address(true)?;
365 #[cfg(unix)]
366 if let PortReuse::Enabled { .. } = &self.port_reuse {
367 socket.set_reuse_port(true)?;
368 }
369 Ok(socket)
370 }
371
372 fn do_listen(
373 &mut self,
374 id: ListenerId,
375 socket_addr: SocketAddr,
376 ) -> io::Result<ListenStream<T>> {
377 let socket = self.create_socket(socket_addr)?;
378 socket.bind(&socket_addr.into())?;
379 socket.listen(self.config.backlog as _)?;
380 socket.set_nonblocking(true)?;
381 let listener: TcpListener = socket.into();
382 let local_addr = listener.local_addr()?;
383
384 if local_addr.ip().is_unspecified() {
385 return ListenStream::<T>::new(
386 id,
387 listener,
388 Some(T::new_if_watcher()?),
389 self.port_reuse.clone(),
390 );
391 }
392
393 self.port_reuse.register(local_addr.ip(), local_addr.port());
394 let listen_addr = ip_to_multiaddr(local_addr.ip(), local_addr.port());
395 self.pending_events.push_back(TransportEvent::NewAddress {
396 listener_id: id,
397 listen_addr,
398 });
399 ListenStream::<T>::new(id, listener, None, self.port_reuse.clone())
400 }
401}
402
403impl<T> Default for Transport<T>
404where
405 T: Provider + Send,
406{
407 fn default() -> Self {
411 let config = Config::default();
412 let port_reuse = if config.enable_port_reuse {
413 PortReuse::Enabled {
414 listen_addrs: Arc::new(RwLock::new(HashSet::new())),
415 }
416 } else {
417 PortReuse::Disabled
418 };
419 Transport {
420 port_reuse,
421 config,
422 listeners: SelectAll::new(),
423 pending_events: VecDeque::new(),
424 }
425 }
426}
427
428impl<T> libp2p_core::Transport for Transport<T>
429where
430 T: Provider + Send + 'static,
431 T::Listener: Unpin,
432 T::Stream: Unpin,
433{
434 type Output = T::Stream;
435 type Error = io::Error;
436 type Dial = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
437 type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
438
439 fn listen_on(
440 &mut self,
441 id: ListenerId,
442 addr: Multiaddr,
443 ) -> Result<(), TransportError<Self::Error>> {
444 let socket_addr = if let Ok(sa) = multiaddr_to_socketaddr(addr.clone()) {
445 sa
446 } else {
447 return Err(TransportError::MultiaddrNotSupported(addr));
448 };
449 log::debug!("listening on {}", socket_addr);
450 let listener = self
451 .do_listen(id, socket_addr)
452 .map_err(TransportError::Other)?;
453 self.listeners.push(listener);
454 Ok(())
455 }
456
457 fn remove_listener(&mut self, id: ListenerId) -> bool {
458 if let Some(listener) = self.listeners.iter_mut().find(|l| l.listener_id == id) {
459 listener.close(Ok(()));
460 true
461 } else {
462 false
463 }
464 }
465
466 fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
467 let socket_addr = if let Ok(socket_addr) = multiaddr_to_socketaddr(addr.clone()) {
468 if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() {
469 return Err(TransportError::MultiaddrNotSupported(addr));
470 }
471 socket_addr
472 } else {
473 return Err(TransportError::MultiaddrNotSupported(addr));
474 };
475 log::debug!("dialing {}", socket_addr);
476
477 let socket = self
478 .create_socket(socket_addr)
479 .map_err(TransportError::Other)?;
480
481 if let Some(addr) = self.port_reuse.local_dial_addr(&socket_addr.ip()) {
482 log::trace!("Binding dial socket to listen socket {}", addr);
483 socket.bind(&addr.into()).map_err(TransportError::Other)?;
484 }
485
486 socket
487 .set_nonblocking(true)
488 .map_err(TransportError::Other)?;
489
490 Ok(async move {
491 match socket.connect(&socket_addr.into()) {
494 Ok(()) => {}
495 Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => {}
496 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
497 Err(err) => return Err(err),
498 };
499
500 let stream = T::new_stream(socket.into()).await?;
501 Ok(stream)
502 }
503 .boxed())
504 }
505
506 fn dial_as_listener(
507 &mut self,
508 addr: Multiaddr,
509 ) -> Result<Self::Dial, TransportError<Self::Error>> {
510 self.dial(addr)
511 }
512
513 fn address_translation(&self, listen: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
531 if !is_tcp_addr(listen) || !is_tcp_addr(observed) {
532 return None;
533 }
534 match &self.port_reuse {
535 PortReuse::Disabled => address_translation(listen, observed),
536 PortReuse::Enabled { .. } => Some(observed.clone()),
537 }
538 }
539
540 fn poll(
542 mut self: Pin<&mut Self>,
543 cx: &mut Context<'_>,
544 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
545 if let Some(event) = self.pending_events.pop_front() {
547 return Poll::Ready(event);
548 }
549
550 match self.listeners.poll_next_unpin(cx) {
551 Poll::Ready(Some(transport_event)) => Poll::Ready(transport_event),
552 _ => Poll::Pending,
553 }
554 }
555}
556
557struct ListenStream<T>
559where
560 T: Provider,
561{
562 listener_id: ListenerId,
564 listen_addr: SocketAddr,
568 listener: T::Listener,
570 if_watcher: Option<T::IfWatcher>,
576 port_reuse: PortReuse,
583 sleep_on_error: Duration,
586 pause: Option<Delay>,
588 pending_event: Option<<Self as Stream>::Item>,
590 is_closed: bool,
592 close_listener_waker: Option<Waker>,
594}
595
596impl<T> ListenStream<T>
597where
598 T: Provider,
599{
600 fn new(
603 listener_id: ListenerId,
604 listener: TcpListener,
605 if_watcher: Option<T::IfWatcher>,
606 port_reuse: PortReuse,
607 ) -> io::Result<Self> {
608 let listen_addr = listener.local_addr()?;
609 let listener = T::new_listener(listener)?;
610
611 Ok(ListenStream {
612 port_reuse,
613 listener,
614 listener_id,
615 listen_addr,
616 if_watcher,
617 pause: None,
618 sleep_on_error: Duration::from_millis(100),
619 pending_event: None,
620 is_closed: false,
621 close_listener_waker: None,
622 })
623 }
624
625 fn disable_port_reuse(&mut self) {
632 match &self.if_watcher {
633 Some(if_watcher) => {
634 for ip_net in T::addrs(if_watcher) {
635 self.port_reuse
636 .unregister(ip_net.addr(), self.listen_addr.port());
637 }
638 }
639 None => self
640 .port_reuse
641 .unregister(self.listen_addr.ip(), self.listen_addr.port()),
642 }
643 }
644
645 fn close(&mut self, reason: Result<(), io::Error>) {
650 if self.is_closed {
651 return;
652 }
653 self.pending_event = Some(TransportEvent::ListenerClosed {
654 listener_id: self.listener_id,
655 reason,
656 });
657 self.is_closed = true;
658
659 if let Some(waker) = self.close_listener_waker.take() {
661 waker.wake();
662 }
663 }
664
665 fn poll_if_addr(&mut self, cx: &mut Context<'_>) -> Poll<<Self as Stream>::Item> {
667 let if_watcher = match self.if_watcher.as_mut() {
668 Some(if_watcher) => if_watcher,
669 None => return Poll::Pending,
670 };
671
672 let my_listen_addr_port = self.listen_addr.port();
673
674 while let Poll::Ready(Some(event)) = if_watcher.poll_next_unpin(cx) {
675 match event {
676 Ok(IfEvent::Up(inet)) => {
677 let ip = inet.addr();
678 if self.listen_addr.is_ipv4() == ip.is_ipv4() {
679 let ma = ip_to_multiaddr(ip, my_listen_addr_port);
680 log::debug!("New listen address: {}", ma);
681 self.port_reuse.register(ip, my_listen_addr_port);
682 return Poll::Ready(TransportEvent::NewAddress {
683 listener_id: self.listener_id,
684 listen_addr: ma,
685 });
686 }
687 }
688 Ok(IfEvent::Down(inet)) => {
689 let ip = inet.addr();
690 if self.listen_addr.is_ipv4() == ip.is_ipv4() {
691 let ma = ip_to_multiaddr(ip, my_listen_addr_port);
692 log::debug!("Expired listen address: {}", ma);
693 self.port_reuse.unregister(ip, my_listen_addr_port);
694 return Poll::Ready(TransportEvent::AddressExpired {
695 listener_id: self.listener_id,
696 listen_addr: ma,
697 });
698 }
699 }
700 Err(error) => {
701 self.pause = Some(Delay::new(self.sleep_on_error));
702 return Poll::Ready(TransportEvent::ListenerError {
703 listener_id: self.listener_id,
704 error,
705 });
706 }
707 }
708 }
709
710 Poll::Pending
711 }
712}
713
714impl<T> Drop for ListenStream<T>
715where
716 T: Provider,
717{
718 fn drop(&mut self) {
719 self.disable_port_reuse();
720 }
721}
722
723impl<T> Stream for ListenStream<T>
724where
725 T: Provider,
726 T::Listener: Unpin,
727 T::Stream: Unpin,
728{
729 type Item = TransportEvent<Ready<Result<T::Stream, io::Error>>, io::Error>;
730
731 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
732 if let Some(mut pause) = self.pause.take() {
733 match pause.poll_unpin(cx) {
734 Poll::Ready(_) => {}
735 Poll::Pending => {
736 self.pause = Some(pause);
737 return Poll::Pending;
738 }
739 }
740 }
741
742 if let Some(event) = self.pending_event.take() {
743 return Poll::Ready(Some(event));
744 }
745
746 if self.is_closed {
747 return Poll::Ready(None);
749 }
750
751 if let Poll::Ready(event) = self.poll_if_addr(cx) {
752 return Poll::Ready(Some(event));
753 }
754
755 match T::poll_accept(&mut self.listener, cx) {
757 Poll::Ready(Ok(Incoming {
758 local_addr,
759 remote_addr,
760 stream,
761 })) => {
762 let local_addr = ip_to_multiaddr(local_addr.ip(), local_addr.port());
763 let remote_addr = ip_to_multiaddr(remote_addr.ip(), remote_addr.port());
764
765 log::debug!("Incoming connection from {} at {}", remote_addr, local_addr);
766
767 return Poll::Ready(Some(TransportEvent::Incoming {
768 listener_id: self.listener_id,
769 upgrade: future::ok(stream),
770 local_addr,
771 send_back_addr: remote_addr,
772 }));
773 }
774 Poll::Ready(Err(error)) => {
775 self.pause = Some(Delay::new(self.sleep_on_error));
777 return Poll::Ready(Some(TransportEvent::ListenerError {
778 listener_id: self.listener_id,
779 error,
780 }));
781 }
782 Poll::Pending => {}
783 }
784
785 self.close_listener_waker = Some(cx.waker().clone());
786 Poll::Pending
787 }
788}
789
790fn multiaddr_to_socketaddr(mut addr: Multiaddr) -> Result<SocketAddr, ()> {
795 let mut port = None;
799 while let Some(proto) = addr.pop() {
800 match proto {
801 Protocol::Ip4(ipv4) => match port {
802 Some(port) => return Ok(SocketAddr::new(ipv4.into(), port)),
803 None => return Err(()),
804 },
805 Protocol::Ip6(ipv6) => match port {
806 Some(port) => return Ok(SocketAddr::new(ipv6.into(), port)),
807 None => return Err(()),
808 },
809 Protocol::Tcp(portnum) => match port {
810 Some(_) => return Err(()),
811 None => port = Some(portnum),
812 },
813 Protocol::P2p(_) => {}
814 _ => return Err(()),
815 }
816 }
817 Err(())
818}
819
820fn ip_to_multiaddr(ip: IpAddr, port: u16) -> Multiaddr {
822 Multiaddr::empty().with(ip.into()).with(Protocol::Tcp(port))
823}
824
825fn is_tcp_addr(addr: &Multiaddr) -> bool {
826 use Protocol::*;
827
828 let mut iter = addr.iter();
829
830 let first = match iter.next() {
831 None => return false,
832 Some(p) => p,
833 };
834 let second = match iter.next() {
835 None => return false,
836 Some(p) => p,
837 };
838
839 matches!(first, Ip4(_) | Ip6(_) | Dns(_) | Dns4(_) | Dns6(_)) && matches!(second, Tcp(_))
840}
841
842#[cfg(test)]
843mod tests {
844 use super::*;
845 use futures::{
846 channel::{mpsc, oneshot},
847 future::poll_fn,
848 };
849 use libp2p_core::Transport as _;
850 use libp2p_identity::PeerId;
851
852 #[test]
853 fn multiaddr_to_tcp_conversion() {
854 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
855
856 assert!(
857 multiaddr_to_socketaddr("/ip4/127.0.0.1/udp/1234".parse::<Multiaddr>().unwrap())
858 .is_err()
859 );
860
861 assert_eq!(
862 multiaddr_to_socketaddr("/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap()),
863 Ok(SocketAddr::new(
864 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
865 12345,
866 ))
867 );
868 assert_eq!(
869 multiaddr_to_socketaddr(
870 "/ip4/255.255.255.255/tcp/8080"
871 .parse::<Multiaddr>()
872 .unwrap()
873 ),
874 Ok(SocketAddr::new(
875 IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)),
876 8080,
877 ))
878 );
879 assert_eq!(
880 multiaddr_to_socketaddr("/ip6/::1/tcp/12345".parse::<Multiaddr>().unwrap()),
881 Ok(SocketAddr::new(
882 IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)),
883 12345,
884 ))
885 );
886 assert_eq!(
887 multiaddr_to_socketaddr(
888 "/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080"
889 .parse::<Multiaddr>()
890 .unwrap()
891 ),
892 Ok(SocketAddr::new(
893 IpAddr::V6(Ipv6Addr::new(
894 65535, 65535, 65535, 65535, 65535, 65535, 65535, 65535,
895 )),
896 8080,
897 ))
898 );
899 }
900
901 #[test]
902 fn communicating_between_dialer_and_listener() {
903 env_logger::try_init().ok();
904
905 async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
906 let mut tcp = Transport::<T>::default().boxed();
907 tcp.listen_on(ListenerId::next(), addr).unwrap();
908 loop {
909 match tcp.select_next_some().await {
910 TransportEvent::NewAddress { listen_addr, .. } => {
911 ready_tx.send(listen_addr).await.unwrap();
912 }
913 TransportEvent::Incoming { upgrade, .. } => {
914 let mut upgrade = upgrade.await.unwrap();
915 let mut buf = [0u8; 3];
916 upgrade.read_exact(&mut buf).await.unwrap();
917 assert_eq!(buf, [1, 2, 3]);
918 upgrade.write_all(&[4, 5, 6]).await.unwrap();
919 return;
920 }
921 e => panic!("Unexpected transport event: {e:?}"),
922 }
923 }
924 }
925
926 async fn dialer<T: Provider>(mut ready_rx: mpsc::Receiver<Multiaddr>) {
927 let addr = ready_rx.next().await.unwrap();
928 let mut tcp = Transport::<T>::default();
929
930 let mut socket = tcp.dial(addr.clone()).unwrap().await.unwrap();
932 socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap();
933
934 let mut buf = [0u8; 3];
935 socket.read_exact(&mut buf).await.unwrap();
936 assert_eq!(buf, [4, 5, 6]);
937 }
938
939 fn test(addr: Multiaddr) {
940 #[cfg(feature = "async-io")]
941 {
942 let (ready_tx, ready_rx) = mpsc::channel(1);
943 let listener = listener::<async_io::Tcp>(addr.clone(), ready_tx);
944 let dialer = dialer::<async_io::Tcp>(ready_rx);
945 let listener = async_std::task::spawn(listener);
946 async_std::task::block_on(dialer);
947 async_std::task::block_on(listener);
948 }
949
950 #[cfg(feature = "tokio")]
951 {
952 let (ready_tx, ready_rx) = mpsc::channel(1);
953 let listener = listener::<tokio::Tcp>(addr, ready_tx);
954 let dialer = dialer::<tokio::Tcp>(ready_rx);
955 let rt = ::tokio::runtime::Builder::new_current_thread()
956 .enable_io()
957 .build()
958 .unwrap();
959 let tasks = ::tokio::task::LocalSet::new();
960 let listener = tasks.spawn_local(listener);
961 tasks.block_on(&rt, dialer);
962 tasks.block_on(&rt, listener).unwrap();
963 }
964 }
965
966 test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
967 test("/ip6/::1/tcp/0".parse().unwrap());
968 }
969
970 #[test]
971 fn wildcard_expansion() {
972 env_logger::try_init().ok();
973
974 async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
975 let mut tcp = Transport::<T>::default().boxed();
976 tcp.listen_on(ListenerId::next(), addr).unwrap();
977
978 loop {
979 match tcp.select_next_some().await {
980 TransportEvent::NewAddress { listen_addr, .. } => {
981 let mut iter = listen_addr.iter();
982 match iter.next().expect("ip address") {
983 Protocol::Ip4(ip) => assert!(!ip.is_unspecified()),
984 Protocol::Ip6(ip) => assert!(!ip.is_unspecified()),
985 other => panic!("Unexpected protocol: {other}"),
986 }
987 if let Protocol::Tcp(port) = iter.next().expect("port") {
988 assert_ne!(0, port)
989 } else {
990 panic!("No TCP port in address: {listen_addr}")
991 }
992 ready_tx.send(listen_addr).await.ok();
993 }
994 TransportEvent::Incoming { .. } => {
995 return;
996 }
997 _ => {}
998 }
999 }
1000 }
1001
1002 async fn dialer<T: Provider>(mut ready_rx: mpsc::Receiver<Multiaddr>) {
1003 let dest_addr = ready_rx.next().await.unwrap();
1004 let mut tcp = Transport::<T>::default();
1005 tcp.dial(dest_addr).unwrap().await.unwrap();
1006 }
1007
1008 fn test(addr: Multiaddr) {
1009 #[cfg(feature = "async-io")]
1010 {
1011 let (ready_tx, ready_rx) = mpsc::channel(1);
1012 let listener = listener::<async_io::Tcp>(addr.clone(), ready_tx);
1013 let dialer = dialer::<async_io::Tcp>(ready_rx);
1014 let listener = async_std::task::spawn(listener);
1015 async_std::task::block_on(dialer);
1016 async_std::task::block_on(listener);
1017 }
1018
1019 #[cfg(feature = "tokio")]
1020 {
1021 let (ready_tx, ready_rx) = mpsc::channel(1);
1022 let listener = listener::<tokio::Tcp>(addr, ready_tx);
1023 let dialer = dialer::<tokio::Tcp>(ready_rx);
1024 let rt = ::tokio::runtime::Builder::new_current_thread()
1025 .enable_io()
1026 .build()
1027 .unwrap();
1028 let tasks = ::tokio::task::LocalSet::new();
1029 let listener = tasks.spawn_local(listener);
1030 tasks.block_on(&rt, dialer);
1031 tasks.block_on(&rt, listener).unwrap();
1032 }
1033 }
1034
1035 test("/ip4/0.0.0.0/tcp/0".parse().unwrap());
1036 test("/ip6/::1/tcp/0".parse().unwrap());
1037 }
1038
1039 #[test]
1040 fn port_reuse_dialing() {
1041 env_logger::try_init().ok();
1042
1043 async fn listener<T: Provider>(
1044 addr: Multiaddr,
1045 mut ready_tx: mpsc::Sender<Multiaddr>,
1046 port_reuse_rx: oneshot::Receiver<Protocol<'_>>,
1047 ) {
1048 let mut tcp = Transport::<T>::new(Config::new()).boxed();
1049 tcp.listen_on(ListenerId::next(), addr).unwrap();
1050 loop {
1051 match tcp.select_next_some().await {
1052 TransportEvent::NewAddress { listen_addr, .. } => {
1053 ready_tx.send(listen_addr).await.ok();
1054 }
1055 TransportEvent::Incoming {
1056 upgrade,
1057 mut send_back_addr,
1058 ..
1059 } => {
1060 let remote_port_reuse = port_reuse_rx.await.unwrap();
1062 assert_eq!(send_back_addr.pop().unwrap(), remote_port_reuse);
1064
1065 let mut upgrade = upgrade.await.unwrap();
1066 let mut buf = [0u8; 3];
1067 upgrade.read_exact(&mut buf).await.unwrap();
1068 assert_eq!(buf, [1, 2, 3]);
1069 upgrade.write_all(&[4, 5, 6]).await.unwrap();
1070 return;
1071 }
1072 e => panic!("Unexpected event: {e:?}"),
1073 }
1074 }
1075 }
1076
1077 async fn dialer<T: Provider>(
1078 addr: Multiaddr,
1079 mut ready_rx: mpsc::Receiver<Multiaddr>,
1080 port_reuse_tx: oneshot::Sender<Protocol<'_>>,
1081 ) {
1082 let dest_addr = ready_rx.next().await.unwrap();
1083 let mut tcp = Transport::<T>::new(Config::new().port_reuse(true));
1084 tcp.listen_on(ListenerId::next(), addr).unwrap();
1085 match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
1086 TransportEvent::NewAddress { .. } => {
1087 let listener = tcp.listeners.iter().next().unwrap();
1089 let port_reuse_tcp = tcp.port_reuse.local_dial_addr(&listener.listen_addr.ip());
1090 let port_reuse_listener = listener
1091 .port_reuse
1092 .local_dial_addr(&listener.listen_addr.ip());
1093 assert!(port_reuse_tcp.is_some());
1094 assert_eq!(port_reuse_tcp, port_reuse_listener);
1095
1096 port_reuse_tx
1098 .send(Protocol::Tcp(port_reuse_tcp.unwrap().port()))
1099 .ok();
1100
1101 let mut socket = tcp.dial(dest_addr).unwrap().await.unwrap();
1103 socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap();
1104 let mut buf = [0u8; 3];
1106 socket.read_exact(&mut buf).await.unwrap();
1107 assert_eq!(buf, [4, 5, 6]);
1108 }
1109 e => panic!("Unexpected transport event: {e:?}"),
1110 }
1111 }
1112
1113 fn test(addr: Multiaddr) {
1114 #[cfg(feature = "async-io")]
1115 {
1116 let (ready_tx, ready_rx) = mpsc::channel(1);
1117 let (port_reuse_tx, port_reuse_rx) = oneshot::channel();
1118 let listener = listener::<async_io::Tcp>(addr.clone(), ready_tx, port_reuse_rx);
1119 let dialer = dialer::<async_io::Tcp>(addr.clone(), ready_rx, port_reuse_tx);
1120 let listener = async_std::task::spawn(listener);
1121 async_std::task::block_on(dialer);
1122 async_std::task::block_on(listener);
1123 }
1124
1125 #[cfg(feature = "tokio")]
1126 {
1127 let (ready_tx, ready_rx) = mpsc::channel(1);
1128 let (port_reuse_tx, port_reuse_rx) = oneshot::channel();
1129 let listener = listener::<tokio::Tcp>(addr.clone(), ready_tx, port_reuse_rx);
1130 let dialer = dialer::<tokio::Tcp>(addr, ready_rx, port_reuse_tx);
1131 let rt = ::tokio::runtime::Builder::new_current_thread()
1132 .enable_io()
1133 .build()
1134 .unwrap();
1135 let tasks = ::tokio::task::LocalSet::new();
1136 let listener = tasks.spawn_local(listener);
1137 tasks.block_on(&rt, dialer);
1138 tasks.block_on(&rt, listener).unwrap();
1139 }
1140 }
1141
1142 test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
1143 test("/ip6/::1/tcp/0".parse().unwrap());
1144 }
1145
1146 #[test]
1147 fn port_reuse_listening() {
1148 env_logger::try_init().ok();
1149
1150 async fn listen_twice<T: Provider>(addr: Multiaddr) {
1151 let mut tcp = Transport::<T>::new(Config::new().port_reuse(true));
1152 tcp.listen_on(ListenerId::next(), addr).unwrap();
1153 match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
1154 TransportEvent::NewAddress {
1155 listen_addr: addr1, ..
1156 } => {
1157 let listener1 = tcp.listeners.iter().next().unwrap();
1158 let port_reuse_tcp =
1159 tcp.port_reuse.local_dial_addr(&listener1.listen_addr.ip());
1160 let port_reuse_listener1 = listener1
1161 .port_reuse
1162 .local_dial_addr(&listener1.listen_addr.ip());
1163 assert!(port_reuse_tcp.is_some());
1164 assert_eq!(port_reuse_tcp, port_reuse_listener1);
1165
1166 tcp.listen_on(ListenerId::next(), addr1.clone()).unwrap();
1168 match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
1169 TransportEvent::NewAddress {
1170 listen_addr: addr2, ..
1171 } => assert_eq!(addr1, addr2),
1172 e => panic!("Unexpected transport event: {e:?}"),
1173 }
1174 }
1175 e => panic!("Unexpected transport event: {e:?}"),
1176 }
1177 }
1178
1179 fn test(addr: Multiaddr) {
1180 #[cfg(feature = "async-io")]
1181 {
1182 let listener = listen_twice::<async_io::Tcp>(addr.clone());
1183 async_std::task::block_on(listener);
1184 }
1185
1186 #[cfg(feature = "tokio")]
1187 {
1188 let listener = listen_twice::<tokio::Tcp>(addr);
1189 let rt = ::tokio::runtime::Builder::new_current_thread()
1190 .enable_io()
1191 .build()
1192 .unwrap();
1193 rt.block_on(listener);
1194 }
1195 }
1196
1197 test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
1198 }
1199
1200 #[test]
1201 fn listen_port_0() {
1202 env_logger::try_init().ok();
1203
1204 async fn listen<T: Provider>(addr: Multiaddr) -> Multiaddr {
1205 let mut tcp = Transport::<T>::default().boxed();
1206 tcp.listen_on(ListenerId::next(), addr).unwrap();
1207 tcp.select_next_some()
1208 .await
1209 .into_new_address()
1210 .expect("listen address")
1211 }
1212
1213 fn test(addr: Multiaddr) {
1214 #[cfg(feature = "async-io")]
1215 {
1216 let new_addr = async_std::task::block_on(listen::<async_io::Tcp>(addr.clone()));
1217 assert!(!new_addr.to_string().contains("tcp/0"));
1218 }
1219
1220 #[cfg(feature = "tokio")]
1221 {
1222 let rt = ::tokio::runtime::Builder::new_current_thread()
1223 .enable_io()
1224 .build()
1225 .unwrap();
1226 let new_addr = rt.block_on(listen::<tokio::Tcp>(addr));
1227 assert!(!new_addr.to_string().contains("tcp/0"));
1228 }
1229 }
1230
1231 test("/ip6/::1/tcp/0".parse().unwrap());
1232 test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
1233 }
1234
1235 #[test]
1236 fn listen_invalid_addr() {
1237 env_logger::try_init().ok();
1238
1239 fn test(addr: Multiaddr) {
1240 #[cfg(feature = "async-io")]
1241 {
1242 let mut tcp = async_io::Transport::default();
1243 assert!(tcp.listen_on(ListenerId::next(), addr.clone()).is_err());
1244 }
1245
1246 #[cfg(feature = "tokio")]
1247 {
1248 let mut tcp = tokio::Transport::default();
1249 assert!(tcp.listen_on(ListenerId::next(), addr).is_err());
1250 }
1251 }
1252
1253 test("/ip4/127.0.0.1/tcp/12345/tcp/12345".parse().unwrap());
1254 }
1255
1256 #[cfg(feature = "async-io")]
1257 #[test]
1258 fn test_address_translation_async_io() {
1259 test_address_translation::<async_io::Transport>()
1260 }
1261
1262 #[cfg(feature = "tokio")]
1263 #[test]
1264 fn test_address_translation_tokio() {
1265 test_address_translation::<tokio::Transport>()
1266 }
1267
1268 fn test_address_translation<T>()
1269 where
1270 T: Default + libp2p_core::Transport,
1271 {
1272 let transport = T::default();
1273
1274 let port = 42;
1275 let tcp_listen_addr = Multiaddr::empty()
1276 .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
1277 .with(Protocol::Tcp(port));
1278 let observed_ip = Ipv4Addr::new(123, 45, 67, 8);
1279 let tcp_observed_addr = Multiaddr::empty()
1280 .with(Protocol::Ip4(observed_ip))
1281 .with(Protocol::Tcp(1))
1282 .with(Protocol::P2p(PeerId::random()));
1283
1284 let translated = transport
1285 .address_translation(&tcp_listen_addr, &tcp_observed_addr)
1286 .unwrap();
1287 let mut iter = translated.iter();
1288 assert_eq!(iter.next(), Some(Protocol::Ip4(observed_ip)));
1289 assert_eq!(iter.next(), Some(Protocol::Tcp(port)));
1290 assert_eq!(iter.next(), None);
1291
1292 let quic_addr = Multiaddr::empty()
1293 .with(Protocol::Ip4(Ipv4Addr::new(87, 65, 43, 21)))
1294 .with(Protocol::Udp(1))
1295 .with(Protocol::QuicV1);
1296
1297 assert!(transport
1298 .address_translation(&tcp_listen_addr, &quic_addr)
1299 .is_none());
1300 assert!(transport
1301 .address_translation(&quic_addr, &tcp_observed_addr)
1302 .is_none());
1303 }
1304
1305 #[test]
1306 fn test_remove_listener() {
1307 env_logger::try_init().ok();
1308
1309 async fn cycle_listeners<T: Provider>() -> bool {
1310 let mut tcp = Transport::<T>::default().boxed();
1311 let listener_id = ListenerId::next();
1312 tcp.listen_on(listener_id, "/ip4/127.0.0.1/tcp/0".parse().unwrap())
1313 .unwrap();
1314 tcp.remove_listener(listener_id)
1315 }
1316
1317 #[cfg(feature = "async-io")]
1318 {
1319 assert!(async_std::task::block_on(cycle_listeners::<async_io::Tcp>()));
1320 }
1321
1322 #[cfg(feature = "tokio")]
1323 {
1324 let rt = ::tokio::runtime::Builder::new_current_thread()
1325 .enable_io()
1326 .build()
1327 .unwrap();
1328 assert!(rt.block_on(cycle_listeners::<tokio::Tcp>()));
1329 }
1330 }
1331
1332 #[test]
1333 fn test_listens_ipv4_ipv6_separately() {
1334 fn test<T: Provider>() {
1335 let port = {
1336 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1337 listener.local_addr().unwrap().port()
1338 };
1339 let mut tcp = Transport::<T>::default().boxed();
1340 let listener_id = ListenerId::next();
1341 tcp.listen_on(
1342 listener_id,
1343 format!("/ip4/0.0.0.0/tcp/{port}").parse().unwrap(),
1344 )
1345 .unwrap();
1346 tcp.listen_on(
1347 ListenerId::next(),
1348 format!("/ip6/::/tcp/{port}").parse().unwrap(),
1349 )
1350 .unwrap();
1351 }
1352 #[cfg(feature = "async-io")]
1353 {
1354 async_std::task::block_on(async {
1355 test::<async_io::Tcp>();
1356 })
1357 }
1358 #[cfg(feature = "tokio")]
1359 {
1360 let rt = ::tokio::runtime::Builder::new_current_thread()
1361 .enable_io()
1362 .build()
1363 .unwrap();
1364 rt.block_on(async {
1365 test::<async_io::Tcp>();
1366 });
1367 }
1368 }
1369}