1mod bucket;
70mod entry;
71#[allow(clippy::ptr_offset_with_cast)]
72#[allow(clippy::assign_op_pattern)]
73mod key;
74
75pub use bucket::NodeStatus;
76pub use entry::*;
77
78use arrayvec::{self, ArrayVec};
79use bucket::KBucket;
80use std::collections::VecDeque;
81use std::time::{Duration, Instant};
82
83const NUM_BUCKETS: usize = 256;
85
86#[derive(Debug, Clone)]
88pub(crate) struct KBucketsTable<TKey, TVal> {
89 local_key: TKey,
91 buckets: Vec<KBucket<TKey, TVal>>,
93 applied_pending: VecDeque<AppliedPending<TKey, TVal>>,
96}
97
98#[derive(Debug, Copy, Clone, PartialEq, Eq)]
101struct BucketIndex(usize);
102
103impl BucketIndex {
104 fn new(d: &Distance) -> Option<BucketIndex> {
112 d.ilog2().map(|i| BucketIndex(i as usize))
113 }
114
115 fn get(&self) -> usize {
117 self.0
118 }
119
120 fn range(&self) -> (Distance, Distance) {
123 let min = Distance(U256::pow(U256::from(2), U256::from(self.0)));
124 if self.0 == usize::from(u8::MAX) {
125 (min, Distance(U256::MAX))
126 } else {
127 let max = Distance(U256::pow(U256::from(2), U256::from(self.0 + 1)) - 1);
128 (min, max)
129 }
130 }
131
132 fn rand_distance(&self, rng: &mut impl rand::Rng) -> Distance {
134 let mut bytes = [0u8; 32];
135 let quot = self.0 / 8;
136 for i in 0..quot {
137 bytes[31 - i] = rng.gen();
138 }
139 let rem = (self.0 % 8) as u32;
140 let lower = usize::pow(2, rem);
141 let upper = usize::pow(2, rem + 1);
142 bytes[31 - quot] = rng.gen_range(lower..upper) as u8;
143 Distance(U256::from(bytes))
144 }
145}
146
147impl<TKey, TVal> KBucketsTable<TKey, TVal>
148where
149 TKey: Clone + AsRef<KeyBytes>,
150 TVal: Clone,
151{
152 pub(crate) fn new(local_key: TKey, pending_timeout: Duration) -> Self {
159 KBucketsTable {
160 local_key,
161 buckets: (0..NUM_BUCKETS)
162 .map(|_| KBucket::new(pending_timeout))
163 .collect(),
164 applied_pending: VecDeque::new(),
165 }
166 }
167
168 pub(crate) fn local_key(&self) -> &TKey {
170 &self.local_key
171 }
172
173 pub(crate) fn entry<'a>(&'a mut self, key: &'a TKey) -> Entry<'a, TKey, TVal> {
176 let index = BucketIndex::new(&self.local_key.as_ref().distance(key));
177 if let Some(i) = index {
178 let bucket = &mut self.buckets[i.get()];
179 if let Some(applied) = bucket.apply_pending() {
180 self.applied_pending.push_back(applied)
181 }
182 Entry::new(bucket, key)
183 } else {
184 Entry::SelfEntry
185 }
186 }
187
188 pub(crate) fn iter(&mut self) -> impl Iterator<Item = KBucketRef<'_, TKey, TVal>> + '_ {
193 let applied_pending = &mut self.applied_pending;
194 self.buckets.iter_mut().enumerate().map(move |(i, b)| {
195 if let Some(applied) = b.apply_pending() {
196 applied_pending.push_back(applied)
197 }
198 KBucketRef {
199 index: BucketIndex(i),
200 bucket: b,
201 }
202 })
203 }
204
205 pub(crate) fn bucket<K>(&mut self, key: &K) -> Option<KBucketRef<'_, TKey, TVal>>
209 where
210 K: AsRef<KeyBytes>,
211 {
212 let d = self.local_key.as_ref().distance(key);
213 if let Some(index) = BucketIndex::new(&d) {
214 let bucket = &mut self.buckets[index.0];
215 if let Some(applied) = bucket.apply_pending() {
216 self.applied_pending.push_back(applied)
217 }
218 Some(KBucketRef { bucket, index })
219 } else {
220 None
221 }
222 }
223
224 pub(crate) fn take_applied_pending(&mut self) -> Option<AppliedPending<TKey, TVal>> {
237 self.applied_pending.pop_front()
238 }
239
240 pub(crate) fn closest_keys<'a, T>(
243 &'a mut self,
244 target: &'a T,
245 ) -> impl Iterator<Item = TKey> + 'a
246 where
247 T: AsRef<KeyBytes>,
248 {
249 let distance = self.local_key.as_ref().distance(target);
250 ClosestIter {
251 target,
252 iter: None,
253 table: self,
254 buckets_iter: ClosestBucketsIter::new(distance),
255 fmap: |b: &KBucket<TKey, _>| -> ArrayVec<_, { K_VALUE.get() }> {
256 b.iter().map(|(n, _)| n.key.clone()).collect()
257 },
258 }
259 }
260
261 pub(crate) fn closest<'a, T>(
264 &'a mut self,
265 target: &'a T,
266 ) -> impl Iterator<Item = EntryView<TKey, TVal>> + 'a
267 where
268 T: Clone + AsRef<KeyBytes>,
269 TVal: Clone,
270 {
271 let distance = self.local_key.as_ref().distance(target);
272 ClosestIter {
273 target,
274 iter: None,
275 table: self,
276 buckets_iter: ClosestBucketsIter::new(distance),
277 fmap: |b: &KBucket<_, TVal>| -> ArrayVec<_, { K_VALUE.get() }> {
278 b.iter()
279 .map(|(n, status)| EntryView {
280 node: n.clone(),
281 status,
282 })
283 .collect()
284 },
285 }
286 }
287
288 pub(crate) fn count_nodes_between<T>(&mut self, target: &T) -> usize
294 where
295 T: AsRef<KeyBytes>,
296 {
297 let local_key = self.local_key.clone();
298 let distance = target.as_ref().distance(&local_key);
299 let mut iter = ClosestBucketsIter::new(distance).take_while(|i| i.get() != 0);
300 if let Some(i) = iter.next() {
301 let num_first = self.buckets[i.get()]
302 .iter()
303 .filter(|(n, _)| n.key.as_ref().distance(&local_key) <= distance)
304 .count();
305 let num_rest: usize = iter.map(|i| self.buckets[i.get()].num_entries()).sum();
306 num_first + num_rest
307 } else {
308 0
309 }
310 }
311}
312
313struct ClosestIter<'a, TTarget, TKey, TVal, TMap, TOut> {
316 target: &'a TTarget,
321 table: &'a mut KBucketsTable<TKey, TVal>,
323 buckets_iter: ClosestBucketsIter,
326 iter: Option<arrayvec::IntoIter<TOut, { K_VALUE.get() }>>,
328 fmap: TMap,
331}
332
333struct ClosestBucketsIter {
337 distance: Distance,
339 state: ClosestBucketsIterState,
341}
342
343enum ClosestBucketsIterState {
345 Start(BucketIndex),
348 ZoomIn(BucketIndex),
354 ZoomOut(BucketIndex),
360 Done,
362}
363
364impl ClosestBucketsIter {
365 fn new(distance: Distance) -> Self {
366 let state = match BucketIndex::new(&distance) {
367 Some(i) => ClosestBucketsIterState::Start(i),
368 None => ClosestBucketsIterState::Start(BucketIndex(0)),
369 };
370 Self { distance, state }
371 }
372
373 fn next_in(&self, i: BucketIndex) -> Option<BucketIndex> {
374 (0..i.get()).rev().find_map(|i| {
375 if self.distance.0.bit(i) {
376 Some(BucketIndex(i))
377 } else {
378 None
379 }
380 })
381 }
382
383 fn next_out(&self, i: BucketIndex) -> Option<BucketIndex> {
384 (i.get() + 1..NUM_BUCKETS).find_map(|i| {
385 if !self.distance.0.bit(i) {
386 Some(BucketIndex(i))
387 } else {
388 None
389 }
390 })
391 }
392}
393
394impl Iterator for ClosestBucketsIter {
395 type Item = BucketIndex;
396
397 fn next(&mut self) -> Option<Self::Item> {
398 match self.state {
399 ClosestBucketsIterState::Start(i) => {
400 self.state = ClosestBucketsIterState::ZoomIn(i);
401 Some(i)
402 }
403 ClosestBucketsIterState::ZoomIn(i) => {
404 if let Some(i) = self.next_in(i) {
405 self.state = ClosestBucketsIterState::ZoomIn(i);
406 Some(i)
407 } else {
408 let i = BucketIndex(0);
409 self.state = ClosestBucketsIterState::ZoomOut(i);
410 Some(i)
411 }
412 }
413 ClosestBucketsIterState::ZoomOut(i) => {
414 if let Some(i) = self.next_out(i) {
415 self.state = ClosestBucketsIterState::ZoomOut(i);
416 Some(i)
417 } else {
418 self.state = ClosestBucketsIterState::Done;
419 None
420 }
421 }
422 ClosestBucketsIterState::Done => None,
423 }
424 }
425}
426
427impl<TTarget, TKey, TVal, TMap, TOut> Iterator for ClosestIter<'_, TTarget, TKey, TVal, TMap, TOut>
428where
429 TTarget: AsRef<KeyBytes>,
430 TKey: Clone + AsRef<KeyBytes>,
431 TVal: Clone,
432 TMap: Fn(&KBucket<TKey, TVal>) -> ArrayVec<TOut, { K_VALUE.get() }>,
433 TOut: AsRef<KeyBytes>,
434{
435 type Item = TOut;
436
437 fn next(&mut self) -> Option<Self::Item> {
438 loop {
439 match &mut self.iter {
440 Some(iter) => match iter.next() {
441 Some(k) => return Some(k),
442 None => self.iter = None,
443 },
444 None => {
445 if let Some(i) = self.buckets_iter.next() {
446 let bucket = &mut self.table.buckets[i.get()];
447 if let Some(applied) = bucket.apply_pending() {
448 self.table.applied_pending.push_back(applied)
449 }
450 let mut v = (self.fmap)(bucket);
451 v.sort_by(|a, b| {
452 self.target
453 .as_ref()
454 .distance(a.as_ref())
455 .cmp(&self.target.as_ref().distance(b.as_ref()))
456 });
457 self.iter = Some(v.into_iter());
458 } else {
459 return None;
460 }
461 }
462 }
463 }
464 }
465}
466
467pub struct KBucketRef<'a, TKey, TVal> {
469 index: BucketIndex,
470 bucket: &'a mut KBucket<TKey, TVal>,
471}
472
473impl<'a, TKey, TVal> KBucketRef<'a, TKey, TVal>
474where
475 TKey: Clone + AsRef<KeyBytes>,
476 TVal: Clone,
477{
478 pub fn range(&self) -> (Distance, Distance) {
481 self.index.range()
482 }
483
484 pub fn is_empty(&self) -> bool {
486 self.num_entries() == 0
487 }
488
489 pub fn num_entries(&self) -> usize {
491 self.bucket.num_entries()
492 }
493
494 pub fn has_pending(&self) -> bool {
496 self.bucket.pending().map_or(false, |n| !n.is_ready())
497 }
498
499 pub fn contains(&self, d: &Distance) -> bool {
501 BucketIndex::new(d).map_or(false, |i| i == self.index)
502 }
503
504 pub fn rand_distance(&self, rng: &mut impl rand::Rng) -> Distance {
511 self.index.rand_distance(rng)
512 }
513
514 pub fn iter(&'a self) -> impl Iterator<Item = EntryRefView<'a, TKey, TVal>> {
516 self.bucket.iter().map(move |(n, status)| EntryRefView {
517 node: NodeRefView {
518 key: &n.key,
519 value: &n.value,
520 },
521 status,
522 })
523 }
524}
525
526#[cfg(test)]
527mod tests {
528 use super::*;
529 use libp2p_identity::PeerId;
530 use quickcheck::*;
531
532 type TestTable = KBucketsTable<KeyBytes, ()>;
533
534 impl Arbitrary for TestTable {
535 fn arbitrary(g: &mut Gen) -> TestTable {
536 let local_key = Key::from(PeerId::random());
537 let timeout = Duration::from_secs(g.gen_range(1..360));
538 let mut table = TestTable::new(local_key.clone().into(), timeout);
539 let mut num_total = g.gen_range(0..100);
540 for (i, b) in &mut table.buckets.iter_mut().enumerate().rev() {
541 let ix = BucketIndex(i);
542 let num = g.gen_range(0..usize::min(K_VALUE.get(), num_total) + 1);
543 num_total -= num;
544 for _ in 0..num {
545 let distance = ix.rand_distance(&mut rand::thread_rng());
546 let key = local_key.for_distance(distance);
547 let node = Node {
548 key: key.clone(),
549 value: (),
550 };
551 let status = NodeStatus::arbitrary(g);
552 match b.insert(node, status) {
553 InsertResult::Inserted => {}
554 _ => panic!(),
555 }
556 }
557 }
558 table
559 }
560 }
561
562 #[test]
563 fn buckets_are_non_overlapping_and_exhaustive() {
564 let local_key = Key::from(PeerId::random());
565 let timeout = Duration::from_secs(0);
566 let mut table = KBucketsTable::<KeyBytes, ()>::new(local_key.into(), timeout);
567
568 let mut prev_max = U256::from(0);
569
570 for bucket in table.iter() {
571 let (min, max) = bucket.range();
572 assert_eq!(Distance(prev_max + U256::from(1)), min);
573 prev_max = max.0;
574 }
575
576 assert_eq!(U256::MAX, prev_max);
577 }
578
579 #[test]
580 fn bucket_contains_range() {
581 fn prop(ix: u8) {
582 let index = BucketIndex(ix as usize);
583 let mut bucket = KBucket::<Key<PeerId>, ()>::new(Duration::from_secs(0));
584 let bucket_ref = KBucketRef {
585 index,
586 bucket: &mut bucket,
587 };
588
589 let (min, max) = bucket_ref.range();
590
591 assert!(min <= max);
592
593 assert!(bucket_ref.contains(&min));
594 assert!(bucket_ref.contains(&max));
595
596 if min != Distance(0.into()) {
597 assert!(!bucket_ref.contains(&Distance(min.0 - 1)));
599 }
600
601 if max != Distance(U256::max_value()) {
602 assert!(!bucket_ref.contains(&Distance(max.0 + 1)));
604 }
605 }
606
607 quickcheck(prop as fn(_));
608 }
609
610 #[test]
611 fn rand_distance() {
612 fn prop(ix: u8) -> bool {
613 let d = BucketIndex(ix as usize).rand_distance(&mut rand::thread_rng());
614 let n = U256::from(<[u8; 32]>::from(d.0));
615 let b = U256::from(2);
616 let e = U256::from(ix);
617 let lower = b.pow(e);
618 let upper = b.checked_pow(e + U256::from(1)).unwrap_or(U256::MAX) - U256::from(1);
619 lower <= n && n <= upper
620 }
621 quickcheck(prop as fn(_) -> _);
622 }
623
624 #[test]
625 fn entry_inserted() {
626 let local_key = Key::from(PeerId::random());
627 let other_id = Key::from(PeerId::random());
628
629 let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5));
630 if let Entry::Absent(entry) = table.entry(&other_id) {
631 match entry.insert((), NodeStatus::Connected) {
632 InsertResult::Inserted => (),
633 _ => panic!(),
634 }
635 } else {
636 panic!()
637 }
638
639 let res = table.closest_keys(&other_id).collect::<Vec<_>>();
640 assert_eq!(res.len(), 1);
641 assert_eq!(res[0], other_id);
642 }
643
644 #[test]
645 fn entry_self() {
646 let local_key = Key::from(PeerId::random());
647 let mut table = KBucketsTable::<_, ()>::new(local_key.clone(), Duration::from_secs(5));
648 match table.entry(&local_key) {
649 Entry::SelfEntry => (),
650 _ => panic!(),
651 }
652 }
653
654 #[test]
655 fn closest() {
656 let local_key = Key::from(PeerId::random());
657 let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5));
658 let mut count = 0;
659 loop {
660 if count == 100 {
661 break;
662 }
663 let key = Key::from(PeerId::random());
664 if let Entry::Absent(e) = table.entry(&key) {
665 match e.insert((), NodeStatus::Connected) {
666 InsertResult::Inserted => count += 1,
667 _ => continue,
668 }
669 } else {
670 panic!("entry exists")
671 }
672 }
673
674 let mut expected_keys: Vec<_> = table
675 .buckets
676 .iter()
677 .flat_map(|t| t.iter().map(|(n, _)| n.key.clone()))
678 .collect();
679
680 for _ in 0..10 {
681 let target_key = Key::from(PeerId::random());
682 let keys = table.closest_keys(&target_key).collect::<Vec<_>>();
683 expected_keys.sort_by_key(|k| k.distance(&target_key));
685 assert_eq!(keys, expected_keys);
686 }
687 }
688
689 #[test]
690 fn applied_pending() {
691 let local_key = Key::from(PeerId::random());
692 let mut table = KBucketsTable::<_, ()>::new(local_key.clone(), Duration::from_millis(1));
693 let expected_applied;
694 let full_bucket_index;
695 loop {
696 let key = Key::from(PeerId::random());
697 if let Entry::Absent(e) = table.entry(&key) {
698 match e.insert((), NodeStatus::Disconnected) {
699 InsertResult::Full => {
700 if let Entry::Absent(e) = table.entry(&key) {
701 match e.insert((), NodeStatus::Connected) {
702 InsertResult::Pending { disconnected } => {
703 expected_applied = AppliedPending {
704 inserted: Node {
705 key: key.clone(),
706 value: (),
707 },
708 evicted: Some(Node {
709 key: disconnected,
710 value: (),
711 }),
712 };
713 full_bucket_index = BucketIndex::new(&key.distance(&local_key));
714 break;
715 }
716 _ => panic!(),
717 }
718 } else {
719 panic!()
720 }
721 }
722 _ => continue,
723 }
724 } else {
725 panic!("entry exists")
726 }
727 }
728
729 let full_bucket = &mut table.buckets[full_bucket_index.unwrap().get()];
731 let elapsed = Instant::now().checked_sub(Duration::from_secs(1)).unwrap();
732 full_bucket.pending_mut().unwrap().set_ready_at(elapsed);
733
734 match table.entry(&expected_applied.inserted.key) {
735 Entry::Present(_, NodeStatus::Connected) => {}
736 x => panic!("Unexpected entry: {x:?}"),
737 }
738
739 match table.entry(&expected_applied.evicted.as_ref().unwrap().key) {
740 Entry::Absent(_) => {}
741 x => panic!("Unexpected entry: {x:?}"),
742 }
743
744 assert_eq!(Some(expected_applied), table.take_applied_pending());
745 assert_eq!(None, table.take_applied_pending());
746 }
747
748 #[test]
749 fn count_nodes_between() {
750 fn prop(mut table: TestTable, target: Key<PeerId>) -> bool {
751 let num_to_target = table.count_nodes_between(&target);
752 let distance = table.local_key.distance(&target);
753 let base2 = U256::from(2);
754 let mut iter = ClosestBucketsIter::new(distance);
755 iter.all(|i| {
756 let d = Distance(distance.0 ^ (base2.pow(U256::from(i.get()))));
758 let k = table.local_key.for_distance(d);
759 if distance.0.bit(i.get()) {
760 d < distance && table.count_nodes_between(&k) <= num_to_target
762 } else {
763 d > distance && table.count_nodes_between(&k) >= num_to_target
765 }
766 })
767 }
768
769 QuickCheck::new()
770 .tests(10)
771 .quickcheck(prop as fn(_, _) -> _)
772 }
773}