libp2p_ping/
protocol.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use futures::prelude::*;
22use instant::Instant;
23use libp2p_swarm::StreamProtocol;
24use rand::{distributions, prelude::*};
25use std::{io, time::Duration};
26
27pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/ping/1.0.0");
28
29/// The `Ping` protocol upgrade.
30///
31/// The ping protocol sends 32 bytes of random data in configurable
32/// intervals over a single outbound substream, expecting to receive
33/// the same bytes as a response. At the same time, incoming pings
34/// on inbound substreams are answered by sending back the received bytes.
35///
36/// At most a single inbound and outbound substream is kept open at
37/// any time. In case of a ping timeout or another error on a substream, the
38/// substream is dropped.
39///
40/// Successful pings report the round-trip time.
41///
42/// > **Note**: The round-trip time of a ping may be subject to delays induced
43/// >           by the underlying transport, e.g. in the case of TCP there is
44/// >           Nagle's algorithm, delayed acks and similar configuration options
45/// >           which can affect latencies especially on otherwise low-volume
46/// >           connections.
47#[derive(Default, Debug, Copy, Clone)]
48pub(crate) struct Ping;
49const PING_SIZE: usize = 32;
50
51/// Sends a ping and waits for the pong.
52pub(crate) async fn send_ping<S>(mut stream: S) -> io::Result<(S, Duration)>
53where
54    S: AsyncRead + AsyncWrite + Unpin,
55{
56    let payload: [u8; PING_SIZE] = thread_rng().sample(distributions::Standard);
57    stream.write_all(&payload).await?;
58    stream.flush().await?;
59    let started = Instant::now();
60    let mut recv_payload = [0u8; PING_SIZE];
61    stream.read_exact(&mut recv_payload).await?;
62    if recv_payload == payload {
63        Ok((stream, started.elapsed()))
64    } else {
65        Err(io::Error::new(
66            io::ErrorKind::InvalidData,
67            "Ping payload mismatch",
68        ))
69    }
70}
71
72/// Waits for a ping and sends a pong.
73pub(crate) async fn recv_ping<S>(mut stream: S) -> io::Result<S>
74where
75    S: AsyncRead + AsyncWrite + Unpin,
76{
77    let mut payload = [0u8; PING_SIZE];
78    stream.read_exact(&mut payload).await?;
79    stream.write_all(&payload).await?;
80    stream.flush().await?;
81    Ok(stream)
82}
83
84#[cfg(test)]
85mod tests {
86    use super::*;
87    use futures::StreamExt;
88    use libp2p_core::{
89        multiaddr::multiaddr,
90        transport::{memory::MemoryTransport, ListenerId, Transport},
91    };
92    use rand::{thread_rng, Rng};
93    use std::time::Duration;
94
95    #[test]
96    fn ping_pong() {
97        let mem_addr = multiaddr![Memory(thread_rng().gen::<u64>())];
98        let mut transport = MemoryTransport::new().boxed();
99        transport.listen_on(ListenerId::next(), mem_addr).unwrap();
100
101        let listener_addr = transport
102            .select_next_some()
103            .now_or_never()
104            .and_then(|ev| ev.into_new_address())
105            .expect("MemoryTransport not listening on an address!");
106
107        async_std::task::spawn(async move {
108            let transport_event = transport.next().await.unwrap();
109            let (listener_upgrade, _) = transport_event.into_incoming().unwrap();
110            let conn = listener_upgrade.await.unwrap();
111            recv_ping(conn).await.unwrap();
112        });
113
114        async_std::task::block_on(async move {
115            let c = MemoryTransport::new()
116                .dial(listener_addr)
117                .unwrap()
118                .await
119                .unwrap();
120            let (_, rtt) = send_ping(c).await.unwrap();
121            assert!(rtt > Duration::from_secs(0));
122        });
123    }
124}