libp2p_swarm/behaviour/
peer_addresses.rs

1use crate::behaviour::FromSwarm;
2use crate::{DialError, DialFailure, NewExternalAddrOfPeer};
3
4use libp2p_core::Multiaddr;
5use libp2p_identity::PeerId;
6
7use lru::LruCache;
8
9use std::num::NonZeroUsize;
10
11/// Struct for tracking peers' external addresses of the [`Swarm`](crate::Swarm).
12#[derive(Debug)]
13pub struct PeerAddresses(LruCache<PeerId, LruCache<Multiaddr, ()>>);
14
15impl PeerAddresses {
16    /// Creates a [`PeerAddresses`] cache with capacity for the given number of peers.
17    ///
18    /// For each peer, we will at most store 10 addresses.
19    pub fn new(number_of_peers: NonZeroUsize) -> Self {
20        Self(LruCache::new(number_of_peers))
21    }
22
23    /// Feed a [`FromSwarm`] event to this struct.
24    ///
25    /// Returns whether the event changed peer's known external addresses.
26    pub fn on_swarm_event(&mut self, event: &FromSwarm) -> bool {
27        match event {
28            FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer { peer_id, addr }) => {
29                self.add(*peer_id, (*addr).clone())
30            }
31            FromSwarm::DialFailure(DialFailure {
32                peer_id: Some(peer_id),
33                error: DialError::Transport(errors),
34                ..
35            }) => {
36                for (addr, _error) in errors {
37                    self.remove(peer_id, addr);
38                }
39                true
40            }
41            _ => false,
42        }
43    }
44
45    /// Adds address to cache.
46    /// Appends address to the existing set if peer addresses already exist.
47    /// Creates a new cache entry for peer_id if no addresses are present.
48    /// Returns true if the newly added address was not previously in the cache.
49    ///
50    pub fn add(&mut self, peer: PeerId, address: Multiaddr) -> bool {
51        match prepare_addr(&peer, &address) {
52            Ok(address) => {
53                if let Some(cached) = self.0.get_mut(&peer) {
54                    cached.put(address, ()).is_none()
55                } else {
56                    let mut set = LruCache::new(NonZeroUsize::new(10).expect("10 > 0"));
57                    set.put(address, ());
58                    self.0.put(peer, set);
59
60                    true
61                }
62            }
63            Err(_) => false,
64        }
65    }
66
67    /// Returns peer's external addresses.
68    pub fn get(&mut self, peer: &PeerId) -> impl Iterator<Item = Multiaddr> + '_ {
69        self.0
70            .get(peer)
71            .into_iter()
72            .flat_map(|c| c.iter().map(|(m, ())| m))
73            .cloned()
74    }
75
76    /// Removes address from peer addresses cache.
77    /// Returns true if the address was removed.
78    pub fn remove(&mut self, peer: &PeerId, address: &Multiaddr) -> bool {
79        match self.0.get_mut(peer) {
80            Some(addrs) => match prepare_addr(peer, address) {
81                Ok(address) => addrs.pop(&address).is_some(),
82                Err(_) => false,
83            },
84            None => false,
85        }
86    }
87}
88
89fn prepare_addr(peer: &PeerId, addr: &Multiaddr) -> Result<Multiaddr, Multiaddr> {
90    addr.clone().with_p2p(*peer)
91}
92
93impl Default for PeerAddresses {
94    fn default() -> Self {
95        Self(LruCache::new(NonZeroUsize::new(100).unwrap()))
96    }
97}
98
99#[cfg(test)]
100mod tests {
101    use super::*;
102    use std::io;
103
104    use crate::ConnectionId;
105    use libp2p_core::{
106        multiaddr::Protocol,
107        transport::{memory::MemoryTransportError, TransportError},
108    };
109
110    use once_cell::sync::Lazy;
111
112    #[test]
113    fn new_peer_addr_returns_correct_changed_value() {
114        let mut cache = PeerAddresses::default();
115        let peer_id = PeerId::random();
116
117        let event = new_external_addr_of_peer1(peer_id);
118
119        let changed = cache.on_swarm_event(&event);
120        assert!(changed);
121
122        let changed = cache.on_swarm_event(&event);
123        assert!(!changed);
124    }
125
126    #[test]
127    fn new_peer_addr_saves_peer_addrs() {
128        let mut cache = PeerAddresses::default();
129        let peer_id = PeerId::random();
130        let event = new_external_addr_of_peer1(peer_id);
131
132        let changed = cache.on_swarm_event(&event);
133        assert!(changed);
134
135        let addr1 = MEMORY_ADDR_1000.clone().with_p2p(peer_id).unwrap();
136        let expected = cache.get(&peer_id).collect::<Vec<Multiaddr>>();
137        assert_eq!(expected, vec![addr1]);
138
139        let event = new_external_addr_of_peer2(peer_id);
140        let changed = cache.on_swarm_event(&event);
141
142        let addr1 = MEMORY_ADDR_1000.clone().with_p2p(peer_id).unwrap();
143        let addr2 = MEMORY_ADDR_2000.clone().with_p2p(peer_id).unwrap();
144
145        let expected_addrs = cache.get(&peer_id).collect::<Vec<Multiaddr>>();
146        assert!(expected_addrs.contains(&addr1));
147        assert!(expected_addrs.contains(&addr2));
148
149        let expected = cache.get(&peer_id).collect::<Vec<Multiaddr>>().len();
150        assert_eq!(expected, 2);
151
152        assert!(changed);
153    }
154
155    #[test]
156    fn existing_addr_is_not_added_to_cache() {
157        let mut cache = PeerAddresses::default();
158        let peer_id = PeerId::random();
159
160        let event = new_external_addr_of_peer1(peer_id);
161
162        let addr1 = MEMORY_ADDR_1000.clone().with_p2p(peer_id).unwrap();
163        let changed = cache.on_swarm_event(&event);
164        let expected = cache.get(&peer_id).collect::<Vec<Multiaddr>>();
165        assert!(changed);
166        assert_eq!(expected, vec![addr1]);
167
168        let addr1 = MEMORY_ADDR_1000.clone().with_p2p(peer_id).unwrap();
169        let changed = cache.on_swarm_event(&event);
170        let expected = cache.get(&peer_id).collect::<Vec<Multiaddr>>();
171        assert!(!changed);
172        assert_eq!(expected, [addr1]);
173    }
174
175    #[test]
176    fn addresses_of_peer_are_removed_when_received_dial_failure() {
177        let mut cache = PeerAddresses::default();
178        let peer_id = PeerId::random();
179
180        let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap();
181        let addr2: Multiaddr = "/ip4/127.0.0.1/tcp/8081".parse().unwrap();
182        let addr3: Multiaddr = "/ip4/127.0.0.1/tcp/8082".parse().unwrap();
183
184        cache.add(peer_id, addr.clone());
185        cache.add(peer_id, addr2.clone());
186        cache.add(peer_id, addr3.clone());
187
188        let error = DialError::Transport(prepare_errors(vec![addr, addr3]));
189
190        let event = FromSwarm::DialFailure(DialFailure {
191            peer_id: Some(peer_id),
192            error: &error,
193            connection_id: ConnectionId::new_unchecked(8),
194        });
195
196        let changed = cache.on_swarm_event(&event);
197
198        assert!(changed);
199
200        let cached = cache.get(&peer_id).collect::<Vec<Multiaddr>>();
201        let expected = prepare_expected_addrs(peer_id, [addr2].into_iter());
202
203        assert_eq!(cached, expected);
204    }
205
206    #[test]
207    fn remove_removes_address_if_present() {
208        let mut cache = PeerAddresses::default();
209        let peer_id = PeerId::random();
210        let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap();
211
212        cache.add(peer_id, addr.clone());
213
214        assert!(cache.remove(&peer_id, &addr));
215    }
216
217    #[test]
218    fn remove_returns_false_if_address_not_present() {
219        let mut cache = PeerAddresses::default();
220        let peer_id = PeerId::random();
221        let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap();
222
223        assert!(!cache.remove(&peer_id, &addr));
224    }
225
226    #[test]
227    fn remove_returns_false_if_peer_not_present() {
228        let mut cache = PeerAddresses::default();
229        let peer_id = PeerId::random();
230        let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap();
231
232        assert!(!cache.remove(&peer_id, &addr));
233    }
234
235    #[test]
236    fn remove_removes_address_provided_in_param() {
237        let mut cache = PeerAddresses::default();
238        let peer_id = PeerId::random();
239        let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap();
240        let addr2: Multiaddr = "/ip4/127.0.0.1/tcp/8081".parse().unwrap();
241        let addr3: Multiaddr = "/ip4/127.0.0.1/tcp/8082".parse().unwrap();
242
243        cache.add(peer_id, addr.clone());
244        cache.add(peer_id, addr2.clone());
245        cache.add(peer_id, addr3.clone());
246
247        assert!(cache.remove(&peer_id, &addr2));
248
249        let mut cached = cache.get(&peer_id).collect::<Vec<Multiaddr>>();
250        cached.sort();
251
252        let expected = prepare_expected_addrs(peer_id, [addr, addr3].into_iter());
253
254        assert_eq!(cached, expected);
255    }
256
257    #[test]
258    fn add_adds_new_address_to_cache() {
259        let mut cache = PeerAddresses::default();
260        let peer_id = PeerId::random();
261        let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap();
262
263        assert!(cache.add(peer_id, addr.clone()));
264
265        let mut cached = cache.get(&peer_id).collect::<Vec<Multiaddr>>();
266        cached.sort();
267        let expected = prepare_expected_addrs(peer_id, [addr].into_iter());
268
269        assert_eq!(cached, expected);
270    }
271
272    #[test]
273    fn add_adds_address_to_cache_to_existing_key() {
274        let mut cache = PeerAddresses::default();
275        let peer_id = PeerId::random();
276        let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap();
277        let addr2: Multiaddr = "/ip4/127.0.0.1/tcp/8081".parse().unwrap();
278        let addr3: Multiaddr = "/ip4/127.0.0.1/tcp/8082".parse().unwrap();
279
280        assert!(cache.add(peer_id, addr.clone()));
281
282        cache.add(peer_id, addr2.clone());
283        cache.add(peer_id, addr3.clone());
284
285        let expected = prepare_expected_addrs(peer_id, [addr, addr2, addr3].into_iter());
286
287        let mut cached = cache.get(&peer_id).collect::<Vec<Multiaddr>>();
288        cached.sort();
289
290        assert_eq!(cached, expected);
291    }
292
293    fn prepare_expected_addrs(
294        peer_id: PeerId,
295        addrs: impl Iterator<Item = Multiaddr>,
296    ) -> Vec<Multiaddr> {
297        let mut addrs = addrs
298            .filter_map(|a| a.with_p2p(peer_id).ok())
299            .collect::<Vec<Multiaddr>>();
300        addrs.sort();
301        addrs
302    }
303
304    fn new_external_addr_of_peer1(peer_id: PeerId) -> FromSwarm<'static> {
305        FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
306            peer_id,
307            addr: &MEMORY_ADDR_1000,
308        })
309    }
310
311    fn new_external_addr_of_peer2(peer_id: PeerId) -> FromSwarm<'static> {
312        FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
313            peer_id,
314            addr: &MEMORY_ADDR_2000,
315        })
316    }
317
318    fn prepare_errors(addrs: Vec<Multiaddr>) -> Vec<(Multiaddr, TransportError<io::Error>)> {
319        let errors: Vec<(Multiaddr, TransportError<io::Error>)> = addrs
320            .iter()
321            .map(|addr| {
322                (
323                    addr.clone(),
324                    TransportError::Other(io::Error::new(
325                        io::ErrorKind::Other,
326                        MemoryTransportError::Unreachable,
327                    )),
328                )
329            })
330            .collect();
331        errors
332    }
333
334    static MEMORY_ADDR_1000: Lazy<Multiaddr> =
335        Lazy::new(|| Multiaddr::empty().with(Protocol::Memory(1000)));
336    static MEMORY_ADDR_2000: Lazy<Multiaddr> =
337        Lazy::new(|| Multiaddr::empty().with(Protocol::Memory(2000)));
338}