litep2p/protocol/libp2p/ping/
mod.rs1use crate::{
24 error::{Error, SubstreamError},
25 protocol::{Direction, TransportEvent, TransportService},
26 substream::Substream,
27 types::SubstreamId,
28 PeerId,
29};
30
31use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
32use tokio::sync::mpsc::Sender;
33
34use std::{
35 collections::HashSet,
36 time::{Duration, Instant},
37};
38
39pub use config::{Config, ConfigBuilder};
40
41mod config;
42
43const LOG_TARGET: &str = "litep2p::ipfs::ping";
47
48#[derive(Debug)]
50pub enum PingEvent {
51 Ping {
53 peer: PeerId,
55
56 ping: Duration,
58 },
59}
60
61pub(crate) struct Ping {
63 _max_failures: usize,
65
66 service: TransportService,
68
69 tx: Sender<PingEvent>,
71
72 peers: HashSet<PeerId>,
74
75 pending_outbound: FuturesUnordered<BoxFuture<'static, crate::Result<(PeerId, Duration)>>>,
77
78 pending_inbound: FuturesUnordered<BoxFuture<'static, crate::Result<()>>>,
80}
81
82impl Ping {
83 pub fn new(service: TransportService, config: Config) -> Self {
85 Self {
86 service,
87 tx: config.tx_event,
88 peers: HashSet::new(),
89 pending_outbound: FuturesUnordered::new(),
90 pending_inbound: FuturesUnordered::new(),
91 _max_failures: config.max_failures,
92 }
93 }
94
95 fn on_connection_established(&mut self, peer: PeerId) -> crate::Result<()> {
97 tracing::trace!(target: LOG_TARGET, ?peer, "connection established");
98
99 self.service.open_substream(peer)?;
100 self.peers.insert(peer);
101
102 Ok(())
103 }
104
105 fn on_connection_closed(&mut self, peer: PeerId) {
107 tracing::trace!(target: LOG_TARGET, ?peer, "connection closed");
108
109 self.peers.remove(&peer);
110 }
111
112 fn on_outbound_substream(
114 &mut self,
115 peer: PeerId,
116 substream_id: SubstreamId,
117 mut substream: Substream,
118 ) {
119 tracing::trace!(target: LOG_TARGET, ?peer, "handle outbound substream");
120
121 self.pending_outbound.push(Box::pin(async move {
122 let future = async move {
123 substream.send_framed(vec![0u8; 32].into()).await?;
125 let now = Instant::now();
126 let _ = substream.next().await.ok_or(Error::SubstreamError(
127 SubstreamError::ReadFailure(Some(substream_id)),
128 ))?;
129 let _ = substream.close().await;
130
131 Ok(now.elapsed())
132 };
133
134 match tokio::time::timeout(Duration::from_secs(10), future).await {
135 Err(_) => Err(Error::Timeout),
136 Ok(Err(error)) => Err(error),
137 Ok(Ok(elapsed)) => Ok((peer, elapsed)),
138 }
139 }));
140 }
141
142 fn on_inbound_substream(&mut self, peer: PeerId, mut substream: Substream) {
144 tracing::trace!(target: LOG_TARGET, ?peer, "handle inbound substream");
145
146 self.pending_inbound.push(Box::pin(async move {
147 let future = async move {
148 let payload = substream
149 .next()
150 .await
151 .ok_or(Error::SubstreamError(SubstreamError::ReadFailure(None)))??;
152 substream.send_framed(payload.freeze()).await?;
153 let _ = substream.next().await.map(|_| ());
154
155 Ok(())
156 };
157
158 match tokio::time::timeout(Duration::from_secs(10), future).await {
159 Err(_) => Err(Error::Timeout),
160 Ok(Err(error)) => Err(error),
161 Ok(Ok(())) => Ok(()),
162 }
163 }));
164 }
165
166 pub async fn run(mut self) {
168 tracing::debug!(target: LOG_TARGET, "starting ping event loop");
169
170 loop {
171 tokio::select! {
172 event = self.service.next() => match event {
173 Some(TransportEvent::ConnectionEstablished { peer, .. }) => {
174 let _ = self.on_connection_established(peer);
175 }
176 Some(TransportEvent::ConnectionClosed { peer }) => {
177 self.on_connection_closed(peer);
178 }
179 Some(TransportEvent::SubstreamOpened {
180 peer,
181 substream,
182 direction,
183 ..
184 }) => match direction {
185 Direction::Inbound => {
186 self.on_inbound_substream(peer, substream);
187 }
188 Direction::Outbound(substream_id) => {
189 self.on_outbound_substream(peer, substream_id, substream);
190 }
191 },
192 Some(_) => {}
193 None => return,
194 },
195 _event = self.pending_inbound.next(), if !self.pending_inbound.is_empty() => {}
196 event = self.pending_outbound.next(), if !self.pending_outbound.is_empty() => {
197 match event {
198 Some(Ok((peer, elapsed))) => {
199 let _ = self
200 .tx
201 .send(PingEvent::Ping {
202 peer,
203 ping: elapsed,
204 })
205 .await;
206 }
207 event => tracing::debug!(target: LOG_TARGET, "failed to handle ping for an outbound peer: {event:?}"),
208 }
209 }
210 }
211 }
212 }
213}