litep2p/protocol/libp2p/ping/
mod.rs

1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! [`/ipfs/ping/1.0.0`](https://github.com/libp2p/specs/blob/master/ping/ping.md) implementation.
22
23use crate::{
24    error::{Error, SubstreamError},
25    protocol::{Direction, TransportEvent, TransportService},
26    substream::Substream,
27    types::SubstreamId,
28    PeerId,
29};
30
31use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
32use tokio::sync::mpsc::Sender;
33
34use std::{
35    collections::{HashMap, HashSet},
36    time::{Duration, Instant},
37};
38
39pub use config::{Config, ConfigBuilder};
40
41mod config;
42
43// TODO: handle max failures
44
45/// Log target for the file.
46const LOG_TARGET: &str = "litep2p::ipfs::ping";
47
48/// Events emitted by the ping protocol.
49#[derive(Debug)]
50pub enum PingEvent {
51    /// Ping time with remote peer.
52    Ping {
53        /// Peer ID.
54        peer: PeerId,
55
56        /// Measured ping time with the peer.
57        ping: Duration,
58    },
59}
60
61/// Ping protocol.
62pub(crate) struct Ping {
63    /// Maximum failures before the peer is considered unreachable.
64    _max_failures: usize,
65
66    // Connection service.
67    service: TransportService,
68
69    /// TX channel for sending events to the user protocol.
70    tx: Sender<PingEvent>,
71
72    /// Connected peers.
73    peers: HashSet<PeerId>,
74
75    /// Pending outbound substreams.
76    pending_opens: HashMap<SubstreamId, PeerId>,
77
78    /// Pending outbound substreams.
79    pending_outbound: FuturesUnordered<BoxFuture<'static, crate::Result<(PeerId, Duration)>>>,
80
81    /// Pending inbound substreams.
82    pending_inbound: FuturesUnordered<BoxFuture<'static, crate::Result<()>>>,
83}
84
85impl Ping {
86    /// Create new [`Ping`] protocol.
87    pub fn new(service: TransportService, config: Config) -> Self {
88        Self {
89            service,
90            tx: config.tx_event,
91            peers: HashSet::new(),
92            pending_opens: HashMap::new(),
93            pending_outbound: FuturesUnordered::new(),
94            pending_inbound: FuturesUnordered::new(),
95            _max_failures: config.max_failures,
96        }
97    }
98
99    /// Connection established to remote peer.
100    fn on_connection_established(&mut self, peer: PeerId) -> crate::Result<()> {
101        tracing::trace!(target: LOG_TARGET, ?peer, "connection established");
102
103        let substream_id = self.service.open_substream(peer)?;
104        self.pending_opens.insert(substream_id, peer);
105        self.peers.insert(peer);
106
107        Ok(())
108    }
109
110    /// Connection closed to remote peer.
111    fn on_connection_closed(&mut self, peer: PeerId) {
112        tracing::trace!(target: LOG_TARGET, ?peer, "connection closed");
113
114        self.peers.remove(&peer);
115    }
116
117    /// Handle outbound substream.
118    fn on_outbound_substream(
119        &mut self,
120        peer: PeerId,
121        substream_id: SubstreamId,
122        mut substream: Substream,
123    ) {
124        tracing::trace!(target: LOG_TARGET, ?peer, "handle outbound substream");
125
126        self.pending_outbound.push(Box::pin(async move {
127            let future = async move {
128                // TODO: generate random payload and verify it
129                substream.send_framed(vec![0u8; 32].into()).await?;
130                let now = Instant::now();
131                let _ = substream.next().await.ok_or(Error::SubstreamError(
132                    SubstreamError::ReadFailure(Some(substream_id)),
133                ))?;
134                let _ = substream.close().await;
135
136                Ok(now.elapsed())
137            };
138
139            match tokio::time::timeout(Duration::from_secs(10), future).await {
140                Err(_) => Err(Error::Timeout),
141                Ok(Err(error)) => Err(error),
142                Ok(Ok(elapsed)) => Ok((peer, elapsed)),
143            }
144        }));
145    }
146
147    /// Substream opened to remote peer.
148    fn on_inbound_substream(&mut self, peer: PeerId, mut substream: Substream) {
149        tracing::trace!(target: LOG_TARGET, ?peer, "handle inbound substream");
150
151        self.pending_inbound.push(Box::pin(async move {
152            let future = async move {
153                let payload = substream
154                    .next()
155                    .await
156                    .ok_or(Error::SubstreamError(SubstreamError::ReadFailure(None)))??;
157                substream.send_framed(payload.freeze()).await?;
158                let _ = substream.next().await.map(|_| ());
159
160                Ok(())
161            };
162
163            match tokio::time::timeout(Duration::from_secs(10), future).await {
164                Err(_) => Err(Error::Timeout),
165                Ok(Err(error)) => Err(error),
166                Ok(Ok(())) => Ok(()),
167            }
168        }));
169    }
170
171    /// Start [`Ping`] event loop.
172    pub async fn run(mut self) {
173        tracing::debug!(target: LOG_TARGET, "starting ping event loop");
174
175        loop {
176            tokio::select! {
177                event = self.service.next() => match event {
178                    Some(TransportEvent::ConnectionEstablished { peer, .. }) => {
179                        let _ = self.on_connection_established(peer);
180                    }
181                    Some(TransportEvent::ConnectionClosed { peer }) => {
182                        self.on_connection_closed(peer);
183                    }
184                    Some(TransportEvent::SubstreamOpened {
185                        peer,
186                        substream,
187                        direction,
188                        ..
189                    }) => match direction {
190                        Direction::Inbound => {
191                            self.on_inbound_substream(peer, substream);
192                        }
193                        Direction::Outbound(substream_id) => {
194                            match self.pending_opens.remove(&substream_id) {
195                                Some(stored_peer) => {
196                                    debug_assert!(peer == stored_peer);
197                                    self.on_outbound_substream(peer, substream_id, substream);
198                                }
199                                None => {
200                                    tracing::warn!(
201                                        target: LOG_TARGET,
202                                        ?substream_id,
203                                        "outbound ping substream ID does not exist",
204                                    );
205                                }
206                            }
207                        }
208                    },
209                    Some(_) => {}
210                    None => return,
211                },
212                _event = self.pending_inbound.next(), if !self.pending_inbound.is_empty() => {}
213                event = self.pending_outbound.next(), if !self.pending_outbound.is_empty() => {
214                    match event {
215                        Some(Ok((peer, elapsed))) => {
216                            let _ = self
217                                .tx
218                                .send(PingEvent::Ping {
219                                    peer,
220                                    ping: elapsed,
221                                })
222                                .await;
223                        }
224                        event => tracing::debug!(target: LOG_TARGET, "failed to handle ping for an outbound peer: {event:?}"),
225                    }
226                }
227            }
228        }
229    }
230}