litep2p/protocol/libp2p/kademlia/
routing_table.rs1use crate::{
25 protocol::libp2p::kademlia::{
26 bucket::{KBucket, KBucketEntry},
27 types::{ConnectionType, Distance, KademliaPeer, Key, U256},
28 },
29 PeerId,
30};
31
32use multiaddr::{Multiaddr, Protocol};
33use multihash::Multihash;
34
35const NUM_BUCKETS: usize = 256;
37
38const LOG_TARGET: &str = "litep2p::ipfs::kademlia::routing_table";
40
41pub struct RoutingTable {
42 local_key: Key<PeerId>,
44
45 buckets: Vec<KBucket>,
47}
48
49#[derive(Debug, Copy, Clone, PartialEq, Eq)]
52struct BucketIndex(usize);
53
54impl BucketIndex {
55 fn new(d: &Distance) -> Option<BucketIndex> {
63 d.ilog2().map(|i| BucketIndex(i as usize))
64 }
65
66 fn get(&self) -> usize {
68 self.0
69 }
70
71 fn _range(&self) -> (Distance, Distance) {
74 let min = Distance(U256::pow(U256::from(2), U256::from(self.0)));
75 if self.0 == usize::from(u8::MAX) {
76 (min, Distance(U256::MAX))
77 } else {
78 let max = Distance(U256::pow(U256::from(2), U256::from(self.0 + 1)) - 1);
79 (min, max)
80 }
81 }
82
83 #[cfg(test)]
85 fn rand_distance(&self, rng: &mut impl rand::Rng) -> Distance {
86 let mut bytes = [0u8; 32];
87 let quot = self.0 / 8;
88 for i in 0..quot {
89 bytes[31 - i] = rng.gen();
90 }
91 let rem = (self.0 % 8) as u32;
92 let lower = usize::pow(2, rem);
93 let upper = usize::pow(2, rem + 1);
94 bytes[31 - quot] = rng.gen_range(lower..upper) as u8;
95 Distance(U256::from(bytes))
96 }
97}
98
99impl RoutingTable {
100 pub fn new(local_key: Key<PeerId>) -> Self {
102 RoutingTable {
103 local_key,
104 buckets: (0..NUM_BUCKETS).map(|_| KBucket::new()).collect(),
105 }
106 }
107
108 pub fn _local_key(&self) -> &Key<PeerId> {
110 &self.local_key
111 }
112
113 pub fn entry(&mut self, key: Key<PeerId>) -> KBucketEntry<'_> {
115 let Some(index) = BucketIndex::new(&self.local_key.distance(&key)) else {
116 return KBucketEntry::LocalNode;
117 };
118
119 self.buckets[index.get()].entry(key)
120 }
121
122 pub fn add_known_peer(
132 &mut self,
133 peer: PeerId,
134 addresses: Vec<Multiaddr>,
135 connection: ConnectionType,
136 ) {
137 tracing::trace!(
138 target: LOG_TARGET,
139 ?peer,
140 ?addresses,
141 ?connection,
142 "add known peer"
143 );
144
145 let addresses: Vec<Multiaddr> = addresses
147 .into_iter()
148 .filter_map(|address| {
149 let last = address.iter().last();
150 if std::matches!(last, Some(Protocol::P2p(_))) {
151 Some(address)
152 } else {
153 Some(address.with(Protocol::P2p(Multihash::from_bytes(&peer.to_bytes()).ok()?)))
154 }
155 })
156 .collect();
157
158 if addresses.is_empty() {
159 tracing::debug!(
160 target: LOG_TARGET,
161 ?peer,
162 "tried to add zero addresses to the routing table"
163 );
164 return;
165 }
166
167 match self.entry(Key::from(peer)) {
168 KBucketEntry::Occupied(entry) => {
169 entry.addresses = addresses;
170 entry.connection = connection;
171 }
172 mut entry @ KBucketEntry::Vacant(_) => {
173 entry.insert(KademliaPeer::new(peer, addresses, connection));
174 }
175 KBucketEntry::LocalNode => tracing::warn!(
176 target: LOG_TARGET,
177 ?peer,
178 "tried to add local node to routing table",
179 ),
180 KBucketEntry::NoSlot => tracing::trace!(
181 target: LOG_TARGET,
182 ?peer,
183 "routing table full, cannot add new entry",
184 ),
185 }
186 }
187
188 pub fn closest<K: Clone>(&mut self, target: Key<K>, limit: usize) -> Vec<KademliaPeer> {
190 ClosestBucketsIter::new(self.local_key.distance(&target))
191 .flat_map(|index| self.buckets[index.get()].closest_iter(&target))
192 .take(limit)
193 .collect()
194 }
195}
196
197struct ClosestBucketsIter {
205 distance: Distance,
207 state: ClosestBucketsIterState,
209}
210
211enum ClosestBucketsIterState {
213 Start(BucketIndex),
216 ZoomIn(BucketIndex),
222 ZoomOut(BucketIndex),
228 Done,
230}
231
232impl ClosestBucketsIter {
233 fn new(distance: Distance) -> Self {
234 let state = match BucketIndex::new(&distance) {
235 Some(i) => ClosestBucketsIterState::Start(i),
236 None => ClosestBucketsIterState::Start(BucketIndex(0)),
237 };
238 Self { distance, state }
239 }
240
241 fn next_in(&self, i: BucketIndex) -> Option<BucketIndex> {
242 (0..i.get())
243 .rev()
244 .find_map(|i| self.distance.0.bit(i).then_some(BucketIndex(i)))
245 }
246
247 fn next_out(&self, i: BucketIndex) -> Option<BucketIndex> {
248 (i.get() + 1..NUM_BUCKETS).find_map(|i| (!self.distance.0.bit(i)).then_some(BucketIndex(i)))
249 }
250}
251
252impl Iterator for ClosestBucketsIter {
253 type Item = BucketIndex;
254
255 fn next(&mut self) -> Option<Self::Item> {
256 match self.state {
257 ClosestBucketsIterState::Start(i) => {
258 self.state = ClosestBucketsIterState::ZoomIn(i);
259 Some(i)
260 }
261 ClosestBucketsIterState::ZoomIn(i) =>
262 if let Some(i) = self.next_in(i) {
263 self.state = ClosestBucketsIterState::ZoomIn(i);
264 Some(i)
265 } else {
266 let i = BucketIndex(0);
267 self.state = ClosestBucketsIterState::ZoomOut(i);
268 Some(i)
269 },
270 ClosestBucketsIterState::ZoomOut(i) =>
271 if let Some(i) = self.next_out(i) {
272 self.state = ClosestBucketsIterState::ZoomOut(i);
273 Some(i)
274 } else {
275 self.state = ClosestBucketsIterState::Done;
276 None
277 },
278 ClosestBucketsIterState::Done => None,
279 }
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286 use crate::protocol::libp2p::kademlia::types::ConnectionType;
287
288 #[test]
289 fn closest_peers() {
290 let own_peer_id = PeerId::random();
291 let own_key = Key::from(own_peer_id);
292 let mut table = RoutingTable::new(own_key.clone());
293
294 for _ in 0..60 {
295 let peer = PeerId::random();
296 let key = Key::from(peer);
297 let mut entry = table.entry(key.clone());
298 entry.insert(KademliaPeer::new(peer, vec![], ConnectionType::Connected));
299 }
300
301 let target = Key::from(PeerId::random());
302 let closest = table.closest(target.clone(), 60usize);
303 let mut prev = None;
304
305 for peer in &closest {
306 if let Some(value) = prev {
307 assert!(value < target.distance(&peer.key));
308 }
309
310 prev = Some(target.distance(&peer.key));
311 }
312 }
313
314 fn random_peer(
318 rng: &mut impl rand::Rng,
319 own_key: Key<PeerId>,
320 bucket_index: usize,
321 ) -> (Key<PeerId>, PeerId) {
322 let peer = PeerId::random();
323 let distance = BucketIndex(bucket_index).rand_distance(rng);
324 let key_bytes = own_key.for_distance(distance);
325
326 (Key::from_bytes(key_bytes, peer), peer)
327 }
328
329 #[test]
330 fn add_peer_to_empty_table() {
331 let own_peer_id = PeerId::random();
332 let own_key = Key::from(own_peer_id);
333 let mut table = RoutingTable::new(own_key.clone());
334
335 assert_eq!(table.entry(own_key), KBucketEntry::LocalNode);
337
338 let peer = PeerId::random();
339 let key = Key::from(peer);
340 let mut test = table.entry(key.clone());
341 let addresses = vec![];
342
343 assert!(std::matches!(test, KBucketEntry::Vacant(_)));
344 test.insert(KademliaPeer::new(
345 peer,
346 addresses.clone(),
347 ConnectionType::Connected,
348 ));
349
350 assert_eq!(
351 table.entry(key.clone()),
352 KBucketEntry::Occupied(&mut KademliaPeer::new(
353 peer,
354 addresses.clone(),
355 ConnectionType::Connected,
356 ))
357 );
358
359 match table.entry(key.clone()) {
360 KBucketEntry::Occupied(entry) => {
361 entry.connection = ConnectionType::NotConnected;
362 }
363 state => panic!("invalid state for `KBucketEntry`: {state:?}"),
364 }
365
366 assert_eq!(
367 table.entry(key.clone()),
368 KBucketEntry::Occupied(&mut KademliaPeer::new(
369 peer,
370 addresses,
371 ConnectionType::NotConnected,
372 ))
373 );
374 }
375
376 #[test]
377 fn full_k_bucket() {
378 let mut rng = rand::thread_rng();
379 let own_peer_id = PeerId::random();
380 let own_key = Key::from(own_peer_id);
381 let mut table = RoutingTable::new(own_key.clone());
382
383 for _ in 0..20 {
385 let (key, peer) = random_peer(&mut rng, own_key.clone(), 254);
386 let mut entry = table.entry(key.clone());
387
388 assert!(std::matches!(entry, KBucketEntry::Vacant(_)));
389 entry.insert(KademliaPeer::new(peer, vec![], ConnectionType::Connected));
390 }
391
392 let peer = PeerId::random();
395 let distance = BucketIndex(254).rand_distance(&mut rng);
396 let key_bytes = own_key.for_distance(distance);
397 let key = Key::from_bytes(key_bytes, peer);
398
399 let entry = table.entry(key.clone());
400 assert!(std::matches!(entry, KBucketEntry::NoSlot));
401 }
402
403 #[test]
404 #[ignore]
405 fn peer_disconnects_and_is_evicted() {
406 let mut rng = rand::thread_rng();
407 let own_peer_id = PeerId::random();
408 let own_key = Key::from(own_peer_id);
409 let mut table = RoutingTable::new(own_key.clone());
410
411 let peers = (0..20)
413 .map(|_| {
414 let (key, peer) = random_peer(&mut rng, own_key.clone(), 253);
415 let mut entry = table.entry(key.clone());
416
417 assert!(std::matches!(entry, KBucketEntry::Vacant(_)));
418 entry.insert(KademliaPeer::new(peer, vec![], ConnectionType::Connected));
419
420 (peer, key)
421 })
422 .collect::<Vec<_>>();
423
424 let peer = PeerId::random();
427 let distance = BucketIndex(253).rand_distance(&mut rng);
428 let key_bytes = own_key.for_distance(distance);
429 let key = Key::from_bytes(key_bytes, peer);
430
431 let entry = table.entry(key.clone());
432 assert!(std::matches!(entry, KBucketEntry::NoSlot));
433
434 match table.entry(peers[3].1.clone()) {
436 KBucketEntry::Occupied(entry) => {
437 entry.connection = ConnectionType::NotConnected;
438 }
439 _ => panic!("invalid state for node"),
440 }
441
442 let mut entry = table.entry(key.clone());
444 assert!(std::matches!(entry, KBucketEntry::Vacant(_)));
445 entry.insert(KademliaPeer::new(
446 peer,
447 vec!["/ip6/::1/tcp/8888".parse().unwrap()],
448 ConnectionType::CanConnect,
449 ));
450
451 let entry = table.entry(key.clone());
453 let addresses = vec!["/ip6/::1/tcp/8888".parse().unwrap()];
454 assert_eq!(
455 entry,
456 KBucketEntry::Occupied(&mut KademliaPeer::new(
457 peer,
458 addresses,
459 ConnectionType::CanConnect,
460 ))
461 );
462 }
463
464 #[test]
465 fn disconnected_peers_are_not_evicted_if_there_is_capacity() {
466 let mut rng = rand::thread_rng();
467 let own_peer_id = PeerId::random();
468 let own_key = Key::from(own_peer_id);
469 let mut table = RoutingTable::new(own_key.clone());
470
471 let _peers = (0..19)
473 .map(|_| {
474 let (key, peer) = random_peer(&mut rng, own_key.clone(), 252);
475 let mut entry = table.entry(key.clone());
476
477 assert!(std::matches!(entry, KBucketEntry::Vacant(_)));
478 entry.insert(KademliaPeer::new(
479 peer,
480 vec![],
481 ConnectionType::NotConnected,
482 ));
483
484 (peer, key)
485 })
486 .collect::<Vec<_>>();
487
488 let peer = PeerId::random();
491 let distance = BucketIndex(252).rand_distance(&mut rng);
492 let key_bytes = own_key.for_distance(distance);
493 let key = Key::from_bytes(key_bytes, peer);
494
495 let mut entry = table.entry(key.clone());
496 assert!(std::matches!(entry, KBucketEntry::Vacant(_)));
497 entry.insert(KademliaPeer::new(
498 peer,
499 vec!["/ip6/::1/tcp/8888".parse().unwrap()],
500 ConnectionType::CanConnect,
501 ));
502 }
503
504 #[test]
505 fn closest_buckets_iterator_set_lsb() {
506 let d = Distance(U256::from(0b10011011));
508 let mut iter = ClosestBucketsIter::new(d);
509 let expected_buckets = vec![7, 4, 3, 1, 0, 0, 2, 5, 6]
514 .into_iter()
515 .chain(8..=255)
516 .map(|i| BucketIndex(i));
517 for expected in expected_buckets {
518 let got = iter.next().unwrap();
519 assert_eq!(got, expected);
520 }
521 assert!(iter.next().is_none());
522 }
523
524 #[test]
525 fn closest_buckets_iterator_unset_lsb() {
526 let d = Distance(U256::from(0b01011010));
528 let mut iter = ClosestBucketsIter::new(d);
529 let expected_buckets =
530 vec![6, 4, 3, 1, 0, 2, 5, 7].into_iter().chain(8..=255).map(|i| BucketIndex(i));
531 for expected in expected_buckets {
532 let got = iter.next().unwrap();
533 assert_eq!(got, expected);
534 }
535 assert!(iter.next().is_none());
536 }
537}