libp2p_mdns/behaviour/
socket.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    io::Error,
23    marker::Unpin,
24    net::{SocketAddr, UdpSocket},
25    task::{Context, Poll},
26};
27
28/// Interface that must be implemented by the different runtimes to use the [`UdpSocket`] in async mode
29#[allow(unreachable_pub)] // Users should not depend on this.
30pub trait AsyncSocket: Unpin + Send + 'static {
31    /// Create the async socket from the [`std::net::UdpSocket`]
32    fn from_std(socket: UdpSocket) -> std::io::Result<Self>
33    where
34        Self: Sized;
35
36    /// Attempts to receive a single packet on the socket from the remote address to which it is connected.
37    fn poll_read(
38        &mut self,
39        _cx: &mut Context,
40        _buf: &mut [u8],
41    ) -> Poll<Result<(usize, SocketAddr), Error>>;
42
43    /// Attempts to send data on the socket to a given address.
44    fn poll_write(
45        &mut self,
46        _cx: &mut Context,
47        _packet: &[u8],
48        _to: SocketAddr,
49    ) -> Poll<Result<(), Error>>;
50}
51
52#[cfg(feature = "async-io")]
53pub(crate) mod asio {
54    use super::*;
55    use async_io::Async;
56    use futures::FutureExt;
57
58    /// AsyncIo UdpSocket
59    pub(crate) type AsyncUdpSocket = Async<UdpSocket>;
60    impl AsyncSocket for AsyncUdpSocket {
61        fn from_std(socket: UdpSocket) -> std::io::Result<Self> {
62            Async::new(socket)
63        }
64
65        fn poll_read(
66            &mut self,
67            cx: &mut Context,
68            buf: &mut [u8],
69        ) -> Poll<Result<(usize, SocketAddr), Error>> {
70            // Poll receive socket.
71            futures::ready!(self.poll_readable(cx))?;
72            match self.recv_from(buf).now_or_never() {
73                Some(data) => Poll::Ready(data),
74                None => Poll::Pending,
75            }
76        }
77
78        fn poll_write(
79            &mut self,
80            cx: &mut Context,
81            packet: &[u8],
82            to: SocketAddr,
83        ) -> Poll<Result<(), Error>> {
84            futures::ready!(self.poll_writable(cx))?;
85            match self.send_to(packet, to).now_or_never() {
86                Some(Ok(_)) => Poll::Ready(Ok(())),
87                Some(Err(err)) => Poll::Ready(Err(err)),
88                None => Poll::Pending,
89            }
90        }
91    }
92}
93
94#[cfg(feature = "tokio")]
95pub(crate) mod tokio {
96    use super::*;
97    use ::tokio::{io::ReadBuf, net::UdpSocket as TkUdpSocket};
98
99    /// Tokio ASync Socket`
100    pub(crate) type TokioUdpSocket = TkUdpSocket;
101    impl AsyncSocket for TokioUdpSocket {
102        fn from_std(socket: UdpSocket) -> std::io::Result<Self> {
103            socket.set_nonblocking(true)?;
104            TokioUdpSocket::from_std(socket)
105        }
106
107        fn poll_read(
108            &mut self,
109            cx: &mut Context,
110            buf: &mut [u8],
111        ) -> Poll<Result<(usize, SocketAddr), Error>> {
112            let mut rbuf = ReadBuf::new(buf);
113            match self.poll_recv_from(cx, &mut rbuf) {
114                Poll::Pending => Poll::Pending,
115                Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
116                Poll::Ready(Ok(addr)) => Poll::Ready(Ok((rbuf.filled().len(), addr))),
117            }
118        }
119
120        fn poll_write(
121            &mut self,
122            cx: &mut Context,
123            packet: &[u8],
124            to: SocketAddr,
125        ) -> Poll<Result<(), Error>> {
126            match self.poll_send_to(cx, packet, to) {
127                Poll::Pending => Poll::Pending,
128                Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
129                Poll::Ready(Ok(_len)) => Poll::Ready(Ok(())),
130            }
131        }
132    }
133}