1use super::*;
22use crate::kbucket::{Key, KeyBytes};
23use instant::Instant;
24use libp2p_identity::PeerId;
25use std::{
26 collections::HashMap,
27 iter::{Cycle, Map, Peekable},
28 ops::{Index, IndexMut, Range},
29};
30
31pub(crate) struct ClosestDisjointPeersIter {
34 target: KeyBytes,
35
36 iters: Vec<ClosestPeersIter>,
38 iter_order: Cycle<Map<Range<usize>, fn(usize) -> IteratorIndex>>,
41
42 contacted_peers: HashMap<PeerId, PeerState>,
49}
50
51impl ClosestDisjointPeersIter {
52 #[cfg(test)]
54 pub(crate) fn new<I>(target: KeyBytes, known_closest_peers: I) -> Self
55 where
56 I: IntoIterator<Item = Key<PeerId>>,
57 {
58 Self::with_config(
59 ClosestPeersIterConfig::default(),
60 target,
61 known_closest_peers,
62 )
63 }
64
65 pub(crate) fn with_config<I, T>(
67 config: ClosestPeersIterConfig,
68 target: T,
69 known_closest_peers: I,
70 ) -> Self
71 where
72 I: IntoIterator<Item = Key<PeerId>>,
73 T: Into<KeyBytes> + Clone,
74 {
75 let peers = known_closest_peers
76 .into_iter()
77 .take(K_VALUE.get())
78 .collect::<Vec<_>>();
79 let iters = (0..config.parallelism.get())
80 .map(|_| ClosestPeersIter::with_config(config.clone(), target.clone(), peers.clone()))
86 .collect::<Vec<_>>();
87
88 let iters_len = iters.len();
89
90 ClosestDisjointPeersIter {
91 target: target.into(),
92 iters,
93 iter_order: (0..iters_len)
94 .map(IteratorIndex as fn(usize) -> IteratorIndex)
95 .cycle(),
96 contacted_peers: HashMap::new(),
97 }
98 }
99
100 pub(crate) fn on_failure(&mut self, peer: &PeerId) -> bool {
111 let mut updated = false;
112
113 if let Some(PeerState {
114 initiated_by,
115 response,
116 }) = self.contacted_peers.get_mut(peer)
117 {
118 updated = self.iters[*initiated_by].on_failure(peer);
119
120 if updated {
121 *response = ResponseState::Failed;
122 }
123
124 for (i, iter) in &mut self.iters.iter_mut().enumerate() {
125 if IteratorIndex(i) != *initiated_by {
126 iter.on_failure(peer);
129 }
130 }
131 }
132
133 updated
134 }
135
136 pub(crate) fn on_success<I>(&mut self, peer: &PeerId, closer_peers: I) -> bool
154 where
155 I: IntoIterator<Item = PeerId>,
156 {
157 let mut updated = false;
158
159 if let Some(PeerState {
160 initiated_by,
161 response,
162 }) = self.contacted_peers.get_mut(peer)
163 {
164 updated = self.iters[*initiated_by].on_success(peer, closer_peers);
167
168 if updated {
169 *response = ResponseState::Succeeded;
174 }
175
176 for (i, iter) in &mut self.iters.iter_mut().enumerate() {
177 if IteratorIndex(i) != *initiated_by {
178 iter.on_success(peer, std::iter::empty());
185 }
186 }
187 }
188
189 updated
190 }
191
192 pub(crate) fn next(&mut self, now: Instant) -> PeersIterState<'_> {
193 let mut state = None;
194
195 for _ in 0..self.iters.len() {
197 let i = self.iter_order.next().expect("Cycle never ends.");
198 let iter = &mut self.iters[i];
199
200 loop {
201 match iter.next(now) {
202 PeersIterState::Waiting(None) => {
203 match state {
204 Some(PeersIterState::Waiting(Some(_))) => {
205 unreachable!();
209 }
210 Some(PeersIterState::Waiting(None)) => {}
211 Some(PeersIterState::WaitingAtCapacity) => {
212 state = Some(PeersIterState::Waiting(None))
215 }
216 Some(PeersIterState::Finished) => {
217 unreachable!();
219 }
220 None => state = Some(PeersIterState::Waiting(None)),
221 };
222
223 break;
224 }
225 PeersIterState::Waiting(Some(peer)) => {
226 match self.contacted_peers.get_mut(&*peer) {
227 Some(PeerState { response, .. }) => {
228 let peer = peer.into_owned();
230
231 match response {
232 ResponseState::Waiting => {}
236 ResponseState::Succeeded => {
237 iter.on_success(&peer, std::iter::empty());
242 }
243 ResponseState::Failed => {
244 iter.on_failure(&peer);
245 }
246 }
247 }
248 None => {
249 self.contacted_peers
251 .insert(peer.clone().into_owned(), PeerState::new(i));
252 return PeersIterState::Waiting(Some(Cow::Owned(
253 peer.into_owned(),
254 )));
255 }
256 }
257 }
258 PeersIterState::WaitingAtCapacity => {
259 match state {
260 Some(PeersIterState::Waiting(Some(_))) => {
261 unreachable!();
265 }
266 Some(PeersIterState::Waiting(None)) => {}
267 Some(PeersIterState::WaitingAtCapacity) => {}
268 Some(PeersIterState::Finished) => {
269 unreachable!();
271 }
272 None => state = Some(PeersIterState::WaitingAtCapacity),
273 };
274
275 break;
276 }
277 PeersIterState::Finished => break,
278 }
279 }
280 }
281
282 state.unwrap_or(PeersIterState::Finished)
283 }
284
285 pub(crate) fn finish_paths<'a, I>(&mut self, peers: I) -> bool
289 where
290 I: IntoIterator<Item = &'a PeerId>,
291 {
292 for peer in peers {
293 if let Some(PeerState { initiated_by, .. }) = self.contacted_peers.get_mut(peer) {
294 self.iters[*initiated_by].finish();
295 }
296 }
297
298 self.is_finished()
299 }
300
301 pub(crate) fn finish(&mut self) {
303 for iter in &mut self.iters {
304 iter.finish();
305 }
306 }
307
308 pub(crate) fn is_finished(&self) -> bool {
310 self.iters.iter().all(|i| i.is_finished())
311 }
312
313 pub(crate) fn into_result(self) -> impl Iterator<Item = PeerId> {
322 let result_per_path = self
323 .iters
324 .into_iter()
325 .map(|iter| iter.into_result().map(Key::from));
326
327 ResultIter::new(self.target, result_per_path).map(Key::into_preimage)
328 }
329}
330
331#[derive(Debug, Clone, Copy, PartialEq, Eq)]
333struct IteratorIndex(usize);
334
335impl Index<IteratorIndex> for Vec<ClosestPeersIter> {
336 type Output = ClosestPeersIter;
337
338 fn index(&self, index: IteratorIndex) -> &Self::Output {
339 &self[index.0]
340 }
341}
342
343impl IndexMut<IteratorIndex> for Vec<ClosestPeersIter> {
344 fn index_mut(&mut self, index: IteratorIndex) -> &mut Self::Output {
345 &mut self[index.0]
346 }
347}
348
349#[derive(Debug, PartialEq, Eq)]
352struct PeerState {
353 initiated_by: IteratorIndex,
356 response: ResponseState,
359}
360
361impl PeerState {
362 fn new(initiated_by: IteratorIndex) -> Self {
363 PeerState {
364 initiated_by,
365 response: ResponseState::Waiting,
366 }
367 }
368}
369
370#[derive(Debug, PartialEq, Eq)]
371enum ResponseState {
372 Waiting,
373 Succeeded,
374 Failed,
375}
376
377#[derive(Clone, Debug)]
382struct ResultIter<I>
383where
384 I: Iterator<Item = Key<PeerId>>,
385{
386 target: KeyBytes,
387 iters: Vec<Peekable<I>>,
388}
389
390impl<I: Iterator<Item = Key<PeerId>>> ResultIter<I> {
391 fn new(target: KeyBytes, iters: impl Iterator<Item = I>) -> Self {
392 ResultIter {
393 target,
394 iters: iters.map(Iterator::peekable).collect(),
395 }
396 }
397}
398
399impl<I: Iterator<Item = Key<PeerId>>> Iterator for ResultIter<I> {
400 type Item = I::Item;
401
402 fn next(&mut self) -> Option<Self::Item> {
403 let target = &self.target;
404
405 self.iters
406 .iter_mut()
407 .fold(Option::<&mut Peekable<_>>::None, |iter_a, iter_b| {
409 let iter_a = match iter_a {
410 Some(iter_a) => iter_a,
411 None => return Some(iter_b),
412 };
413
414 match (iter_a.peek(), iter_b.peek()) {
415 (Some(next_a), Some(next_b)) => {
416 if next_a == next_b {
417 iter_b.next();
419 return Some(iter_a);
420 }
421
422 if target.distance(next_a) < target.distance(next_b) {
423 Some(iter_a)
424 } else {
425 Some(iter_b)
426 }
427 }
428 (Some(_), None) => Some(iter_a),
429 (None, Some(_)) => Some(iter_b),
430 (None, None) => None,
431 }
432 })
433 .and_then(Iterator::next)
435 }
436}
437
438#[cfg(test)]
439mod tests {
440 use super::*;
441
442 use crate::{K_VALUE, SHA_256_MH};
443 use libp2p_core::multihash::Multihash;
444 use quickcheck::*;
445 use std::collections::HashSet;
446 use std::iter;
447
448 impl Arbitrary for ResultIter<std::vec::IntoIter<Key<PeerId>>> {
449 fn arbitrary(g: &mut Gen) -> Self {
450 let target = Target::arbitrary(g).0;
451 let num_closest_iters = g.gen_range(0..20 + 1);
452 let peers = random_peers(g.gen_range(0..20 * num_closest_iters + 1), g);
453
454 let iters = (0..num_closest_iters).map(|_| {
455 let num_peers = g.gen_range(0..20 + 1);
456 let mut peers = g
457 .choose_multiple(&peers, num_peers)
458 .cloned()
459 .map(Key::from)
460 .collect::<Vec<_>>();
461
462 peers.sort_unstable_by_key(|a| target.distance(a));
463
464 peers.into_iter()
465 });
466
467 ResultIter::new(target.clone(), iters)
468 }
469
470 fn shrink(&self) -> Box<dyn Iterator<Item = Self>> {
471 let peers = self
472 .iters
473 .clone()
474 .into_iter()
475 .flatten()
476 .collect::<HashSet<_>>()
477 .into_iter()
478 .collect::<Vec<_>>();
479
480 let iters = self
481 .iters
482 .clone()
483 .into_iter()
484 .map(|iter| iter.collect::<Vec<_>>())
485 .collect();
486
487 Box::new(ResultIterShrinker {
488 target: self.target.clone(),
489 peers,
490 iters,
491 })
492 }
493 }
494
495 struct ResultIterShrinker {
496 target: KeyBytes,
497 peers: Vec<Key<PeerId>>,
498 iters: Vec<Vec<Key<PeerId>>>,
499 }
500
501 impl Iterator for ResultIterShrinker {
502 type Item = ResultIter<std::vec::IntoIter<Key<PeerId>>>;
503
504 fn next(&mut self) -> Option<Self::Item> {
507 let peer = self.peers.pop()?;
509
510 let iters = self.iters.clone().into_iter().filter_map(|mut iter| {
511 iter.retain(|p| p != &peer);
512 if iter.is_empty() {
513 return None;
514 }
515 Some(iter.into_iter())
516 });
517
518 Some(ResultIter::new(self.target.clone(), iters))
519 }
520 }
521
522 #[derive(Clone, Debug)]
523 struct ArbitraryPeerId(PeerId);
524
525 impl Arbitrary for ArbitraryPeerId {
526 fn arbitrary(g: &mut Gen) -> ArbitraryPeerId {
527 let hash: [u8; 32] = core::array::from_fn(|_| u8::arbitrary(g));
528 let peer_id =
529 PeerId::from_multihash(Multihash::wrap(SHA_256_MH, &hash).unwrap()).unwrap();
530 ArbitraryPeerId(peer_id)
531 }
532 }
533
534 #[derive(Clone, Debug)]
535 struct Target(KeyBytes);
536
537 impl Arbitrary for Target {
538 fn arbitrary(g: &mut Gen) -> Self {
539 let peer_id = ArbitraryPeerId::arbitrary(g).0;
540 Target(Key::from(peer_id).into())
541 }
542 }
543
544 fn random_peers(n: usize, g: &mut Gen) -> Vec<PeerId> {
545 (0..n).map(|_| ArbitraryPeerId::arbitrary(g).0).collect()
546 }
547
548 #[test]
549 fn result_iter_returns_deduplicated_ordered_peer_id_stream() {
550 fn prop(result_iter: ResultIter<std::vec::IntoIter<Key<PeerId>>>) {
551 let expected = {
552 let mut deduplicated = result_iter
553 .clone()
554 .iters
555 .into_iter()
556 .flatten()
557 .collect::<HashSet<_>>()
558 .into_iter()
559 .map(Key::from)
560 .collect::<Vec<_>>();
561
562 deduplicated.sort_unstable_by(|a, b| {
563 result_iter
564 .target
565 .distance(a)
566 .cmp(&result_iter.target.distance(b))
567 });
568
569 deduplicated
570 };
571
572 assert_eq!(expected, result_iter.collect::<Vec<_>>());
573 }
574
575 QuickCheck::new().quickcheck(prop as fn(_))
576 }
577
578 #[derive(Debug, Clone)]
579 struct Parallelism(NonZeroUsize);
580
581 impl Arbitrary for Parallelism {
582 fn arbitrary(g: &mut Gen) -> Self {
583 Parallelism(NonZeroUsize::new(g.gen_range(1..10)).unwrap())
584 }
585 }
586
587 #[derive(Debug, Clone)]
588 struct NumResults(NonZeroUsize);
589
590 impl Arbitrary for NumResults {
591 fn arbitrary(g: &mut Gen) -> Self {
592 NumResults(NonZeroUsize::new(g.gen_range(1..K_VALUE.get())).unwrap())
593 }
594 }
595
596 impl Arbitrary for ClosestPeersIterConfig {
597 fn arbitrary(g: &mut Gen) -> Self {
598 ClosestPeersIterConfig {
599 parallelism: Parallelism::arbitrary(g).0,
600 num_results: NumResults::arbitrary(g).0,
601 peer_timeout: Duration::from_secs(1),
602 }
603 }
604 }
605
606 #[derive(Debug, Clone)]
607 struct PeerVec(Vec<Key<PeerId>>);
608
609 impl Arbitrary for PeerVec {
610 fn arbitrary(g: &mut Gen) -> Self {
611 PeerVec(
612 (0..g.gen_range(1..60u8))
613 .map(|_| ArbitraryPeerId::arbitrary(g).0)
614 .map(Key::from)
615 .collect(),
616 )
617 }
618 }
619
620 #[test]
621 fn s_kademlia_disjoint_paths() {
622 let now = Instant::now();
623 let target: KeyBytes = Key::from(PeerId::random()).into();
624
625 let mut pool = [0; 12]
626 .iter()
627 .map(|_| Key::from(PeerId::random()))
628 .collect::<Vec<_>>();
629
630 pool.sort_unstable_by_key(|a| target.distance(a));
631
632 let known_closest_peers = pool.split_off(pool.len() - 3);
633
634 let config = ClosestPeersIterConfig {
635 parallelism: NonZeroUsize::new(3).unwrap(),
636 num_results: NonZeroUsize::new(3).unwrap(),
637 ..ClosestPeersIterConfig::default()
638 };
639
640 let mut peers_iter =
641 ClosestDisjointPeersIter::with_config(config, target, known_closest_peers.clone());
642
643 for _ in 0..3 {
647 if let PeersIterState::Waiting(Some(Cow::Owned(peer))) = peers_iter.next(now) {
648 assert!(known_closest_peers.contains(&Key::from(peer)));
649 } else {
650 panic!("Expected iterator to return peer to query.");
651 }
652 }
653
654 assert_eq!(PeersIterState::WaitingAtCapacity, peers_iter.next(now),);
655
656 let response_2 = pool.split_off(pool.len() - 3);
657 let response_3 = pool.split_off(pool.len() - 3);
658 let malicious_response_1 = pool.split_off(pool.len() - 3);
661
662 peers_iter.on_success(
664 known_closest_peers[0].preimage(),
665 malicious_response_1
666 .clone()
667 .into_iter()
668 .map(|k| *k.preimage()),
669 );
670
671 peers_iter.on_success(
673 known_closest_peers[1].preimage(),
674 response_2.clone().into_iter().map(|k| *k.preimage()),
675 );
676
677 peers_iter.on_success(
679 known_closest_peers[2].preimage(),
680 response_3.clone().into_iter().map(|k| *k.preimage()),
681 );
682
683 let mut next_to_query = vec![];
687 for _ in 0..3 {
688 if let PeersIterState::Waiting(Some(Cow::Owned(peer))) = peers_iter.next(now) {
689 next_to_query.push(peer)
690 } else {
691 panic!("Expected iterator to return peer to query.");
692 }
693 }
694
695 assert!(next_to_query.contains(malicious_response_1[0].preimage()));
697 assert!(next_to_query.contains(response_2[0].preimage()));
698 assert!(next_to_query.contains(response_3[0].preimage()));
699
700 for peer in next_to_query {
701 peers_iter.on_success(&peer, vec![]);
702 }
703
704 for _ in 0..6 {
706 if let PeersIterState::Waiting(Some(Cow::Owned(peer))) = peers_iter.next(now) {
707 peers_iter.on_success(&peer, vec![]);
708 } else {
709 panic!("Expected iterator to return peer to query.");
710 }
711 }
712
713 assert_eq!(PeersIterState::Finished, peers_iter.next(now),);
714
715 let final_peers: Vec<_> = peers_iter.into_result().collect();
716
717 assert!(final_peers.contains(malicious_response_1[0].preimage()));
720 assert!(final_peers.contains(response_2[0].preimage()));
721 assert!(final_peers.contains(response_3[0].preimage()));
722 }
723
724 #[derive(Clone)]
725 struct Graph(HashMap<PeerId, Peer>);
726
727 impl std::fmt::Debug for Graph {
728 fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
729 fmt.debug_list().entries(self.0.keys()).finish()
730 }
731 }
732
733 impl Arbitrary for Graph {
734 fn arbitrary(g: &mut Gen) -> Self {
735 let mut peer_ids = random_peers(g.gen_range(K_VALUE.get()..200), g)
736 .into_iter()
737 .map(|peer_id| (peer_id, Key::from(peer_id)))
738 .collect::<Vec<_>>();
739
740 let mut peers = peer_ids
742 .clone()
743 .into_iter()
744 .map(|(peer_id, key)| {
745 peer_ids
746 .sort_unstable_by(|(_, a), (_, b)| key.distance(a).cmp(&key.distance(b)));
747
748 assert_eq!(peer_id, peer_ids[0].0);
749
750 let known_peers = peer_ids
751 .iter()
752 .skip(1)
754 .take(K_VALUE.get())
755 .cloned()
756 .collect::<Vec<_>>();
757
758 (peer_id, Peer { known_peers })
759 })
760 .collect::<HashMap<_, _>>();
761
762 for (peer_id, peer) in peers.iter_mut() {
764 g.shuffle(&mut peer_ids);
765
766 let num_peers = g.gen_range(K_VALUE.get()..peer_ids.len() + 1);
767 let mut random_peer_ids = g
768 .choose_multiple(&peer_ids, num_peers)
769 .filter(|(id, _)| peer_id != id)
771 .cloned()
772 .collect::<Vec<_>>();
773
774 peer.known_peers.append(&mut random_peer_ids);
775 peer.known_peers = std::mem::take(&mut peer.known_peers)
776 .into_iter()
778 .collect::<HashSet<_>>()
779 .into_iter()
780 .collect();
781 }
782
783 Graph(peers)
784 }
785 }
786
787 impl Graph {
788 fn get_closest_peer(&self, target: &KeyBytes) -> PeerId {
789 *self
790 .0
791 .keys()
792 .map(|peer_id| (target.distance(&Key::from(*peer_id)), peer_id))
793 .fold(None, |acc, (distance_b, peer_id_b)| match acc {
794 None => Some((distance_b, peer_id_b)),
795 Some((distance_a, peer_id_a)) => {
796 if distance_a < distance_b {
797 Some((distance_a, peer_id_a))
798 } else {
799 Some((distance_b, peer_id_b))
800 }
801 }
802 })
803 .expect("Graph to have at least one peer.")
804 .1
805 }
806 }
807
808 #[derive(Debug, Clone)]
809 struct Peer {
810 known_peers: Vec<(PeerId, Key<PeerId>)>,
811 }
812
813 impl Peer {
814 fn get_closest_peers(&mut self, target: &KeyBytes) -> Vec<PeerId> {
815 self.known_peers
816 .sort_unstable_by(|(_, a), (_, b)| target.distance(a).cmp(&target.distance(b)));
817
818 self.known_peers
819 .iter()
820 .take(K_VALUE.get())
821 .map(|(id, _)| id)
822 .cloned()
823 .collect()
824 }
825 }
826
827 enum PeerIterator {
828 Disjoint(ClosestDisjointPeersIter),
829 Closest(ClosestPeersIter),
830 }
831
832 impl PeerIterator {
833 fn next(&mut self, now: Instant) -> PeersIterState<'_> {
834 match self {
835 PeerIterator::Disjoint(iter) => iter.next(now),
836 PeerIterator::Closest(iter) => iter.next(now),
837 }
838 }
839
840 fn on_success(&mut self, peer: &PeerId, closer_peers: Vec<PeerId>) {
841 match self {
842 PeerIterator::Disjoint(iter) => iter.on_success(peer, closer_peers),
843 PeerIterator::Closest(iter) => iter.on_success(peer, closer_peers),
844 };
845 }
846
847 fn into_result(self) -> Vec<PeerId> {
848 match self {
849 PeerIterator::Disjoint(iter) => iter.into_result().collect(),
850 PeerIterator::Closest(iter) => iter.into_result().collect(),
851 }
852 }
853 }
854
855 #[test]
857 fn closest_and_disjoint_closest_yield_same_result() {
858 fn prop(
859 target: Target,
860 graph: Graph,
861 parallelism: Parallelism,
862 num_results: NumResults,
863 ) -> TestResult {
864 if parallelism.0 > num_results.0 {
865 return TestResult::discard();
866 }
867
868 let target: KeyBytes = target.0;
869 let closest_peer = graph.get_closest_peer(&target);
870
871 let mut known_closest_peers = graph
872 .0
873 .iter()
874 .take(K_VALUE.get())
875 .map(|(key, _peers)| Key::from(*key))
876 .collect::<Vec<_>>();
877 known_closest_peers.sort_unstable_by_key(|a| target.distance(a));
878
879 let cfg = ClosestPeersIterConfig {
880 parallelism: parallelism.0,
881 num_results: num_results.0,
882 ..ClosestPeersIterConfig::default()
883 };
884
885 let closest = drive_to_finish(
886 PeerIterator::Closest(ClosestPeersIter::with_config(
887 cfg.clone(),
888 target.clone(),
889 known_closest_peers.clone(),
890 )),
891 graph.clone(),
892 &target,
893 );
894
895 let disjoint = drive_to_finish(
896 PeerIterator::Disjoint(ClosestDisjointPeersIter::with_config(
897 cfg,
898 target.clone(),
899 known_closest_peers.clone(),
900 )),
901 graph,
902 &target,
903 );
904
905 assert!(
906 closest.contains(&closest_peer),
907 "Expected `ClosestPeersIter` to find closest peer.",
908 );
909 assert!(
910 disjoint.contains(&closest_peer),
911 "Expected `ClosestDisjointPeersIter` to find closest peer.",
912 );
913
914 assert!(
915 closest.len() == num_results.0.get(),
916 "Expected `ClosestPeersIter` to find `num_results` closest \
917 peers."
918 );
919 assert!(
920 disjoint.len() >= num_results.0.get(),
921 "Expected `ClosestDisjointPeersIter` to find at least \
922 `num_results` closest peers."
923 );
924
925 if closest.len() > disjoint.len() {
926 let closest_only = closest.difference(&disjoint).collect::<Vec<_>>();
927
928 panic!(
929 "Expected `ClosestDisjointPeersIter` to find all peers \
930 found by `ClosestPeersIter`, but it did not find {closest_only:?}.",
931 );
932 };
933
934 TestResult::passed()
935 }
936
937 fn drive_to_finish(
938 mut iter: PeerIterator,
939 mut graph: Graph,
940 target: &KeyBytes,
941 ) -> HashSet<PeerId> {
942 let now = Instant::now();
943 loop {
944 match iter.next(now) {
945 PeersIterState::Waiting(Some(peer_id)) => {
946 let peer_id = peer_id.clone().into_owned();
947 let closest_peers =
948 graph.0.get_mut(&peer_id).unwrap().get_closest_peers(target);
949 iter.on_success(&peer_id, closest_peers);
950 }
951 PeersIterState::WaitingAtCapacity | PeersIterState::Waiting(None) => {
952 panic!("There is never more than one request in flight.")
953 }
954 PeersIterState::Finished => break,
955 }
956 }
957
958 let mut result = iter
959 .into_result()
960 .into_iter()
961 .map(Key::from)
962 .collect::<Vec<_>>();
963 result.sort_unstable_by_key(|a| target.distance(a));
964 result.into_iter().map(|k| k.into_preimage()).collect()
965 }
966
967 QuickCheck::new()
968 .tests(10)
969 .quickcheck(prop as fn(_, _, _, _) -> _)
970 }
971
972 #[test]
973 fn failure_can_not_overwrite_previous_success() {
974 let now = Instant::now();
975 let peer = PeerId::random();
976 let mut iter = ClosestDisjointPeersIter::new(
977 Key::from(PeerId::random()).into(),
978 iter::once(Key::from(peer)),
979 );
980
981 assert!(matches!(iter.next(now), PeersIterState::Waiting(Some(_))));
982
983 assert!(iter.on_success(&peer, iter::empty()));
985 assert_eq!(
986 iter.contacted_peers.get(&peer),
987 Some(&PeerState {
988 initiated_by: IteratorIndex(0),
989 response: ResponseState::Succeeded,
990 })
991 );
992
993 assert!(!iter.on_failure(&peer));
995 assert_eq!(
996 iter.contacted_peers.get(&peer),
997 Some(&PeerState {
998 initiated_by: IteratorIndex(0),
999 response: ResponseState::Succeeded,
1000 })
1001 );
1002 }
1003}