litep2p/protocol/libp2p/ping/
mod.rs

1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! [`/ipfs/ping/1.0.0`](https://github.com/libp2p/specs/blob/master/ping/ping.md) implementation.
22
23use crate::{
24    error::{Error, SubstreamError},
25    protocol::{Direction, TransportEvent, TransportService},
26    substream::Substream,
27    types::SubstreamId,
28    PeerId,
29};
30
31use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
32use tokio::sync::mpsc::Sender;
33
34use std::{
35    collections::HashSet,
36    time::{Duration, Instant},
37};
38
39pub use config::{Config, ConfigBuilder};
40
41mod config;
42
43// TODO: https://github.com/paritytech/litep2p/issues/132 let the user handle max failures
44
45/// Log target for the file.
46const LOG_TARGET: &str = "litep2p::ipfs::ping";
47
48/// Events emitted by the ping protocol.
49#[derive(Debug)]
50pub enum PingEvent {
51    /// Ping time with remote peer.
52    Ping {
53        /// Peer ID.
54        peer: PeerId,
55
56        /// Measured ping time with the peer.
57        ping: Duration,
58    },
59}
60
61/// Ping protocol.
62pub(crate) struct Ping {
63    /// Maximum failures before the peer is considered unreachable.
64    _max_failures: usize,
65
66    // Connection service.
67    service: TransportService,
68
69    /// TX channel for sending events to the user protocol.
70    tx: Sender<PingEvent>,
71
72    /// Connected peers.
73    peers: HashSet<PeerId>,
74
75    /// Pending outbound substreams.
76    pending_outbound: FuturesUnordered<BoxFuture<'static, crate::Result<(PeerId, Duration)>>>,
77
78    /// Pending inbound substreams.
79    pending_inbound: FuturesUnordered<BoxFuture<'static, crate::Result<()>>>,
80}
81
82impl Ping {
83    /// Create new [`Ping`] protocol.
84    pub fn new(service: TransportService, config: Config) -> Self {
85        Self {
86            service,
87            tx: config.tx_event,
88            peers: HashSet::new(),
89            pending_outbound: FuturesUnordered::new(),
90            pending_inbound: FuturesUnordered::new(),
91            _max_failures: config.max_failures,
92        }
93    }
94
95    /// Connection established to remote peer.
96    fn on_connection_established(&mut self, peer: PeerId) -> crate::Result<()> {
97        tracing::trace!(target: LOG_TARGET, ?peer, "connection established");
98
99        self.service.open_substream(peer)?;
100        self.peers.insert(peer);
101
102        Ok(())
103    }
104
105    /// Connection closed to remote peer.
106    fn on_connection_closed(&mut self, peer: PeerId) {
107        tracing::trace!(target: LOG_TARGET, ?peer, "connection closed");
108
109        self.peers.remove(&peer);
110    }
111
112    /// Handle outbound substream.
113    fn on_outbound_substream(
114        &mut self,
115        peer: PeerId,
116        substream_id: SubstreamId,
117        mut substream: Substream,
118    ) {
119        tracing::trace!(target: LOG_TARGET, ?peer, "handle outbound substream");
120
121        self.pending_outbound.push(Box::pin(async move {
122            let future = async move {
123                // TODO: https://github.com/paritytech/litep2p/issues/134 generate random payload and verify it
124                substream.send_framed(vec![0u8; 32].into()).await?;
125                let now = Instant::now();
126                let _ = substream.next().await.ok_or(Error::SubstreamError(
127                    SubstreamError::ReadFailure(Some(substream_id)),
128                ))?;
129                let _ = substream.close().await;
130
131                Ok(now.elapsed())
132            };
133
134            match tokio::time::timeout(Duration::from_secs(10), future).await {
135                Err(_) => Err(Error::Timeout),
136                Ok(Err(error)) => Err(error),
137                Ok(Ok(elapsed)) => Ok((peer, elapsed)),
138            }
139        }));
140    }
141
142    /// Substream opened to remote peer.
143    fn on_inbound_substream(&mut self, peer: PeerId, mut substream: Substream) {
144        tracing::trace!(target: LOG_TARGET, ?peer, "handle inbound substream");
145
146        self.pending_inbound.push(Box::pin(async move {
147            let future = async move {
148                let payload = substream
149                    .next()
150                    .await
151                    .ok_or(Error::SubstreamError(SubstreamError::ReadFailure(None)))??;
152                substream.send_framed(payload.freeze()).await?;
153                let _ = substream.next().await.map(|_| ());
154
155                Ok(())
156            };
157
158            match tokio::time::timeout(Duration::from_secs(10), future).await {
159                Err(_) => Err(Error::Timeout),
160                Ok(Err(error)) => Err(error),
161                Ok(Ok(())) => Ok(()),
162            }
163        }));
164    }
165
166    /// Start [`Ping`] event loop.
167    pub async fn run(mut self) {
168        tracing::debug!(target: LOG_TARGET, "starting ping event loop");
169
170        loop {
171            tokio::select! {
172                event = self.service.next() => match event {
173                    Some(TransportEvent::ConnectionEstablished { peer, .. }) => {
174                        let _ = self.on_connection_established(peer);
175                    }
176                    Some(TransportEvent::ConnectionClosed { peer }) => {
177                        self.on_connection_closed(peer);
178                    }
179                    Some(TransportEvent::SubstreamOpened {
180                        peer,
181                        substream,
182                        direction,
183                        ..
184                    }) => match direction {
185                        Direction::Inbound => {
186                            self.on_inbound_substream(peer, substream);
187                        }
188                        Direction::Outbound(substream_id) => {
189                            self.on_outbound_substream(peer, substream_id, substream);
190                        }
191                    },
192                    Some(_) => {}
193                    None => return,
194                },
195                _event = self.pending_inbound.next(), if !self.pending_inbound.is_empty() => {}
196                event = self.pending_outbound.next(), if !self.pending_outbound.is_empty() => {
197                    match event {
198                        Some(Ok((peer, elapsed))) => {
199                            let _ = self
200                                .tx
201                                .send(PingEvent::Ping {
202                                    peer,
203                                    ping: elapsed,
204                                })
205                                .await;
206                        }
207                        event => tracing::debug!(target: LOG_TARGET, "failed to handle ping for an outbound peer: {event:?}"),
208                    }
209                }
210            }
211        }
212    }
213}