libp2p_identify/
behaviour.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::handler::{self, Handler, InEvent};
22use crate::protocol::{Info, UpgradeError};
23use libp2p_core::multiaddr::Protocol;
24use libp2p_core::transport::PortUse;
25use libp2p_core::{multiaddr, ConnectedPoint, Endpoint, Multiaddr};
26use libp2p_identity::PeerId;
27use libp2p_identity::PublicKey;
28use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
29use libp2p_swarm::{
30    ConnectionDenied, DialError, ExternalAddresses, ListenAddresses, NetworkBehaviour,
31    NotifyHandler, PeerAddresses, StreamUpgradeError, THandlerInEvent, ToSwarm,
32    _address_translation,
33};
34use libp2p_swarm::{ConnectionId, THandler, THandlerOutEvent};
35
36use std::collections::hash_map::Entry;
37use std::num::NonZeroUsize;
38use std::{
39    collections::{HashMap, HashSet, VecDeque},
40    task::Context,
41    task::Poll,
42    time::Duration,
43};
44
45/// Whether an [`Multiaddr`] is a valid for the QUIC transport.
46fn is_quic_addr(addr: &Multiaddr, v1: bool) -> bool {
47    use Protocol::*;
48    let mut iter = addr.iter();
49    let Some(first) = iter.next() else {
50        return false;
51    };
52    let Some(second) = iter.next() else {
53        return false;
54    };
55    let Some(third) = iter.next() else {
56        return false;
57    };
58    let fourth = iter.next();
59    let fifth = iter.next();
60
61    matches!(first, Ip4(_) | Ip6(_) | Dns(_) | Dns4(_) | Dns6(_))
62        && matches!(second, Udp(_))
63        && if v1 {
64            matches!(third, QuicV1)
65        } else {
66            matches!(third, Quic)
67        }
68        && matches!(fourth, Some(P2p(_)) | None)
69        && fifth.is_none()
70}
71
72fn is_tcp_addr(addr: &Multiaddr) -> bool {
73    use Protocol::*;
74
75    let mut iter = addr.iter();
76
77    let first = match iter.next() {
78        None => return false,
79        Some(p) => p,
80    };
81    let second = match iter.next() {
82        None => return false,
83        Some(p) => p,
84    };
85
86    matches!(first, Ip4(_) | Ip6(_) | Dns(_) | Dns4(_) | Dns6(_)) && matches!(second, Tcp(_))
87}
88
89/// Network behaviour that automatically identifies nodes periodically, returns information
90/// about them, and answers identify queries from other nodes.
91///
92/// All external addresses of the local node supposedly observed by remotes
93/// are reported via [`ToSwarm::NewExternalAddrCandidate`].
94pub struct Behaviour {
95    config: Config,
96    /// For each peer we're connected to, the observed address to send back to it.
97    connected: HashMap<PeerId, HashMap<ConnectionId, Multiaddr>>,
98
99    /// The address a remote observed for us.
100    our_observed_addresses: HashMap<ConnectionId, Multiaddr>,
101
102    /// The outbound connections established without port reuse (require translation)
103    outbound_connections_with_ephemeral_port: HashSet<ConnectionId>,
104
105    /// Pending events to be emitted when polled.
106    events: VecDeque<ToSwarm<Event, InEvent>>,
107    /// The addresses of all peers that we have discovered.
108    discovered_peers: PeerCache,
109
110    listen_addresses: ListenAddresses,
111    external_addresses: ExternalAddresses,
112}
113
114/// Configuration for the [`identify::Behaviour`](Behaviour).
115#[non_exhaustive]
116#[derive(Debug, Clone)]
117pub struct Config {
118    /// Application-specific version of the protocol family used by the peer,
119    /// e.g. `ipfs/1.0.0` or `polkadot/1.0.0`.
120    pub protocol_version: String,
121    /// The public key of the local node. To report on the wire.
122    pub local_public_key: PublicKey,
123    /// Name and version of the local peer implementation, similar to the
124    /// `User-Agent` header in the HTTP protocol.
125    ///
126    /// Defaults to `rust-libp2p/<libp2p-identify-version>`.
127    pub agent_version: String,
128    /// The interval at which identification requests are sent to
129    /// the remote on established connections after the first request,
130    /// i.e. the delay between identification requests.
131    ///
132    /// Defaults to 5 minutes.
133    pub interval: Duration,
134
135    /// Whether new or expired listen addresses of the local node should
136    /// trigger an active push of an identify message to all connected peers.
137    ///
138    /// Enabling this option can result in connected peers being informed
139    /// earlier about new or expired listen addresses of the local node,
140    /// i.e. before the next periodic identify request with each peer.
141    ///
142    /// Disabled by default.
143    pub push_listen_addr_updates: bool,
144
145    /// How many entries of discovered peers to keep before we discard
146    /// the least-recently used one.
147    ///
148    /// Disabled by default.
149    pub cache_size: usize,
150}
151
152impl Config {
153    /// Creates a new configuration for the identify [`Behaviour`] that
154    /// advertises the given protocol version and public key.
155    pub fn new(protocol_version: String, local_public_key: PublicKey) -> Self {
156        Self {
157            protocol_version,
158            agent_version: format!("rust-libp2p/{}", env!("CARGO_PKG_VERSION")),
159            local_public_key,
160            interval: Duration::from_secs(5 * 60),
161            push_listen_addr_updates: false,
162            cache_size: 100,
163        }
164    }
165
166    /// Configures the agent version sent to peers.
167    pub fn with_agent_version(mut self, v: String) -> Self {
168        self.agent_version = v;
169        self
170    }
171
172    /// Configures the interval at which identification requests are
173    /// sent to peers after the initial request.
174    pub fn with_interval(mut self, d: Duration) -> Self {
175        self.interval = d;
176        self
177    }
178
179    /// Configures whether new or expired listen addresses of the local
180    /// node should trigger an active push of an identify message to all
181    /// connected peers.
182    pub fn with_push_listen_addr_updates(mut self, b: bool) -> Self {
183        self.push_listen_addr_updates = b;
184        self
185    }
186
187    /// Configures the size of the LRU cache, caching addresses of discovered peers.
188    pub fn with_cache_size(mut self, cache_size: usize) -> Self {
189        self.cache_size = cache_size;
190        self
191    }
192}
193
194impl Behaviour {
195    /// Creates a new identify [`Behaviour`].
196    pub fn new(config: Config) -> Self {
197        let discovered_peers = match NonZeroUsize::new(config.cache_size) {
198            None => PeerCache::disabled(),
199            Some(size) => PeerCache::enabled(size),
200        };
201
202        Self {
203            config,
204            connected: HashMap::new(),
205            our_observed_addresses: Default::default(),
206            outbound_connections_with_ephemeral_port: Default::default(),
207            events: VecDeque::new(),
208            discovered_peers,
209            listen_addresses: Default::default(),
210            external_addresses: Default::default(),
211        }
212    }
213
214    /// Initiates an active push of the local peer information to the given peers.
215    pub fn push<I>(&mut self, peers: I)
216    where
217        I: IntoIterator<Item = PeerId>,
218    {
219        for p in peers {
220            if !self.connected.contains_key(&p) {
221                tracing::debug!(peer=%p, "Not pushing to peer because we are not connected");
222                continue;
223            }
224
225            self.events.push_back(ToSwarm::NotifyHandler {
226                peer_id: p,
227                handler: NotifyHandler::Any,
228                event: InEvent::Push,
229            });
230        }
231    }
232
233    fn on_connection_established(
234        &mut self,
235        ConnectionEstablished {
236            peer_id,
237            connection_id: conn,
238            endpoint,
239            failed_addresses,
240            ..
241        }: ConnectionEstablished,
242    ) {
243        let addr = match endpoint {
244            ConnectedPoint::Dialer { address, .. } => address.clone(),
245            ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr.clone(),
246        };
247
248        self.connected
249            .entry(peer_id)
250            .or_default()
251            .insert(conn, addr);
252
253        if let Some(cache) = self.discovered_peers.0.as_mut() {
254            for addr in failed_addresses {
255                cache.remove(&peer_id, addr);
256            }
257        }
258    }
259
260    fn all_addresses(&self) -> HashSet<Multiaddr> {
261        self.listen_addresses
262            .iter()
263            .chain(self.external_addresses.iter())
264            .cloned()
265            .collect()
266    }
267
268    fn emit_new_external_addr_candidate_event(
269        &mut self,
270        connection_id: ConnectionId,
271        observed: &Multiaddr,
272    ) {
273        if self
274            .outbound_connections_with_ephemeral_port
275            .contains(&connection_id)
276        {
277            // Apply address translation to the candidate address.
278            // For TCP without port-reuse, the observed address contains an ephemeral port which needs to be replaced by the port of a listen address.
279            let translated_addresses = {
280                let mut addrs: Vec<_> = self
281                    .listen_addresses
282                    .iter()
283                    .filter_map(|server| {
284                        if (is_tcp_addr(server) && is_tcp_addr(observed))
285                            || (is_quic_addr(server, true) && is_quic_addr(observed, true))
286                            || (is_quic_addr(server, false) && is_quic_addr(observed, false))
287                        {
288                            _address_translation(server, observed)
289                        } else {
290                            None
291                        }
292                    })
293                    .collect();
294
295                // remove duplicates
296                addrs.sort_unstable();
297                addrs.dedup();
298                addrs
299            };
300
301            // If address translation yielded nothing, broadcast the original candidate address.
302            if translated_addresses.is_empty() {
303                self.events
304                    .push_back(ToSwarm::NewExternalAddrCandidate(observed.clone()));
305            } else {
306                for addr in translated_addresses {
307                    self.events
308                        .push_back(ToSwarm::NewExternalAddrCandidate(addr));
309                }
310            }
311            return;
312        }
313
314        // outgoing connection dialed with port reuse
315        // incomming connection
316        self.events
317            .push_back(ToSwarm::NewExternalAddrCandidate(observed.clone()));
318    }
319}
320
321impl NetworkBehaviour for Behaviour {
322    type ConnectionHandler = Handler;
323    type ToSwarm = Event;
324
325    fn handle_established_inbound_connection(
326        &mut self,
327        _: ConnectionId,
328        peer: PeerId,
329        _: &Multiaddr,
330        remote_addr: &Multiaddr,
331    ) -> Result<THandler<Self>, ConnectionDenied> {
332        Ok(Handler::new(
333            self.config.interval,
334            peer,
335            self.config.local_public_key.clone(),
336            self.config.protocol_version.clone(),
337            self.config.agent_version.clone(),
338            remote_addr.clone(),
339            self.all_addresses(),
340        ))
341    }
342
343    fn handle_established_outbound_connection(
344        &mut self,
345        connection_id: ConnectionId,
346        peer: PeerId,
347        addr: &Multiaddr,
348        _: Endpoint,
349        port_use: PortUse,
350    ) -> Result<THandler<Self>, ConnectionDenied> {
351        // Contrary to inbound events, outbound events are full-p2p qualified
352        // so we remove /p2p/ in order to be homogeneous
353        // this will avoid Autonatv2 to probe twice the same address (fully-p2p-qualified + not fully-p2p-qualified)
354        let mut addr = addr.clone();
355        if matches!(addr.iter().last(), Some(multiaddr::Protocol::P2p(_))) {
356            addr.pop();
357        }
358
359        if port_use == PortUse::New {
360            self.outbound_connections_with_ephemeral_port
361                .insert(connection_id);
362        }
363
364        Ok(Handler::new(
365            self.config.interval,
366            peer,
367            self.config.local_public_key.clone(),
368            self.config.protocol_version.clone(),
369            self.config.agent_version.clone(),
370            addr.clone(), // TODO: This is weird? That is the public address we dialed, shouldn't need to tell the other party?
371            self.all_addresses(),
372        ))
373    }
374
375    fn on_connection_handler_event(
376        &mut self,
377        peer_id: PeerId,
378        connection_id: ConnectionId,
379        event: THandlerOutEvent<Self>,
380    ) {
381        match event {
382            handler::Event::Identified(mut info) => {
383                // Remove invalid multiaddrs.
384                info.listen_addrs
385                    .retain(|addr| multiaddr_matches_peer_id(addr, &peer_id));
386
387                let observed = info.observed_addr.clone();
388                self.events
389                    .push_back(ToSwarm::GenerateEvent(Event::Received {
390                        connection_id,
391                        peer_id,
392                        info: info.clone(),
393                    }));
394
395                if let Some(ref mut discovered_peers) = self.discovered_peers.0 {
396                    for address in &info.listen_addrs {
397                        if discovered_peers.add(peer_id, address.clone()) {
398                            self.events.push_back(ToSwarm::NewExternalAddrOfPeer {
399                                peer_id,
400                                address: address.clone(),
401                            });
402                        }
403                    }
404                }
405
406                match self.our_observed_addresses.entry(connection_id) {
407                    Entry::Vacant(not_yet_observed) => {
408                        not_yet_observed.insert(observed.clone());
409                        self.emit_new_external_addr_candidate_event(connection_id, &observed);
410                    }
411                    Entry::Occupied(already_observed) if already_observed.get() == &observed => {
412                        // No-op, we already observed this address.
413                    }
414                    Entry::Occupied(mut already_observed) => {
415                        tracing::info!(
416                            old_address=%already_observed.get(),
417                            new_address=%observed,
418                            "Our observed address on connection {connection_id} changed",
419                        );
420
421                        *already_observed.get_mut() = observed.clone();
422                        self.emit_new_external_addr_candidate_event(connection_id, &observed);
423                    }
424                }
425            }
426            handler::Event::Identification => {
427                self.events.push_back(ToSwarm::GenerateEvent(Event::Sent {
428                    connection_id,
429                    peer_id,
430                }));
431            }
432            handler::Event::IdentificationPushed(info) => {
433                self.events.push_back(ToSwarm::GenerateEvent(Event::Pushed {
434                    connection_id,
435                    peer_id,
436                    info,
437                }));
438            }
439            handler::Event::IdentificationError(error) => {
440                self.events.push_back(ToSwarm::GenerateEvent(Event::Error {
441                    connection_id,
442                    peer_id,
443                    error,
444                }));
445            }
446        }
447    }
448
449    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self))]
450    fn poll(&mut self, _: &mut Context<'_>) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
451        if let Some(event) = self.events.pop_front() {
452            return Poll::Ready(event);
453        }
454
455        Poll::Pending
456    }
457
458    fn handle_pending_outbound_connection(
459        &mut self,
460        _connection_id: ConnectionId,
461        maybe_peer: Option<PeerId>,
462        _addresses: &[Multiaddr],
463        _effective_role: Endpoint,
464    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
465        let peer = match maybe_peer {
466            None => return Ok(vec![]),
467            Some(peer) => peer,
468        };
469
470        Ok(self.discovered_peers.get(&peer))
471    }
472
473    fn on_swarm_event(&mut self, event: FromSwarm) {
474        let listen_addr_changed = self.listen_addresses.on_swarm_event(&event);
475        let external_addr_changed = self.external_addresses.on_swarm_event(&event);
476
477        if listen_addr_changed || external_addr_changed {
478            // notify all connected handlers about our changed addresses
479            let change_events = self
480                .connected
481                .iter()
482                .flat_map(|(peer, map)| map.keys().map(|id| (*peer, id)))
483                .map(|(peer_id, connection_id)| ToSwarm::NotifyHandler {
484                    peer_id,
485                    handler: NotifyHandler::One(*connection_id),
486                    event: InEvent::AddressesChanged(self.all_addresses()),
487                })
488                .collect::<Vec<_>>();
489
490            self.events.extend(change_events)
491        }
492
493        if listen_addr_changed && self.config.push_listen_addr_updates {
494            // trigger an identify push for all connected peers
495            let push_events = self.connected.keys().map(|peer| ToSwarm::NotifyHandler {
496                peer_id: *peer,
497                handler: NotifyHandler::Any,
498                event: InEvent::Push,
499            });
500
501            self.events.extend(push_events);
502        }
503
504        match event {
505            FromSwarm::ConnectionEstablished(connection_established) => {
506                self.on_connection_established(connection_established)
507            }
508            FromSwarm::ConnectionClosed(ConnectionClosed {
509                peer_id,
510                connection_id,
511                remaining_established,
512                ..
513            }) => {
514                if remaining_established == 0 {
515                    self.connected.remove(&peer_id);
516                } else if let Some(addrs) = self.connected.get_mut(&peer_id) {
517                    addrs.remove(&connection_id);
518                }
519
520                self.our_observed_addresses.remove(&connection_id);
521                self.outbound_connections_with_ephemeral_port
522                    .remove(&connection_id);
523            }
524            FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
525                if let (Some(peer_id), Some(cache), DialError::Transport(errors)) =
526                    (peer_id, self.discovered_peers.0.as_mut(), error)
527                {
528                    for (addr, _error) in errors {
529                        cache.remove(&peer_id, addr);
530                    }
531                }
532            }
533            _ => {}
534        }
535    }
536}
537
538/// Event emitted  by the `Identify` behaviour.
539#[allow(clippy::large_enum_variant)]
540#[derive(Debug)]
541pub enum Event {
542    /// Identification information has been received from a peer.
543    Received {
544        /// Identifier of the connection.
545        connection_id: ConnectionId,
546        /// The peer that has been identified.
547        peer_id: PeerId,
548        /// The information provided by the peer.
549        info: Info,
550    },
551    /// Identification information of the local node has been sent to a peer in
552    /// response to an identification request.
553    Sent {
554        /// Identifier of the connection.
555        connection_id: ConnectionId,
556        /// The peer that the information has been sent to.
557        peer_id: PeerId,
558    },
559    /// Identification information of the local node has been actively pushed to
560    /// a peer.
561    Pushed {
562        /// Identifier of the connection.
563        connection_id: ConnectionId,
564        /// The peer that the information has been sent to.
565        peer_id: PeerId,
566        /// The full Info struct we pushed to the remote peer. Clients must
567        /// do some diff'ing to know what has changed since the last push.
568        info: Info,
569    },
570    /// Error while attempting to identify the remote.
571    Error {
572        /// Identifier of the connection.
573        connection_id: ConnectionId,
574        /// The peer with whom the error originated.
575        peer_id: PeerId,
576        /// The error that occurred.
577        error: StreamUpgradeError<UpgradeError>,
578    },
579}
580
581impl Event {
582    pub fn connection_id(&self) -> ConnectionId {
583        match self {
584            Event::Received { connection_id, .. }
585            | Event::Sent { connection_id, .. }
586            | Event::Pushed { connection_id, .. }
587            | Event::Error { connection_id, .. } => *connection_id,
588        }
589    }
590}
591
592/// If there is a given peer_id in the multiaddr, make sure it is the same as
593/// the given peer_id. If there is no peer_id for the peer in the mutiaddr, this returns true.
594fn multiaddr_matches_peer_id(addr: &Multiaddr, peer_id: &PeerId) -> bool {
595    let last_component = addr.iter().last();
596    if let Some(multiaddr::Protocol::P2p(multi_addr_peer_id)) = last_component {
597        return multi_addr_peer_id == *peer_id;
598    }
599    true
600}
601
602struct PeerCache(Option<PeerAddresses>);
603
604impl PeerCache {
605    fn disabled() -> Self {
606        Self(None)
607    }
608
609    fn enabled(size: NonZeroUsize) -> Self {
610        Self(Some(PeerAddresses::new(size)))
611    }
612
613    fn get(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
614        if let Some(cache) = self.0.as_mut() {
615            cache.get(peer).collect()
616        } else {
617            Vec::new()
618        }
619    }
620}
621
622#[cfg(test)]
623mod tests {
624    use super::*;
625
626    #[test]
627    fn check_multiaddr_matches_peer_id() {
628        let peer_id = PeerId::random();
629        let other_peer_id = PeerId::random();
630        let mut addr: Multiaddr = "/ip4/147.75.69.143/tcp/4001"
631            .parse()
632            .expect("failed to parse multiaddr");
633
634        let addr_without_peer_id: Multiaddr = addr.clone();
635        let mut addr_with_other_peer_id = addr.clone();
636
637        addr.push(multiaddr::Protocol::P2p(peer_id));
638        addr_with_other_peer_id.push(multiaddr::Protocol::P2p(other_peer_id));
639
640        assert!(multiaddr_matches_peer_id(&addr, &peer_id));
641        assert!(!multiaddr_matches_peer_id(
642            &addr_with_other_peer_id,
643            &peer_id
644        ));
645        assert!(multiaddr_matches_peer_id(&addr_without_peer_id, &peer_id));
646    }
647}