litep2p/protocol/libp2p/
identify.rs

1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! [`/ipfs/identify/1.0.0`](https://github.com/libp2p/specs/blob/master/identify/README.md) implementation.
22
23use crate::{
24    codec::ProtocolCodec,
25    crypto::PublicKey,
26    error::{Error, SubstreamError},
27    protocol::{Direction, TransportEvent, TransportService},
28    substream::Substream,
29    transport::Endpoint,
30    types::{protocol::ProtocolName, SubstreamId},
31    utils::futures_stream::FuturesStream,
32    PeerId, DEFAULT_CHANNEL_SIZE,
33};
34
35use futures::{future::BoxFuture, Stream, StreamExt};
36use multiaddr::Multiaddr;
37use prost::Message;
38use tokio::sync::mpsc::{channel, Sender};
39use tokio_stream::wrappers::ReceiverStream;
40
41use std::{
42    collections::{HashMap, HashSet},
43    time::Duration,
44};
45
46/// Log target for the file.
47const LOG_TARGET: &str = "litep2p::ipfs::identify";
48
49/// IPFS Identify protocol name
50const PROTOCOL_NAME: &str = "/ipfs/id/1.0.0";
51
52/// IPFS Identify push protocol name.
53const _PUSH_PROTOCOL_NAME: &str = "/ipfs/id/push/1.0.0";
54
55/// Default agent version.
56const DEFAULT_AGENT: &str = "litep2p/1.0.0";
57
58/// Size for `/ipfs/ping/1.0.0` payloads.
59// TODO: https://github.com/paritytech/litep2p/issues/334 what is the max size?
60const IDENTIFY_PAYLOAD_SIZE: usize = 4096;
61
62mod identify_schema {
63    include!(concat!(env!("OUT_DIR"), "/identify.rs"));
64}
65
66/// Identify configuration.
67pub struct Config {
68    /// Protocol name.
69    pub(crate) protocol: ProtocolName,
70
71    /// Codec used by the protocol.
72    pub(crate) codec: ProtocolCodec,
73
74    /// TX channel for sending events to the user protocol.
75    tx_event: Sender<IdentifyEvent>,
76
77    // Public key of the local node, filled by `Litep2p`.
78    pub(crate) public: Option<PublicKey>,
79
80    /// Protocols supported by the local node, filled by `Litep2p`.
81    pub(crate) protocols: Vec<ProtocolName>,
82
83    /// Protocol version.
84    pub(crate) protocol_version: String,
85
86    /// User agent.
87    pub(crate) user_agent: Option<String>,
88}
89
90impl Config {
91    /// Create new [`Config`].
92    ///
93    /// Returns a config that is given to `Litep2pConfig` and an event stream for
94    /// [`IdentifyEvent`]s.
95    pub fn new(
96        protocol_version: String,
97        user_agent: Option<String>,
98    ) -> (Self, Box<dyn Stream<Item = IdentifyEvent> + Send + Unpin>) {
99        let (tx_event, rx_event) = channel(DEFAULT_CHANNEL_SIZE);
100
101        (
102            Self {
103                tx_event,
104                public: None,
105                protocol_version,
106                user_agent,
107                codec: ProtocolCodec::UnsignedVarint(Some(IDENTIFY_PAYLOAD_SIZE)),
108                protocols: Vec::new(),
109                protocol: ProtocolName::from(PROTOCOL_NAME),
110            },
111            Box::new(ReceiverStream::new(rx_event)),
112        )
113    }
114}
115
116/// Events emitted by Identify protocol.
117#[derive(Debug)]
118pub enum IdentifyEvent {
119    /// Peer identified.
120    PeerIdentified {
121        /// Peer ID.
122        peer: PeerId,
123
124        /// Protocol version.
125        protocol_version: Option<String>,
126
127        /// User agent.
128        user_agent: Option<String>,
129
130        /// Supported protocols.
131        supported_protocols: HashSet<ProtocolName>,
132
133        /// Observed address.
134        observed_address: Multiaddr,
135
136        /// Listen addresses.
137        listen_addresses: Vec<Multiaddr>,
138    },
139}
140
141/// Identify response received from remote.
142struct IdentifyResponse {
143    /// Remote peer ID.
144    peer: PeerId,
145
146    /// Protocol version.
147    protocol_version: Option<String>,
148
149    /// User agent.
150    user_agent: Option<String>,
151
152    /// Protocols supported by remote.
153    supported_protocols: HashSet<String>,
154
155    /// Remote's listen addresses.
156    listen_addresses: Vec<Multiaddr>,
157
158    /// Observed address.
159    observed_address: Option<Multiaddr>,
160}
161
162pub(crate) struct Identify {
163    // Connection service.
164    service: TransportService,
165
166    /// TX channel for sending events to the user protocol.
167    tx: Sender<IdentifyEvent>,
168
169    /// Connected peers and their observed addresses.
170    peers: HashMap<PeerId, Endpoint>,
171
172    // Public key of the local node, filled by `Litep2p`.
173    public: PublicKey,
174
175    /// Local peer ID.
176    local_peer_id: PeerId,
177
178    /// Protocol version.
179    protocol_version: String,
180
181    /// User agent.
182    user_agent: String,
183
184    /// Protocols supported by the local node, filled by `Litep2p`.
185    protocols: Vec<String>,
186
187    /// Pending outbound substreams.
188    pending_outbound: FuturesStream<BoxFuture<'static, crate::Result<IdentifyResponse>>>,
189
190    /// Pending inbound substreams.
191    pending_inbound: FuturesStream<BoxFuture<'static, ()>>,
192}
193
194impl Identify {
195    /// Create new [`Identify`] protocol.
196    pub(crate) fn new(service: TransportService, config: Config) -> Self {
197        // The public key is always supplied by litep2p and is the one
198        // used to identify the local peer. This is a similar story to the
199        // supported protocols.
200        let public = config.public.expect("public key to always be supplied by litep2p; qed");
201        let local_peer_id = public.to_peer_id();
202
203        Self {
204            service,
205            tx: config.tx_event,
206            peers: HashMap::new(),
207            public,
208            local_peer_id,
209            protocol_version: config.protocol_version,
210            user_agent: config.user_agent.unwrap_or(DEFAULT_AGENT.to_string()),
211            pending_inbound: FuturesStream::new(),
212            pending_outbound: FuturesStream::new(),
213            protocols: config.protocols.iter().map(|protocol| protocol.to_string()).collect(),
214        }
215    }
216
217    /// Connection established to remote peer.
218    fn on_connection_established(&mut self, peer: PeerId, endpoint: Endpoint) -> crate::Result<()> {
219        tracing::trace!(target: LOG_TARGET, ?peer, ?endpoint, "connection established");
220
221        self.service.open_substream(peer)?;
222        self.peers.insert(peer, endpoint);
223
224        Ok(())
225    }
226
227    /// Connection closed to remote peer.
228    fn on_connection_closed(&mut self, peer: PeerId) {
229        tracing::trace!(target: LOG_TARGET, ?peer, "connection closed");
230
231        self.peers.remove(&peer);
232    }
233
234    /// Inbound substream opened.
235    fn on_inbound_substream(
236        &mut self,
237        peer: PeerId,
238        protocol: ProtocolName,
239        mut substream: Substream,
240    ) {
241        tracing::trace!(
242            target: LOG_TARGET,
243            ?peer,
244            ?protocol,
245            "inbound substream opened"
246        );
247
248        let observed_addr = match self.peers.get(&peer) {
249            Some(endpoint) => Some(endpoint.address().to_vec()),
250            None => {
251                tracing::warn!(
252                    target: LOG_TARGET,
253                    ?peer,
254                    %protocol,
255                    "inbound identify substream opened for peer who doesn't exist",
256                );
257                None
258            }
259        };
260
261        let mut listen_addr: HashSet<_> =
262            self.service.listen_addresses().into_iter().map(|addr| addr.to_vec()).collect();
263        listen_addr
264            .extend(self.service.public_addresses().inner.read().iter().map(|addr| addr.to_vec()));
265
266        let identify = identify_schema::Identify {
267            protocol_version: Some(self.protocol_version.clone()),
268            agent_version: Some(self.user_agent.clone()),
269            public_key: Some(self.public.to_protobuf_encoding()),
270            listen_addrs: listen_addr.into_iter().collect(),
271            observed_addr,
272            protocols: self.protocols.clone(),
273        };
274
275        tracing::trace!(
276            target: LOG_TARGET,
277            ?peer,
278            ?identify,
279            "sending identify response",
280        );
281
282        let mut msg = Vec::with_capacity(identify.encoded_len());
283        identify.encode(&mut msg).expect("`msg` to have enough capacity");
284
285        self.pending_inbound.push(Box::pin(async move {
286            match tokio::time::timeout(Duration::from_secs(10), substream.send_framed(msg.into()))
287                .await
288            {
289                Err(error) => {
290                    tracing::debug!(
291                        target: LOG_TARGET,
292                        ?peer,
293                        ?error,
294                        "timed out while sending ipfs identify response",
295                    );
296                }
297                Ok(Err(error)) => {
298                    tracing::debug!(
299                        target: LOG_TARGET,
300                        ?peer,
301                        ?error,
302                        "failed to send ipfs identify response",
303                    );
304                }
305                Ok(_) => {
306                    substream.close().await;
307                }
308            }
309        }))
310    }
311
312    /// Outbound substream opened.
313    fn on_outbound_substream(
314        &mut self,
315        peer: PeerId,
316        protocol: ProtocolName,
317        substream_id: SubstreamId,
318        mut substream: Substream,
319    ) {
320        tracing::trace!(
321            target: LOG_TARGET,
322            ?peer,
323            ?protocol,
324            ?substream_id,
325            "outbound substream opened"
326        );
327
328        let local_peer_id = self.local_peer_id;
329
330        self.pending_outbound.push(Box::pin(async move {
331            let payload =
332                match tokio::time::timeout(Duration::from_secs(10), substream.next()).await {
333                    Err(_) => return Err(Error::Timeout),
334                    Ok(None) =>
335                        return Err(Error::SubstreamError(SubstreamError::ReadFailure(Some(
336                            substream_id,
337                        )))),
338                    Ok(Some(Err(error))) => return Err(error.into()),
339                    Ok(Some(Ok(payload))) => payload,
340                };
341
342            let info = identify_schema::Identify::decode(payload.to_vec().as_slice()).map_err(
343                |err| {
344                    tracing::debug!(target: LOG_TARGET, ?peer, ?err, "peer identified provided undecodable identify response");
345                    err
346                })?;
347
348            tracing::trace!(target: LOG_TARGET, ?peer, ?info, "peer identified");
349
350            let listen_addresses = info
351                .listen_addrs
352                .iter()
353                .filter_map(|address| {
354                    let address = Multiaddr::try_from(address.clone()).ok()?;
355
356                    // Ensure the address ends with the provided peer ID and is not empty.
357                    if address.is_empty() {
358                        tracing::debug!(target: LOG_TARGET, ?peer, ?address, "peer identified provided empty listen address");
359                        return None;
360                    }
361                    if let Some(multiaddr::Protocol::P2p(peer_id)) = address.iter().last() {
362                        if peer_id != peer.into() {
363                            tracing::debug!(target: LOG_TARGET, ?peer, ?address, "peer identified provided listen address with incorrect peer ID; discarding the address");
364                            return None;
365                        }
366                    }
367
368                    Some(address)
369                })
370                .collect();
371
372            let observed_address =
373                info.observed_addr.and_then(|address| {
374                    let address = Multiaddr::try_from(address).ok()?;
375
376                    if address.is_empty() {
377                        tracing::debug!(target: LOG_TARGET, ?peer, ?address, "peer identified provided empty observed address");
378                        return None;
379                    }
380
381                    if let Some(multiaddr::Protocol::P2p(peer_id)) = address.iter().last() {
382                        if peer_id != local_peer_id.into() {
383                            tracing::debug!(target: LOG_TARGET, ?peer, ?address, "peer identified provided observed address with peer ID not matching our peer ID; discarding address");
384                            return None;
385                        }
386                    }
387
388                    Some(address)
389                });
390
391            let protocol_version = info.protocol_version;
392            let user_agent = info.agent_version;
393
394            Ok(IdentifyResponse {
395                peer,
396                protocol_version,
397                user_agent,
398                supported_protocols: HashSet::from_iter(info.protocols),
399                observed_address,
400                listen_addresses,
401            })
402        }));
403    }
404
405    /// Start [`Identify`] event loop.
406    pub async fn run(mut self) {
407        tracing::debug!(target: LOG_TARGET, "starting identify event loop");
408
409        loop {
410            tokio::select! {
411                event = self.service.next() => match event {
412                    None => {
413                        tracing::warn!(target: LOG_TARGET, "transport service stream ended, terminating identify event loop");
414                        return
415                    },
416                    Some(TransportEvent::ConnectionEstablished { peer, endpoint }) => {
417                        let _ = self.on_connection_established(peer, endpoint);
418                    }
419                    Some(TransportEvent::ConnectionClosed { peer }) => {
420                        self.on_connection_closed(peer);
421                    }
422                    Some(TransportEvent::SubstreamOpened {
423                        peer,
424                        protocol,
425                        direction,
426                        substream,
427                        ..
428                    }) => match direction {
429                        Direction::Inbound => self.on_inbound_substream(peer, protocol, substream),
430                        Direction::Outbound(substream_id) => self.on_outbound_substream(peer, protocol, substream_id, substream),
431                    },
432                    _ => {}
433                },
434                _ = self.pending_inbound.next(), if !self.pending_inbound.is_empty() => {}
435                event = self.pending_outbound.next(), if !self.pending_outbound.is_empty() => match event {
436                    Some(Ok(response)) => {
437                        let _ = self.tx
438                            .send(IdentifyEvent::PeerIdentified {
439                                peer: response.peer,
440                                protocol_version: response.protocol_version,
441                                user_agent: response.user_agent,
442                                supported_protocols: response.supported_protocols.into_iter().map(From::from).collect(),
443                                observed_address: response.observed_address.map_or(Multiaddr::empty(), |address| address),
444                                listen_addresses: response.listen_addresses,
445                            })
446                            .await;
447                    }
448                    Some(Err(error)) => tracing::debug!(target: LOG_TARGET, ?error, "failed to read ipfs identify response"),
449                    None => {}
450                }
451            }
452        }
453    }
454}
455
456#[cfg(test)]
457mod tests {
458    use super::*;
459    use crate::{config::ConfigBuilder, transport::tcp::config::Config as TcpConfig, Litep2p};
460    use multiaddr::{Multiaddr, Protocol};
461
462    fn create_litep2p() -> (
463        Litep2p,
464        Box<dyn Stream<Item = IdentifyEvent> + Send + Unpin>,
465        PeerId,
466    ) {
467        let (identify_config, identify) =
468            Config::new("1.0.0".to_string(), Some("litep2p/1.0.0".to_string()));
469
470        let keypair = crate::crypto::ed25519::Keypair::generate();
471        let peer = PeerId::from_public_key(&crate::crypto::PublicKey::Ed25519(keypair.public()));
472        let config = ConfigBuilder::new()
473            .with_keypair(keypair)
474            .with_tcp(TcpConfig {
475                listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()],
476                ..Default::default()
477            })
478            .with_libp2p_identify(identify_config)
479            .build();
480
481        (Litep2p::new(config).unwrap(), identify, peer)
482    }
483
484    #[tokio::test]
485    async fn update_identify_addresses() {
486        // Create two instances of litep2p
487        let (mut litep2p1, mut event_stream1, peer1) = create_litep2p();
488        let (mut litep2p2, mut event_stream2, _peer2) = create_litep2p();
489        let litep2p1_address = litep2p1.listen_addresses().next().unwrap();
490
491        let multiaddr: Multiaddr = "/ip6/::9/tcp/111".parse().unwrap();
492        // Litep2p1 is now reporting the new address.
493        assert!(litep2p1.public_addresses().add_address(multiaddr.clone()).unwrap());
494
495        // Dial `litep2p1`
496        litep2p2.dial_address(litep2p1_address.clone()).await.unwrap();
497
498        let expected_multiaddr = multiaddr.with(Protocol::P2p(peer1.into()));
499
500        tokio::spawn(async move {
501            loop {
502                tokio::select! {
503                    _ = litep2p1.next_event() => {}
504                    _event = event_stream1.next() => {}
505                }
506            }
507        });
508
509        loop {
510            tokio::select! {
511                _ = litep2p2.next_event() => {}
512                event = event_stream2.next() => match event {
513                    Some(IdentifyEvent::PeerIdentified {
514                        listen_addresses,
515                        ..
516                    }) => {
517                        assert!(listen_addresses.iter().any(|address| address == &expected_multiaddr));
518                        break;
519                    }
520                    _ => {}
521                }
522            }
523        }
524    }
525}