litep2p/transport/common/
listener.rs

1// Copyright 2024 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Shared socket listener between TCP and WebSocket.
22
23use crate::{
24    error::{AddressError, DnsError},
25    PeerId,
26};
27
28use futures::Stream;
29use hickory_resolver::{
30    config::{ResolverConfig, ResolverOpts},
31    TokioAsyncResolver,
32};
33use multiaddr::{Multiaddr, Protocol};
34use network_interface::{Addr, NetworkInterface, NetworkInterfaceConfig};
35use socket2::{Domain, Socket, Type};
36use tokio::net::{TcpListener as TokioTcpListener, TcpStream};
37
38use std::{
39    io,
40    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
41    pin::Pin,
42    sync::Arc,
43    task::{Context, Poll},
44};
45
46/// Logging target for the file.
47const LOG_TARGET: &str = "litep2p::transport::listener";
48
49/// Address type.
50#[derive(Debug)]
51pub enum AddressType {
52    /// Socket address.
53    Socket(SocketAddr),
54
55    /// DNS address.
56    Dns {
57        address: String,
58        port: u16,
59        dns_type: DnsType,
60    },
61}
62
63/// The DNS type of the address.
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum DnsType {
66    /// DNS supports both IPv4 and IPv6.
67    Dns,
68    /// DNS supports only IPv4.
69    Dns4,
70    /// DNS supports only IPv6.
71    Dns6,
72}
73
74impl AddressType {
75    /// Resolve the address to a concrete IP.
76    pub async fn lookup_ip(self) -> Result<SocketAddr, DnsError> {
77        let (url, port, dns_type) = match self {
78            // We already have the IP address.
79            AddressType::Socket(address) => return Ok(address),
80            AddressType::Dns {
81                address,
82                port,
83                dns_type,
84            } => (address, port, dns_type),
85        };
86
87        let lookup =
88            match TokioAsyncResolver::tokio(ResolverConfig::default(), ResolverOpts::default())
89                .lookup_ip(url.clone())
90                .await
91            {
92                Ok(lookup) => lookup,
93                Err(error) => {
94                    tracing::debug!(
95                        target: LOG_TARGET,
96                        ?error,
97                        "failed to resolve DNS address `{}`",
98                        url
99                    );
100
101                    return Err(DnsError::ResolveError(url));
102                }
103            };
104
105        let Some(ip) = lookup.iter().find(|ip| match dns_type {
106            DnsType::Dns => true,
107            DnsType::Dns4 => ip.is_ipv4(),
108            DnsType::Dns6 => ip.is_ipv6(),
109        }) else {
110            tracing::debug!(
111                target: LOG_TARGET,
112                "Multiaddr DNS type does not match IP version `{}`",
113                url
114            );
115            return Err(DnsError::IpVersionMismatch);
116        };
117
118        Ok(SocketAddr::new(ip, port))
119    }
120}
121
122/// Local addresses to use for outbound connections.
123#[derive(Clone, Default)]
124pub enum DialAddresses {
125    /// Reuse port from listen addresses.
126    Reuse {
127        listen_addresses: Arc<Vec<SocketAddr>>,
128    },
129    /// Do not reuse port.
130    #[default]
131    NoReuse,
132}
133
134impl DialAddresses {
135    /// Get local dial address for an outbound connection.
136    pub fn local_dial_address(&self, remote_address: &IpAddr) -> Result<Option<SocketAddr>, ()> {
137        match self {
138            DialAddresses::Reuse { listen_addresses } => {
139                for address in listen_addresses.iter() {
140                    if remote_address.is_ipv4() == address.is_ipv4()
141                        && remote_address.is_loopback() == address.ip().is_loopback()
142                    {
143                        if remote_address.is_ipv4() {
144                            return Ok(Some(SocketAddr::new(
145                                IpAddr::V4(Ipv4Addr::UNSPECIFIED),
146                                address.port(),
147                            )));
148                        } else {
149                            return Ok(Some(SocketAddr::new(
150                                IpAddr::V6(Ipv6Addr::UNSPECIFIED),
151                                address.port(),
152                            )));
153                        }
154                    }
155                }
156
157                Err(())
158            }
159            DialAddresses::NoReuse => Ok(None),
160        }
161    }
162}
163
164/// Socket listening to zero or more addresses.
165pub struct SocketListener {
166    /// Listeners.
167    listeners: Vec<TokioTcpListener>,
168    /// The index in the listeners from which the polling is resumed.
169    poll_index: usize,
170}
171
172/// Trait to convert between `Multiaddr` and `SocketAddr`.
173pub trait GetSocketAddr {
174    /// Convert `Multiaddr` to `SocketAddr`.
175    ///
176    /// # Note
177    ///
178    /// This method is called from two main code paths:
179    ///  - When creating a new `SocketListener` to bind to a specific address.
180    ///  - When dialing a new connection to a remote address.
181    ///
182    /// The `AddressType` is either `SocketAddr` or a `Dns` address.
183    /// For the `Dns` the concrete IP address is resolved later in our code.
184    ///
185    /// The `PeerId` is optional and may not be present.
186    fn multiaddr_to_socket_address(
187        address: &Multiaddr,
188    ) -> Result<(AddressType, Option<PeerId>), AddressError>;
189
190    /// Convert concrete `SocketAddr` to `Multiaddr`.
191    fn socket_address_to_multiaddr(address: &SocketAddr) -> Multiaddr;
192}
193
194/// TCP helper to convert between `Multiaddr` and `SocketAddr`.
195pub struct TcpAddress;
196
197impl GetSocketAddr for TcpAddress {
198    fn multiaddr_to_socket_address(
199        address: &Multiaddr,
200    ) -> Result<(AddressType, Option<PeerId>), AddressError> {
201        multiaddr_to_socket_address(address, SocketListenerType::Tcp)
202    }
203
204    fn socket_address_to_multiaddr(address: &SocketAddr) -> Multiaddr {
205        Multiaddr::empty()
206            .with(Protocol::from(address.ip()))
207            .with(Protocol::Tcp(address.port()))
208    }
209}
210
211/// WebSocket helper to convert between `Multiaddr` and `SocketAddr`.
212pub struct WebSocketAddress;
213
214impl GetSocketAddr for WebSocketAddress {
215    fn multiaddr_to_socket_address(
216        address: &Multiaddr,
217    ) -> Result<(AddressType, Option<PeerId>), AddressError> {
218        multiaddr_to_socket_address(address, SocketListenerType::WebSocket)
219    }
220
221    fn socket_address_to_multiaddr(address: &SocketAddr) -> Multiaddr {
222        Multiaddr::empty()
223            .with(Protocol::from(address.ip()))
224            .with(Protocol::Tcp(address.port()))
225            .with(Protocol::Ws(std::borrow::Cow::Borrowed("/")))
226    }
227}
228
229impl SocketListener {
230    /// Create new [`SocketListener`]
231    pub fn new<T: GetSocketAddr>(
232        addresses: Vec<Multiaddr>,
233        reuse_port: bool,
234        nodelay: bool,
235    ) -> (Self, Vec<Multiaddr>, DialAddresses) {
236        let (listeners, listen_addresses): (_, Vec<Vec<_>>) = addresses
237            .into_iter()
238            .filter_map(|address| {
239                let address = match T::multiaddr_to_socket_address(&address).ok()?.0 {
240                    AddressType::Dns { address, port, .. } => {
241                        tracing::debug!(
242                            target: LOG_TARGET,
243                            ?address,
244                            ?port,
245                            "dns not supported as bind address"
246                        );
247
248                        return None;
249                    }
250                    AddressType::Socket(address) => address,
251                };
252
253                let socket = if address.is_ipv4() {
254                    Socket::new(Domain::IPV4, Type::STREAM, Some(socket2::Protocol::TCP)).ok()?
255                } else {
256                    let socket =
257                        Socket::new(Domain::IPV6, Type::STREAM, Some(socket2::Protocol::TCP))
258                            .ok()?;
259                    socket.set_only_v6(true).ok()?;
260                    socket
261                };
262
263                socket.set_nodelay(nodelay).ok()?;
264                socket.set_nonblocking(true).ok()?;
265                socket.set_reuse_address(true).ok()?;
266                #[cfg(unix)]
267                if reuse_port {
268                    socket.set_reuse_port(true).ok()?;
269                }
270                socket.bind(&address.into()).ok()?;
271                socket.listen(1024).ok()?;
272
273                let socket: std::net::TcpListener = socket.into();
274                let listener = TokioTcpListener::from_std(socket).ok()?;
275                let local_address = listener.local_addr().ok()?;
276
277                let listen_addresses = if address.ip().is_unspecified() {
278                    match NetworkInterface::show() {
279                        Ok(ifaces) => ifaces
280                            .into_iter()
281                            .flat_map(|record| {
282                                record.addr.into_iter().filter_map(|iface_address| {
283                                    match (iface_address, address.is_ipv4()) {
284                                        (Addr::V4(inner), true) => Some(SocketAddr::new(
285                                            IpAddr::V4(inner.ip),
286                                            local_address.port(),
287                                        )),
288                                        (Addr::V6(inner), false) =>
289                                            match inner.ip.segments().first() {
290                                                Some(0xfe80) => None,
291                                                _ => Some(SocketAddr::new(
292                                                    IpAddr::V6(inner.ip),
293                                                    local_address.port(),
294                                                )),
295                                            },
296                                        _ => None,
297                                    }
298                                })
299                            })
300                            .collect(),
301                        Err(error) => {
302                            tracing::warn!(
303                                target: LOG_TARGET,
304                                ?error,
305                                "failed to fetch network interfaces",
306                            );
307
308                            return None;
309                        }
310                    }
311                } else {
312                    vec![local_address]
313                };
314
315                Some((listener, listen_addresses))
316            })
317            .unzip();
318
319        let listen_addresses = listen_addresses.into_iter().flatten().collect::<Vec<_>>();
320        let listen_multi_addresses =
321            listen_addresses.iter().map(T::socket_address_to_multiaddr).collect();
322
323        let dial_addresses = if reuse_port {
324            DialAddresses::Reuse {
325                listen_addresses: Arc::new(listen_addresses),
326            }
327        } else {
328            DialAddresses::NoReuse
329        };
330
331        (
332            Self {
333                listeners,
334                poll_index: 0,
335            },
336            listen_multi_addresses,
337            dial_addresses,
338        )
339    }
340}
341
342/// The type of the socket listener.
343#[derive(Clone, Copy, PartialEq, Eq)]
344enum SocketListenerType {
345    /// Listener for TCP.
346    Tcp,
347    /// Listener for WebSocket.
348    WebSocket,
349}
350
351/// Extract socket address and `PeerId`, if found, from `address`.
352fn multiaddr_to_socket_address(
353    address: &Multiaddr,
354    ty: SocketListenerType,
355) -> Result<(AddressType, Option<PeerId>), AddressError> {
356    tracing::trace!(target: LOG_TARGET, ?address, "parse multi address");
357
358    let mut iter = address.iter();
359    // Small helper to handle DNS types.
360    let handle_dns_type =
361        |address: String, dns_type: DnsType, protocol: Option<Protocol>| match protocol {
362            Some(Protocol::Tcp(port)) => Ok(AddressType::Dns {
363                address,
364                port,
365                dns_type,
366            }),
367            protocol => {
368                tracing::error!(
369                    target: LOG_TARGET,
370                    ?protocol,
371                    "invalid transport protocol, expected `Tcp`",
372                );
373                Err(AddressError::InvalidProtocol)
374            }
375        };
376
377    let socket_address = match iter.next() {
378        Some(Protocol::Ip6(address)) => match iter.next() {
379            Some(Protocol::Tcp(port)) =>
380                AddressType::Socket(SocketAddr::new(IpAddr::V6(address), port)),
381            protocol => {
382                tracing::error!(
383                    target: LOG_TARGET,
384                    ?protocol,
385                    "invalid transport protocol, expected `Tcp`",
386                );
387                return Err(AddressError::InvalidProtocol);
388            }
389        },
390        Some(Protocol::Ip4(address)) => match iter.next() {
391            Some(Protocol::Tcp(port)) =>
392                AddressType::Socket(SocketAddr::new(IpAddr::V4(address), port)),
393            protocol => {
394                tracing::error!(
395                    target: LOG_TARGET,
396                    ?protocol,
397                    "invalid transport protocol, expected `Tcp`",
398                );
399                return Err(AddressError::InvalidProtocol);
400            }
401        },
402        Some(Protocol::Dns(address)) => handle_dns_type(address.into(), DnsType::Dns, iter.next())?,
403        Some(Protocol::Dns4(address)) =>
404            handle_dns_type(address.into(), DnsType::Dns4, iter.next())?,
405        Some(Protocol::Dns6(address)) =>
406            handle_dns_type(address.into(), DnsType::Dns6, iter.next())?,
407        protocol => {
408            tracing::error!(target: LOG_TARGET, ?protocol, "invalid transport protocol");
409            return Err(AddressError::InvalidProtocol);
410        }
411    };
412
413    match ty {
414        SocketListenerType::Tcp => (),
415        SocketListenerType::WebSocket => {
416            // verify that `/ws`/`/wss` is part of the multi address
417            match iter.next() {
418                Some(Protocol::Ws(_address)) => {}
419                Some(Protocol::Wss(_address)) => {}
420                protocol => {
421                    tracing::error!(
422                        target: LOG_TARGET,
423                        ?protocol,
424                        "invalid protocol, expected `Ws` or `Wss`"
425                    );
426                    return Err(AddressError::InvalidProtocol);
427                }
428            };
429        }
430    }
431
432    let maybe_peer = match iter.next() {
433        Some(Protocol::P2p(multihash)) =>
434            Some(PeerId::from_multihash(multihash).map_err(AddressError::InvalidPeerId)?),
435        None => None,
436        protocol => {
437            tracing::error!(
438                target: LOG_TARGET,
439                ?protocol,
440                "invalid protocol, expected `P2p` or `None`"
441            );
442            return Err(AddressError::InvalidProtocol);
443        }
444    };
445
446    Ok((socket_address, maybe_peer))
447}
448
449impl Stream for SocketListener {
450    type Item = io::Result<(TcpStream, SocketAddr)>;
451
452    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
453        if self.listeners.is_empty() {
454            return Poll::Pending;
455        }
456
457        let len = self.listeners.len();
458        for index in 0..len {
459            let current = (self.poll_index + index) % len;
460            let listener = &mut self.listeners[current];
461
462            match listener.poll_accept(cx) {
463                Poll::Pending => {}
464                Poll::Ready(Err(error)) => {
465                    self.poll_index = (self.poll_index + 1) % len;
466                    return Poll::Ready(Some(Err(error)));
467                }
468                Poll::Ready(Ok((stream, address))) => {
469                    self.poll_index = (self.poll_index + 1) % len;
470                    return Poll::Ready(Some(Ok((stream, address))));
471                }
472            }
473        }
474
475        self.poll_index = (self.poll_index + 1) % len;
476        Poll::Pending
477    }
478}
479
480#[cfg(test)]
481mod tests {
482    use super::*;
483    use futures::StreamExt;
484
485    #[test]
486    fn parse_multiaddresses_tcp() {
487        assert!(multiaddr_to_socket_address(
488            &"/ip6/::1/tcp/8888".parse().expect("valid multiaddress"),
489            SocketListenerType::Tcp,
490        )
491        .is_ok());
492        assert!(multiaddr_to_socket_address(
493            &"/ip4/127.0.0.1/tcp/8888".parse().expect("valid multiaddress"),
494            SocketListenerType::Tcp,
495        )
496        .is_ok());
497        assert!(multiaddr_to_socket_address(
498            &"/ip6/::1/tcp/8888/p2p/12D3KooWT2ouvz5uMmCvHJGzAGRHiqDts5hzXR7NdoQ27pGdzp9Q"
499                .parse()
500                .expect("valid multiaddress"),
501            SocketListenerType::Tcp,
502        )
503        .is_ok());
504        assert!(multiaddr_to_socket_address(
505            &"/ip4/127.0.0.1/tcp/8888/p2p/12D3KooWT2ouvz5uMmCvHJGzAGRHiqDts5hzXR7NdoQ27pGdzp9Q"
506                .parse()
507                .expect("valid multiaddress"),
508            SocketListenerType::Tcp,
509        )
510        .is_ok());
511        assert!(multiaddr_to_socket_address(
512            &"/ip6/::1/udp/8888/p2p/12D3KooWT2ouvz5uMmCvHJGzAGRHiqDts5hzXR7NdoQ27pGdzp9Q"
513                .parse()
514                .expect("valid multiaddress"),
515            SocketListenerType::Tcp,
516        )
517        .is_err());
518        assert!(multiaddr_to_socket_address(
519            &"/ip4/127.0.0.1/udp/8888/p2p/12D3KooWT2ouvz5uMmCvHJGzAGRHiqDts5hzXR7NdoQ27pGdzp9Q"
520                .parse()
521                .expect("valid multiaddress"),
522            SocketListenerType::Tcp,
523        )
524        .is_err());
525    }
526
527    #[test]
528    fn parse_multiaddresses_websocket() {
529        assert!(multiaddr_to_socket_address(
530            &"/ip6/::1/tcp/8888/ws".parse().expect("valid multiaddress"),
531            SocketListenerType::WebSocket,
532        )
533        .is_ok());
534        assert!(multiaddr_to_socket_address(
535            &"/ip4/127.0.0.1/tcp/8888/ws".parse().expect("valid multiaddress"),
536            SocketListenerType::WebSocket,
537        )
538        .is_ok());
539        assert!(multiaddr_to_socket_address(
540            &"/ip6/::1/tcp/8888/ws/p2p/12D3KooWT2ouvz5uMmCvHJGzAGRHiqDts5hzXR7NdoQ27pGdzp9Q"
541                .parse()
542                .expect("valid multiaddress"),
543            SocketListenerType::WebSocket,
544        )
545        .is_ok());
546        assert!(multiaddr_to_socket_address(
547            &"/ip4/127.0.0.1/tcp/8888/ws/p2p/12D3KooWT2ouvz5uMmCvHJGzAGRHiqDts5hzXR7NdoQ27pGdzp9Q"
548                .parse()
549                .expect("valid multiaddress"),
550            SocketListenerType::WebSocket,
551        )
552        .is_ok());
553        assert!(multiaddr_to_socket_address(
554            &"/ip6/::1/udp/8888/p2p/12D3KooWT2ouvz5uMmCvHJGzAGRHiqDts5hzXR7NdoQ27pGdzp9Q"
555                .parse()
556                .expect("valid multiaddress"),
557            SocketListenerType::WebSocket,
558        )
559        .is_err());
560        assert!(multiaddr_to_socket_address(
561            &"/ip4/127.0.0.1/udp/8888/p2p/12D3KooWT2ouvz5uMmCvHJGzAGRHiqDts5hzXR7NdoQ27pGdzp9Q"
562                .parse()
563                .expect("valid multiaddress"),
564            SocketListenerType::WebSocket,
565        )
566        .is_err());
567        assert!(multiaddr_to_socket_address(
568            &"/ip4/127.0.0.1/tcp/8888/ws/utp".parse().expect("valid multiaddress"),
569            SocketListenerType::WebSocket,
570        )
571        .is_err());
572        assert!(multiaddr_to_socket_address(
573            &"/ip6/::1/tcp/8888/p2p/12D3KooWT2ouvz5uMmCvHJGzAGRHiqDts5hzXR7NdoQ27pGdzp9Q"
574                .parse()
575                .expect("valid multiaddress"),
576            SocketListenerType::WebSocket,
577        )
578        .is_err());
579        assert!(multiaddr_to_socket_address(
580            &"/p2p/12D3KooWT2ouvz5uMmCvHJGzAGRHiqDts5hzXR7NdoQ27pGdzp9Q"
581                .parse()
582                .expect("valid multiaddress"),
583            SocketListenerType::WebSocket,
584        )
585        .is_err());
586        assert!(multiaddr_to_socket_address(
587            &"/dns/hello.world/tcp/8888/p2p/12D3KooWT2ouvz5uMmCvHJGzAGRHiqDts5hzXR7NdoQ27pGdzp9Q"
588                .parse()
589                .expect("valid multiaddress"),
590            SocketListenerType::WebSocket,
591        )
592        .is_err());
593        assert!(multiaddr_to_socket_address(
594            &"/dns6/hello.world/tcp/8888/ws/p2p/12D3KooWT2ouvz5uMmCvHJGzAGRHiqDts5hzXR7NdoQ27pGdzp9Q"
595                .parse()
596                .expect("valid multiaddress")
597                ,SocketListenerType::WebSocket,
598        )
599        .is_ok());
600        assert!(multiaddr_to_socket_address(
601            &"/dns4/hello.world/tcp/8888/ws/p2p/12D3KooWT2ouvz5uMmCvHJGzAGRHiqDts5hzXR7NdoQ27pGdzp9Q"
602                .parse()
603                .expect("valid multiaddress"),
604                SocketListenerType::WebSocket,
605        )
606        .is_ok());
607        assert!(multiaddr_to_socket_address(
608            &"/dns6/hello.world/tcp/8888/ws/p2p/12D3KooWT2ouvz5uMmCvHJGzAGRHiqDts5hzXR7NdoQ27pGdzp9Q"
609                .parse()
610                .expect("valid multiaddress"),
611                SocketListenerType::WebSocket,
612        )
613        .is_ok());
614    }
615
616    #[tokio::test]
617    async fn no_listeners_tcp() {
618        let (mut listener, _, _) = SocketListener::new::<TcpAddress>(Vec::new(), true, false);
619
620        futures::future::poll_fn(|cx| match listener.poll_next_unpin(cx) {
621            Poll::Pending => Poll::Ready(()),
622            event => panic!("unexpected event: {event:?}"),
623        })
624        .await;
625    }
626
627    #[tokio::test]
628    async fn no_listeners_websocket() {
629        let (mut listener, _, _) = SocketListener::new::<WebSocketAddress>(Vec::new(), true, false);
630
631        futures::future::poll_fn(|cx| match listener.poll_next_unpin(cx) {
632            Poll::Pending => Poll::Ready(()),
633            event => panic!("unexpected event: {event:?}"),
634        })
635        .await;
636    }
637
638    #[tokio::test]
639    async fn one_listener_tcp() {
640        let address: Multiaddr = "/ip6/::1/tcp/0".parse().unwrap();
641        let (mut listener, listen_addresses, _) =
642            SocketListener::new::<TcpAddress>(vec![address.clone()], true, false);
643
644        let Some(Protocol::Tcp(port)) =
645            listen_addresses.iter().next().unwrap().clone().iter().skip(1).next()
646        else {
647            panic!("invalid address");
648        };
649
650        let (res1, res2) =
651            tokio::join!(listener.next(), TcpStream::connect(format!("[::1]:{port}")));
652
653        assert!(res1.unwrap().is_ok() && res2.is_ok());
654    }
655
656    #[tokio::test]
657    async fn one_listener_websocket() {
658        let address: Multiaddr = "/ip6/::1/tcp/0/ws".parse().unwrap();
659        let (mut listener, listen_addresses, _) =
660            SocketListener::new::<WebSocketAddress>(vec![address.clone()], true, false);
661        let Some(Protocol::Tcp(port)) =
662            listen_addresses.iter().next().unwrap().clone().iter().skip(1).next()
663        else {
664            panic!("invalid address");
665        };
666
667        let (res1, res2) =
668            tokio::join!(listener.next(), TcpStream::connect(format!("[::1]:{port}")));
669
670        assert!(res1.unwrap().is_ok() && res2.is_ok());
671    }
672
673    #[tokio::test]
674    async fn two_listeners_tcp() {
675        let address1: Multiaddr = "/ip6/::1/tcp/0".parse().unwrap();
676        let address2: Multiaddr = "/ip4/127.0.0.1/tcp/0".parse().unwrap();
677        let (mut listener, listen_addresses, _) =
678            SocketListener::new::<TcpAddress>(vec![address1, address2], true, false);
679        let Some(Protocol::Tcp(port1)) =
680            listen_addresses.iter().next().unwrap().clone().iter().skip(1).next()
681        else {
682            panic!("invalid address");
683        };
684
685        let Some(Protocol::Tcp(port2)) =
686            listen_addresses.iter().skip(1).next().unwrap().clone().iter().skip(1).next()
687        else {
688            panic!("invalid address");
689        };
690
691        tokio::spawn(async move { while let Some(_) = listener.next().await {} });
692
693        let (res1, res2) = tokio::join!(
694            TcpStream::connect(format!("[::1]:{port1}")),
695            TcpStream::connect(format!("127.0.0.1:{port2}"))
696        );
697
698        assert!(res1.is_ok() && res2.is_ok());
699    }
700
701    #[tokio::test]
702    async fn two_listeners_websocket() {
703        let address1: Multiaddr = "/ip6/::1/tcp/0/ws".parse().unwrap();
704        let address2: Multiaddr = "/ip4/127.0.0.1/tcp/0/ws".parse().unwrap();
705        let (mut listener, listen_addresses, _) =
706            SocketListener::new::<WebSocketAddress>(vec![address1, address2], true, false);
707
708        let Some(Protocol::Tcp(port1)) =
709            listen_addresses.iter().next().unwrap().clone().iter().skip(1).next()
710        else {
711            panic!("invalid address");
712        };
713
714        let Some(Protocol::Tcp(port2)) =
715            listen_addresses.iter().skip(1).next().unwrap().clone().iter().skip(1).next()
716        else {
717            panic!("invalid address");
718        };
719
720        tokio::spawn(async move { while let Some(_) = listener.next().await {} });
721
722        let (res1, res2) = tokio::join!(
723            TcpStream::connect(format!("[::1]:{port1}")),
724            TcpStream::connect(format!("127.0.0.1:{port2}"))
725        );
726
727        assert!(res1.is_ok() && res2.is_ok());
728    }
729
730    #[tokio::test]
731    async fn local_dial_address() {
732        let dial_addresses = DialAddresses::Reuse {
733            listen_addresses: Arc::new(vec![
734                "[2001:7d0:84aa:3900:2a5d:9e85::]:8888".parse().unwrap(),
735                "92.168.127.1:9999".parse().unwrap(),
736            ]),
737        };
738
739        assert_eq!(
740            dial_addresses.local_dial_address(&IpAddr::V4(Ipv4Addr::new(192, 168, 0, 1))),
741            Ok(Some(SocketAddr::new(
742                IpAddr::V4(Ipv4Addr::UNSPECIFIED),
743                9999
744            ))),
745        );
746
747        assert_eq!(
748            dial_addresses.local_dial_address(&IpAddr::V6(Ipv6Addr::new(0, 1, 2, 3, 4, 5, 6, 7))),
749            Ok(Some(SocketAddr::new(
750                IpAddr::V6(Ipv6Addr::UNSPECIFIED),
751                8888
752            ))),
753        );
754    }
755}