litep2p/protocol/libp2p/
identify.rs

1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! [`/ipfs/identify/1.0.0`](https://github.com/libp2p/specs/blob/master/identify/README.md) implementation.
22
23use crate::{
24    codec::ProtocolCodec,
25    crypto::PublicKey,
26    error::{Error, SubstreamError},
27    protocol::{Direction, TransportEvent, TransportService},
28    substream::Substream,
29    transport::Endpoint,
30    types::{protocol::ProtocolName, SubstreamId},
31    PeerId, DEFAULT_CHANNEL_SIZE,
32};
33
34use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt};
35use multiaddr::Multiaddr;
36use prost::Message;
37use tokio::sync::mpsc::{channel, Sender};
38use tokio_stream::wrappers::ReceiverStream;
39
40use std::{
41    collections::{HashMap, HashSet},
42    time::Duration,
43};
44
45/// Log target for the file.
46const LOG_TARGET: &str = "litep2p::ipfs::identify";
47
48/// IPFS Identify protocol name
49const PROTOCOL_NAME: &str = "/ipfs/id/1.0.0";
50
51/// IPFS Identify push protocol name.
52const _PUSH_PROTOCOL_NAME: &str = "/ipfs/id/push/1.0.0";
53
54/// Default agent version.
55const DEFAULT_AGENT: &str = "litep2p/1.0.0";
56
57/// Size for `/ipfs/ping/1.0.0` payloads.
58// TODO: what is the max size?
59const IDENTIFY_PAYLOAD_SIZE: usize = 4096;
60
61mod identify_schema {
62    include!(concat!(env!("OUT_DIR"), "/identify.rs"));
63}
64
65/// Identify configuration.
66pub struct Config {
67    /// Protocol name.
68    pub(crate) protocol: ProtocolName,
69
70    /// Codec used by the protocol.
71    pub(crate) codec: ProtocolCodec,
72
73    /// TX channel for sending events to the user protocol.
74    tx_event: Sender<IdentifyEvent>,
75
76    // Public key of the local node, filled by `Litep2p`.
77    pub(crate) public: Option<PublicKey>,
78
79    /// Protocols supported by the local node, filled by `Litep2p`.
80    pub(crate) protocols: Vec<ProtocolName>,
81
82    /// Protocol version.
83    pub(crate) protocol_version: String,
84
85    /// User agent.
86    pub(crate) user_agent: Option<String>,
87}
88
89impl Config {
90    /// Create new [`Config`].
91    ///
92    /// Returns a config that is given to `Litep2pConfig` and an event stream for
93    /// [`IdentifyEvent`]s.
94    pub fn new(
95        protocol_version: String,
96        user_agent: Option<String>,
97    ) -> (Self, Box<dyn Stream<Item = IdentifyEvent> + Send + Unpin>) {
98        let (tx_event, rx_event) = channel(DEFAULT_CHANNEL_SIZE);
99
100        (
101            Self {
102                tx_event,
103                public: None,
104                protocol_version,
105                user_agent,
106                codec: ProtocolCodec::UnsignedVarint(Some(IDENTIFY_PAYLOAD_SIZE)),
107                protocols: Vec::new(),
108                protocol: ProtocolName::from(PROTOCOL_NAME),
109            },
110            Box::new(ReceiverStream::new(rx_event)),
111        )
112    }
113}
114
115/// Events emitted by Identify protocol.
116#[derive(Debug)]
117pub enum IdentifyEvent {
118    /// Peer identified.
119    PeerIdentified {
120        /// Peer ID.
121        peer: PeerId,
122
123        /// Protocol version.
124        protocol_version: Option<String>,
125
126        /// User agent.
127        user_agent: Option<String>,
128
129        /// Supported protocols.
130        supported_protocols: HashSet<ProtocolName>,
131
132        /// Observed address.
133        observed_address: Multiaddr,
134
135        /// Listen addresses.
136        listen_addresses: Vec<Multiaddr>,
137    },
138}
139
140/// Identify response received from remote.
141struct IdentifyResponse {
142    /// Remote peer ID.
143    peer: PeerId,
144
145    /// Protocol version.
146    protocol_version: Option<String>,
147
148    /// User agent.
149    user_agent: Option<String>,
150
151    /// Protocols supported by remote.
152    supported_protocols: HashSet<String>,
153
154    /// Remote's listen addresses.
155    listen_addresses: Vec<Multiaddr>,
156
157    /// Observed address.
158    observed_address: Option<Multiaddr>,
159}
160
161pub(crate) struct Identify {
162    // Connection service.
163    service: TransportService,
164
165    /// TX channel for sending events to the user protocol.
166    tx: Sender<IdentifyEvent>,
167
168    /// Connected peers and their observed addresses.
169    peers: HashMap<PeerId, Endpoint>,
170
171    // Public key of the local node, filled by `Litep2p`.
172    public: PublicKey,
173
174    /// Protocol version.
175    protocol_version: String,
176
177    /// User agent.
178    user_agent: String,
179
180    /// Protocols supported by the local node, filled by `Litep2p`.
181    protocols: Vec<String>,
182
183    /// Pending outbound substreams.
184    pending_opens: HashMap<SubstreamId, PeerId>,
185
186    /// Pending outbound substreams.
187    pending_outbound: FuturesUnordered<BoxFuture<'static, crate::Result<IdentifyResponse>>>,
188
189    /// Pending inbound substreams.
190    pending_inbound: FuturesUnordered<BoxFuture<'static, ()>>,
191}
192
193impl Identify {
194    /// Create new [`Identify`] protocol.
195    pub(crate) fn new(service: TransportService, config: Config) -> Self {
196        Self {
197            service,
198            tx: config.tx_event,
199            peers: HashMap::new(),
200            public: config.public.expect("public key to be supplied"),
201            protocol_version: config.protocol_version,
202            user_agent: config.user_agent.unwrap_or(DEFAULT_AGENT.to_string()),
203            pending_opens: HashMap::new(),
204            pending_inbound: FuturesUnordered::new(),
205            pending_outbound: FuturesUnordered::new(),
206            protocols: config.protocols.iter().map(|protocol| protocol.to_string()).collect(),
207        }
208    }
209
210    /// Connection established to remote peer.
211    fn on_connection_established(&mut self, peer: PeerId, endpoint: Endpoint) -> crate::Result<()> {
212        tracing::trace!(target: LOG_TARGET, ?peer, ?endpoint, "connection established");
213
214        let substream_id = self.service.open_substream(peer)?;
215        self.pending_opens.insert(substream_id, peer);
216        self.peers.insert(peer, endpoint);
217
218        Ok(())
219    }
220
221    /// Connection closed to remote peer.
222    fn on_connection_closed(&mut self, peer: PeerId) {
223        tracing::trace!(target: LOG_TARGET, ?peer, "connection closed");
224
225        self.peers.remove(&peer);
226    }
227
228    /// Inbound substream opened.
229    fn on_inbound_substream(
230        &mut self,
231        peer: PeerId,
232        protocol: ProtocolName,
233        mut substream: Substream,
234    ) {
235        tracing::trace!(
236            target: LOG_TARGET,
237            ?peer,
238            ?protocol,
239            "inbound substream opened"
240        );
241
242        let observed_addr = match self.peers.get(&peer) {
243            Some(endpoint) => Some(endpoint.address().to_vec()),
244            None => {
245                tracing::warn!(
246                    target: LOG_TARGET,
247                    ?peer,
248                    %protocol,
249                    "inbound identify substream opened for peer who doesn't exist",
250                );
251                None
252            }
253        };
254
255        let mut listen_addr: HashSet<_> =
256            self.service.listen_addresses().into_iter().map(|addr| addr.to_vec()).collect();
257        listen_addr
258            .extend(self.service.public_addresses().inner.read().iter().map(|addr| addr.to_vec()));
259
260        let identify = identify_schema::Identify {
261            protocol_version: Some(self.protocol_version.clone()),
262            agent_version: Some(self.user_agent.clone()),
263            public_key: Some(self.public.to_protobuf_encoding()),
264            listen_addrs: listen_addr.into_iter().collect(),
265            observed_addr,
266            protocols: self.protocols.clone(),
267        };
268
269        tracing::trace!(
270            target: LOG_TARGET,
271            ?peer,
272            ?identify,
273            "sending identify response",
274        );
275
276        let mut msg = Vec::with_capacity(identify.encoded_len());
277        identify.encode(&mut msg).expect("`msg` to have enough capacity");
278
279        self.pending_inbound.push(Box::pin(async move {
280            match tokio::time::timeout(Duration::from_secs(10), substream.send_framed(msg.into()))
281                .await
282            {
283                Err(error) => {
284                    tracing::debug!(
285                        target: LOG_TARGET,
286                        ?peer,
287                        ?error,
288                        "timed out while sending ipfs identify response",
289                    );
290                }
291                Ok(Err(error)) => {
292                    tracing::debug!(
293                        target: LOG_TARGET,
294                        ?peer,
295                        ?error,
296                        "failed to send ipfs identify response",
297                    );
298                }
299                Ok(_) => {}
300            }
301        }))
302    }
303
304    /// Outbound substream opened.
305    fn on_outbound_substream(
306        &mut self,
307        peer: PeerId,
308        protocol: ProtocolName,
309        substream_id: SubstreamId,
310        mut substream: Substream,
311    ) {
312        tracing::trace!(
313            target: LOG_TARGET,
314            ?peer,
315            ?protocol,
316            ?substream_id,
317            "outbound substream opened"
318        );
319
320        self.pending_outbound.push(Box::pin(async move {
321            let payload =
322                match tokio::time::timeout(Duration::from_secs(10), substream.next()).await {
323                    Err(_) => return Err(Error::Timeout),
324                    Ok(None) =>
325                        return Err(Error::SubstreamError(SubstreamError::ReadFailure(Some(
326                            substream_id,
327                        )))),
328                    Ok(Some(Err(error))) => return Err(error.into()),
329                    Ok(Some(Ok(payload))) => payload,
330                };
331
332            let info = identify_schema::Identify::decode(payload.to_vec().as_slice())?;
333
334            tracing::trace!(target: LOG_TARGET, ?peer, ?info, "peer identified");
335
336            let listen_addresses = info
337                .listen_addrs
338                .iter()
339                .filter_map(|address| Multiaddr::try_from(address.clone()).ok())
340                .collect();
341            let observed_address =
342                info.observed_addr.and_then(|address| Multiaddr::try_from(address).ok());
343            let protocol_version = info.protocol_version;
344            let user_agent = info.agent_version;
345
346            Ok(IdentifyResponse {
347                peer,
348                protocol_version,
349                user_agent,
350                supported_protocols: HashSet::from_iter(info.protocols),
351                observed_address,
352                listen_addresses,
353            })
354        }));
355    }
356
357    /// Start [`Identify`] event loop.
358    pub async fn run(mut self) {
359        tracing::debug!(target: LOG_TARGET, "starting identify event loop");
360
361        loop {
362            tokio::select! {
363                event = self.service.next() => match event {
364                    None => return,
365                    Some(TransportEvent::ConnectionEstablished { peer, endpoint }) => {
366                        let _ = self.on_connection_established(peer, endpoint);
367                    }
368                    Some(TransportEvent::ConnectionClosed { peer }) => {
369                        self.on_connection_closed(peer);
370                    }
371                    Some(TransportEvent::SubstreamOpened {
372                        peer,
373                        protocol,
374                        direction,
375                        substream,
376                        ..
377                    }) => match direction {
378                        Direction::Inbound => self.on_inbound_substream(peer, protocol, substream),
379                        Direction::Outbound(substream_id) => self.on_outbound_substream(peer, protocol, substream_id, substream),
380                    },
381                    _ => {}
382                },
383                _ = self.pending_inbound.next(), if !self.pending_inbound.is_empty() => {}
384                event = self.pending_outbound.next(), if !self.pending_outbound.is_empty() => match event {
385                    Some(Ok(response)) => {
386                        let _ = self.tx
387                            .send(IdentifyEvent::PeerIdentified {
388                                peer: response.peer,
389                                protocol_version: response.protocol_version,
390                                user_agent: response.user_agent,
391                                supported_protocols: response.supported_protocols.into_iter().map(From::from).collect(),
392                                observed_address: response.observed_address.map_or(Multiaddr::empty(), |address| address),
393                                listen_addresses: response.listen_addresses,
394                            })
395                            .await;
396                    }
397                    Some(Err(error)) => tracing::debug!(target: LOG_TARGET, ?error, "failed to read ipfs identify response"),
398                    None => return,
399                }
400            }
401        }
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408    use crate::{config::ConfigBuilder, transport::tcp::config::Config as TcpConfig, Litep2p};
409    use multiaddr::{Multiaddr, Protocol};
410
411    fn create_litep2p() -> (
412        Litep2p,
413        Box<dyn Stream<Item = IdentifyEvent> + Send + Unpin>,
414        PeerId,
415    ) {
416        let (identify_config, identify) =
417            Config::new("1.0.0".to_string(), Some("litep2p/1.0.0".to_string()));
418
419        let keypair = crate::crypto::ed25519::Keypair::generate();
420        let peer = PeerId::from_public_key(&crate::crypto::PublicKey::Ed25519(keypair.public()));
421        let config = ConfigBuilder::new()
422            .with_keypair(keypair)
423            .with_tcp(TcpConfig {
424                listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()],
425                ..Default::default()
426            })
427            .with_libp2p_identify(identify_config)
428            .build();
429
430        (Litep2p::new(config).unwrap(), identify, peer)
431    }
432
433    #[tokio::test]
434    async fn update_identify_addresses() {
435        // Create two instances of litep2p
436        let (mut litep2p1, mut event_stream1, peer1) = create_litep2p();
437        let (mut litep2p2, mut event_stream2, _peer2) = create_litep2p();
438        let litep2p1_address = litep2p1.listen_addresses().into_iter().next().unwrap();
439
440        let multiaddr: Multiaddr = "/ip6/::9/tcp/111".parse().unwrap();
441        // Litep2p1 is now reporting the new address.
442        assert!(litep2p1.public_addresses().add_address(multiaddr.clone()).unwrap());
443
444        // Dial `litep2p1`
445        litep2p2.dial_address(litep2p1_address.clone()).await.unwrap();
446
447        let expected_multiaddr = multiaddr.with(Protocol::P2p(peer1.into()));
448
449        tokio::spawn(async move {
450            loop {
451                tokio::select! {
452                    _ = litep2p1.next_event() => {}
453                    _event = event_stream1.next() => {}
454                }
455            }
456        });
457
458        loop {
459            tokio::select! {
460                _ = litep2p2.next_event() => {}
461                event = event_stream2.next() => match event {
462                    Some(IdentifyEvent::PeerIdentified {
463                        listen_addresses,
464                        ..
465                    }) => {
466                        assert!(listen_addresses.iter().any(|address| address == &expected_multiaddr));
467                        break;
468                    }
469                    _ => {}
470                }
471            }
472        }
473    }
474}