libp2p_mdns/behaviour/
iface.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21mod dns;
22mod query;
23
24use self::dns::{build_query, build_query_response, build_service_discovery_response};
25use self::query::MdnsPacket;
26use crate::behaviour::{socket::AsyncSocket, timer::Builder};
27use crate::Config;
28use libp2p_core::Multiaddr;
29use libp2p_identity::PeerId;
30use libp2p_swarm::ListenAddresses;
31use socket2::{Domain, Socket, Type};
32use std::{
33    collections::VecDeque,
34    io,
35    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket},
36    pin::Pin,
37    task::{Context, Poll},
38    time::{Duration, Instant},
39};
40
41/// Initial interval for starting probe
42const INITIAL_TIMEOUT_INTERVAL: Duration = Duration::from_millis(500);
43
44#[derive(Debug, Clone)]
45enum ProbeState {
46    Probing(Duration),
47    Finished(Duration),
48}
49
50impl Default for ProbeState {
51    fn default() -> Self {
52        ProbeState::Probing(INITIAL_TIMEOUT_INTERVAL)
53    }
54}
55
56impl ProbeState {
57    fn interval(&self) -> &Duration {
58        match self {
59            ProbeState::Probing(query_interval) => query_interval,
60            ProbeState::Finished(query_interval) => query_interval,
61        }
62    }
63}
64
65/// An mDNS instance for a networking interface. To discover all peers when having multiple
66/// interfaces an [`InterfaceState`] is required for each interface.
67#[derive(Debug)]
68pub(crate) struct InterfaceState<U, T> {
69    /// Address this instance is bound to.
70    addr: IpAddr,
71    /// Receive socket.
72    recv_socket: U,
73    /// Send socket.
74    send_socket: U,
75    /// Buffer used for receiving data from the main socket.
76    /// RFC6762 discourages packets larger than the interface MTU, but allows sizes of up to 9000
77    /// bytes, if it can be ensured that all participating devices can handle such large packets.
78    /// For computers with several interfaces and IP addresses responses can easily reach sizes in
79    /// the range of 3000 bytes, so 4096 seems sensible for now. For more information see
80    /// [rfc6762](https://tools.ietf.org/html/rfc6762#page-46).
81    recv_buffer: [u8; 4096],
82    /// Buffers pending to send on the main socket.
83    send_buffer: VecDeque<Vec<u8>>,
84    /// Discovery interval.
85    query_interval: Duration,
86    /// Discovery timer.
87    timeout: T,
88    /// Multicast address.
89    multicast_addr: IpAddr,
90    /// Discovered addresses.
91    discovered: VecDeque<(PeerId, Multiaddr, Instant)>,
92    /// TTL
93    ttl: Duration,
94    probe_state: ProbeState,
95    local_peer_id: PeerId,
96}
97
98impl<U, T> InterfaceState<U, T>
99where
100    U: AsyncSocket,
101    T: Builder + futures::Stream,
102{
103    /// Builds a new [`InterfaceState`].
104    pub(crate) fn new(addr: IpAddr, config: Config, local_peer_id: PeerId) -> io::Result<Self> {
105        log::info!("creating instance on iface {}", addr);
106        let recv_socket = match addr {
107            IpAddr::V4(addr) => {
108                let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(socket2::Protocol::UDP))?;
109                socket.set_reuse_address(true)?;
110                #[cfg(unix)]
111                socket.set_reuse_port(true)?;
112                socket.bind(&SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 5353).into())?;
113                socket.set_multicast_loop_v4(true)?;
114                socket.set_multicast_ttl_v4(255)?;
115                socket.join_multicast_v4(&crate::IPV4_MDNS_MULTICAST_ADDRESS, &addr)?;
116                U::from_std(UdpSocket::from(socket))?
117            }
118            IpAddr::V6(_) => {
119                let socket = Socket::new(Domain::IPV6, Type::DGRAM, Some(socket2::Protocol::UDP))?;
120                socket.set_reuse_address(true)?;
121                #[cfg(unix)]
122                socket.set_reuse_port(true)?;
123                socket.bind(&SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 5353).into())?;
124                socket.set_multicast_loop_v6(true)?;
125                // TODO: find interface matching addr.
126                socket.join_multicast_v6(&crate::IPV6_MDNS_MULTICAST_ADDRESS, 0)?;
127                U::from_std(UdpSocket::from(socket))?
128            }
129        };
130        let bind_addr = match addr {
131            IpAddr::V4(_) => SocketAddr::new(addr, 0),
132            IpAddr::V6(_addr) => {
133                // TODO: if-watch should return the scope_id of an address
134                // as a workaround we bind to unspecified, which means that
135                // this probably won't work when using multiple interfaces.
136                // SocketAddr::V6(SocketAddrV6::new(addr, 0, 0, scope_id))
137                SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)
138            }
139        };
140        let send_socket = U::from_std(UdpSocket::bind(bind_addr)?)?;
141
142        // randomize timer to prevent all converging and firing at the same time.
143        let query_interval = {
144            use rand::Rng;
145            let mut rng = rand::thread_rng();
146            let jitter = rng.gen_range(0..100);
147            config.query_interval + Duration::from_millis(jitter)
148        };
149        let multicast_addr = match addr {
150            IpAddr::V4(_) => IpAddr::V4(crate::IPV4_MDNS_MULTICAST_ADDRESS),
151            IpAddr::V6(_) => IpAddr::V6(crate::IPV6_MDNS_MULTICAST_ADDRESS),
152        };
153        Ok(Self {
154            addr,
155            recv_socket,
156            send_socket,
157            recv_buffer: [0; 4096],
158            send_buffer: Default::default(),
159            discovered: Default::default(),
160            query_interval,
161            timeout: T::interval_at(Instant::now(), INITIAL_TIMEOUT_INTERVAL),
162            multicast_addr,
163            ttl: config.ttl,
164            probe_state: Default::default(),
165            local_peer_id,
166        })
167    }
168
169    pub(crate) fn reset_timer(&mut self) {
170        log::trace!("reset timer on {:#?} {:#?}", self.addr, self.probe_state);
171        let interval = *self.probe_state.interval();
172        self.timeout = T::interval(interval);
173    }
174
175    pub(crate) fn fire_timer(&mut self) {
176        self.timeout = T::interval_at(Instant::now(), INITIAL_TIMEOUT_INTERVAL);
177    }
178
179    pub(crate) fn poll(
180        &mut self,
181        cx: &mut Context,
182        listen_addresses: &ListenAddresses,
183    ) -> Poll<(PeerId, Multiaddr, Instant)> {
184        loop {
185            // 1st priority: Low latency: Create packet ASAP after timeout.
186            if Pin::new(&mut self.timeout).poll_next(cx).is_ready() {
187                log::trace!("sending query on iface {}", self.addr);
188                self.send_buffer.push_back(build_query());
189                log::trace!("tick on {:#?} {:#?}", self.addr, self.probe_state);
190
191                // Stop to probe when the initial interval reach the query interval
192                if let ProbeState::Probing(interval) = self.probe_state {
193                    let interval = interval * 2;
194                    self.probe_state = if interval >= self.query_interval {
195                        ProbeState::Finished(self.query_interval)
196                    } else {
197                        ProbeState::Probing(interval)
198                    };
199                }
200
201                self.reset_timer();
202            }
203
204            // 2nd priority: Keep local buffers small: Send packets to remote.
205            if let Some(packet) = self.send_buffer.pop_front() {
206                match Pin::new(&mut self.send_socket).poll_write(
207                    cx,
208                    &packet,
209                    SocketAddr::new(self.multicast_addr, 5353),
210                ) {
211                    Poll::Ready(Ok(_)) => {
212                        log::trace!("sent packet on iface {}", self.addr);
213                        continue;
214                    }
215                    Poll::Ready(Err(err)) => {
216                        log::error!("error sending packet on iface {} {}", self.addr, err);
217                        continue;
218                    }
219                    Poll::Pending => {
220                        self.send_buffer.push_front(packet);
221                    }
222                }
223            }
224
225            // 3rd priority: Keep local buffers small: Return discovered addresses.
226            if let Some(discovered) = self.discovered.pop_front() {
227                return Poll::Ready(discovered);
228            }
229
230            // 4th priority: Remote work: Answer incoming requests.
231            match Pin::new(&mut self.recv_socket)
232                .poll_read(cx, &mut self.recv_buffer)
233                .map_ok(|(len, from)| MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from))
234            {
235                Poll::Ready(Ok(Ok(Some(MdnsPacket::Query(query))))) => {
236                    log::trace!(
237                        "received query from {} on {}",
238                        query.remote_addr(),
239                        self.addr
240                    );
241
242                    self.send_buffer.extend(build_query_response(
243                        query.query_id(),
244                        self.local_peer_id,
245                        listen_addresses.iter(),
246                        self.ttl,
247                    ));
248                    continue;
249                }
250                Poll::Ready(Ok(Ok(Some(MdnsPacket::Response(response))))) => {
251                    log::trace!(
252                        "received response from {} on {}",
253                        response.remote_addr(),
254                        self.addr
255                    );
256
257                    self.discovered
258                        .extend(response.extract_discovered(Instant::now(), self.local_peer_id));
259
260                    // Stop probing when we have a valid response
261                    if !self.discovered.is_empty() {
262                        self.probe_state = ProbeState::Finished(self.query_interval);
263                        self.reset_timer();
264                    }
265                    continue;
266                }
267                Poll::Ready(Ok(Ok(Some(MdnsPacket::ServiceDiscovery(disc))))) => {
268                    log::trace!(
269                        "received service discovery from {} on {}",
270                        disc.remote_addr(),
271                        self.addr
272                    );
273
274                    self.send_buffer
275                        .push_back(build_service_discovery_response(disc.query_id(), self.ttl));
276                    continue;
277                }
278                Poll::Ready(Err(err)) if err.kind() == std::io::ErrorKind::WouldBlock => {
279                    // No more bytes available on the socket to read
280                }
281                Poll::Ready(Err(err)) => {
282                    log::error!("failed reading datagram: {}", err);
283                }
284                Poll::Ready(Ok(Err(err))) => {
285                    log::debug!("Parsing mdns packet failed: {:?}", err);
286                }
287                Poll::Ready(Ok(Ok(None))) | Poll::Pending => {}
288            }
289
290            return Poll::Pending;
291        }
292    }
293}