litep2p/protocol/libp2p/ping/
mod.rs1use crate::{
24 error::{Error, SubstreamError},
25 protocol::{Direction, TransportEvent, TransportService},
26 substream::Substream,
27 types::SubstreamId,
28 PeerId,
29};
30
31use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
32use tokio::sync::mpsc::Sender;
33
34use std::{
35 collections::{HashMap, HashSet},
36 time::{Duration, Instant},
37};
38
39pub use config::{Config, ConfigBuilder};
40
41mod config;
42
43const LOG_TARGET: &str = "litep2p::ipfs::ping";
47
48#[derive(Debug)]
50pub enum PingEvent {
51 Ping {
53 peer: PeerId,
55
56 ping: Duration,
58 },
59}
60
61pub(crate) struct Ping {
63 _max_failures: usize,
65
66 service: TransportService,
68
69 tx: Sender<PingEvent>,
71
72 peers: HashSet<PeerId>,
74
75 pending_opens: HashMap<SubstreamId, PeerId>,
77
78 pending_outbound: FuturesUnordered<BoxFuture<'static, crate::Result<(PeerId, Duration)>>>,
80
81 pending_inbound: FuturesUnordered<BoxFuture<'static, crate::Result<()>>>,
83}
84
85impl Ping {
86 pub fn new(service: TransportService, config: Config) -> Self {
88 Self {
89 service,
90 tx: config.tx_event,
91 peers: HashSet::new(),
92 pending_opens: HashMap::new(),
93 pending_outbound: FuturesUnordered::new(),
94 pending_inbound: FuturesUnordered::new(),
95 _max_failures: config.max_failures,
96 }
97 }
98
99 fn on_connection_established(&mut self, peer: PeerId) -> crate::Result<()> {
101 tracing::trace!(target: LOG_TARGET, ?peer, "connection established");
102
103 let substream_id = self.service.open_substream(peer)?;
104 self.pending_opens.insert(substream_id, peer);
105 self.peers.insert(peer);
106
107 Ok(())
108 }
109
110 fn on_connection_closed(&mut self, peer: PeerId) {
112 tracing::trace!(target: LOG_TARGET, ?peer, "connection closed");
113
114 self.peers.remove(&peer);
115 }
116
117 fn on_outbound_substream(
119 &mut self,
120 peer: PeerId,
121 substream_id: SubstreamId,
122 mut substream: Substream,
123 ) {
124 tracing::trace!(target: LOG_TARGET, ?peer, "handle outbound substream");
125
126 self.pending_outbound.push(Box::pin(async move {
127 let future = async move {
128 substream.send_framed(vec![0u8; 32].into()).await?;
130 let now = Instant::now();
131 let _ = substream.next().await.ok_or(Error::SubstreamError(
132 SubstreamError::ReadFailure(Some(substream_id)),
133 ))?;
134 let _ = substream.close().await;
135
136 Ok(now.elapsed())
137 };
138
139 match tokio::time::timeout(Duration::from_secs(10), future).await {
140 Err(_) => Err(Error::Timeout),
141 Ok(Err(error)) => Err(error),
142 Ok(Ok(elapsed)) => Ok((peer, elapsed)),
143 }
144 }));
145 }
146
147 fn on_inbound_substream(&mut self, peer: PeerId, mut substream: Substream) {
149 tracing::trace!(target: LOG_TARGET, ?peer, "handle inbound substream");
150
151 self.pending_inbound.push(Box::pin(async move {
152 let future = async move {
153 let payload = substream
154 .next()
155 .await
156 .ok_or(Error::SubstreamError(SubstreamError::ReadFailure(None)))??;
157 substream.send_framed(payload.freeze()).await?;
158 let _ = substream.next().await.map(|_| ());
159
160 Ok(())
161 };
162
163 match tokio::time::timeout(Duration::from_secs(10), future).await {
164 Err(_) => Err(Error::Timeout),
165 Ok(Err(error)) => Err(error),
166 Ok(Ok(())) => Ok(()),
167 }
168 }));
169 }
170
171 pub async fn run(mut self) {
173 tracing::debug!(target: LOG_TARGET, "starting ping event loop");
174
175 loop {
176 tokio::select! {
177 event = self.service.next() => match event {
178 Some(TransportEvent::ConnectionEstablished { peer, .. }) => {
179 let _ = self.on_connection_established(peer);
180 }
181 Some(TransportEvent::ConnectionClosed { peer }) => {
182 self.on_connection_closed(peer);
183 }
184 Some(TransportEvent::SubstreamOpened {
185 peer,
186 substream,
187 direction,
188 ..
189 }) => match direction {
190 Direction::Inbound => {
191 self.on_inbound_substream(peer, substream);
192 }
193 Direction::Outbound(substream_id) => {
194 match self.pending_opens.remove(&substream_id) {
195 Some(stored_peer) => {
196 debug_assert!(peer == stored_peer);
197 self.on_outbound_substream(peer, substream_id, substream);
198 }
199 None => {
200 tracing::warn!(
201 target: LOG_TARGET,
202 ?substream_id,
203 "outbound ping substream ID does not exist",
204 );
205 }
206 }
207 }
208 },
209 Some(_) => {}
210 None => return,
211 },
212 _event = self.pending_inbound.next(), if !self.pending_inbound.is_empty() => {}
213 event = self.pending_outbound.next(), if !self.pending_outbound.is_empty() => {
214 match event {
215 Some(Ok((peer, elapsed))) => {
216 let _ = self
217 .tx
218 .send(PingEvent::Ping {
219 peer,
220 ping: elapsed,
221 })
222 .await;
223 }
224 event => tracing::debug!(target: LOG_TARGET, "failed to handle ping for an outbound peer: {event:?}"),
225 }
226 }
227 }
228 }
229 }
230}