libp2p_ping/
handler.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::{protocol, PROTOCOL_NAME};
22use futures::future::{BoxFuture, Either};
23use futures::prelude::*;
24use futures_timer::Delay;
25use libp2p_core::upgrade::ReadyUpgrade;
26use libp2p_identity::PeerId;
27use libp2p_swarm::handler::{
28    ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
29};
30use libp2p_swarm::{
31    ConnectionHandler, ConnectionHandlerEvent, KeepAlive, Stream, StreamProtocol,
32    StreamUpgradeError, SubstreamProtocol,
33};
34use std::collections::VecDeque;
35use std::{
36    error::Error,
37    fmt, io,
38    task::{Context, Poll},
39    time::Duration,
40};
41use void::Void;
42
43/// The configuration for outbound pings.
44#[derive(Debug, Clone)]
45pub struct Config {
46    /// The timeout of an outbound ping.
47    timeout: Duration,
48    /// The duration between outbound pings.
49    interval: Duration,
50}
51
52impl Config {
53    /// Creates a new [`Config`] with the following default settings:
54    ///
55    ///   * [`Config::with_interval`] 15s
56    ///   * [`Config::with_timeout`] 20s
57    ///
58    /// These settings have the following effect:
59    ///
60    ///   * A ping is sent every 15 seconds on a healthy connection.
61    ///   * Every ping sent must yield a response within 20 seconds in order to
62    ///     be successful.
63    pub fn new() -> Self {
64        Self {
65            timeout: Duration::from_secs(20),
66            interval: Duration::from_secs(15),
67        }
68    }
69
70    /// Sets the ping timeout.
71    pub fn with_timeout(mut self, d: Duration) -> Self {
72        self.timeout = d;
73        self
74    }
75
76    /// Sets the ping interval.
77    pub fn with_interval(mut self, d: Duration) -> Self {
78        self.interval = d;
79        self
80    }
81}
82
83impl Default for Config {
84    fn default() -> Self {
85        Self::new()
86    }
87}
88
89/// An outbound ping failure.
90#[derive(Debug)]
91pub enum Failure {
92    /// The ping timed out, i.e. no response was received within the
93    /// configured ping timeout.
94    Timeout,
95    /// The peer does not support the ping protocol.
96    Unsupported,
97    /// The ping failed for reasons other than a timeout.
98    Other {
99        error: Box<dyn std::error::Error + Send + 'static>,
100    },
101}
102
103impl Failure {
104    fn other(e: impl std::error::Error + Send + 'static) -> Self {
105        Self::Other { error: Box::new(e) }
106    }
107}
108
109impl fmt::Display for Failure {
110    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111        match self {
112            Failure::Timeout => f.write_str("Ping timeout"),
113            Failure::Other { error } => write!(f, "Ping error: {error}"),
114            Failure::Unsupported => write!(f, "Ping protocol not supported"),
115        }
116    }
117}
118
119impl Error for Failure {
120    fn source(&self) -> Option<&(dyn Error + 'static)> {
121        match self {
122            Failure::Timeout => None,
123            Failure::Other { error } => Some(&**error),
124            Failure::Unsupported => None,
125        }
126    }
127}
128
129/// Protocol handler that handles pinging the remote at a regular period
130/// and answering ping queries.
131pub struct Handler {
132    /// Configuration options.
133    config: Config,
134    /// The timer used for the delay to the next ping.
135    interval: Delay,
136    /// Outbound ping failures that are pending to be processed by `poll()`.
137    pending_errors: VecDeque<Failure>,
138    /// The number of consecutive ping failures that occurred.
139    ///
140    /// Each successful ping resets this counter to 0.
141    failures: u32,
142    /// The outbound ping state.
143    outbound: Option<OutboundState>,
144    /// The inbound pong handler, i.e. if there is an inbound
145    /// substream, this is always a future that waits for the
146    /// next inbound ping to be answered.
147    inbound: Option<PongFuture>,
148    /// Tracks the state of our handler.
149    state: State,
150    /// The peer we are connected to.
151    peer: PeerId,
152}
153
154#[derive(Debug, Clone, Copy, PartialEq, Eq)]
155enum State {
156    /// We are inactive because the other peer doesn't support ping.
157    Inactive {
158        /// Whether or not we've reported the missing support yet.
159        ///
160        /// This is used to avoid repeated events being emitted for a specific connection.
161        reported: bool,
162    },
163    /// We are actively pinging the other peer.
164    Active,
165}
166
167impl Handler {
168    /// Builds a new [`Handler`] with the given configuration.
169    pub fn new(config: Config, peer: PeerId) -> Self {
170        Handler {
171            peer,
172            config,
173            interval: Delay::new(Duration::new(0, 0)),
174            pending_errors: VecDeque::with_capacity(2),
175            failures: 0,
176            outbound: None,
177            inbound: None,
178            state: State::Active,
179        }
180    }
181
182    fn on_dial_upgrade_error(
183        &mut self,
184        DialUpgradeError { error, .. }: DialUpgradeError<
185            <Self as ConnectionHandler>::OutboundOpenInfo,
186            <Self as ConnectionHandler>::OutboundProtocol,
187        >,
188    ) {
189        self.outbound = None; // Request a new substream on the next `poll`.
190
191        let error = match error {
192            StreamUpgradeError::NegotiationFailed => {
193                debug_assert_eq!(self.state, State::Active);
194
195                self.state = State::Inactive { reported: false };
196                return;
197            }
198            // Note: This timeout only covers protocol negotiation.
199            StreamUpgradeError::Timeout => Failure::Other {
200                error: Box::new(std::io::Error::new(
201                    std::io::ErrorKind::TimedOut,
202                    "ping protocol negotiation timed out",
203                )),
204            },
205            StreamUpgradeError::Apply(e) => void::unreachable(e),
206            StreamUpgradeError::Io(e) => Failure::Other { error: Box::new(e) },
207        };
208
209        self.pending_errors.push_front(error);
210    }
211}
212
213impl ConnectionHandler for Handler {
214    type FromBehaviour = Void;
215    type ToBehaviour = Result<Duration, Failure>;
216    type Error = Void;
217    type InboundProtocol = ReadyUpgrade<StreamProtocol>;
218    type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
219    type OutboundOpenInfo = ();
220    type InboundOpenInfo = ();
221
222    fn listen_protocol(&self) -> SubstreamProtocol<ReadyUpgrade<StreamProtocol>, ()> {
223        SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ())
224    }
225
226    fn on_behaviour_event(&mut self, _: Void) {}
227
228    fn connection_keep_alive(&self) -> KeepAlive {
229        KeepAlive::No
230    }
231
232    fn poll(
233        &mut self,
234        cx: &mut Context<'_>,
235    ) -> Poll<
236        ConnectionHandlerEvent<
237            ReadyUpgrade<StreamProtocol>,
238            (),
239            Result<Duration, Failure>,
240            Self::Error,
241        >,
242    > {
243        match self.state {
244            State::Inactive { reported: true } => {
245                return Poll::Pending; // nothing to do on this connection
246            }
247            State::Inactive { reported: false } => {
248                self.state = State::Inactive { reported: true };
249                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err(
250                    Failure::Unsupported,
251                )));
252            }
253            State::Active => {}
254        }
255
256        // Respond to inbound pings.
257        if let Some(fut) = self.inbound.as_mut() {
258            match fut.poll_unpin(cx) {
259                Poll::Pending => {}
260                Poll::Ready(Err(e)) => {
261                    log::debug!("Inbound ping error: {:?}", e);
262                    self.inbound = None;
263                }
264                Poll::Ready(Ok(stream)) => {
265                    log::trace!("answered inbound ping from {}", self.peer);
266
267                    // A ping from a remote peer has been answered, wait for the next.
268                    self.inbound = Some(protocol::recv_ping(stream).boxed());
269                }
270            }
271        }
272
273        loop {
274            // Check for outbound ping failures.
275            if let Some(error) = self.pending_errors.pop_back() {
276                log::debug!("Ping failure: {:?}", error);
277
278                self.failures += 1;
279
280                // Note: For backward-compatibility the first failure is always "free"
281                // and silent. This allows peers who use a new substream
282                // for each ping to have successful ping exchanges with peers
283                // that use a single substream, since every successful ping
284                // resets `failures` to `0`.
285                if self.failures > 1 {
286                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err(error)));
287                }
288            }
289
290            // Continue outbound pings.
291            match self.outbound.take() {
292                Some(OutboundState::Ping(mut ping)) => match ping.poll_unpin(cx) {
293                    Poll::Pending => {
294                        self.outbound = Some(OutboundState::Ping(ping));
295                        break;
296                    }
297                    Poll::Ready(Ok((stream, rtt))) => {
298                        log::debug!("latency to {} is {}ms", self.peer, rtt.as_millis());
299
300                        self.failures = 0;
301                        self.interval.reset(self.config.interval);
302                        self.outbound = Some(OutboundState::Idle(stream));
303                        return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Ok(rtt)));
304                    }
305                    Poll::Ready(Err(e)) => {
306                        self.interval.reset(self.config.interval);
307                        self.pending_errors.push_front(e);
308                    }
309                },
310                Some(OutboundState::Idle(stream)) => match self.interval.poll_unpin(cx) {
311                    Poll::Pending => {
312                        self.outbound = Some(OutboundState::Idle(stream));
313                        break;
314                    }
315                    Poll::Ready(()) => {
316                        self.outbound = Some(OutboundState::Ping(
317                            send_ping(stream, self.config.timeout).boxed(),
318                        ));
319                    }
320                },
321                Some(OutboundState::OpenStream) => {
322                    self.outbound = Some(OutboundState::OpenStream);
323                    break;
324                }
325                None => match self.interval.poll_unpin(cx) {
326                    Poll::Pending => break,
327                    Poll::Ready(()) => {
328                        self.outbound = Some(OutboundState::OpenStream);
329                        let protocol = SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ());
330                        return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
331                            protocol,
332                        });
333                    }
334                },
335            }
336        }
337
338        Poll::Pending
339    }
340
341    fn on_connection_event(
342        &mut self,
343        event: ConnectionEvent<
344            Self::InboundProtocol,
345            Self::OutboundProtocol,
346            Self::InboundOpenInfo,
347            Self::OutboundOpenInfo,
348        >,
349    ) {
350        match event {
351            ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
352                protocol: stream,
353                ..
354            }) => {
355                self.inbound = Some(protocol::recv_ping(stream).boxed());
356            }
357            ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
358                protocol: stream,
359                ..
360            }) => {
361                self.outbound = Some(OutboundState::Ping(
362                    send_ping(stream, self.config.timeout).boxed(),
363                ));
364            }
365            ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
366                self.on_dial_upgrade_error(dial_upgrade_error)
367            }
368            ConnectionEvent::AddressChange(_)
369            | ConnectionEvent::ListenUpgradeError(_)
370            | ConnectionEvent::LocalProtocolsChange(_)
371            | ConnectionEvent::RemoteProtocolsChange(_) => {}
372        }
373    }
374}
375
376type PingFuture = BoxFuture<'static, Result<(Stream, Duration), Failure>>;
377type PongFuture = BoxFuture<'static, Result<Stream, io::Error>>;
378
379/// The current state w.r.t. outbound pings.
380enum OutboundState {
381    /// A new substream is being negotiated for the ping protocol.
382    OpenStream,
383    /// The substream is idle, waiting to send the next ping.
384    Idle(Stream),
385    /// A ping is being sent and the response awaited.
386    Ping(PingFuture),
387}
388
389/// A wrapper around [`protocol::send_ping`] that enforces a time out.
390async fn send_ping(stream: Stream, timeout: Duration) -> Result<(Stream, Duration), Failure> {
391    let ping = protocol::send_ping(stream);
392    futures::pin_mut!(ping);
393
394    match future::select(ping, Delay::new(timeout)).await {
395        Either::Left((Ok((stream, rtt)), _)) => Ok((stream, rtt)),
396        Either::Left((Err(e), _)) => Err(Failure::other(e)),
397        Either::Right(((), _)) => Err(Failure::Timeout),
398    }
399}