litep2p/protocol/libp2p/ping/
mod.rs1use crate::{
24 error::SubstreamError,
25 protocol::{Direction, TransportEvent, TransportService},
26 substream::Substream,
27 types::SubstreamId,
28 PeerId,
29};
30
31use bytes::Bytes;
32use futures::{
33 stream::{self, BoxStream},
34 FutureExt, StreamExt,
35};
36use rand::Rng as _;
37use std::{
38 collections::HashSet,
39 time::{Duration, Instant},
40};
41use tokio::sync::mpsc;
42use tokio_stream::StreamMap;
43
44pub use config::{Config, ConfigBuilder};
45mod config;
46
47const LOG_TARGET: &str = "litep2p::ipfs::ping";
51
52#[derive(Debug)]
54pub enum PingEvent {
55 Ping {
57 peer: PeerId,
59
60 ping: Duration,
62 },
63}
64
65pub(crate) struct Ping {
67 _max_failures: usize,
72
73 service: TransportService,
75
76 tx: mpsc::Sender<PingEvent>,
78
79 pingers: StreamMap<PeerId, BoxStream<'static, Result<Duration, PingError>>>,
81
82 retries: HashSet<SubstreamId>,
84
85 responders: StreamMap<PeerId, BoxStream<'static, Result<(), SubstreamError>>>,
87
88 ping_interval: Duration,
90}
91
92impl Ping {
93 pub fn new(service: TransportService, config: Config) -> Self {
95 Self {
96 service,
97 tx: config.tx_event,
98 ping_interval: config.ping_interval,
99 pingers: StreamMap::new(),
100 retries: HashSet::new(),
101 responders: StreamMap::new(),
102 _max_failures: config.max_failures,
103 }
104 }
105
106 fn on_connection_established(&mut self, peer: PeerId) {
108 tracing::debug!(target: LOG_TARGET, ?peer, "connection established");
109
110 if let Err(error) = self.service.open_substream(peer) {
111 tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to open substream");
112 }
113 }
114
115 fn on_connection_closed(&mut self, peer: PeerId) {
117 tracing::debug!(target: LOG_TARGET, ?peer, "connection closed");
118 }
119
120 fn on_outbound_substream(
122 &mut self,
123 peer: PeerId,
124 substream_id: SubstreamId,
125 substream: Substream,
126 ) {
127 tracing::trace!(target: LOG_TARGET, ?peer, "handle outbound substream");
128 let interval = self.ping_interval;
129 let should_wait = self.retries.remove(&substream_id);
130
131 let pinger_stream = stream::unfold(
132 (substream, should_wait),
133 move |(mut substream, should_wait)| async move {
134 if should_wait {
135 tokio::time::sleep(interval).await;
136 }
137
138 let payload = Bytes::from(Vec::from(rand::thread_rng().gen::<[u8; 32]>()));
139
140 let ping = async {
141 let now = Instant::now();
142
143 substream.send_framed(payload.clone()).await?;
144 let received = substream.next().await.ok_or(PingError::SubstreamError(
145 SubstreamError::ReadFailure(Some(substream_id)),
146 ))??;
147
148 if received == payload {
149 Ok(now.elapsed())
150 } else {
151 Err(PingError::InvalidPayload)
152 }
153 };
154
155 match tokio::time::timeout(Duration::from_secs(20), ping).await {
156 Ok(Ok(elapsed)) => Some((Ok(elapsed), (substream, true))),
157 Ok(Err(error)) => Some((Err(error), (substream, false))),
158 Err(timeout) => Some((Err(timeout.into()), (substream, false))),
159 }
160 },
161 );
162
163 let _ = self.pingers.insert(peer, pinger_stream.boxed());
166 }
167
168 fn on_inbound_substream(&mut self, peer: PeerId, mut substream: Substream) {
170 tracing::trace!(target: LOG_TARGET, ?peer, "handle inbound substream");
171
172 let responder_future = async move {
173 loop {
174 if let Some(payload) = substream.next().await {
175 substream.send_framed(payload?.freeze()).await?;
176 } else {
177 return Ok(());
178 }
179 }
180 };
181
182 if self.responders.insert(peer, responder_future.into_stream().boxed()).is_some() {
183 tracing::trace!(
184 target: LOG_TARGET,
185 ?peer,
186 "discarding ping substream as remote opened a new one",
187 );
188 }
189 }
190
191 pub async fn run(mut self) {
193 tracing::debug!(target: LOG_TARGET, "starting ping event loop");
194
195 loop {
196 tokio::select! {
197 event = self.service.next() => match event {
198 Some(TransportEvent::ConnectionEstablished { peer, .. }) => {
199 self.on_connection_established(peer);
200 }
201 Some(TransportEvent::ConnectionClosed { peer }) => {
202 self.on_connection_closed(peer);
203 }
204 Some(TransportEvent::SubstreamOpened {
205 peer,
206 substream,
207 direction,
208 ..
209 }) => match direction {
210 Direction::Inbound => {
211 self.on_inbound_substream(peer, substream);
212 }
213 Direction::Outbound(substream_id) => {
214 self.on_outbound_substream(peer, substream_id, substream);
215 }
216 }
217 Some(TransportEvent::SubstreamOpenFailure {
218 substream,
219 ..
220 }) => {
221 self.retries.remove(&substream);
222 }
223 Some(_) => {}
224 None => return,
225 },
226 Some((peer, result)) = self.responders.next(), if !self.responders.is_empty() => {
227 self.responders.remove(&peer);
231
232 tracing::trace!(
233 target: LOG_TARGET,
234 ?peer,
235 ?result,
236 "inbound ping responder terminated",
237 );
238 }
239 Some((peer, result)) = self.pingers.next(), if !self.pingers.is_empty() => {
240 match result {
241 Ok(elapsed) => {
242 tracing::debug!(
243 target: LOG_TARGET,
244 ?peer,
245 time_us = elapsed.as_micros(),
246 "pong",
247 );
248
249 let _ = self.tx.send(PingEvent::Ping { peer, ping: elapsed }).await;
250 }
251 Err(error) => {
252 self.pingers.remove(&peer);
253
254 tracing::debug!(
255 target: LOG_TARGET,
256 ?peer,
257 ?error,
258 "ping failed",
259 );
260
261 match self.service.open_substream(peer) {
262 Ok(substream_id) => {
263 self.retries.insert(substream_id);
264 }
265 Err(error) => tracing::debug!(
266 target: LOG_TARGET,
267 ?peer,
268 ?error,
269 "failed to open substream after ping failed",
270 ),
271 }
272 }
273 }
274 }
275 }
276 }
277 }
278}
279
280#[derive(Debug, thiserror::Error)]
282enum PingError {
283 #[error("Substream error: {0}")]
284 SubstreamError(#[from] SubstreamError),
285 #[error("Invalid payload received")]
286 InvalidPayload,
287 #[error("Timeout")]
288 Timeout(#[from] tokio::time::error::Elapsed),
289}