1use crate::{protocol, PROTOCOL_NAME};
22use futures::future::{BoxFuture, Either};
23use futures::prelude::*;
24use futures_timer::Delay;
25use libp2p_core::upgrade::ReadyUpgrade;
26use libp2p_identity::PeerId;
27use libp2p_swarm::handler::{
28 ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
29};
30use libp2p_swarm::{
31 ConnectionHandler, ConnectionHandlerEvent, KeepAlive, Stream, StreamProtocol,
32 StreamUpgradeError, SubstreamProtocol,
33};
34use std::collections::VecDeque;
35use std::{
36 error::Error,
37 fmt, io,
38 task::{Context, Poll},
39 time::Duration,
40};
41use void::Void;
42
43#[derive(Debug, Clone)]
45pub struct Config {
46 timeout: Duration,
48 interval: Duration,
50}
51
52impl Config {
53 pub fn new() -> Self {
64 Self {
65 timeout: Duration::from_secs(20),
66 interval: Duration::from_secs(15),
67 }
68 }
69
70 pub fn with_timeout(mut self, d: Duration) -> Self {
72 self.timeout = d;
73 self
74 }
75
76 pub fn with_interval(mut self, d: Duration) -> Self {
78 self.interval = d;
79 self
80 }
81}
82
83impl Default for Config {
84 fn default() -> Self {
85 Self::new()
86 }
87}
88
89#[derive(Debug)]
91pub enum Failure {
92 Timeout,
95 Unsupported,
97 Other {
99 error: Box<dyn std::error::Error + Send + 'static>,
100 },
101}
102
103impl Failure {
104 fn other(e: impl std::error::Error + Send + 'static) -> Self {
105 Self::Other { error: Box::new(e) }
106 }
107}
108
109impl fmt::Display for Failure {
110 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111 match self {
112 Failure::Timeout => f.write_str("Ping timeout"),
113 Failure::Other { error } => write!(f, "Ping error: {error}"),
114 Failure::Unsupported => write!(f, "Ping protocol not supported"),
115 }
116 }
117}
118
119impl Error for Failure {
120 fn source(&self) -> Option<&(dyn Error + 'static)> {
121 match self {
122 Failure::Timeout => None,
123 Failure::Other { error } => Some(&**error),
124 Failure::Unsupported => None,
125 }
126 }
127}
128
129pub struct Handler {
132 config: Config,
134 interval: Delay,
136 pending_errors: VecDeque<Failure>,
138 failures: u32,
142 outbound: Option<OutboundState>,
144 inbound: Option<PongFuture>,
148 state: State,
150 peer: PeerId,
152}
153
154#[derive(Debug, Clone, Copy, PartialEq, Eq)]
155enum State {
156 Inactive {
158 reported: bool,
162 },
163 Active,
165}
166
167impl Handler {
168 pub fn new(config: Config, peer: PeerId) -> Self {
170 Handler {
171 peer,
172 config,
173 interval: Delay::new(Duration::new(0, 0)),
174 pending_errors: VecDeque::with_capacity(2),
175 failures: 0,
176 outbound: None,
177 inbound: None,
178 state: State::Active,
179 }
180 }
181
182 fn on_dial_upgrade_error(
183 &mut self,
184 DialUpgradeError { error, .. }: DialUpgradeError<
185 <Self as ConnectionHandler>::OutboundOpenInfo,
186 <Self as ConnectionHandler>::OutboundProtocol,
187 >,
188 ) {
189 self.outbound = None; let error = match error {
192 StreamUpgradeError::NegotiationFailed => {
193 debug_assert_eq!(self.state, State::Active);
194
195 self.state = State::Inactive { reported: false };
196 return;
197 }
198 StreamUpgradeError::Timeout => Failure::Other {
200 error: Box::new(std::io::Error::new(
201 std::io::ErrorKind::TimedOut,
202 "ping protocol negotiation timed out",
203 )),
204 },
205 StreamUpgradeError::Apply(e) => void::unreachable(e),
206 StreamUpgradeError::Io(e) => Failure::Other { error: Box::new(e) },
207 };
208
209 self.pending_errors.push_front(error);
210 }
211}
212
213impl ConnectionHandler for Handler {
214 type FromBehaviour = Void;
215 type ToBehaviour = Result<Duration, Failure>;
216 type Error = Void;
217 type InboundProtocol = ReadyUpgrade<StreamProtocol>;
218 type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
219 type OutboundOpenInfo = ();
220 type InboundOpenInfo = ();
221
222 fn listen_protocol(&self) -> SubstreamProtocol<ReadyUpgrade<StreamProtocol>, ()> {
223 SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ())
224 }
225
226 fn on_behaviour_event(&mut self, _: Void) {}
227
228 fn connection_keep_alive(&self) -> KeepAlive {
229 KeepAlive::No
230 }
231
232 fn poll(
233 &mut self,
234 cx: &mut Context<'_>,
235 ) -> Poll<
236 ConnectionHandlerEvent<
237 ReadyUpgrade<StreamProtocol>,
238 (),
239 Result<Duration, Failure>,
240 Self::Error,
241 >,
242 > {
243 match self.state {
244 State::Inactive { reported: true } => {
245 return Poll::Pending; }
247 State::Inactive { reported: false } => {
248 self.state = State::Inactive { reported: true };
249 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err(
250 Failure::Unsupported,
251 )));
252 }
253 State::Active => {}
254 }
255
256 if let Some(fut) = self.inbound.as_mut() {
258 match fut.poll_unpin(cx) {
259 Poll::Pending => {}
260 Poll::Ready(Err(e)) => {
261 log::debug!("Inbound ping error: {:?}", e);
262 self.inbound = None;
263 }
264 Poll::Ready(Ok(stream)) => {
265 log::trace!("answered inbound ping from {}", self.peer);
266
267 self.inbound = Some(protocol::recv_ping(stream).boxed());
269 }
270 }
271 }
272
273 loop {
274 if let Some(error) = self.pending_errors.pop_back() {
276 log::debug!("Ping failure: {:?}", error);
277
278 self.failures += 1;
279
280 if self.failures > 1 {
286 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err(error)));
287 }
288 }
289
290 match self.outbound.take() {
292 Some(OutboundState::Ping(mut ping)) => match ping.poll_unpin(cx) {
293 Poll::Pending => {
294 self.outbound = Some(OutboundState::Ping(ping));
295 break;
296 }
297 Poll::Ready(Ok((stream, rtt))) => {
298 log::debug!("latency to {} is {}ms", self.peer, rtt.as_millis());
299
300 self.failures = 0;
301 self.interval.reset(self.config.interval);
302 self.outbound = Some(OutboundState::Idle(stream));
303 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Ok(rtt)));
304 }
305 Poll::Ready(Err(e)) => {
306 self.interval.reset(self.config.interval);
307 self.pending_errors.push_front(e);
308 }
309 },
310 Some(OutboundState::Idle(stream)) => match self.interval.poll_unpin(cx) {
311 Poll::Pending => {
312 self.outbound = Some(OutboundState::Idle(stream));
313 break;
314 }
315 Poll::Ready(()) => {
316 self.outbound = Some(OutboundState::Ping(
317 send_ping(stream, self.config.timeout).boxed(),
318 ));
319 }
320 },
321 Some(OutboundState::OpenStream) => {
322 self.outbound = Some(OutboundState::OpenStream);
323 break;
324 }
325 None => match self.interval.poll_unpin(cx) {
326 Poll::Pending => break,
327 Poll::Ready(()) => {
328 self.outbound = Some(OutboundState::OpenStream);
329 let protocol = SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ());
330 return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
331 protocol,
332 });
333 }
334 },
335 }
336 }
337
338 Poll::Pending
339 }
340
341 fn on_connection_event(
342 &mut self,
343 event: ConnectionEvent<
344 Self::InboundProtocol,
345 Self::OutboundProtocol,
346 Self::InboundOpenInfo,
347 Self::OutboundOpenInfo,
348 >,
349 ) {
350 match event {
351 ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
352 protocol: stream,
353 ..
354 }) => {
355 self.inbound = Some(protocol::recv_ping(stream).boxed());
356 }
357 ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
358 protocol: stream,
359 ..
360 }) => {
361 self.outbound = Some(OutboundState::Ping(
362 send_ping(stream, self.config.timeout).boxed(),
363 ));
364 }
365 ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
366 self.on_dial_upgrade_error(dial_upgrade_error)
367 }
368 ConnectionEvent::AddressChange(_)
369 | ConnectionEvent::ListenUpgradeError(_)
370 | ConnectionEvent::LocalProtocolsChange(_)
371 | ConnectionEvent::RemoteProtocolsChange(_) => {}
372 }
373 }
374}
375
376type PingFuture = BoxFuture<'static, Result<(Stream, Duration), Failure>>;
377type PongFuture = BoxFuture<'static, Result<Stream, io::Error>>;
378
379enum OutboundState {
381 OpenStream,
383 Idle(Stream),
385 Ping(PingFuture),
387}
388
389async fn send_ping(stream: Stream, timeout: Duration) -> Result<(Stream, Duration), Failure> {
391 let ping = protocol::send_ping(stream);
392 futures::pin_mut!(ping);
393
394 match future::select(ping, Delay::new(timeout)).await {
395 Either::Left((Ok((stream, rtt)), _)) => Ok((stream, rtt)),
396 Either::Left((Err(e), _)) => Err(Failure::other(e)),
397 Either::Right(((), _)) => Err(Failure::Timeout),
398 }
399}