1use crate::{
24 codec::ProtocolCodec,
25 crypto::PublicKey,
26 error::{Error, SubstreamError},
27 protocol::{Direction, TransportEvent, TransportService},
28 substream::Substream,
29 transport::Endpoint,
30 types::{protocol::ProtocolName, SubstreamId},
31 utils::futures_stream::FuturesStream,
32 PeerId, DEFAULT_CHANNEL_SIZE,
33};
34
35use futures::{future::BoxFuture, Stream, StreamExt};
36use multiaddr::Multiaddr;
37use prost::Message;
38use tokio::sync::mpsc::{channel, Sender};
39use tokio_stream::wrappers::ReceiverStream;
40
41use std::{
42 collections::{HashMap, HashSet},
43 time::Duration,
44};
45
46const LOG_TARGET: &str = "litep2p::ipfs::identify";
48
49const PROTOCOL_NAME: &str = "/ipfs/id/1.0.0";
51
52const _PUSH_PROTOCOL_NAME: &str = "/ipfs/id/push/1.0.0";
54
55const DEFAULT_AGENT: &str = "litep2p/1.0.0";
57
58const IDENTIFY_PAYLOAD_SIZE: usize = 4096;
61
62mod identify_schema {
63 include!(concat!(env!("OUT_DIR"), "/identify.rs"));
64}
65
66pub struct Config {
68 pub(crate) protocol: ProtocolName,
70
71 pub(crate) codec: ProtocolCodec,
73
74 tx_event: Sender<IdentifyEvent>,
76
77 pub(crate) public: Option<PublicKey>,
79
80 pub(crate) protocols: Vec<ProtocolName>,
82
83 pub(crate) protocol_version: String,
85
86 pub(crate) user_agent: Option<String>,
88}
89
90impl Config {
91 pub fn new(
96 protocol_version: String,
97 user_agent: Option<String>,
98 ) -> (Self, Box<dyn Stream<Item = IdentifyEvent> + Send + Unpin>) {
99 let (tx_event, rx_event) = channel(DEFAULT_CHANNEL_SIZE);
100
101 (
102 Self {
103 tx_event,
104 public: None,
105 protocol_version,
106 user_agent,
107 codec: ProtocolCodec::UnsignedVarint(Some(IDENTIFY_PAYLOAD_SIZE)),
108 protocols: Vec::new(),
109 protocol: ProtocolName::from(PROTOCOL_NAME),
110 },
111 Box::new(ReceiverStream::new(rx_event)),
112 )
113 }
114}
115
116#[derive(Debug)]
118pub enum IdentifyEvent {
119 PeerIdentified {
121 peer: PeerId,
123
124 protocol_version: Option<String>,
126
127 user_agent: Option<String>,
129
130 supported_protocols: HashSet<ProtocolName>,
132
133 observed_address: Multiaddr,
135
136 listen_addresses: Vec<Multiaddr>,
138 },
139}
140
141struct IdentifyResponse {
143 peer: PeerId,
145
146 protocol_version: Option<String>,
148
149 user_agent: Option<String>,
151
152 supported_protocols: HashSet<String>,
154
155 listen_addresses: Vec<Multiaddr>,
157
158 observed_address: Option<Multiaddr>,
160}
161
162pub(crate) struct Identify {
163 service: TransportService,
165
166 tx: Sender<IdentifyEvent>,
168
169 peers: HashMap<PeerId, Endpoint>,
171
172 public: PublicKey,
174
175 local_peer_id: PeerId,
177
178 protocol_version: String,
180
181 user_agent: String,
183
184 protocols: Vec<String>,
186
187 pending_outbound: FuturesStream<BoxFuture<'static, crate::Result<IdentifyResponse>>>,
189
190 pending_inbound: FuturesStream<BoxFuture<'static, ()>>,
192}
193
194impl Identify {
195 pub(crate) fn new(service: TransportService, config: Config) -> Self {
197 let public = config.public.expect("public key to always be supplied by litep2p; qed");
201 let local_peer_id = public.to_peer_id();
202
203 Self {
204 service,
205 tx: config.tx_event,
206 peers: HashMap::new(),
207 public,
208 local_peer_id,
209 protocol_version: config.protocol_version,
210 user_agent: config.user_agent.unwrap_or(DEFAULT_AGENT.to_string()),
211 pending_inbound: FuturesStream::new(),
212 pending_outbound: FuturesStream::new(),
213 protocols: config.protocols.iter().map(|protocol| protocol.to_string()).collect(),
214 }
215 }
216
217 fn on_connection_established(&mut self, peer: PeerId, endpoint: Endpoint) -> crate::Result<()> {
219 tracing::trace!(target: LOG_TARGET, ?peer, ?endpoint, "connection established");
220
221 self.service.open_substream(peer)?;
222 self.peers.insert(peer, endpoint);
223
224 Ok(())
225 }
226
227 fn on_connection_closed(&mut self, peer: PeerId) {
229 tracing::trace!(target: LOG_TARGET, ?peer, "connection closed");
230
231 self.peers.remove(&peer);
232 }
233
234 fn on_inbound_substream(
236 &mut self,
237 peer: PeerId,
238 protocol: ProtocolName,
239 mut substream: Substream,
240 ) {
241 tracing::trace!(
242 target: LOG_TARGET,
243 ?peer,
244 ?protocol,
245 "inbound substream opened"
246 );
247
248 let observed_addr = match self.peers.get(&peer) {
249 Some(endpoint) => Some(endpoint.address().to_vec()),
250 None => {
251 tracing::warn!(
252 target: LOG_TARGET,
253 ?peer,
254 %protocol,
255 "inbound identify substream opened for peer who doesn't exist",
256 );
257 None
258 }
259 };
260
261 let mut listen_addr: HashSet<_> =
262 self.service.listen_addresses().into_iter().map(|addr| addr.to_vec()).collect();
263 listen_addr
264 .extend(self.service.public_addresses().inner.read().iter().map(|addr| addr.to_vec()));
265
266 let identify = identify_schema::Identify {
267 protocol_version: Some(self.protocol_version.clone()),
268 agent_version: Some(self.user_agent.clone()),
269 public_key: Some(self.public.to_protobuf_encoding()),
270 listen_addrs: listen_addr.into_iter().collect(),
271 observed_addr,
272 protocols: self.protocols.clone(),
273 };
274
275 tracing::trace!(
276 target: LOG_TARGET,
277 ?peer,
278 ?identify,
279 "sending identify response",
280 );
281
282 let mut msg = Vec::with_capacity(identify.encoded_len());
283 identify.encode(&mut msg).expect("`msg` to have enough capacity");
284
285 self.pending_inbound.push(Box::pin(async move {
286 match tokio::time::timeout(Duration::from_secs(10), substream.send_framed(msg.into()))
287 .await
288 {
289 Err(error) => {
290 tracing::debug!(
291 target: LOG_TARGET,
292 ?peer,
293 ?error,
294 "timed out while sending ipfs identify response",
295 );
296 }
297 Ok(Err(error)) => {
298 tracing::debug!(
299 target: LOG_TARGET,
300 ?peer,
301 ?error,
302 "failed to send ipfs identify response",
303 );
304 }
305 Ok(_) => {
306 substream.close().await;
307 }
308 }
309 }))
310 }
311
312 fn on_outbound_substream(
314 &mut self,
315 peer: PeerId,
316 protocol: ProtocolName,
317 substream_id: SubstreamId,
318 mut substream: Substream,
319 ) {
320 tracing::trace!(
321 target: LOG_TARGET,
322 ?peer,
323 ?protocol,
324 ?substream_id,
325 "outbound substream opened"
326 );
327
328 let local_peer_id = self.local_peer_id;
329
330 self.pending_outbound.push(Box::pin(async move {
331 let payload =
332 match tokio::time::timeout(Duration::from_secs(10), substream.next()).await {
333 Err(_) => return Err(Error::Timeout),
334 Ok(None) =>
335 return Err(Error::SubstreamError(SubstreamError::ReadFailure(Some(
336 substream_id,
337 )))),
338 Ok(Some(Err(error))) => return Err(error.into()),
339 Ok(Some(Ok(payload))) => payload,
340 };
341
342 let info = identify_schema::Identify::decode(payload.to_vec().as_slice()).map_err(
343 |err| {
344 tracing::debug!(target: LOG_TARGET, ?peer, ?err, "peer identified provided undecodable identify response");
345 err
346 })?;
347
348 tracing::trace!(target: LOG_TARGET, ?peer, ?info, "peer identified");
349
350 let listen_addresses = info
351 .listen_addrs
352 .iter()
353 .filter_map(|address| {
354 let address = Multiaddr::try_from(address.clone()).ok()?;
355
356 if address.is_empty() {
358 tracing::debug!(target: LOG_TARGET, ?peer, ?address, "peer identified provided empty listen address");
359 return None;
360 }
361 if let Some(multiaddr::Protocol::P2p(peer_id)) = address.iter().last() {
362 if peer_id != peer.into() {
363 tracing::debug!(target: LOG_TARGET, ?peer, ?address, "peer identified provided listen address with incorrect peer ID; discarding the address");
364 return None;
365 }
366 }
367
368 Some(address)
369 })
370 .collect();
371
372 let observed_address =
373 info.observed_addr.and_then(|address| {
374 let address = Multiaddr::try_from(address).ok()?;
375
376 if address.is_empty() {
377 tracing::debug!(target: LOG_TARGET, ?peer, ?address, "peer identified provided empty observed address");
378 return None;
379 }
380
381 if let Some(multiaddr::Protocol::P2p(peer_id)) = address.iter().last() {
382 if peer_id != local_peer_id.into() {
383 tracing::debug!(target: LOG_TARGET, ?peer, ?address, "peer identified provided observed address with peer ID not matching our peer ID; discarding address");
384 return None;
385 }
386 }
387
388 Some(address)
389 });
390
391 let protocol_version = info.protocol_version;
392 let user_agent = info.agent_version;
393
394 Ok(IdentifyResponse {
395 peer,
396 protocol_version,
397 user_agent,
398 supported_protocols: HashSet::from_iter(info.protocols),
399 observed_address,
400 listen_addresses,
401 })
402 }));
403 }
404
405 pub async fn run(mut self) {
407 tracing::debug!(target: LOG_TARGET, "starting identify event loop");
408
409 loop {
410 tokio::select! {
411 event = self.service.next() => match event {
412 None => {
413 tracing::warn!(target: LOG_TARGET, "transport service stream ended, terminating identify event loop");
414 return
415 },
416 Some(TransportEvent::ConnectionEstablished { peer, endpoint }) => {
417 let _ = self.on_connection_established(peer, endpoint);
418 }
419 Some(TransportEvent::ConnectionClosed { peer }) => {
420 self.on_connection_closed(peer);
421 }
422 Some(TransportEvent::SubstreamOpened {
423 peer,
424 protocol,
425 direction,
426 substream,
427 ..
428 }) => match direction {
429 Direction::Inbound => self.on_inbound_substream(peer, protocol, substream),
430 Direction::Outbound(substream_id) => self.on_outbound_substream(peer, protocol, substream_id, substream),
431 },
432 _ => {}
433 },
434 _ = self.pending_inbound.next(), if !self.pending_inbound.is_empty() => {}
435 event = self.pending_outbound.next(), if !self.pending_outbound.is_empty() => match event {
436 Some(Ok(response)) => {
437 let _ = self.tx
438 .send(IdentifyEvent::PeerIdentified {
439 peer: response.peer,
440 protocol_version: response.protocol_version,
441 user_agent: response.user_agent,
442 supported_protocols: response.supported_protocols.into_iter().map(From::from).collect(),
443 observed_address: response.observed_address.map_or(Multiaddr::empty(), |address| address),
444 listen_addresses: response.listen_addresses,
445 })
446 .await;
447 }
448 Some(Err(error)) => tracing::debug!(target: LOG_TARGET, ?error, "failed to read ipfs identify response"),
449 None => {}
450 }
451 }
452 }
453 }
454}
455
456#[cfg(test)]
457mod tests {
458 use super::*;
459 use crate::{config::ConfigBuilder, transport::tcp::config::Config as TcpConfig, Litep2p};
460 use multiaddr::{Multiaddr, Protocol};
461
462 fn create_litep2p() -> (
463 Litep2p,
464 Box<dyn Stream<Item = IdentifyEvent> + Send + Unpin>,
465 PeerId,
466 ) {
467 let (identify_config, identify) =
468 Config::new("1.0.0".to_string(), Some("litep2p/1.0.0".to_string()));
469
470 let keypair = crate::crypto::ed25519::Keypair::generate();
471 let peer = PeerId::from_public_key(&crate::crypto::PublicKey::Ed25519(keypair.public()));
472 let config = ConfigBuilder::new()
473 .with_keypair(keypair)
474 .with_tcp(TcpConfig {
475 listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()],
476 ..Default::default()
477 })
478 .with_libp2p_identify(identify_config)
479 .build();
480
481 (Litep2p::new(config).unwrap(), identify, peer)
482 }
483
484 #[tokio::test]
485 async fn update_identify_addresses() {
486 let (mut litep2p1, mut event_stream1, peer1) = create_litep2p();
488 let (mut litep2p2, mut event_stream2, _peer2) = create_litep2p();
489 let litep2p1_address = litep2p1.listen_addresses().next().unwrap();
490
491 let multiaddr: Multiaddr = "/ip6/::9/tcp/111".parse().unwrap();
492 assert!(litep2p1.public_addresses().add_address(multiaddr.clone()).unwrap());
494
495 litep2p2.dial_address(litep2p1_address.clone()).await.unwrap();
497
498 let expected_multiaddr = multiaddr.with(Protocol::P2p(peer1.into()));
499
500 tokio::spawn(async move {
501 loop {
502 tokio::select! {
503 _ = litep2p1.next_event() => {}
504 _event = event_stream1.next() => {}
505 }
506 }
507 });
508
509 loop {
510 tokio::select! {
511 _ = litep2p2.next_event() => {}
512 event = event_stream2.next() => match event {
513 Some(IdentifyEvent::PeerIdentified {
514 listen_addresses,
515 ..
516 }) => {
517 assert!(listen_addresses.iter().any(|address| address == &expected_multiaddr));
518 break;
519 }
520 _ => {}
521 }
522 }
523 }
524 }
525}