libp2p_identify/
behaviour.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::handler::{self, Handler, InEvent};
22use crate::protocol::{Info, UpgradeError};
23use libp2p_core::{multiaddr, ConnectedPoint, Endpoint, Multiaddr};
24use libp2p_identity::PeerId;
25use libp2p_identity::PublicKey;
26use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
27use libp2p_swarm::{
28    ConnectionDenied, DialError, ExternalAddresses, ListenAddresses, NetworkBehaviour,
29    NotifyHandler, PollParameters, StreamUpgradeError, THandlerInEvent, ToSwarm,
30};
31use libp2p_swarm::{ConnectionId, THandler, THandlerOutEvent};
32use lru::LruCache;
33use std::num::NonZeroUsize;
34use std::{
35    collections::{HashMap, HashSet, VecDeque},
36    iter::FromIterator,
37    task::Context,
38    task::Poll,
39    time::Duration,
40};
41
42/// Network behaviour that automatically identifies nodes periodically, returns information
43/// about them, and answers identify queries from other nodes.
44///
45/// All external addresses of the local node supposedly observed by remotes
46/// are reported via [`ToSwarm::NewExternalAddrCandidate`].
47pub struct Behaviour {
48    config: Config,
49    /// For each peer we're connected to, the observed address to send back to it.
50    connected: HashMap<PeerId, HashMap<ConnectionId, Multiaddr>>,
51    /// Pending events to be emitted when polled.
52    events: VecDeque<ToSwarm<Event, InEvent>>,
53    /// The addresses of all peers that we have discovered.
54    discovered_peers: PeerCache,
55
56    listen_addresses: ListenAddresses,
57    external_addresses: ExternalAddresses,
58}
59
60/// Configuration for the [`identify::Behaviour`](Behaviour).
61#[non_exhaustive]
62#[derive(Debug, Clone)]
63pub struct Config {
64    /// Application-specific version of the protocol family used by the peer,
65    /// e.g. `ipfs/1.0.0` or `polkadot/1.0.0`.
66    pub protocol_version: String,
67    /// The public key of the local node. To report on the wire.
68    pub local_public_key: PublicKey,
69    /// Name and version of the local peer implementation, similar to the
70    /// `User-Agent` header in the HTTP protocol.
71    ///
72    /// Defaults to `rust-libp2p/<libp2p-identify-version>`.
73    pub agent_version: String,
74    /// The initial delay before the first identification request
75    /// is sent to a remote on a newly established connection.
76    ///
77    /// Defaults to 0ms.
78    #[deprecated(note = "The `initial_delay` is no longer necessary and will be
79                completely removed since a remote should be able to instantly
80                answer to an identify request")]
81    pub initial_delay: Duration,
82    /// The interval at which identification requests are sent to
83    /// the remote on established connections after the first request,
84    /// i.e. the delay between identification requests.
85    ///
86    /// Defaults to 5 minutes.
87    pub interval: Duration,
88
89    /// Whether new or expired listen addresses of the local node should
90    /// trigger an active push of an identify message to all connected peers.
91    ///
92    /// Enabling this option can result in connected peers being informed
93    /// earlier about new or expired listen addresses of the local node,
94    /// i.e. before the next periodic identify request with each peer.
95    ///
96    /// Disabled by default.
97    pub push_listen_addr_updates: bool,
98
99    /// How many entries of discovered peers to keep before we discard
100    /// the least-recently used one.
101    ///
102    /// Disabled by default.
103    pub cache_size: usize,
104}
105
106impl Config {
107    /// Creates a new configuration for the identify [`Behaviour`] that
108    /// advertises the given protocol version and public key.
109    #[allow(deprecated)]
110    pub fn new(protocol_version: String, local_public_key: PublicKey) -> Self {
111        Self {
112            protocol_version,
113            agent_version: format!("rust-libp2p/{}", env!("CARGO_PKG_VERSION")),
114            local_public_key,
115            initial_delay: Duration::from_millis(0),
116            interval: Duration::from_secs(5 * 60),
117            push_listen_addr_updates: false,
118            cache_size: 100,
119        }
120    }
121
122    /// Configures the agent version sent to peers.
123    pub fn with_agent_version(mut self, v: String) -> Self {
124        self.agent_version = v;
125        self
126    }
127
128    /// Configures the initial delay before the first identification
129    /// request is sent on a newly established connection to a peer.
130    #[deprecated(note = "The `initial_delay` is no longer necessary and will be
131                completely removed since a remote should be able to instantly
132                answer to an identify request thus also this setter will be removed")]
133    #[allow(deprecated)]
134    pub fn with_initial_delay(mut self, d: Duration) -> Self {
135        self.initial_delay = d;
136        self
137    }
138
139    /// Configures the interval at which identification requests are
140    /// sent to peers after the initial request.
141    pub fn with_interval(mut self, d: Duration) -> Self {
142        self.interval = d;
143        self
144    }
145
146    /// Configures whether new or expired listen addresses of the local
147    /// node should trigger an active push of an identify message to all
148    /// connected peers.
149    pub fn with_push_listen_addr_updates(mut self, b: bool) -> Self {
150        self.push_listen_addr_updates = b;
151        self
152    }
153
154    /// Configures the size of the LRU cache, caching addresses of discovered peers.
155    pub fn with_cache_size(mut self, cache_size: usize) -> Self {
156        self.cache_size = cache_size;
157        self
158    }
159}
160
161impl Behaviour {
162    /// Creates a new identify [`Behaviour`].
163    pub fn new(config: Config) -> Self {
164        let discovered_peers = match NonZeroUsize::new(config.cache_size) {
165            None => PeerCache::disabled(),
166            Some(size) => PeerCache::enabled(size),
167        };
168
169        Self {
170            config,
171            connected: HashMap::new(),
172            events: VecDeque::new(),
173            discovered_peers,
174            listen_addresses: Default::default(),
175            external_addresses: Default::default(),
176        }
177    }
178
179    /// Initiates an active push of the local peer information to the given peers.
180    pub fn push<I>(&mut self, peers: I)
181    where
182        I: IntoIterator<Item = PeerId>,
183    {
184        for p in peers {
185            if !self.connected.contains_key(&p) {
186                log::debug!("Not pushing to {p} because we are not connected");
187                continue;
188            }
189
190            self.events.push_back(ToSwarm::NotifyHandler {
191                peer_id: p,
192                handler: NotifyHandler::Any,
193                event: InEvent::Push,
194            });
195        }
196    }
197
198    fn on_connection_established(
199        &mut self,
200        ConnectionEstablished {
201            peer_id,
202            connection_id: conn,
203            endpoint,
204            failed_addresses,
205            ..
206        }: ConnectionEstablished,
207    ) {
208        let addr = match endpoint {
209            ConnectedPoint::Dialer { address, .. } => address.clone(),
210            ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr.clone(),
211        };
212
213        self.connected
214            .entry(peer_id)
215            .or_default()
216            .insert(conn, addr);
217
218        if let Some(entry) = self.discovered_peers.get_mut(&peer_id) {
219            for addr in failed_addresses {
220                entry.remove(addr);
221            }
222        }
223    }
224
225    fn all_addresses(&self) -> HashSet<Multiaddr> {
226        self.listen_addresses
227            .iter()
228            .chain(self.external_addresses.iter())
229            .cloned()
230            .collect()
231    }
232}
233
234impl NetworkBehaviour for Behaviour {
235    type ConnectionHandler = Handler;
236    type ToSwarm = Event;
237
238    #[allow(deprecated)]
239    fn handle_established_inbound_connection(
240        &mut self,
241        _: ConnectionId,
242        peer: PeerId,
243        _: &Multiaddr,
244        remote_addr: &Multiaddr,
245    ) -> Result<THandler<Self>, ConnectionDenied> {
246        Ok(Handler::new(
247            self.config.initial_delay,
248            self.config.interval,
249            peer,
250            self.config.local_public_key.clone(),
251            self.config.protocol_version.clone(),
252            self.config.agent_version.clone(),
253            remote_addr.clone(),
254            self.all_addresses(),
255        ))
256    }
257
258    #[allow(deprecated)]
259    fn handle_established_outbound_connection(
260        &mut self,
261        _: ConnectionId,
262        peer: PeerId,
263        addr: &Multiaddr,
264        _: Endpoint,
265    ) -> Result<THandler<Self>, ConnectionDenied> {
266        Ok(Handler::new(
267            self.config.initial_delay,
268            self.config.interval,
269            peer,
270            self.config.local_public_key.clone(),
271            self.config.protocol_version.clone(),
272            self.config.agent_version.clone(),
273            addr.clone(), // TODO: This is weird? That is the public address we dialed, shouldn't need to tell the other party?
274            self.all_addresses(),
275        ))
276    }
277
278    fn on_connection_handler_event(
279        &mut self,
280        peer_id: PeerId,
281        _: ConnectionId,
282        event: THandlerOutEvent<Self>,
283    ) {
284        match event {
285            handler::Event::Identified(mut info) => {
286                // Remove invalid multiaddrs.
287                info.listen_addrs
288                    .retain(|addr| multiaddr_matches_peer_id(addr, &peer_id));
289
290                // Replace existing addresses to prevent other peer from filling up our memory.
291                self.discovered_peers
292                    .put(peer_id, info.listen_addrs.iter().cloned());
293
294                let observed = info.observed_addr.clone();
295                self.events
296                    .push_back(ToSwarm::GenerateEvent(Event::Received { peer_id, info }));
297                self.events
298                    .push_back(ToSwarm::NewExternalAddrCandidate(observed));
299            }
300            handler::Event::Identification => {
301                self.events
302                    .push_back(ToSwarm::GenerateEvent(Event::Sent { peer_id }));
303            }
304            handler::Event::IdentificationPushed => {
305                self.events
306                    .push_back(ToSwarm::GenerateEvent(Event::Pushed { peer_id }));
307            }
308            handler::Event::IdentificationError(error) => {
309                self.events
310                    .push_back(ToSwarm::GenerateEvent(Event::Error { peer_id, error }));
311            }
312        }
313    }
314
315    fn poll(
316        &mut self,
317        _cx: &mut Context<'_>,
318        _: &mut impl PollParameters,
319    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
320        if let Some(event) = self.events.pop_front() {
321            return Poll::Ready(event);
322        }
323
324        Poll::Pending
325    }
326
327    fn handle_pending_outbound_connection(
328        &mut self,
329        _connection_id: ConnectionId,
330        maybe_peer: Option<PeerId>,
331        _addresses: &[Multiaddr],
332        _effective_role: Endpoint,
333    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
334        let peer = match maybe_peer {
335            None => return Ok(vec![]),
336            Some(peer) => peer,
337        };
338
339        Ok(self.discovered_peers.get(&peer))
340    }
341
342    fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
343        let listen_addr_changed = self.listen_addresses.on_swarm_event(&event);
344        let external_addr_changed = self.external_addresses.on_swarm_event(&event);
345
346        if listen_addr_changed || external_addr_changed {
347            // notify all connected handlers about our changed addresses
348            let change_events = self
349                .connected
350                .iter()
351                .flat_map(|(peer, map)| map.keys().map(|id| (*peer, id)))
352                .map(|(peer_id, connection_id)| ToSwarm::NotifyHandler {
353                    peer_id,
354                    handler: NotifyHandler::One(*connection_id),
355                    event: InEvent::AddressesChanged(self.all_addresses()),
356                })
357                .collect::<Vec<_>>();
358
359            self.events.extend(change_events)
360        }
361
362        if listen_addr_changed && self.config.push_listen_addr_updates {
363            // trigger an identify push for all connected peers
364            let push_events = self.connected.keys().map(|peer| ToSwarm::NotifyHandler {
365                peer_id: *peer,
366                handler: NotifyHandler::Any,
367                event: InEvent::Push,
368            });
369
370            self.events.extend(push_events);
371        }
372
373        match event {
374            FromSwarm::ConnectionEstablished(connection_established) => {
375                self.on_connection_established(connection_established)
376            }
377            FromSwarm::ConnectionClosed(ConnectionClosed {
378                peer_id,
379                connection_id,
380                remaining_established,
381                ..
382            }) => {
383                if remaining_established == 0 {
384                    self.connected.remove(&peer_id);
385                } else if let Some(addrs) = self.connected.get_mut(&peer_id) {
386                    addrs.remove(&connection_id);
387                }
388            }
389            FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
390                if let Some(entry) = peer_id.and_then(|id| self.discovered_peers.get_mut(&id)) {
391                    if let DialError::Transport(errors) = error {
392                        for (addr, _error) in errors {
393                            entry.remove(addr);
394                        }
395                    }
396                }
397            }
398            FromSwarm::NewListenAddr(_)
399            | FromSwarm::ExpiredListenAddr(_)
400            | FromSwarm::AddressChange(_)
401            | FromSwarm::ListenFailure(_)
402            | FromSwarm::NewListener(_)
403            | FromSwarm::ListenerError(_)
404            | FromSwarm::ListenerClosed(_)
405            | FromSwarm::NewExternalAddrCandidate(_)
406            | FromSwarm::ExternalAddrExpired(_) => {}
407            FromSwarm::ExternalAddrConfirmed(_) => {}
408        }
409    }
410}
411
412/// Event emitted  by the `Identify` behaviour.
413#[allow(clippy::large_enum_variant)]
414#[derive(Debug)]
415pub enum Event {
416    /// Identification information has been received from a peer.
417    Received {
418        /// The peer that has been identified.
419        peer_id: PeerId,
420        /// The information provided by the peer.
421        info: Info,
422    },
423    /// Identification information of the local node has been sent to a peer in
424    /// response to an identification request.
425    Sent {
426        /// The peer that the information has been sent to.
427        peer_id: PeerId,
428    },
429    /// Identification information of the local node has been actively pushed to
430    /// a peer.
431    Pushed {
432        /// The peer that the information has been sent to.
433        peer_id: PeerId,
434    },
435    /// Error while attempting to identify the remote.
436    Error {
437        /// The peer with whom the error originated.
438        peer_id: PeerId,
439        /// The error that occurred.
440        error: StreamUpgradeError<UpgradeError>,
441    },
442}
443
444/// If there is a given peer_id in the multiaddr, make sure it is the same as
445/// the given peer_id. If there is no peer_id for the peer in the mutiaddr, this returns true.
446fn multiaddr_matches_peer_id(addr: &Multiaddr, peer_id: &PeerId) -> bool {
447    let last_component = addr.iter().last();
448    if let Some(multiaddr::Protocol::P2p(multi_addr_peer_id)) = last_component {
449        return multi_addr_peer_id == *peer_id;
450    }
451    true
452}
453
454struct PeerCache(Option<LruCache<PeerId, HashSet<Multiaddr>>>);
455
456impl PeerCache {
457    fn disabled() -> Self {
458        Self(None)
459    }
460
461    fn enabled(size: NonZeroUsize) -> Self {
462        Self(Some(LruCache::new(size)))
463    }
464
465    fn get_mut(&mut self, peer: &PeerId) -> Option<&mut HashSet<Multiaddr>> {
466        self.0.as_mut()?.get_mut(peer)
467    }
468
469    fn put(&mut self, peer: PeerId, addresses: impl Iterator<Item = Multiaddr>) {
470        let cache = match self.0.as_mut() {
471            None => return,
472            Some(cache) => cache,
473        };
474
475        cache.put(peer, HashSet::from_iter(addresses));
476    }
477
478    fn get(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
479        let cache = match self.0.as_mut() {
480            None => return Vec::new(),
481            Some(cache) => cache,
482        };
483
484        cache
485            .get(peer)
486            .cloned()
487            .map(Vec::from_iter)
488            .unwrap_or_default()
489    }
490}
491
492#[cfg(test)]
493mod tests {
494    use super::*;
495
496    #[test]
497    fn check_multiaddr_matches_peer_id() {
498        let peer_id = PeerId::random();
499        let other_peer_id = PeerId::random();
500        let mut addr: Multiaddr = "/ip4/147.75.69.143/tcp/4001"
501            .parse()
502            .expect("failed to parse multiaddr");
503
504        let addr_without_peer_id: Multiaddr = addr.clone();
505        let mut addr_with_other_peer_id = addr.clone();
506
507        addr.push(multiaddr::Protocol::P2p(peer_id));
508        addr_with_other_peer_id.push(multiaddr::Protocol::P2p(other_peer_id));
509
510        assert!(multiaddr_matches_peer_id(&addr, &peer_id));
511        assert!(!multiaddr_matches_peer_id(
512            &addr_with_other_peer_id,
513            &peer_id
514        ));
515        assert!(multiaddr_matches_peer_id(&addr_without_peer_id, &peer_id));
516    }
517}