libp2p_ping/
handler.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::{protocol, PROTOCOL_NAME};
22use futures::future::{BoxFuture, Either};
23use futures::prelude::*;
24use futures_timer::Delay;
25use libp2p_core::upgrade::ReadyUpgrade;
26use libp2p_swarm::handler::{
27    ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
28};
29use libp2p_swarm::{
30    ConnectionHandler, ConnectionHandlerEvent, Stream, StreamProtocol, StreamUpgradeError,
31    SubstreamProtocol,
32};
33use std::collections::VecDeque;
34use std::{
35    error::Error,
36    fmt, io,
37    task::{Context, Poll},
38    time::Duration,
39};
40use void::Void;
41
42/// The configuration for outbound pings.
43#[derive(Debug, Clone)]
44pub struct Config {
45    /// The timeout of an outbound ping.
46    timeout: Duration,
47    /// The duration between outbound pings.
48    interval: Duration,
49}
50
51impl Config {
52    /// Creates a new [`Config`] with the following default settings:
53    ///
54    ///   * [`Config::with_interval`] 15s
55    ///   * [`Config::with_timeout`] 20s
56    ///
57    /// These settings have the following effect:
58    ///
59    ///   * A ping is sent every 15 seconds on a healthy connection.
60    ///   * Every ping sent must yield a response within 20 seconds in order to
61    ///     be successful.
62    pub fn new() -> Self {
63        Self {
64            timeout: Duration::from_secs(20),
65            interval: Duration::from_secs(15),
66        }
67    }
68
69    /// Sets the ping timeout.
70    pub fn with_timeout(mut self, d: Duration) -> Self {
71        self.timeout = d;
72        self
73    }
74
75    /// Sets the ping interval.
76    pub fn with_interval(mut self, d: Duration) -> Self {
77        self.interval = d;
78        self
79    }
80}
81
82impl Default for Config {
83    fn default() -> Self {
84        Self::new()
85    }
86}
87
88/// An outbound ping failure.
89#[derive(Debug)]
90pub enum Failure {
91    /// The ping timed out, i.e. no response was received within the
92    /// configured ping timeout.
93    Timeout,
94    /// The peer does not support the ping protocol.
95    Unsupported,
96    /// The ping failed for reasons other than a timeout.
97    Other {
98        error: Box<dyn std::error::Error + Send + Sync + 'static>,
99    },
100}
101
102impl Failure {
103    fn other(e: impl std::error::Error + Send + Sync + 'static) -> Self {
104        Self::Other { error: Box::new(e) }
105    }
106}
107
108impl fmt::Display for Failure {
109    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
110        match self {
111            Failure::Timeout => f.write_str("Ping timeout"),
112            Failure::Other { error } => write!(f, "Ping error: {error}"),
113            Failure::Unsupported => write!(f, "Ping protocol not supported"),
114        }
115    }
116}
117
118impl Error for Failure {
119    fn source(&self) -> Option<&(dyn Error + 'static)> {
120        match self {
121            Failure::Timeout => None,
122            Failure::Other { error } => Some(&**error),
123            Failure::Unsupported => None,
124        }
125    }
126}
127
128/// Protocol handler that handles pinging the remote at a regular period
129/// and answering ping queries.
130pub struct Handler {
131    /// Configuration options.
132    config: Config,
133    /// The timer used for the delay to the next ping.
134    interval: Delay,
135    /// Outbound ping failures that are pending to be processed by `poll()`.
136    pending_errors: VecDeque<Failure>,
137    /// The number of consecutive ping failures that occurred.
138    ///
139    /// Each successful ping resets this counter to 0.
140    failures: u32,
141    /// The outbound ping state.
142    outbound: Option<OutboundState>,
143    /// The inbound pong handler, i.e. if there is an inbound
144    /// substream, this is always a future that waits for the
145    /// next inbound ping to be answered.
146    inbound: Option<PongFuture>,
147    /// Tracks the state of our handler.
148    state: State,
149}
150
151#[derive(Debug, Clone, Copy, PartialEq, Eq)]
152enum State {
153    /// We are inactive because the other peer doesn't support ping.
154    Inactive {
155        /// Whether or not we've reported the missing support yet.
156        ///
157        /// This is used to avoid repeated events being emitted for a specific connection.
158        reported: bool,
159    },
160    /// We are actively pinging the other peer.
161    Active,
162}
163
164impl Handler {
165    /// Builds a new [`Handler`] with the given configuration.
166    pub fn new(config: Config) -> Self {
167        Handler {
168            config,
169            interval: Delay::new(Duration::new(0, 0)),
170            pending_errors: VecDeque::with_capacity(2),
171            failures: 0,
172            outbound: None,
173            inbound: None,
174            state: State::Active,
175        }
176    }
177
178    fn on_dial_upgrade_error(
179        &mut self,
180        DialUpgradeError { error, .. }: DialUpgradeError<
181            <Self as ConnectionHandler>::OutboundOpenInfo,
182            <Self as ConnectionHandler>::OutboundProtocol,
183        >,
184    ) {
185        self.outbound = None; // Request a new substream on the next `poll`.
186
187        // Timer is already polled and expired before substream request is initiated
188        // and will be polled again later on in our `poll` because we reset `self.outbound`.
189        //
190        // `futures-timer` allows an expired timer to be polled again and returns
191        // immediately `Poll::Ready`. However in its WASM implementation there is
192        // a bug that causes the expired timer to panic.
193        // This is a workaround until a proper fix is merged and released.
194        // See libp2p/rust-libp2p#5447 for more info.
195        //
196        // TODO: remove when async-rs/futures-timer#74 gets merged.
197        self.interval.reset(Duration::new(0, 0));
198
199        let error = match error {
200            StreamUpgradeError::NegotiationFailed => {
201                debug_assert_eq!(self.state, State::Active);
202
203                self.state = State::Inactive { reported: false };
204                return;
205            }
206            // Note: This timeout only covers protocol negotiation.
207            StreamUpgradeError::Timeout => Failure::Other {
208                error: Box::new(std::io::Error::new(
209                    std::io::ErrorKind::TimedOut,
210                    "ping protocol negotiation timed out",
211                )),
212            },
213            StreamUpgradeError::Apply(e) => void::unreachable(e),
214            StreamUpgradeError::Io(e) => Failure::Other { error: Box::new(e) },
215        };
216
217        self.pending_errors.push_front(error);
218    }
219}
220
221impl ConnectionHandler for Handler {
222    type FromBehaviour = Void;
223    type ToBehaviour = Result<Duration, Failure>;
224    type InboundProtocol = ReadyUpgrade<StreamProtocol>;
225    type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
226    type OutboundOpenInfo = ();
227    type InboundOpenInfo = ();
228
229    fn listen_protocol(&self) -> SubstreamProtocol<ReadyUpgrade<StreamProtocol>, ()> {
230        SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ())
231    }
232
233    fn on_behaviour_event(&mut self, _: Void) {}
234
235    #[tracing::instrument(level = "trace", name = "ConnectionHandler::poll", skip(self, cx))]
236    fn poll(
237        &mut self,
238        cx: &mut Context<'_>,
239    ) -> Poll<ConnectionHandlerEvent<ReadyUpgrade<StreamProtocol>, (), Result<Duration, Failure>>>
240    {
241        match self.state {
242            State::Inactive { reported: true } => {
243                return Poll::Pending; // nothing to do on this connection
244            }
245            State::Inactive { reported: false } => {
246                self.state = State::Inactive { reported: true };
247                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err(
248                    Failure::Unsupported,
249                )));
250            }
251            State::Active => {}
252        }
253
254        // Respond to inbound pings.
255        if let Some(fut) = self.inbound.as_mut() {
256            match fut.poll_unpin(cx) {
257                Poll::Pending => {}
258                Poll::Ready(Err(e)) => {
259                    tracing::debug!("Inbound ping error: {:?}", e);
260                    self.inbound = None;
261                }
262                Poll::Ready(Ok(stream)) => {
263                    tracing::trace!("answered inbound ping from peer");
264
265                    // A ping from a remote peer has been answered, wait for the next.
266                    self.inbound = Some(protocol::recv_ping(stream).boxed());
267                }
268            }
269        }
270
271        loop {
272            // Check for outbound ping failures.
273            if let Some(error) = self.pending_errors.pop_back() {
274                tracing::debug!("Ping failure: {:?}", error);
275
276                self.failures += 1;
277
278                // Note: For backward-compatibility the first failure is always "free"
279                // and silent. This allows peers who use a new substream
280                // for each ping to have successful ping exchanges with peers
281                // that use a single substream, since every successful ping
282                // resets `failures` to `0`.
283                if self.failures > 1 {
284                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err(error)));
285                }
286            }
287
288            // Continue outbound pings.
289            match self.outbound.take() {
290                Some(OutboundState::Ping(mut ping)) => match ping.poll_unpin(cx) {
291                    Poll::Pending => {
292                        self.outbound = Some(OutboundState::Ping(ping));
293                        break;
294                    }
295                    Poll::Ready(Ok((stream, rtt))) => {
296                        tracing::debug!(?rtt, "ping succeeded");
297                        self.failures = 0;
298                        self.interval.reset(self.config.interval);
299                        self.outbound = Some(OutboundState::Idle(stream));
300                        return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Ok(rtt)));
301                    }
302                    Poll::Ready(Err(e)) => {
303                        self.interval.reset(self.config.interval);
304                        self.pending_errors.push_front(e);
305                    }
306                },
307                Some(OutboundState::Idle(stream)) => match self.interval.poll_unpin(cx) {
308                    Poll::Pending => {
309                        self.outbound = Some(OutboundState::Idle(stream));
310                        break;
311                    }
312                    Poll::Ready(()) => {
313                        self.outbound = Some(OutboundState::Ping(
314                            send_ping(stream, self.config.timeout).boxed(),
315                        ));
316                    }
317                },
318                Some(OutboundState::OpenStream) => {
319                    self.outbound = Some(OutboundState::OpenStream);
320                    break;
321                }
322                None => match self.interval.poll_unpin(cx) {
323                    Poll::Pending => break,
324                    Poll::Ready(()) => {
325                        self.outbound = Some(OutboundState::OpenStream);
326                        let protocol = SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ());
327                        return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
328                            protocol,
329                        });
330                    }
331                },
332            }
333        }
334
335        Poll::Pending
336    }
337
338    fn on_connection_event(
339        &mut self,
340        event: ConnectionEvent<
341            Self::InboundProtocol,
342            Self::OutboundProtocol,
343            Self::InboundOpenInfo,
344            Self::OutboundOpenInfo,
345        >,
346    ) {
347        match event {
348            ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
349                protocol: mut stream,
350                ..
351            }) => {
352                stream.ignore_for_keep_alive();
353                self.inbound = Some(protocol::recv_ping(stream).boxed());
354            }
355            ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
356                protocol: mut stream,
357                ..
358            }) => {
359                stream.ignore_for_keep_alive();
360                self.outbound = Some(OutboundState::Ping(
361                    send_ping(stream, self.config.timeout).boxed(),
362                ));
363            }
364            ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
365                self.on_dial_upgrade_error(dial_upgrade_error)
366            }
367            _ => {}
368        }
369    }
370}
371
372type PingFuture = BoxFuture<'static, Result<(Stream, Duration), Failure>>;
373type PongFuture = BoxFuture<'static, Result<Stream, io::Error>>;
374
375/// The current state w.r.t. outbound pings.
376enum OutboundState {
377    /// A new substream is being negotiated for the ping protocol.
378    OpenStream,
379    /// The substream is idle, waiting to send the next ping.
380    Idle(Stream),
381    /// A ping is being sent and the response awaited.
382    Ping(PingFuture),
383}
384
385/// A wrapper around [`protocol::send_ping`] that enforces a time out.
386async fn send_ping(stream: Stream, timeout: Duration) -> Result<(Stream, Duration), Failure> {
387    let ping = protocol::send_ping(stream);
388    futures::pin_mut!(ping);
389
390    match future::select(ping, Delay::new(timeout)).await {
391        Either::Left((Ok((stream, rtt)), _)) => Ok((stream, rtt)),
392        Either::Left((Err(e), _)) => Err(Failure::other(e)),
393        Either::Right(((), _)) => Err(Failure::Timeout),
394    }
395}