litep2p/protocol/libp2p/ping/
mod.rs

1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! [`/ipfs/ping/1.0.0`](https://github.com/libp2p/specs/blob/master/ping/ping.md) implementation.
22
23use crate::{
24    error::SubstreamError,
25    protocol::{Direction, TransportEvent, TransportService},
26    substream::Substream,
27    types::SubstreamId,
28    PeerId,
29};
30
31use bytes::Bytes;
32use futures::{
33    stream::{self, BoxStream},
34    FutureExt, StreamExt,
35};
36use rand::Rng as _;
37use std::{
38    collections::HashSet,
39    time::{Duration, Instant},
40};
41use tokio::sync::mpsc;
42use tokio_stream::StreamMap;
43
44pub use config::{Config, ConfigBuilder};
45mod config;
46
47// TODO: https://github.com/paritytech/litep2p/issues/132 let the user handle max failures
48
49/// Log target for the file.
50const LOG_TARGET: &str = "litep2p::ipfs::ping";
51
52/// Events emitted by the ping protocol.
53#[derive(Debug)]
54pub enum PingEvent {
55    /// Ping time with remote peer.
56    Ping {
57        /// Peer ID.
58        peer: PeerId,
59
60        /// Measured ping time with the peer.
61        ping: Duration,
62    },
63}
64
65/// Ping protocol.
66pub(crate) struct Ping {
67    /// Maximum failures before the peer is considered unreachable.
68    /// This must be at least 1 until <https://github.com/paritytech/litep2p/pull/416> is adopted
69    /// by the network. (With older litep2p every other ping fails.)
70    // TODO: use this to disconnect peers.
71    _max_failures: usize,
72
73    /// Connection service.
74    service: TransportService,
75
76    /// TX channel for sending events to the user protocol.
77    tx: mpsc::Sender<PingEvent>,
78
79    /// Local pingers per peer.
80    pingers: StreamMap<PeerId, BoxStream<'static, Result<Duration, PingError>>>,
81
82    /// Substreams on which we retry pings after failure. Used for rate-limiting.
83    retries: HashSet<SubstreamId>,
84
85    /// Ping responders per peer.
86    responders: StreamMap<PeerId, BoxStream<'static, Result<(), SubstreamError>>>,
87
88    /// Interval between outbound pings.
89    ping_interval: Duration,
90}
91
92impl Ping {
93    /// Create new [`Ping`] protocol.
94    pub fn new(service: TransportService, config: Config) -> Self {
95        Self {
96            service,
97            tx: config.tx_event,
98            ping_interval: config.ping_interval,
99            pingers: StreamMap::new(),
100            retries: HashSet::new(),
101            responders: StreamMap::new(),
102            _max_failures: config.max_failures,
103        }
104    }
105
106    /// Connection established to remote peer.
107    fn on_connection_established(&mut self, peer: PeerId) {
108        tracing::debug!(target: LOG_TARGET, ?peer, "connection established");
109
110        if let Err(error) = self.service.open_substream(peer) {
111            tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to open substream");
112        }
113    }
114
115    /// Connection closed to remote peer.
116    fn on_connection_closed(&mut self, peer: PeerId) {
117        tracing::debug!(target: LOG_TARGET, ?peer, "connection closed");
118    }
119
120    /// Handle outbound substream.
121    fn on_outbound_substream(
122        &mut self,
123        peer: PeerId,
124        substream_id: SubstreamId,
125        substream: Substream,
126    ) {
127        tracing::trace!(target: LOG_TARGET, ?peer, "handle outbound substream");
128        let interval = self.ping_interval;
129        let should_wait = self.retries.remove(&substream_id);
130
131        let pinger_stream = stream::unfold(
132            (substream, should_wait),
133            move |(mut substream, should_wait)| async move {
134                if should_wait {
135                    tokio::time::sleep(interval).await;
136                }
137
138                let payload = Bytes::from(Vec::from(rand::thread_rng().gen::<[u8; 32]>()));
139
140                let ping = async {
141                    let now = Instant::now();
142
143                    substream.send_framed(payload.clone()).await?;
144                    let received = substream.next().await.ok_or(PingError::SubstreamError(
145                        SubstreamError::ReadFailure(Some(substream_id)),
146                    ))??;
147
148                    if received == payload {
149                        Ok(now.elapsed())
150                    } else {
151                        Err(PingError::InvalidPayload)
152                    }
153                };
154
155                match tokio::time::timeout(Duration::from_secs(20), ping).await {
156                    Ok(Ok(elapsed)) => Some((Ok(elapsed), (substream, true))),
157                    Ok(Err(error)) => Some((Err(error), (substream, false))),
158                    Err(timeout) => Some((Err(timeout.into()), (substream, false))),
159                }
160            },
161        );
162
163        // We could overwrite the old pinger here if connection was closed then opened before the
164        // ping failed.
165        let _ = self.pingers.insert(peer, pinger_stream.boxed());
166    }
167
168    /// Handle inbound substream.
169    fn on_inbound_substream(&mut self, peer: PeerId, mut substream: Substream) {
170        tracing::trace!(target: LOG_TARGET, ?peer, "handle inbound substream");
171
172        let responder_future = async move {
173            loop {
174                if let Some(payload) = substream.next().await {
175                    substream.send_framed(payload?.freeze()).await?;
176                } else {
177                    return Ok(());
178                }
179            }
180        };
181
182        if self.responders.insert(peer, responder_future.into_stream().boxed()).is_some() {
183            tracing::trace!(
184                target: LOG_TARGET,
185                ?peer,
186                "discarding ping substream as remote opened a new one",
187            );
188        }
189    }
190
191    /// Start [`Ping`] event loop.
192    pub async fn run(mut self) {
193        tracing::debug!(target: LOG_TARGET, "starting ping event loop");
194
195        loop {
196            tokio::select! {
197                event = self.service.next() => match event {
198                    Some(TransportEvent::ConnectionEstablished { peer, .. }) => {
199                        self.on_connection_established(peer);
200                    }
201                    Some(TransportEvent::ConnectionClosed { peer }) => {
202                        self.on_connection_closed(peer);
203                    }
204                    Some(TransportEvent::SubstreamOpened {
205                        peer,
206                        substream,
207                        direction,
208                        ..
209                    }) => match direction {
210                        Direction::Inbound => {
211                            self.on_inbound_substream(peer, substream);
212                        }
213                        Direction::Outbound(substream_id) => {
214                            self.on_outbound_substream(peer, substream_id, substream);
215                        }
216                    }
217                    Some(TransportEvent::SubstreamOpenFailure {
218                        substream,
219                        ..
220                    }) => {
221                        self.retries.remove(&substream);
222                    }
223                    Some(_) => {}
224                    None => return,
225                },
226                Some((peer, result)) = self.responders.next(), if !self.responders.is_empty() => {
227                    // Remove the future from `StreamMap` to not wait untill it is polled again and
228                    // removes it itself getting `None`. Otherwise we can get a confusing log
229                    // message when try to insert a new responder for the same peer.
230                    self.responders.remove(&peer);
231
232                    tracing::trace!(
233                        target: LOG_TARGET,
234                        ?peer,
235                        ?result,
236                        "inbound ping responder terminated",
237                    );
238                }
239                Some((peer, result)) = self.pingers.next(), if !self.pingers.is_empty() => {
240                    match result {
241                        Ok(elapsed) => {
242                            tracing::debug!(
243                                target: LOG_TARGET,
244                                ?peer,
245                                time_us = elapsed.as_micros(),
246                                "pong",
247                            );
248
249                            let _ = self.tx.send(PingEvent::Ping { peer, ping: elapsed }).await;
250                        }
251                        Err(error) => {
252                            self.pingers.remove(&peer);
253
254                            tracing::debug!(
255                                target: LOG_TARGET,
256                                ?peer,
257                                ?error,
258                                "ping failed",
259                            );
260
261                            match self.service.open_substream(peer) {
262                                Ok(substream_id) => {
263                                    self.retries.insert(substream_id);
264                                }
265                                Err(error) => tracing::debug!(
266                                    target: LOG_TARGET,
267                                    ?peer,
268                                    ?error,
269                                    "failed to open substream after ping failed",
270                                ),
271                            }
272                        }
273                    }
274                }
275            }
276        }
277    }
278}
279
280/// Possible error of the outbound ping.
281#[derive(Debug, thiserror::Error)]
282enum PingError {
283    #[error("Substream error: {0}")]
284    SubstreamError(#[from] SubstreamError),
285    #[error("Invalid payload received")]
286    InvalidPayload,
287    #[error("Timeout")]
288    Timeout(#[from] tokio::time::error::Elapsed),
289}