libp2p_tcp/provider/
tokio.rs

1// Copyright 2020 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use super::{Incoming, Provider};
22
23use futures::{
24    future::{BoxFuture, FutureExt},
25    prelude::*,
26};
27use std::convert::TryFrom;
28use std::io;
29use std::net;
30use std::pin::Pin;
31use std::task::{Context, Poll};
32
33/// A TCP [`Transport`](libp2p_core::Transport) that works with the `tokio` ecosystem.
34///
35/// # Example
36///
37/// ```rust
38/// # use libp2p_tcp as tcp;
39/// # use libp2p_core::{Transport, transport::ListenerId};
40/// # use futures::future;
41/// # use std::pin::Pin;
42/// #
43/// # #[tokio::main]
44/// # async fn main() {
45/// let mut transport = tcp::tokio::Transport::new(tcp::Config::default());
46/// let id = transport.listen_on(ListenerId::next(), "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
47///
48/// let addr = future::poll_fn(|cx| Pin::new(&mut transport).poll(cx)).await.into_new_address().unwrap();
49///
50/// println!("Listening on {addr}");
51/// # }
52/// ```
53pub type Transport = crate::Transport<Tcp>;
54
55#[derive(Copy, Clone)]
56#[doc(hidden)]
57pub enum Tcp {}
58
59impl Provider for Tcp {
60    type Stream = TcpStream;
61    type Listener = tokio::net::TcpListener;
62    type IfWatcher = if_watch::tokio::IfWatcher;
63
64    fn new_if_watcher() -> io::Result<Self::IfWatcher> {
65        Self::IfWatcher::new()
66    }
67
68    fn addrs(if_watcher: &Self::IfWatcher) -> Vec<if_watch::IpNet> {
69        if_watcher.iter().copied().collect()
70    }
71
72    fn new_listener(l: net::TcpListener) -> io::Result<Self::Listener> {
73        tokio::net::TcpListener::try_from(l)
74    }
75
76    fn new_stream(s: net::TcpStream) -> BoxFuture<'static, io::Result<Self::Stream>> {
77        async move {
78            // Taken from [`tokio::net::TcpStream::connect_mio`].
79
80            let stream = tokio::net::TcpStream::try_from(s)?;
81
82            // Once we've connected, wait for the stream to be writable as
83            // that's when the actual connection has been initiated. Once we're
84            // writable we check for `take_socket_error` to see if the connect
85            // actually hit an error or not.
86            //
87            // If all that succeeded then we ship everything on up.
88            stream.writable().await?;
89
90            if let Some(e) = stream.take_error()? {
91                return Err(e);
92            }
93
94            Ok(TcpStream(stream))
95        }
96        .boxed()
97    }
98
99    fn poll_accept(
100        l: &mut Self::Listener,
101        cx: &mut Context<'_>,
102    ) -> Poll<io::Result<Incoming<Self::Stream>>> {
103        let (stream, remote_addr) = match l.poll_accept(cx) {
104            Poll::Pending => return Poll::Pending,
105            Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
106            Poll::Ready(Ok((stream, remote_addr))) => (stream, remote_addr),
107        };
108
109        let local_addr = stream.local_addr()?;
110        let stream = TcpStream(stream);
111
112        Poll::Ready(Ok(Incoming {
113            stream,
114            local_addr,
115            remote_addr,
116        }))
117    }
118}
119
120/// A [`tokio::net::TcpStream`] that implements [`AsyncRead`] and [`AsyncWrite`].
121#[derive(Debug)]
122pub struct TcpStream(pub tokio::net::TcpStream);
123
124impl From<TcpStream> for tokio::net::TcpStream {
125    fn from(t: TcpStream) -> tokio::net::TcpStream {
126        t.0
127    }
128}
129
130impl AsyncRead for TcpStream {
131    fn poll_read(
132        mut self: Pin<&mut Self>,
133        cx: &mut Context,
134        buf: &mut [u8],
135    ) -> Poll<Result<usize, io::Error>> {
136        let mut read_buf = tokio::io::ReadBuf::new(buf);
137        futures::ready!(tokio::io::AsyncRead::poll_read(
138            Pin::new(&mut self.0),
139            cx,
140            &mut read_buf
141        ))?;
142        Poll::Ready(Ok(read_buf.filled().len()))
143    }
144}
145
146impl AsyncWrite for TcpStream {
147    fn poll_write(
148        mut self: Pin<&mut Self>,
149        cx: &mut Context,
150        buf: &[u8],
151    ) -> Poll<Result<usize, io::Error>> {
152        tokio::io::AsyncWrite::poll_write(Pin::new(&mut self.0), cx, buf)
153    }
154
155    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
156        tokio::io::AsyncWrite::poll_flush(Pin::new(&mut self.0), cx)
157    }
158
159    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
160        tokio::io::AsyncWrite::poll_shutdown(Pin::new(&mut self.0), cx)
161    }
162
163    fn poll_write_vectored(
164        mut self: Pin<&mut Self>,
165        cx: &mut Context<'_>,
166        bufs: &[io::IoSlice<'_>],
167    ) -> Poll<io::Result<usize>> {
168        tokio::io::AsyncWrite::poll_write_vectored(Pin::new(&mut self.0), cx, bufs)
169    }
170}