libp2p_kad/query/peers/closest/
disjoint.rs

1// Copyright 2020 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use super::*;
22use crate::kbucket::{Key, KeyBytes};
23use instant::Instant;
24use libp2p_identity::PeerId;
25use std::{
26    collections::HashMap,
27    iter::{Cycle, Map, Peekable},
28    ops::{Index, IndexMut, Range},
29};
30
31/// Wraps around a set of [`ClosestPeersIter`], enforcing a disjoint discovery
32/// path per configured parallelism according to the S/Kademlia paper.
33pub(crate) struct ClosestDisjointPeersIter {
34    target: KeyBytes,
35
36    /// The set of wrapped [`ClosestPeersIter`].
37    iters: Vec<ClosestPeersIter>,
38    /// Order in which to query the iterators ensuring fairness across
39    /// [`ClosestPeersIter::next`] calls.
40    iter_order: Cycle<Map<Range<usize>, fn(usize) -> IteratorIndex>>,
41
42    /// Mapping of contacted peers by their [`PeerId`] to [`PeerState`]
43    /// containing the corresponding iterator indices as well as the response
44    /// state.
45    ///
46    /// Used to track which iterator contacted which peer. See [`PeerState`]
47    /// for details.
48    contacted_peers: HashMap<PeerId, PeerState>,
49}
50
51impl ClosestDisjointPeersIter {
52    /// Creates a new iterator with a default configuration.
53    #[cfg(test)]
54    pub(crate) fn new<I>(target: KeyBytes, known_closest_peers: I) -> Self
55    where
56        I: IntoIterator<Item = Key<PeerId>>,
57    {
58        Self::with_config(
59            ClosestPeersIterConfig::default(),
60            target,
61            known_closest_peers,
62        )
63    }
64
65    /// Creates a new iterator with the given configuration.
66    pub(crate) fn with_config<I, T>(
67        config: ClosestPeersIterConfig,
68        target: T,
69        known_closest_peers: I,
70    ) -> Self
71    where
72        I: IntoIterator<Item = Key<PeerId>>,
73        T: Into<KeyBytes> + Clone,
74    {
75        let peers = known_closest_peers
76            .into_iter()
77            .take(K_VALUE.get())
78            .collect::<Vec<_>>();
79        let iters = (0..config.parallelism.get())
80            // NOTE: All [`ClosestPeersIter`] share the same set of peers at
81            // initialization. The [`ClosestDisjointPeersIter.contacted_peers`]
82            // mapping ensures that a successful response from a peer is only
83            // ever passed to a single [`ClosestPeersIter`]. See
84            // [`ClosestDisjointPeersIter::on_success`] for details.
85            .map(|_| ClosestPeersIter::with_config(config.clone(), target.clone(), peers.clone()))
86            .collect::<Vec<_>>();
87
88        let iters_len = iters.len();
89
90        ClosestDisjointPeersIter {
91            target: target.into(),
92            iters,
93            iter_order: (0..iters_len)
94                .map(IteratorIndex as fn(usize) -> IteratorIndex)
95                .cycle(),
96            contacted_peers: HashMap::new(),
97        }
98    }
99
100    /// Callback for informing the iterator about a failed request to a peer.
101    ///
102    /// If the iterator is currently waiting for a result from `peer`,
103    /// the iterator state is updated and `true` is returned. In that
104    /// case, after calling this function, `next` should eventually be
105    /// called again to obtain the new state of the iterator.
106    ///
107    /// If the iterator is finished, it is not currently waiting for a
108    /// result from `peer`, or a result for `peer` has already been reported,
109    /// calling this function has no effect and `false` is returned.
110    pub(crate) fn on_failure(&mut self, peer: &PeerId) -> bool {
111        let mut updated = false;
112
113        if let Some(PeerState {
114            initiated_by,
115            response,
116        }) = self.contacted_peers.get_mut(peer)
117        {
118            updated = self.iters[*initiated_by].on_failure(peer);
119
120            if updated {
121                *response = ResponseState::Failed;
122            }
123
124            for (i, iter) in &mut self.iters.iter_mut().enumerate() {
125                if IteratorIndex(i) != *initiated_by {
126                    // This iterator never triggered an actual request to the
127                    // given peer - thus ignore the returned boolean.
128                    iter.on_failure(peer);
129                }
130            }
131        }
132
133        updated
134    }
135
136    /// Callback for delivering the result of a successful request to a peer.
137    ///
138    /// Delivering results of requests back to the iterator allows the iterator
139    /// to make progress. The iterator is said to make progress either when the
140    /// given `closer_peers` contain a peer closer to the target than any peer
141    /// seen so far, or when the iterator did not yet accumulate `num_results`
142    /// closest peers and `closer_peers` contains a new peer, regardless of its
143    /// distance to the target.
144    ///
145    /// If the iterator is currently waiting for a result from `peer`,
146    /// the iterator state is updated and `true` is returned. In that
147    /// case, after calling this function, `next` should eventually be
148    /// called again to obtain the new state of the iterator.
149    ///
150    /// If the iterator is finished, it is not currently waiting for a
151    /// result from `peer`, or a result for `peer` has already been reported,
152    /// calling this function has no effect and `false` is returned.
153    pub(crate) fn on_success<I>(&mut self, peer: &PeerId, closer_peers: I) -> bool
154    where
155        I: IntoIterator<Item = PeerId>,
156    {
157        let mut updated = false;
158
159        if let Some(PeerState {
160            initiated_by,
161            response,
162        }) = self.contacted_peers.get_mut(peer)
163        {
164            // Pass the new `closer_peers` to the iterator that first yielded
165            // the peer.
166            updated = self.iters[*initiated_by].on_success(peer, closer_peers);
167
168            if updated {
169                // Mark the response as succeeded for future iterators yielding
170                // this peer. There is no need to keep the `closer_peers`
171                // around, given that they are only passed to the first
172                // iterator.
173                *response = ResponseState::Succeeded;
174            }
175
176            for (i, iter) in &mut self.iters.iter_mut().enumerate() {
177                if IteratorIndex(i) != *initiated_by {
178                    // Only report the success to all remaining not-first
179                    // iterators. Do not pass the `closer_peers` in order to
180                    // uphold the S/Kademlia disjoint paths guarantee.
181                    //
182                    // This iterator never triggered an actual request to the
183                    // given peer - thus ignore the returned boolean.
184                    iter.on_success(peer, std::iter::empty());
185                }
186            }
187        }
188
189        updated
190    }
191
192    pub(crate) fn next(&mut self, now: Instant) -> PeersIterState<'_> {
193        let mut state = None;
194
195        // Ensure querying each iterator at most once.
196        for _ in 0..self.iters.len() {
197            let i = self.iter_order.next().expect("Cycle never ends.");
198            let iter = &mut self.iters[i];
199
200            loop {
201                match iter.next(now) {
202                    PeersIterState::Waiting(None) => {
203                        match state {
204                            Some(PeersIterState::Waiting(Some(_))) => {
205                                // [`ClosestDisjointPeersIter::next`] returns immediately once a
206                                // [`ClosestPeersIter`] yielded a peer. Thus this state is
207                                // unreachable.
208                                unreachable!();
209                            }
210                            Some(PeersIterState::Waiting(None)) => {}
211                            Some(PeersIterState::WaitingAtCapacity) => {
212                                // At least one ClosestPeersIter is no longer at capacity, thus the
213                                // composite ClosestDisjointPeersIter is no longer at capacity.
214                                state = Some(PeersIterState::Waiting(None))
215                            }
216                            Some(PeersIterState::Finished) => {
217                                // `state` is never set to `Finished`.
218                                unreachable!();
219                            }
220                            None => state = Some(PeersIterState::Waiting(None)),
221                        };
222
223                        break;
224                    }
225                    PeersIterState::Waiting(Some(peer)) => {
226                        match self.contacted_peers.get_mut(&*peer) {
227                            Some(PeerState { response, .. }) => {
228                                // Another iterator already contacted this peer.
229                                let peer = peer.into_owned();
230
231                                match response {
232                                    // The iterator will be notified later whether the given node
233                                    // was successfully contacted or not. See
234                                    // [`ClosestDisjointPeersIter::on_success`] for details.
235                                    ResponseState::Waiting => {}
236                                    ResponseState::Succeeded => {
237                                        // Given that iterator was not the first to contact the peer
238                                        // it will not be made aware of the closer peers discovered
239                                        // to uphold the S/Kademlia disjoint paths guarantee. See
240                                        // [`ClosestDisjointPeersIter::on_success`] for details.
241                                        iter.on_success(&peer, std::iter::empty());
242                                    }
243                                    ResponseState::Failed => {
244                                        iter.on_failure(&peer);
245                                    }
246                                }
247                            }
248                            None => {
249                                // The iterator is the first to contact this peer.
250                                self.contacted_peers
251                                    .insert(peer.clone().into_owned(), PeerState::new(i));
252                                return PeersIterState::Waiting(Some(Cow::Owned(
253                                    peer.into_owned(),
254                                )));
255                            }
256                        }
257                    }
258                    PeersIterState::WaitingAtCapacity => {
259                        match state {
260                            Some(PeersIterState::Waiting(Some(_))) => {
261                                // [`ClosestDisjointPeersIter::next`] returns immediately once a
262                                // [`ClosestPeersIter`] yielded a peer. Thus this state is
263                                // unreachable.
264                                unreachable!();
265                            }
266                            Some(PeersIterState::Waiting(None)) => {}
267                            Some(PeersIterState::WaitingAtCapacity) => {}
268                            Some(PeersIterState::Finished) => {
269                                // `state` is never set to `Finished`.
270                                unreachable!();
271                            }
272                            None => state = Some(PeersIterState::WaitingAtCapacity),
273                        };
274
275                        break;
276                    }
277                    PeersIterState::Finished => break,
278                }
279            }
280        }
281
282        state.unwrap_or(PeersIterState::Finished)
283    }
284
285    /// Finishes all paths containing one of the given peers.
286    ///
287    /// See [`crate::query::Query::try_finish`] for details.
288    pub(crate) fn finish_paths<'a, I>(&mut self, peers: I) -> bool
289    where
290        I: IntoIterator<Item = &'a PeerId>,
291    {
292        for peer in peers {
293            if let Some(PeerState { initiated_by, .. }) = self.contacted_peers.get_mut(peer) {
294                self.iters[*initiated_by].finish();
295            }
296        }
297
298        self.is_finished()
299    }
300
301    /// Immediately transitions the iterator to [`PeersIterState::Finished`].
302    pub(crate) fn finish(&mut self) {
303        for iter in &mut self.iters {
304            iter.finish();
305        }
306    }
307
308    /// Checks whether the iterator has finished.
309    pub(crate) fn is_finished(&self) -> bool {
310        self.iters.iter().all(|i| i.is_finished())
311    }
312
313    /// Note: In the case of no adversarial peers or connectivity issues along
314    ///       any path, all paths return the same result, deduplicated through
315    ///       the `ResultIter`, thus overall `into_result` returns
316    ///       `num_results`. In the case of adversarial peers or connectivity
317    ///       issues `ClosestDisjointPeersIter` tries to return the
318    ///       `num_results` closest benign peers, but as it can not
319    ///       differentiate benign from faulty paths it as well returns faulty
320    ///       peers and thus overall returns more than `num_results` peers.
321    pub(crate) fn into_result(self) -> impl Iterator<Item = PeerId> {
322        let result_per_path = self
323            .iters
324            .into_iter()
325            .map(|iter| iter.into_result().map(Key::from));
326
327        ResultIter::new(self.target, result_per_path).map(Key::into_preimage)
328    }
329}
330
331/// Index into the [`ClosestDisjointPeersIter`] `iters` vector.
332#[derive(Debug, Clone, Copy, PartialEq, Eq)]
333struct IteratorIndex(usize);
334
335impl Index<IteratorIndex> for Vec<ClosestPeersIter> {
336    type Output = ClosestPeersIter;
337
338    fn index(&self, index: IteratorIndex) -> &Self::Output {
339        &self[index.0]
340    }
341}
342
343impl IndexMut<IteratorIndex> for Vec<ClosestPeersIter> {
344    fn index_mut(&mut self, index: IteratorIndex) -> &mut Self::Output {
345        &mut self[index.0]
346    }
347}
348
349/// State tracking the iterator that yielded (i.e. tried to contact) a peer. See
350/// [`ClosestDisjointPeersIter::on_success`] for details.
351#[derive(Debug, PartialEq, Eq)]
352struct PeerState {
353    /// First iterator to yield the peer. Will be notified both of the outcome
354    /// (success/failure) as well as the closer peers.
355    initiated_by: IteratorIndex,
356    /// Keeping track of the response state. In case other iterators later on
357    /// yield the same peer, they can be notified of the response outcome.
358    response: ResponseState,
359}
360
361impl PeerState {
362    fn new(initiated_by: IteratorIndex) -> Self {
363        PeerState {
364            initiated_by,
365            response: ResponseState::Waiting,
366        }
367    }
368}
369
370#[derive(Debug, PartialEq, Eq)]
371enum ResponseState {
372    Waiting,
373    Succeeded,
374    Failed,
375}
376
377/// Iterator combining the result of multiple [`ClosestPeersIter`] into a single
378/// deduplicated ordered iterator.
379//
380// Note: This operates under the assumption that `I` is ordered.
381#[derive(Clone, Debug)]
382struct ResultIter<I>
383where
384    I: Iterator<Item = Key<PeerId>>,
385{
386    target: KeyBytes,
387    iters: Vec<Peekable<I>>,
388}
389
390impl<I: Iterator<Item = Key<PeerId>>> ResultIter<I> {
391    fn new(target: KeyBytes, iters: impl Iterator<Item = I>) -> Self {
392        ResultIter {
393            target,
394            iters: iters.map(Iterator::peekable).collect(),
395        }
396    }
397}
398
399impl<I: Iterator<Item = Key<PeerId>>> Iterator for ResultIter<I> {
400    type Item = I::Item;
401
402    fn next(&mut self) -> Option<Self::Item> {
403        let target = &self.target;
404
405        self.iters
406            .iter_mut()
407            // Find the iterator with the next closest peer.
408            .fold(Option::<&mut Peekable<_>>::None, |iter_a, iter_b| {
409                let iter_a = match iter_a {
410                    Some(iter_a) => iter_a,
411                    None => return Some(iter_b),
412                };
413
414                match (iter_a.peek(), iter_b.peek()) {
415                    (Some(next_a), Some(next_b)) => {
416                        if next_a == next_b {
417                            // Remove from one for deduplication.
418                            iter_b.next();
419                            return Some(iter_a);
420                        }
421
422                        if target.distance(next_a) < target.distance(next_b) {
423                            Some(iter_a)
424                        } else {
425                            Some(iter_b)
426                        }
427                    }
428                    (Some(_), None) => Some(iter_a),
429                    (None, Some(_)) => Some(iter_b),
430                    (None, None) => None,
431                }
432            })
433            // Pop off the next closest peer from that iterator.
434            .and_then(Iterator::next)
435    }
436}
437
438#[cfg(test)]
439mod tests {
440    use super::*;
441
442    use crate::{K_VALUE, SHA_256_MH};
443    use libp2p_core::multihash::Multihash;
444    use quickcheck::*;
445    use std::collections::HashSet;
446    use std::iter;
447
448    impl Arbitrary for ResultIter<std::vec::IntoIter<Key<PeerId>>> {
449        fn arbitrary(g: &mut Gen) -> Self {
450            let target = Target::arbitrary(g).0;
451            let num_closest_iters = g.gen_range(0..20 + 1);
452            let peers = random_peers(g.gen_range(0..20 * num_closest_iters + 1), g);
453
454            let iters = (0..num_closest_iters).map(|_| {
455                let num_peers = g.gen_range(0..20 + 1);
456                let mut peers = g
457                    .choose_multiple(&peers, num_peers)
458                    .cloned()
459                    .map(Key::from)
460                    .collect::<Vec<_>>();
461
462                peers.sort_unstable_by_key(|a| target.distance(a));
463
464                peers.into_iter()
465            });
466
467            ResultIter::new(target.clone(), iters)
468        }
469
470        fn shrink(&self) -> Box<dyn Iterator<Item = Self>> {
471            let peers = self
472                .iters
473                .clone()
474                .into_iter()
475                .flatten()
476                .collect::<HashSet<_>>()
477                .into_iter()
478                .collect::<Vec<_>>();
479
480            let iters = self
481                .iters
482                .clone()
483                .into_iter()
484                .map(|iter| iter.collect::<Vec<_>>())
485                .collect();
486
487            Box::new(ResultIterShrinker {
488                target: self.target.clone(),
489                peers,
490                iters,
491            })
492        }
493    }
494
495    struct ResultIterShrinker {
496        target: KeyBytes,
497        peers: Vec<Key<PeerId>>,
498        iters: Vec<Vec<Key<PeerId>>>,
499    }
500
501    impl Iterator for ResultIterShrinker {
502        type Item = ResultIter<std::vec::IntoIter<Key<PeerId>>>;
503
504        /// Return an iterator of [`ResultIter`]s with each of them missing a
505        /// different peer from the original set.
506        fn next(&mut self) -> Option<Self::Item> {
507            // The peer that should not be included.
508            let peer = self.peers.pop()?;
509
510            let iters = self.iters.clone().into_iter().filter_map(|mut iter| {
511                iter.retain(|p| p != &peer);
512                if iter.is_empty() {
513                    return None;
514                }
515                Some(iter.into_iter())
516            });
517
518            Some(ResultIter::new(self.target.clone(), iters))
519        }
520    }
521
522    #[derive(Clone, Debug)]
523    struct ArbitraryPeerId(PeerId);
524
525    impl Arbitrary for ArbitraryPeerId {
526        fn arbitrary(g: &mut Gen) -> ArbitraryPeerId {
527            let hash: [u8; 32] = core::array::from_fn(|_| u8::arbitrary(g));
528            let peer_id =
529                PeerId::from_multihash(Multihash::wrap(SHA_256_MH, &hash).unwrap()).unwrap();
530            ArbitraryPeerId(peer_id)
531        }
532    }
533
534    #[derive(Clone, Debug)]
535    struct Target(KeyBytes);
536
537    impl Arbitrary for Target {
538        fn arbitrary(g: &mut Gen) -> Self {
539            let peer_id = ArbitraryPeerId::arbitrary(g).0;
540            Target(Key::from(peer_id).into())
541        }
542    }
543
544    fn random_peers(n: usize, g: &mut Gen) -> Vec<PeerId> {
545        (0..n).map(|_| ArbitraryPeerId::arbitrary(g).0).collect()
546    }
547
548    #[test]
549    fn result_iter_returns_deduplicated_ordered_peer_id_stream() {
550        fn prop(result_iter: ResultIter<std::vec::IntoIter<Key<PeerId>>>) {
551            let expected = {
552                let mut deduplicated = result_iter
553                    .clone()
554                    .iters
555                    .into_iter()
556                    .flatten()
557                    .collect::<HashSet<_>>()
558                    .into_iter()
559                    .map(Key::from)
560                    .collect::<Vec<_>>();
561
562                deduplicated.sort_unstable_by(|a, b| {
563                    result_iter
564                        .target
565                        .distance(a)
566                        .cmp(&result_iter.target.distance(b))
567                });
568
569                deduplicated
570            };
571
572            assert_eq!(expected, result_iter.collect::<Vec<_>>());
573        }
574
575        QuickCheck::new().quickcheck(prop as fn(_))
576    }
577
578    #[derive(Debug, Clone)]
579    struct Parallelism(NonZeroUsize);
580
581    impl Arbitrary for Parallelism {
582        fn arbitrary(g: &mut Gen) -> Self {
583            Parallelism(NonZeroUsize::new(g.gen_range(1..10)).unwrap())
584        }
585    }
586
587    #[derive(Debug, Clone)]
588    struct NumResults(NonZeroUsize);
589
590    impl Arbitrary for NumResults {
591        fn arbitrary(g: &mut Gen) -> Self {
592            NumResults(NonZeroUsize::new(g.gen_range(1..K_VALUE.get())).unwrap())
593        }
594    }
595
596    impl Arbitrary for ClosestPeersIterConfig {
597        fn arbitrary(g: &mut Gen) -> Self {
598            ClosestPeersIterConfig {
599                parallelism: Parallelism::arbitrary(g).0,
600                num_results: NumResults::arbitrary(g).0,
601                peer_timeout: Duration::from_secs(1),
602            }
603        }
604    }
605
606    #[derive(Debug, Clone)]
607    struct PeerVec(Vec<Key<PeerId>>);
608
609    impl Arbitrary for PeerVec {
610        fn arbitrary(g: &mut Gen) -> Self {
611            PeerVec(
612                (0..g.gen_range(1..60u8))
613                    .map(|_| ArbitraryPeerId::arbitrary(g).0)
614                    .map(Key::from)
615                    .collect(),
616            )
617        }
618    }
619
620    #[test]
621    fn s_kademlia_disjoint_paths() {
622        let now = Instant::now();
623        let target: KeyBytes = Key::from(PeerId::random()).into();
624
625        let mut pool = [0; 12]
626            .iter()
627            .map(|_| Key::from(PeerId::random()))
628            .collect::<Vec<_>>();
629
630        pool.sort_unstable_by_key(|a| target.distance(a));
631
632        let known_closest_peers = pool.split_off(pool.len() - 3);
633
634        let config = ClosestPeersIterConfig {
635            parallelism: NonZeroUsize::new(3).unwrap(),
636            num_results: NonZeroUsize::new(3).unwrap(),
637            ..ClosestPeersIterConfig::default()
638        };
639
640        let mut peers_iter =
641            ClosestDisjointPeersIter::with_config(config, target, known_closest_peers.clone());
642
643        ////////////////////////////////////////////////////////////////////////
644        // First round.
645
646        for _ in 0..3 {
647            if let PeersIterState::Waiting(Some(Cow::Owned(peer))) = peers_iter.next(now) {
648                assert!(known_closest_peers.contains(&Key::from(peer)));
649            } else {
650                panic!("Expected iterator to return peer to query.");
651            }
652        }
653
654        assert_eq!(PeersIterState::WaitingAtCapacity, peers_iter.next(now),);
655
656        let response_2 = pool.split_off(pool.len() - 3);
657        let response_3 = pool.split_off(pool.len() - 3);
658        // Keys are closer than any of the previous two responses from honest
659        // node 1 and 2.
660        let malicious_response_1 = pool.split_off(pool.len() - 3);
661
662        // Response from malicious peer 1.
663        peers_iter.on_success(
664            known_closest_peers[0].preimage(),
665            malicious_response_1
666                .clone()
667                .into_iter()
668                .map(|k| *k.preimage()),
669        );
670
671        // Response from peer 2.
672        peers_iter.on_success(
673            known_closest_peers[1].preimage(),
674            response_2.clone().into_iter().map(|k| *k.preimage()),
675        );
676
677        // Response from peer 3.
678        peers_iter.on_success(
679            known_closest_peers[2].preimage(),
680            response_3.clone().into_iter().map(|k| *k.preimage()),
681        );
682
683        ////////////////////////////////////////////////////////////////////////
684        // Second round.
685
686        let mut next_to_query = vec![];
687        for _ in 0..3 {
688            if let PeersIterState::Waiting(Some(Cow::Owned(peer))) = peers_iter.next(now) {
689                next_to_query.push(peer)
690            } else {
691                panic!("Expected iterator to return peer to query.");
692            }
693        }
694
695        // Expect a peer from each disjoint path.
696        assert!(next_to_query.contains(malicious_response_1[0].preimage()));
697        assert!(next_to_query.contains(response_2[0].preimage()));
698        assert!(next_to_query.contains(response_3[0].preimage()));
699
700        for peer in next_to_query {
701            peers_iter.on_success(&peer, vec![]);
702        }
703
704        // Mark all remaining peers as succeeded.
705        for _ in 0..6 {
706            if let PeersIterState::Waiting(Some(Cow::Owned(peer))) = peers_iter.next(now) {
707                peers_iter.on_success(&peer, vec![]);
708            } else {
709                panic!("Expected iterator to return peer to query.");
710            }
711        }
712
713        assert_eq!(PeersIterState::Finished, peers_iter.next(now),);
714
715        let final_peers: Vec<_> = peers_iter.into_result().collect();
716
717        // Expect final result to contain peer from each disjoint path, even
718        // though not all are among the best ones.
719        assert!(final_peers.contains(malicious_response_1[0].preimage()));
720        assert!(final_peers.contains(response_2[0].preimage()));
721        assert!(final_peers.contains(response_3[0].preimage()));
722    }
723
724    #[derive(Clone)]
725    struct Graph(HashMap<PeerId, Peer>);
726
727    impl std::fmt::Debug for Graph {
728        fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
729            fmt.debug_list().entries(self.0.keys()).finish()
730        }
731    }
732
733    impl Arbitrary for Graph {
734        fn arbitrary(g: &mut Gen) -> Self {
735            let mut peer_ids = random_peers(g.gen_range(K_VALUE.get()..200), g)
736                .into_iter()
737                .map(|peer_id| (peer_id, Key::from(peer_id)))
738                .collect::<Vec<_>>();
739
740            // Make each peer aware of its direct neighborhood.
741            let mut peers = peer_ids
742                .clone()
743                .into_iter()
744                .map(|(peer_id, key)| {
745                    peer_ids
746                        .sort_unstable_by(|(_, a), (_, b)| key.distance(a).cmp(&key.distance(b)));
747
748                    assert_eq!(peer_id, peer_ids[0].0);
749
750                    let known_peers = peer_ids
751                        .iter()
752                        // Skip itself.
753                        .skip(1)
754                        .take(K_VALUE.get())
755                        .cloned()
756                        .collect::<Vec<_>>();
757
758                    (peer_id, Peer { known_peers })
759                })
760                .collect::<HashMap<_, _>>();
761
762            // Make each peer aware of a random set of other peers within the graph.
763            for (peer_id, peer) in peers.iter_mut() {
764                g.shuffle(&mut peer_ids);
765
766                let num_peers = g.gen_range(K_VALUE.get()..peer_ids.len() + 1);
767                let mut random_peer_ids = g
768                    .choose_multiple(&peer_ids, num_peers)
769                    // Make sure not to include itself.
770                    .filter(|(id, _)| peer_id != id)
771                    .cloned()
772                    .collect::<Vec<_>>();
773
774                peer.known_peers.append(&mut random_peer_ids);
775                peer.known_peers = std::mem::take(&mut peer.known_peers)
776                    // Deduplicate peer ids.
777                    .into_iter()
778                    .collect::<HashSet<_>>()
779                    .into_iter()
780                    .collect();
781            }
782
783            Graph(peers)
784        }
785    }
786
787    impl Graph {
788        fn get_closest_peer(&self, target: &KeyBytes) -> PeerId {
789            *self
790                .0
791                .keys()
792                .map(|peer_id| (target.distance(&Key::from(*peer_id)), peer_id))
793                .fold(None, |acc, (distance_b, peer_id_b)| match acc {
794                    None => Some((distance_b, peer_id_b)),
795                    Some((distance_a, peer_id_a)) => {
796                        if distance_a < distance_b {
797                            Some((distance_a, peer_id_a))
798                        } else {
799                            Some((distance_b, peer_id_b))
800                        }
801                    }
802                })
803                .expect("Graph to have at least one peer.")
804                .1
805        }
806    }
807
808    #[derive(Debug, Clone)]
809    struct Peer {
810        known_peers: Vec<(PeerId, Key<PeerId>)>,
811    }
812
813    impl Peer {
814        fn get_closest_peers(&mut self, target: &KeyBytes) -> Vec<PeerId> {
815            self.known_peers
816                .sort_unstable_by(|(_, a), (_, b)| target.distance(a).cmp(&target.distance(b)));
817
818            self.known_peers
819                .iter()
820                .take(K_VALUE.get())
821                .map(|(id, _)| id)
822                .cloned()
823                .collect()
824        }
825    }
826
827    enum PeerIterator {
828        Disjoint(ClosestDisjointPeersIter),
829        Closest(ClosestPeersIter),
830    }
831
832    impl PeerIterator {
833        fn next(&mut self, now: Instant) -> PeersIterState<'_> {
834            match self {
835                PeerIterator::Disjoint(iter) => iter.next(now),
836                PeerIterator::Closest(iter) => iter.next(now),
837            }
838        }
839
840        fn on_success(&mut self, peer: &PeerId, closer_peers: Vec<PeerId>) {
841            match self {
842                PeerIterator::Disjoint(iter) => iter.on_success(peer, closer_peers),
843                PeerIterator::Closest(iter) => iter.on_success(peer, closer_peers),
844            };
845        }
846
847        fn into_result(self) -> Vec<PeerId> {
848            match self {
849                PeerIterator::Disjoint(iter) => iter.into_result().collect(),
850                PeerIterator::Closest(iter) => iter.into_result().collect(),
851            }
852        }
853    }
854
855    /// Ensure [`ClosestPeersIter`] and [`ClosestDisjointPeersIter`] yield same closest peers.
856    #[test]
857    fn closest_and_disjoint_closest_yield_same_result() {
858        fn prop(
859            target: Target,
860            graph: Graph,
861            parallelism: Parallelism,
862            num_results: NumResults,
863        ) -> TestResult {
864            if parallelism.0 > num_results.0 {
865                return TestResult::discard();
866            }
867
868            let target: KeyBytes = target.0;
869            let closest_peer = graph.get_closest_peer(&target);
870
871            let mut known_closest_peers = graph
872                .0
873                .iter()
874                .take(K_VALUE.get())
875                .map(|(key, _peers)| Key::from(*key))
876                .collect::<Vec<_>>();
877            known_closest_peers.sort_unstable_by_key(|a| target.distance(a));
878
879            let cfg = ClosestPeersIterConfig {
880                parallelism: parallelism.0,
881                num_results: num_results.0,
882                ..ClosestPeersIterConfig::default()
883            };
884
885            let closest = drive_to_finish(
886                PeerIterator::Closest(ClosestPeersIter::with_config(
887                    cfg.clone(),
888                    target.clone(),
889                    known_closest_peers.clone(),
890                )),
891                graph.clone(),
892                &target,
893            );
894
895            let disjoint = drive_to_finish(
896                PeerIterator::Disjoint(ClosestDisjointPeersIter::with_config(
897                    cfg,
898                    target.clone(),
899                    known_closest_peers.clone(),
900                )),
901                graph,
902                &target,
903            );
904
905            assert!(
906                closest.contains(&closest_peer),
907                "Expected `ClosestPeersIter` to find closest peer.",
908            );
909            assert!(
910                disjoint.contains(&closest_peer),
911                "Expected `ClosestDisjointPeersIter` to find closest peer.",
912            );
913
914            assert!(
915                closest.len() == num_results.0.get(),
916                "Expected `ClosestPeersIter` to find `num_results` closest \
917                 peers."
918            );
919            assert!(
920                disjoint.len() >= num_results.0.get(),
921                "Expected `ClosestDisjointPeersIter` to find at least \
922                 `num_results` closest peers."
923            );
924
925            if closest.len() > disjoint.len() {
926                let closest_only = closest.difference(&disjoint).collect::<Vec<_>>();
927
928                panic!(
929                    "Expected `ClosestDisjointPeersIter` to find all peers \
930                     found by `ClosestPeersIter`, but it did not find {closest_only:?}.",
931                );
932            };
933
934            TestResult::passed()
935        }
936
937        fn drive_to_finish(
938            mut iter: PeerIterator,
939            mut graph: Graph,
940            target: &KeyBytes,
941        ) -> HashSet<PeerId> {
942            let now = Instant::now();
943            loop {
944                match iter.next(now) {
945                    PeersIterState::Waiting(Some(peer_id)) => {
946                        let peer_id = peer_id.clone().into_owned();
947                        let closest_peers =
948                            graph.0.get_mut(&peer_id).unwrap().get_closest_peers(target);
949                        iter.on_success(&peer_id, closest_peers);
950                    }
951                    PeersIterState::WaitingAtCapacity | PeersIterState::Waiting(None) => {
952                        panic!("There is never more than one request in flight.")
953                    }
954                    PeersIterState::Finished => break,
955                }
956            }
957
958            let mut result = iter
959                .into_result()
960                .into_iter()
961                .map(Key::from)
962                .collect::<Vec<_>>();
963            result.sort_unstable_by_key(|a| target.distance(a));
964            result.into_iter().map(|k| k.into_preimage()).collect()
965        }
966
967        QuickCheck::new()
968            .tests(10)
969            .quickcheck(prop as fn(_, _, _, _) -> _)
970    }
971
972    #[test]
973    fn failure_can_not_overwrite_previous_success() {
974        let now = Instant::now();
975        let peer = PeerId::random();
976        let mut iter = ClosestDisjointPeersIter::new(
977            Key::from(PeerId::random()).into(),
978            iter::once(Key::from(peer)),
979        );
980
981        assert!(matches!(iter.next(now), PeersIterState::Waiting(Some(_))));
982
983        // Expect peer to be marked as succeeded.
984        assert!(iter.on_success(&peer, iter::empty()));
985        assert_eq!(
986            iter.contacted_peers.get(&peer),
987            Some(&PeerState {
988                initiated_by: IteratorIndex(0),
989                response: ResponseState::Succeeded,
990            })
991        );
992
993        // Expect peer to stay marked as succeeded.
994        assert!(!iter.on_failure(&peer));
995        assert_eq!(
996            iter.contacted_peers.get(&peer),
997            Some(&PeerState {
998                initiated_by: IteratorIndex(0),
999                response: ResponseState::Succeeded,
1000            })
1001        );
1002    }
1003}