1mod dns;
22mod query;
23
24use self::dns::{build_query, build_query_response, build_service_discovery_response};
25use self::query::MdnsPacket;
26use crate::behaviour::{socket::AsyncSocket, timer::Builder};
27use crate::Config;
28use libp2p_core::Multiaddr;
29use libp2p_identity::PeerId;
30use libp2p_swarm::ListenAddresses;
31use socket2::{Domain, Socket, Type};
32use std::{
33 collections::VecDeque,
34 io,
35 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket},
36 pin::Pin,
37 task::{Context, Poll},
38 time::{Duration, Instant},
39};
40
41const INITIAL_TIMEOUT_INTERVAL: Duration = Duration::from_millis(500);
43
44#[derive(Debug, Clone)]
45enum ProbeState {
46 Probing(Duration),
47 Finished(Duration),
48}
49
50impl Default for ProbeState {
51 fn default() -> Self {
52 ProbeState::Probing(INITIAL_TIMEOUT_INTERVAL)
53 }
54}
55
56impl ProbeState {
57 fn interval(&self) -> &Duration {
58 match self {
59 ProbeState::Probing(query_interval) => query_interval,
60 ProbeState::Finished(query_interval) => query_interval,
61 }
62 }
63}
64
65#[derive(Debug)]
68pub(crate) struct InterfaceState<U, T> {
69 addr: IpAddr,
71 recv_socket: U,
73 send_socket: U,
75 recv_buffer: [u8; 4096],
82 send_buffer: VecDeque<Vec<u8>>,
84 query_interval: Duration,
86 timeout: T,
88 multicast_addr: IpAddr,
90 discovered: VecDeque<(PeerId, Multiaddr, Instant)>,
92 ttl: Duration,
94 probe_state: ProbeState,
95 local_peer_id: PeerId,
96}
97
98impl<U, T> InterfaceState<U, T>
99where
100 U: AsyncSocket,
101 T: Builder + futures::Stream,
102{
103 pub(crate) fn new(addr: IpAddr, config: Config, local_peer_id: PeerId) -> io::Result<Self> {
105 log::info!("creating instance on iface {}", addr);
106 let recv_socket = match addr {
107 IpAddr::V4(addr) => {
108 let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(socket2::Protocol::UDP))?;
109 socket.set_reuse_address(true)?;
110 #[cfg(unix)]
111 socket.set_reuse_port(true)?;
112 socket.bind(&SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 5353).into())?;
113 socket.set_multicast_loop_v4(true)?;
114 socket.set_multicast_ttl_v4(255)?;
115 socket.join_multicast_v4(&crate::IPV4_MDNS_MULTICAST_ADDRESS, &addr)?;
116 U::from_std(UdpSocket::from(socket))?
117 }
118 IpAddr::V6(_) => {
119 let socket = Socket::new(Domain::IPV6, Type::DGRAM, Some(socket2::Protocol::UDP))?;
120 socket.set_reuse_address(true)?;
121 #[cfg(unix)]
122 socket.set_reuse_port(true)?;
123 socket.bind(&SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 5353).into())?;
124 socket.set_multicast_loop_v6(true)?;
125 socket.join_multicast_v6(&crate::IPV6_MDNS_MULTICAST_ADDRESS, 0)?;
127 U::from_std(UdpSocket::from(socket))?
128 }
129 };
130 let bind_addr = match addr {
131 IpAddr::V4(_) => SocketAddr::new(addr, 0),
132 IpAddr::V6(_addr) => {
133 SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)
138 }
139 };
140 let send_socket = U::from_std(UdpSocket::bind(bind_addr)?)?;
141
142 let query_interval = {
144 use rand::Rng;
145 let mut rng = rand::thread_rng();
146 let jitter = rng.gen_range(0..100);
147 config.query_interval + Duration::from_millis(jitter)
148 };
149 let multicast_addr = match addr {
150 IpAddr::V4(_) => IpAddr::V4(crate::IPV4_MDNS_MULTICAST_ADDRESS),
151 IpAddr::V6(_) => IpAddr::V6(crate::IPV6_MDNS_MULTICAST_ADDRESS),
152 };
153 Ok(Self {
154 addr,
155 recv_socket,
156 send_socket,
157 recv_buffer: [0; 4096],
158 send_buffer: Default::default(),
159 discovered: Default::default(),
160 query_interval,
161 timeout: T::interval_at(Instant::now(), INITIAL_TIMEOUT_INTERVAL),
162 multicast_addr,
163 ttl: config.ttl,
164 probe_state: Default::default(),
165 local_peer_id,
166 })
167 }
168
169 pub(crate) fn reset_timer(&mut self) {
170 log::trace!("reset timer on {:#?} {:#?}", self.addr, self.probe_state);
171 let interval = *self.probe_state.interval();
172 self.timeout = T::interval(interval);
173 }
174
175 pub(crate) fn fire_timer(&mut self) {
176 self.timeout = T::interval_at(Instant::now(), INITIAL_TIMEOUT_INTERVAL);
177 }
178
179 pub(crate) fn poll(
180 &mut self,
181 cx: &mut Context,
182 listen_addresses: &ListenAddresses,
183 ) -> Poll<(PeerId, Multiaddr, Instant)> {
184 loop {
185 if Pin::new(&mut self.timeout).poll_next(cx).is_ready() {
187 log::trace!("sending query on iface {}", self.addr);
188 self.send_buffer.push_back(build_query());
189 log::trace!("tick on {:#?} {:#?}", self.addr, self.probe_state);
190
191 if let ProbeState::Probing(interval) = self.probe_state {
193 let interval = interval * 2;
194 self.probe_state = if interval >= self.query_interval {
195 ProbeState::Finished(self.query_interval)
196 } else {
197 ProbeState::Probing(interval)
198 };
199 }
200
201 self.reset_timer();
202 }
203
204 if let Some(packet) = self.send_buffer.pop_front() {
206 match Pin::new(&mut self.send_socket).poll_write(
207 cx,
208 &packet,
209 SocketAddr::new(self.multicast_addr, 5353),
210 ) {
211 Poll::Ready(Ok(_)) => {
212 log::trace!("sent packet on iface {}", self.addr);
213 continue;
214 }
215 Poll::Ready(Err(err)) => {
216 log::error!("error sending packet on iface {} {}", self.addr, err);
217 continue;
218 }
219 Poll::Pending => {
220 self.send_buffer.push_front(packet);
221 }
222 }
223 }
224
225 if let Some(discovered) = self.discovered.pop_front() {
227 return Poll::Ready(discovered);
228 }
229
230 match Pin::new(&mut self.recv_socket)
232 .poll_read(cx, &mut self.recv_buffer)
233 .map_ok(|(len, from)| MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from))
234 {
235 Poll::Ready(Ok(Ok(Some(MdnsPacket::Query(query))))) => {
236 log::trace!(
237 "received query from {} on {}",
238 query.remote_addr(),
239 self.addr
240 );
241
242 self.send_buffer.extend(build_query_response(
243 query.query_id(),
244 self.local_peer_id,
245 listen_addresses.iter(),
246 self.ttl,
247 ));
248 continue;
249 }
250 Poll::Ready(Ok(Ok(Some(MdnsPacket::Response(response))))) => {
251 log::trace!(
252 "received response from {} on {}",
253 response.remote_addr(),
254 self.addr
255 );
256
257 self.discovered
258 .extend(response.extract_discovered(Instant::now(), self.local_peer_id));
259
260 if !self.discovered.is_empty() {
262 self.probe_state = ProbeState::Finished(self.query_interval);
263 self.reset_timer();
264 }
265 continue;
266 }
267 Poll::Ready(Ok(Ok(Some(MdnsPacket::ServiceDiscovery(disc))))) => {
268 log::trace!(
269 "received service discovery from {} on {}",
270 disc.remote_addr(),
271 self.addr
272 );
273
274 self.send_buffer
275 .push_back(build_service_discovery_response(disc.query_id(), self.ttl));
276 continue;
277 }
278 Poll::Ready(Err(err)) if err.kind() == std::io::ErrorKind::WouldBlock => {
279 }
281 Poll::Ready(Err(err)) => {
282 log::error!("failed reading datagram: {}", err);
283 }
284 Poll::Ready(Ok(Err(err))) => {
285 log::debug!("Parsing mdns packet failed: {:?}", err);
286 }
287 Poll::Ready(Ok(Ok(None))) | Poll::Pending => {}
288 }
289
290 return Poll::Pending;
291 }
292 }
293}