1use crate::{
24 codec::ProtocolCodec,
25 crypto::PublicKey,
26 error::{Error, SubstreamError},
27 protocol::{Direction, TransportEvent, TransportService},
28 substream::Substream,
29 transport::Endpoint,
30 types::{protocol::ProtocolName, SubstreamId},
31 PeerId, DEFAULT_CHANNEL_SIZE,
32};
33
34use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt};
35use multiaddr::Multiaddr;
36use prost::Message;
37use tokio::sync::mpsc::{channel, Sender};
38use tokio_stream::wrappers::ReceiverStream;
39
40use std::{
41 collections::{HashMap, HashSet},
42 time::Duration,
43};
44
45const LOG_TARGET: &str = "litep2p::ipfs::identify";
47
48const PROTOCOL_NAME: &str = "/ipfs/id/1.0.0";
50
51const _PUSH_PROTOCOL_NAME: &str = "/ipfs/id/push/1.0.0";
53
54const DEFAULT_AGENT: &str = "litep2p/1.0.0";
56
57const IDENTIFY_PAYLOAD_SIZE: usize = 4096;
60
61mod identify_schema {
62 include!(concat!(env!("OUT_DIR"), "/identify.rs"));
63}
64
65pub struct Config {
67 pub(crate) protocol: ProtocolName,
69
70 pub(crate) codec: ProtocolCodec,
72
73 tx_event: Sender<IdentifyEvent>,
75
76 pub(crate) public: Option<PublicKey>,
78
79 pub(crate) protocols: Vec<ProtocolName>,
81
82 pub(crate) protocol_version: String,
84
85 pub(crate) user_agent: Option<String>,
87}
88
89impl Config {
90 pub fn new(
95 protocol_version: String,
96 user_agent: Option<String>,
97 ) -> (Self, Box<dyn Stream<Item = IdentifyEvent> + Send + Unpin>) {
98 let (tx_event, rx_event) = channel(DEFAULT_CHANNEL_SIZE);
99
100 (
101 Self {
102 tx_event,
103 public: None,
104 protocol_version,
105 user_agent,
106 codec: ProtocolCodec::UnsignedVarint(Some(IDENTIFY_PAYLOAD_SIZE)),
107 protocols: Vec::new(),
108 protocol: ProtocolName::from(PROTOCOL_NAME),
109 },
110 Box::new(ReceiverStream::new(rx_event)),
111 )
112 }
113}
114
115#[derive(Debug)]
117pub enum IdentifyEvent {
118 PeerIdentified {
120 peer: PeerId,
122
123 protocol_version: Option<String>,
125
126 user_agent: Option<String>,
128
129 supported_protocols: HashSet<ProtocolName>,
131
132 observed_address: Multiaddr,
134
135 listen_addresses: Vec<Multiaddr>,
137 },
138}
139
140struct IdentifyResponse {
142 peer: PeerId,
144
145 protocol_version: Option<String>,
147
148 user_agent: Option<String>,
150
151 supported_protocols: HashSet<String>,
153
154 listen_addresses: Vec<Multiaddr>,
156
157 observed_address: Option<Multiaddr>,
159}
160
161pub(crate) struct Identify {
162 service: TransportService,
164
165 tx: Sender<IdentifyEvent>,
167
168 peers: HashMap<PeerId, Endpoint>,
170
171 public: PublicKey,
173
174 protocol_version: String,
176
177 user_agent: String,
179
180 protocols: Vec<String>,
182
183 pending_opens: HashMap<SubstreamId, PeerId>,
185
186 pending_outbound: FuturesUnordered<BoxFuture<'static, crate::Result<IdentifyResponse>>>,
188
189 pending_inbound: FuturesUnordered<BoxFuture<'static, ()>>,
191}
192
193impl Identify {
194 pub(crate) fn new(service: TransportService, config: Config) -> Self {
196 Self {
197 service,
198 tx: config.tx_event,
199 peers: HashMap::new(),
200 public: config.public.expect("public key to be supplied"),
201 protocol_version: config.protocol_version,
202 user_agent: config.user_agent.unwrap_or(DEFAULT_AGENT.to_string()),
203 pending_opens: HashMap::new(),
204 pending_inbound: FuturesUnordered::new(),
205 pending_outbound: FuturesUnordered::new(),
206 protocols: config.protocols.iter().map(|protocol| protocol.to_string()).collect(),
207 }
208 }
209
210 fn on_connection_established(&mut self, peer: PeerId, endpoint: Endpoint) -> crate::Result<()> {
212 tracing::trace!(target: LOG_TARGET, ?peer, ?endpoint, "connection established");
213
214 let substream_id = self.service.open_substream(peer)?;
215 self.pending_opens.insert(substream_id, peer);
216 self.peers.insert(peer, endpoint);
217
218 Ok(())
219 }
220
221 fn on_connection_closed(&mut self, peer: PeerId) {
223 tracing::trace!(target: LOG_TARGET, ?peer, "connection closed");
224
225 self.peers.remove(&peer);
226 }
227
228 fn on_inbound_substream(
230 &mut self,
231 peer: PeerId,
232 protocol: ProtocolName,
233 mut substream: Substream,
234 ) {
235 tracing::trace!(
236 target: LOG_TARGET,
237 ?peer,
238 ?protocol,
239 "inbound substream opened"
240 );
241
242 let observed_addr = match self.peers.get(&peer) {
243 Some(endpoint) => Some(endpoint.address().to_vec()),
244 None => {
245 tracing::warn!(
246 target: LOG_TARGET,
247 ?peer,
248 %protocol,
249 "inbound identify substream opened for peer who doesn't exist",
250 );
251 None
252 }
253 };
254
255 let mut listen_addr: HashSet<_> =
256 self.service.listen_addresses().into_iter().map(|addr| addr.to_vec()).collect();
257 listen_addr
258 .extend(self.service.public_addresses().inner.read().iter().map(|addr| addr.to_vec()));
259
260 let identify = identify_schema::Identify {
261 protocol_version: Some(self.protocol_version.clone()),
262 agent_version: Some(self.user_agent.clone()),
263 public_key: Some(self.public.to_protobuf_encoding()),
264 listen_addrs: listen_addr.into_iter().collect(),
265 observed_addr,
266 protocols: self.protocols.clone(),
267 };
268
269 tracing::trace!(
270 target: LOG_TARGET,
271 ?peer,
272 ?identify,
273 "sending identify response",
274 );
275
276 let mut msg = Vec::with_capacity(identify.encoded_len());
277 identify.encode(&mut msg).expect("`msg` to have enough capacity");
278
279 self.pending_inbound.push(Box::pin(async move {
280 match tokio::time::timeout(Duration::from_secs(10), substream.send_framed(msg.into()))
281 .await
282 {
283 Err(error) => {
284 tracing::debug!(
285 target: LOG_TARGET,
286 ?peer,
287 ?error,
288 "timed out while sending ipfs identify response",
289 );
290 }
291 Ok(Err(error)) => {
292 tracing::debug!(
293 target: LOG_TARGET,
294 ?peer,
295 ?error,
296 "failed to send ipfs identify response",
297 );
298 }
299 Ok(_) => {}
300 }
301 }))
302 }
303
304 fn on_outbound_substream(
306 &mut self,
307 peer: PeerId,
308 protocol: ProtocolName,
309 substream_id: SubstreamId,
310 mut substream: Substream,
311 ) {
312 tracing::trace!(
313 target: LOG_TARGET,
314 ?peer,
315 ?protocol,
316 ?substream_id,
317 "outbound substream opened"
318 );
319
320 self.pending_outbound.push(Box::pin(async move {
321 let payload =
322 match tokio::time::timeout(Duration::from_secs(10), substream.next()).await {
323 Err(_) => return Err(Error::Timeout),
324 Ok(None) =>
325 return Err(Error::SubstreamError(SubstreamError::ReadFailure(Some(
326 substream_id,
327 )))),
328 Ok(Some(Err(error))) => return Err(error.into()),
329 Ok(Some(Ok(payload))) => payload,
330 };
331
332 let info = identify_schema::Identify::decode(payload.to_vec().as_slice())?;
333
334 tracing::trace!(target: LOG_TARGET, ?peer, ?info, "peer identified");
335
336 let listen_addresses = info
337 .listen_addrs
338 .iter()
339 .filter_map(|address| Multiaddr::try_from(address.clone()).ok())
340 .collect();
341 let observed_address =
342 info.observed_addr.and_then(|address| Multiaddr::try_from(address).ok());
343 let protocol_version = info.protocol_version;
344 let user_agent = info.agent_version;
345
346 Ok(IdentifyResponse {
347 peer,
348 protocol_version,
349 user_agent,
350 supported_protocols: HashSet::from_iter(info.protocols),
351 observed_address,
352 listen_addresses,
353 })
354 }));
355 }
356
357 pub async fn run(mut self) {
359 tracing::debug!(target: LOG_TARGET, "starting identify event loop");
360
361 loop {
362 tokio::select! {
363 event = self.service.next() => match event {
364 None => return,
365 Some(TransportEvent::ConnectionEstablished { peer, endpoint }) => {
366 let _ = self.on_connection_established(peer, endpoint);
367 }
368 Some(TransportEvent::ConnectionClosed { peer }) => {
369 self.on_connection_closed(peer);
370 }
371 Some(TransportEvent::SubstreamOpened {
372 peer,
373 protocol,
374 direction,
375 substream,
376 ..
377 }) => match direction {
378 Direction::Inbound => self.on_inbound_substream(peer, protocol, substream),
379 Direction::Outbound(substream_id) => self.on_outbound_substream(peer, protocol, substream_id, substream),
380 },
381 _ => {}
382 },
383 _ = self.pending_inbound.next(), if !self.pending_inbound.is_empty() => {}
384 event = self.pending_outbound.next(), if !self.pending_outbound.is_empty() => match event {
385 Some(Ok(response)) => {
386 let _ = self.tx
387 .send(IdentifyEvent::PeerIdentified {
388 peer: response.peer,
389 protocol_version: response.protocol_version,
390 user_agent: response.user_agent,
391 supported_protocols: response.supported_protocols.into_iter().map(From::from).collect(),
392 observed_address: response.observed_address.map_or(Multiaddr::empty(), |address| address),
393 listen_addresses: response.listen_addresses,
394 })
395 .await;
396 }
397 Some(Err(error)) => tracing::debug!(target: LOG_TARGET, ?error, "failed to read ipfs identify response"),
398 None => return,
399 }
400 }
401 }
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408 use crate::{config::ConfigBuilder, transport::tcp::config::Config as TcpConfig, Litep2p};
409 use multiaddr::{Multiaddr, Protocol};
410
411 fn create_litep2p() -> (
412 Litep2p,
413 Box<dyn Stream<Item = IdentifyEvent> + Send + Unpin>,
414 PeerId,
415 ) {
416 let (identify_config, identify) =
417 Config::new("1.0.0".to_string(), Some("litep2p/1.0.0".to_string()));
418
419 let keypair = crate::crypto::ed25519::Keypair::generate();
420 let peer = PeerId::from_public_key(&crate::crypto::PublicKey::Ed25519(keypair.public()));
421 let config = ConfigBuilder::new()
422 .with_keypair(keypair)
423 .with_tcp(TcpConfig {
424 listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()],
425 ..Default::default()
426 })
427 .with_libp2p_identify(identify_config)
428 .build();
429
430 (Litep2p::new(config).unwrap(), identify, peer)
431 }
432
433 #[tokio::test]
434 async fn update_identify_addresses() {
435 let (mut litep2p1, mut event_stream1, peer1) = create_litep2p();
437 let (mut litep2p2, mut event_stream2, _peer2) = create_litep2p();
438 let litep2p1_address = litep2p1.listen_addresses().into_iter().next().unwrap();
439
440 let multiaddr: Multiaddr = "/ip6/::9/tcp/111".parse().unwrap();
441 assert!(litep2p1.public_addresses().add_address(multiaddr.clone()).unwrap());
443
444 litep2p2.dial_address(litep2p1_address.clone()).await.unwrap();
446
447 let expected_multiaddr = multiaddr.with(Protocol::P2p(peer1.into()));
448
449 tokio::spawn(async move {
450 loop {
451 tokio::select! {
452 _ = litep2p1.next_event() => {}
453 _event = event_stream1.next() => {}
454 }
455 }
456 });
457
458 loop {
459 tokio::select! {
460 _ = litep2p2.next_event() => {}
461 event = event_stream2.next() => match event {
462 Some(IdentifyEvent::PeerIdentified {
463 listen_addresses,
464 ..
465 }) => {
466 assert!(listen_addresses.iter().any(|address| address == &expected_multiaddr));
467 break;
468 }
469 _ => {}
470 }
471 }
472 }
473 }
474}