libp2p_kad/query/peers/
closest.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use super::*;
22
23use crate::kbucket::{Distance, Key, KeyBytes};
24use crate::{ALPHA_VALUE, K_VALUE};
25use instant::Instant;
26use libp2p_identity::PeerId;
27use std::collections::btree_map::{BTreeMap, Entry};
28use std::{iter::FromIterator, num::NonZeroUsize, time::Duration};
29
30pub(crate) mod disjoint;
31/// A peer iterator for a dynamically changing list of peers, sorted by increasing
32/// distance to a chosen target.
33#[derive(Debug, Clone)]
34pub struct ClosestPeersIter {
35    config: ClosestPeersIterConfig,
36
37    /// The target whose distance to any peer determines the position of
38    /// the peer in the iterator.
39    target: KeyBytes,
40
41    /// The internal iterator state.
42    state: State,
43
44    /// The closest peers to the target, ordered by increasing distance.
45    closest_peers: BTreeMap<Distance, Peer>,
46
47    /// The number of peers for which the iterator is currently waiting for results.
48    num_waiting: usize,
49}
50
51/// Configuration for a `ClosestPeersIter`.
52#[derive(Debug, Clone)]
53pub struct ClosestPeersIterConfig {
54    /// Allowed level of parallelism.
55    ///
56    /// The `α` parameter in the Kademlia paper. The maximum number of peers that
57    /// the iterator is allowed to wait for in parallel while iterating towards the closest
58    /// nodes to a target. Defaults to `ALPHA_VALUE`.
59    pub parallelism: NonZeroUsize,
60
61    /// Number of results (closest peers) to search for.
62    ///
63    /// The number of closest peers for which the iterator must obtain successful results
64    /// in order to finish successfully. Defaults to `K_VALUE`.
65    pub num_results: NonZeroUsize,
66
67    /// The timeout for a single peer.
68    ///
69    /// If a successful result is not reported for a peer within this timeout
70    /// window, the iterator considers the peer unresponsive and will not wait for
71    /// the peer when evaluating the termination conditions, until and unless a
72    /// result is delivered. Defaults to `10` seconds.
73    pub peer_timeout: Duration,
74}
75
76impl Default for ClosestPeersIterConfig {
77    fn default() -> Self {
78        ClosestPeersIterConfig {
79            parallelism: ALPHA_VALUE,
80            num_results: K_VALUE,
81            peer_timeout: Duration::from_secs(10),
82        }
83    }
84}
85
86impl ClosestPeersIter {
87    /// Creates a new iterator with a default configuration.
88    pub fn new<I>(target: KeyBytes, known_closest_peers: I) -> Self
89    where
90        I: IntoIterator<Item = Key<PeerId>>,
91    {
92        Self::with_config(
93            ClosestPeersIterConfig::default(),
94            target,
95            known_closest_peers,
96        )
97    }
98
99    /// Creates a new iterator with the given configuration.
100    pub fn with_config<I, T>(
101        config: ClosestPeersIterConfig,
102        target: T,
103        known_closest_peers: I,
104    ) -> Self
105    where
106        I: IntoIterator<Item = Key<PeerId>>,
107        T: Into<KeyBytes>,
108    {
109        let target = target.into();
110
111        // Initialise the closest peers to start the iterator with.
112        let closest_peers = BTreeMap::from_iter(
113            known_closest_peers
114                .into_iter()
115                .map(|key| {
116                    let distance = key.distance(&target);
117                    let state = PeerState::NotContacted;
118                    (distance, Peer { key, state })
119                })
120                .take(K_VALUE.into()),
121        );
122
123        // The iterator initially makes progress by iterating towards the target.
124        let state = State::Iterating { no_progress: 0 };
125
126        ClosestPeersIter {
127            config,
128            target,
129            state,
130            closest_peers,
131            num_waiting: 0,
132        }
133    }
134
135    /// Callback for delivering the result of a successful request to a peer.
136    ///
137    /// Delivering results of requests back to the iterator allows the iterator to make
138    /// progress. The iterator is said to make progress either when the given
139    /// `closer_peers` contain a peer closer to the target than any peer seen so far,
140    /// or when the iterator did not yet accumulate `num_results` closest peers and
141    /// `closer_peers` contains a new peer, regardless of its distance to the target.
142    ///
143    /// If the iterator is currently waiting for a result from `peer`,
144    /// the iterator state is updated and `true` is returned. In that
145    /// case, after calling this function, `next` should eventually be
146    /// called again to obtain the new state of the iterator.
147    ///
148    /// If the iterator is finished, it is not currently waiting for a
149    /// result from `peer`, or a result for `peer` has already been reported,
150    /// calling this function has no effect and `false` is returned.
151    pub fn on_success<I>(&mut self, peer: &PeerId, closer_peers: I) -> bool
152    where
153        I: IntoIterator<Item = PeerId>,
154    {
155        if let State::Finished = self.state {
156            return false;
157        }
158
159        let key = Key::from(*peer);
160        let distance = key.distance(&self.target);
161
162        // Mark the peer as succeeded.
163        match self.closest_peers.entry(distance) {
164            Entry::Vacant(..) => return false,
165            Entry::Occupied(mut e) => match e.get().state {
166                PeerState::Waiting(..) => {
167                    debug_assert!(self.num_waiting > 0);
168                    self.num_waiting -= 1;
169                    e.get_mut().state = PeerState::Succeeded;
170                }
171                PeerState::Unresponsive => {
172                    e.get_mut().state = PeerState::Succeeded;
173                }
174                PeerState::NotContacted | PeerState::Failed | PeerState::Succeeded => return false,
175            },
176        }
177
178        let num_closest = self.closest_peers.len();
179        let mut progress = false;
180
181        // Incorporate the reported closer peers into the iterator.
182        for peer in closer_peers {
183            let key = peer.into();
184            let distance = self.target.distance(&key);
185            let peer = Peer {
186                key,
187                state: PeerState::NotContacted,
188            };
189            self.closest_peers.entry(distance).or_insert(peer);
190            // The iterator makes progress if the new peer is either closer to the target
191            // than any peer seen so far (i.e. is the first entry), or the iterator did
192            // not yet accumulate enough closest peers.
193            progress = self.closest_peers.keys().next() == Some(&distance)
194                || num_closest < self.config.num_results.get();
195        }
196
197        // Update the iterator state.
198        self.state = match self.state {
199            State::Iterating { no_progress } => {
200                let no_progress = if progress { 0 } else { no_progress + 1 };
201                if no_progress >= self.config.parallelism.get() {
202                    State::Stalled
203                } else {
204                    State::Iterating { no_progress }
205                }
206            }
207            State::Stalled => {
208                if progress {
209                    State::Iterating { no_progress: 0 }
210                } else {
211                    State::Stalled
212                }
213            }
214            State::Finished => State::Finished,
215        };
216
217        true
218    }
219
220    /// Callback for informing the iterator about a failed request to a peer.
221    ///
222    /// If the iterator is currently waiting for a result from `peer`,
223    /// the iterator state is updated and `true` is returned. In that
224    /// case, after calling this function, `next` should eventually be
225    /// called again to obtain the new state of the iterator.
226    ///
227    /// If the iterator is finished, it is not currently waiting for a
228    /// result from `peer`, or a result for `peer` has already been reported,
229    /// calling this function has no effect and `false` is returned.
230    pub fn on_failure(&mut self, peer: &PeerId) -> bool {
231        if let State::Finished = self.state {
232            return false;
233        }
234
235        let key = Key::from(*peer);
236        let distance = key.distance(&self.target);
237
238        match self.closest_peers.entry(distance) {
239            Entry::Vacant(_) => return false,
240            Entry::Occupied(mut e) => match e.get().state {
241                PeerState::Waiting(_) => {
242                    debug_assert!(self.num_waiting > 0);
243                    self.num_waiting -= 1;
244                    e.get_mut().state = PeerState::Failed
245                }
246                PeerState::Unresponsive => e.get_mut().state = PeerState::Failed,
247                PeerState::NotContacted | PeerState::Failed | PeerState::Succeeded => return false,
248            },
249        }
250
251        true
252    }
253
254    /// Returns the list of peers for which the iterator is currently waiting
255    /// for results.
256    pub fn waiting(&self) -> impl Iterator<Item = &PeerId> {
257        self.closest_peers
258            .values()
259            .filter_map(|peer| match peer.state {
260                PeerState::Waiting(..) => Some(peer.key.preimage()),
261                _ => None,
262            })
263    }
264
265    /// Returns the number of peers for which the iterator is currently
266    /// waiting for results.
267    pub fn num_waiting(&self) -> usize {
268        self.num_waiting
269    }
270
271    /// Returns true if the iterator is waiting for a response from the given peer.
272    pub fn is_waiting(&self, peer: &PeerId) -> bool {
273        self.waiting().any(|p| peer == p)
274    }
275
276    /// Advances the state of the iterator, potentially getting a new peer to contact.
277    pub fn next(&mut self, now: Instant) -> PeersIterState<'_> {
278        if let State::Finished = self.state {
279            return PeersIterState::Finished;
280        }
281
282        // Count the number of peers that returned a result. If there is a
283        // request in progress to one of the `num_results` closest peers, the
284        // counter is set to `None` as the iterator can only finish once
285        // `num_results` closest peers have responded (or there are no more
286        // peers to contact, see `num_waiting`).
287        let mut result_counter = Some(0);
288
289        // Check if the iterator is at capacity w.r.t. the allowed parallelism.
290        let at_capacity = self.at_capacity();
291
292        for peer in self.closest_peers.values_mut() {
293            match peer.state {
294                PeerState::Waiting(timeout) => {
295                    if now >= timeout {
296                        // Unresponsive peers no longer count towards the limit for the
297                        // bounded parallelism, though they might still be ongoing and
298                        // their results can still be delivered to the iterator.
299                        debug_assert!(self.num_waiting > 0);
300                        self.num_waiting -= 1;
301                        peer.state = PeerState::Unresponsive
302                    } else if at_capacity {
303                        // The iterator is still waiting for a result from a peer and is
304                        // at capacity w.r.t. the maximum number of peers being waited on.
305                        return PeersIterState::WaitingAtCapacity;
306                    } else {
307                        // The iterator is still waiting for a result from a peer and the
308                        // `result_counter` did not yet reach `num_results`. Therefore
309                        // the iterator is not yet done, regardless of already successful
310                        // queries to peers farther from the target.
311                        result_counter = None;
312                    }
313                }
314
315                PeerState::Succeeded => {
316                    if let Some(ref mut cnt) = result_counter {
317                        *cnt += 1;
318                        // If `num_results` successful results have been delivered for the
319                        // closest peers, the iterator is done.
320                        if *cnt >= self.config.num_results.get() {
321                            self.state = State::Finished;
322                            return PeersIterState::Finished;
323                        }
324                    }
325                }
326
327                PeerState::NotContacted => {
328                    if !at_capacity {
329                        let timeout = now + self.config.peer_timeout;
330                        peer.state = PeerState::Waiting(timeout);
331                        self.num_waiting += 1;
332                        return PeersIterState::Waiting(Some(Cow::Borrowed(peer.key.preimage())));
333                    } else {
334                        return PeersIterState::WaitingAtCapacity;
335                    }
336                }
337
338                PeerState::Unresponsive | PeerState::Failed => {
339                    // Skip over unresponsive or failed peers.
340                }
341            }
342        }
343
344        if self.num_waiting > 0 {
345            // The iterator is still waiting for results and not at capacity w.r.t.
346            // the allowed parallelism, but there are no new peers to contact
347            // at the moment.
348            PeersIterState::Waiting(None)
349        } else {
350            // The iterator is finished because all available peers have been contacted
351            // and the iterator is not waiting for any more results.
352            self.state = State::Finished;
353            PeersIterState::Finished
354        }
355    }
356
357    /// Immediately transitions the iterator to [`PeersIterState::Finished`].
358    pub fn finish(&mut self) {
359        self.state = State::Finished
360    }
361
362    /// Checks whether the iterator has finished.
363    pub fn is_finished(&self) -> bool {
364        self.state == State::Finished
365    }
366
367    /// Consumes the iterator, returning the closest peers.
368    pub fn into_result(self) -> impl Iterator<Item = PeerId> {
369        self.closest_peers
370            .into_iter()
371            .filter_map(|(_, peer)| {
372                if let PeerState::Succeeded = peer.state {
373                    Some(peer.key.into_preimage())
374                } else {
375                    None
376                }
377            })
378            .take(self.config.num_results.get())
379    }
380
381    /// Checks if the iterator is at capacity w.r.t. the permitted parallelism.
382    ///
383    /// While the iterator is stalled, up to `num_results` parallel requests
384    /// are allowed. This is a slightly more permissive variant of the
385    /// requirement that the initiator "resends the FIND_NODE to all of the
386    /// k closest nodes it has not already queried".
387    fn at_capacity(&self) -> bool {
388        match self.state {
389            State::Stalled => {
390                self.num_waiting
391                    >= usize::max(self.config.num_results.get(), self.config.parallelism.get())
392            }
393            State::Iterating { .. } => self.num_waiting >= self.config.parallelism.get(),
394            State::Finished => true,
395        }
396    }
397}
398
399////////////////////////////////////////////////////////////////////////////////
400// Private state
401
402/// Internal state of the iterator.
403#[derive(Debug, PartialEq, Eq, Copy, Clone)]
404enum State {
405    /// The iterator is making progress by iterating towards `num_results` closest
406    /// peers to the target with a maximum of `parallelism` peers for which the
407    /// iterator is waiting for results at a time.
408    ///
409    /// > **Note**: When the iterator switches back to `Iterating` after being
410    /// > `Stalled`, it may temporarily be waiting for more than `parallelism`
411    /// > results from peers, with new peers only being considered once
412    /// > the number pending results drops below `parallelism`.
413    Iterating {
414        /// The number of consecutive results that did not yield a peer closer
415        /// to the target. When this number reaches `parallelism` and no new
416        /// peer was discovered or at least `num_results` peers are known to
417        /// the iterator, it is considered `Stalled`.
418        no_progress: usize,
419    },
420
421    /// A iterator is stalled when it did not make progress after `parallelism`
422    /// consecutive successful results (see `on_success`).
423    ///
424    /// While the iterator is stalled, the maximum allowed parallelism for pending
425    /// results is increased to `num_results` in an attempt to finish the iterator.
426    /// If the iterator can make progress again upon receiving the remaining
427    /// results, it switches back to `Iterating`. Otherwise it will be finished.
428    Stalled,
429
430    /// The iterator is finished.
431    ///
432    /// A iterator finishes either when it has collected `num_results` results
433    /// from the closest peers (not counting those that failed or are unresponsive)
434    /// or because the iterator ran out of peers that have not yet delivered
435    /// results (or failed).
436    Finished,
437}
438
439/// Representation of a peer in the context of a iterator.
440#[derive(Debug, Clone)]
441struct Peer {
442    key: Key<PeerId>,
443    state: PeerState,
444}
445
446/// The state of a single `Peer`.
447#[derive(Debug, Copy, Clone)]
448enum PeerState {
449    /// The peer has not yet been contacted.
450    ///
451    /// This is the starting state for every peer.
452    NotContacted,
453
454    /// The iterator is waiting for a result from the peer.
455    Waiting(Instant),
456
457    /// A result was not delivered for the peer within the configured timeout.
458    ///
459    /// The peer is not taken into account for the termination conditions
460    /// of the iterator until and unless it responds.
461    Unresponsive,
462
463    /// Obtaining a result from the peer has failed.
464    ///
465    /// This is a final state, reached as a result of a call to `on_failure`.
466    Failed,
467
468    /// A successful result from the peer has been delivered.
469    ///
470    /// This is a final state, reached as a result of a call to `on_success`.
471    Succeeded,
472}
473
474#[cfg(test)]
475mod tests {
476    use super::*;
477    use crate::SHA_256_MH;
478    use libp2p_core::multihash::Multihash;
479    use libp2p_identity::PeerId;
480    use quickcheck::*;
481    use rand::{rngs::StdRng, Rng, SeedableRng};
482    use std::{iter, time::Duration};
483
484    fn random_peers<R: Rng>(n: usize, g: &mut R) -> Vec<PeerId> {
485        (0..n)
486            .map(|_| {
487                PeerId::from_multihash(Multihash::wrap(SHA_256_MH, &g.gen::<[u8; 32]>()).unwrap())
488                    .unwrap()
489            })
490            .collect()
491    }
492
493    fn sorted<T: AsRef<KeyBytes>>(target: &T, peers: &[Key<PeerId>]) -> bool {
494        peers
495            .windows(2)
496            .all(|w| w[0].distance(&target) < w[1].distance(&target))
497    }
498
499    #[derive(Clone, Debug)]
500    struct ArbitraryPeerId(PeerId);
501
502    impl Arbitrary for ArbitraryPeerId {
503        fn arbitrary(g: &mut Gen) -> ArbitraryPeerId {
504            let hash: [u8; 32] = core::array::from_fn(|_| u8::arbitrary(g));
505            let peer_id =
506                PeerId::from_multihash(Multihash::wrap(SHA_256_MH, &hash).unwrap()).unwrap();
507            ArbitraryPeerId(peer_id)
508        }
509    }
510
511    impl Arbitrary for ClosestPeersIter {
512        fn arbitrary(g: &mut Gen) -> ClosestPeersIter {
513            let known_closest_peers = (0..g.gen_range(1..60u8))
514                .map(|_| Key::from(ArbitraryPeerId::arbitrary(g).0))
515                .collect::<Vec<_>>();
516            let target = Key::from(ArbitraryPeerId::arbitrary(g).0);
517            let config = ClosestPeersIterConfig {
518                parallelism: NonZeroUsize::new(g.gen_range(1..10)).unwrap(),
519                num_results: NonZeroUsize::new(g.gen_range(1..25)).unwrap(),
520                peer_timeout: Duration::from_secs(g.gen_range(10..30)),
521            };
522            ClosestPeersIter::with_config(config, target, known_closest_peers)
523        }
524    }
525
526    #[derive(Clone, Debug)]
527    struct Seed([u8; 32]);
528
529    impl Arbitrary for Seed {
530        fn arbitrary(g: &mut Gen) -> Seed {
531            let seed = core::array::from_fn(|_| u8::arbitrary(g));
532            Seed(seed)
533        }
534    }
535
536    #[test]
537    fn new_iter() {
538        fn prop(iter: ClosestPeersIter) {
539            let target = iter.target.clone();
540
541            let (keys, states): (Vec<_>, Vec<_>) = iter
542                .closest_peers
543                .values()
544                .map(|e| (e.key.clone(), &e.state))
545                .unzip();
546
547            let none_contacted = states.iter().all(|s| matches!(s, PeerState::NotContacted));
548
549            assert!(none_contacted, "Unexpected peer state in new iterator.");
550            assert!(
551                sorted(&target, &keys),
552                "Closest peers in new iterator not sorted by distance to target."
553            );
554            assert_eq!(
555                iter.num_waiting(),
556                0,
557                "Unexpected peers in progress in new iterator."
558            );
559            assert_eq!(
560                iter.into_result().count(),
561                0,
562                "Unexpected closest peers in new iterator"
563            );
564        }
565
566        QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _)
567    }
568
569    #[test]
570    fn termination_and_parallelism() {
571        fn prop(mut iter: ClosestPeersIter, seed: Seed) {
572            let now = Instant::now();
573            let mut rng = StdRng::from_seed(seed.0);
574
575            let mut expected = iter
576                .closest_peers
577                .values()
578                .map(|e| e.key.clone())
579                .collect::<Vec<_>>();
580            let num_known = expected.len();
581            let max_parallelism = usize::min(iter.config.parallelism.get(), num_known);
582
583            let target = iter.target.clone();
584            let mut remaining;
585            let mut num_failures = 0;
586
587            'finished: loop {
588                if expected.is_empty() {
589                    break;
590                }
591                // Split off the next up to `parallelism` expected peers.
592                else if expected.len() < max_parallelism {
593                    remaining = Vec::new();
594                } else {
595                    remaining = expected.split_off(max_parallelism);
596                }
597
598                // Advance for maximum parallelism.
599                for k in expected.iter() {
600                    match iter.next(now) {
601                        PeersIterState::Finished => break 'finished,
602                        PeersIterState::Waiting(Some(p)) => assert_eq!(&*p, k.preimage()),
603                        PeersIterState::Waiting(None) => panic!("Expected another peer."),
604                        PeersIterState::WaitingAtCapacity => {
605                            panic!("Unexpectedly reached capacity.")
606                        }
607                    }
608                }
609                let num_waiting = iter.num_waiting();
610                assert_eq!(num_waiting, expected.len());
611
612                // Check the bounded parallelism.
613                if iter.at_capacity() {
614                    assert_eq!(iter.next(now), PeersIterState::WaitingAtCapacity)
615                }
616
617                // Report results back to the iterator with a random number of "closer"
618                // peers or an error, thus finishing the "in-flight requests".
619                for (i, k) in expected.iter().enumerate() {
620                    if rng.gen_bool(0.75) {
621                        let num_closer = rng.gen_range(0..iter.config.num_results.get() + 1);
622                        let closer_peers = random_peers(num_closer, &mut rng);
623                        remaining.extend(closer_peers.iter().cloned().map(Key::from));
624                        iter.on_success(k.preimage(), closer_peers);
625                    } else {
626                        num_failures += 1;
627                        iter.on_failure(k.preimage());
628                    }
629                    assert_eq!(iter.num_waiting(), num_waiting - (i + 1));
630                }
631
632                // Re-sort the remaining expected peers for the next "round".
633                remaining.sort_by_key(|k| target.distance(&k));
634
635                expected = remaining
636            }
637
638            // The iterator must be finished.
639            assert_eq!(iter.next(now), PeersIterState::Finished);
640            assert_eq!(iter.state, State::Finished);
641
642            // Determine if all peers have been contacted by the iterator. This _must_ be
643            // the case if the iterator finished with fewer than the requested number
644            // of results.
645            let all_contacted = iter
646                .closest_peers
647                .values()
648                .all(|e| !matches!(e.state, PeerState::NotContacted | PeerState::Waiting { .. }));
649
650            let target = iter.target.clone();
651            let num_results = iter.config.num_results;
652            let result = iter.into_result();
653            let closest = result.map(Key::from).collect::<Vec<_>>();
654
655            assert!(sorted(&target, &closest));
656
657            if closest.len() < num_results.get() {
658                // The iterator returned fewer results than requested. Therefore
659                // either the initial number of known peers must have been
660                // less than the desired number of results, or there must
661                // have been failures.
662                assert!(num_known < num_results.get() || num_failures > 0);
663                // All peers must have been contacted.
664                assert!(all_contacted, "Not all peers have been contacted.");
665            } else {
666                assert_eq!(num_results.get(), closest.len(), "Too  many results.");
667            }
668        }
669
670        QuickCheck::new()
671            .tests(10)
672            .quickcheck(prop as fn(_, _) -> _)
673    }
674
675    #[test]
676    fn no_duplicates() {
677        fn prop(mut iter: ClosestPeersIter, closer: ArbitraryPeerId) -> bool {
678            let now = Instant::now();
679
680            let closer = vec![closer.0];
681
682            // A first peer reports a "closer" peer.
683            let peer1 = match iter.next(now) {
684                PeersIterState::Waiting(Some(p)) => p.into_owned(),
685                _ => panic!("No peer."),
686            };
687            iter.on_success(&peer1, closer.clone());
688            // Duplicate result from te same peer.
689            iter.on_success(&peer1, closer.clone());
690
691            // If there is a second peer, let it also report the same "closer" peer.
692            match iter.next(now) {
693                PeersIterState::Waiting(Some(p)) => {
694                    let peer2 = p.into_owned();
695                    assert!(iter.on_success(&peer2, closer.clone()))
696                }
697                PeersIterState::Finished => {}
698                _ => panic!("Unexpectedly iter state."),
699            };
700
701            // The "closer" peer must only be in the iterator once.
702            let n = iter
703                .closest_peers
704                .values()
705                .filter(|e| e.key.preimage() == &closer[0])
706                .count();
707            assert_eq!(n, 1);
708
709            true
710        }
711
712        QuickCheck::new()
713            .tests(10)
714            .quickcheck(prop as fn(_, _) -> _)
715    }
716
717    #[test]
718    fn timeout() {
719        fn prop(mut iter: ClosestPeersIter) -> bool {
720            let mut now = Instant::now();
721            let peer = iter
722                .closest_peers
723                .values()
724                .next()
725                .unwrap()
726                .key
727                .clone()
728                .into_preimage();
729
730            // Poll the iterator for the first peer to be in progress.
731            match iter.next(now) {
732                PeersIterState::Waiting(Some(id)) => assert_eq!(&*id, &peer),
733                _ => panic!(),
734            }
735
736            // Artificially advance the clock.
737            now += iter.config.peer_timeout;
738
739            // Advancing the iterator again should mark the first peer as unresponsive.
740            let _ = iter.next(now);
741            match &iter.closest_peers.values().next().unwrap() {
742                Peer {
743                    key,
744                    state: PeerState::Unresponsive,
745                } => {
746                    assert_eq!(key.preimage(), &peer);
747                }
748                Peer { state, .. } => panic!("Unexpected peer state: {state:?}"),
749            }
750
751            let finished = iter.is_finished();
752            iter.on_success(&peer, iter::empty());
753            let closest = iter.into_result().collect::<Vec<_>>();
754
755            if finished {
756                // Delivering results when the iterator already finished must have
757                // no effect.
758                assert_eq!(Vec::<PeerId>::new(), closest)
759            } else {
760                // Unresponsive peers can still deliver results while the iterator
761                // is not finished.
762                assert_eq!(vec![peer], closest)
763            }
764            true
765        }
766
767        QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _)
768    }
769
770    #[test]
771    fn without_success_try_up_to_k_peers() {
772        fn prop(mut iter: ClosestPeersIter) {
773            let now = Instant::now();
774
775            for _ in 0..(usize::min(iter.closest_peers.len(), K_VALUE.get())) {
776                match iter.next(now) {
777                    PeersIterState::Waiting(Some(p)) => {
778                        let peer = p.clone().into_owned();
779                        iter.on_failure(&peer);
780                    }
781                    _ => panic!("Expected iterator to yield another peer to query."),
782                }
783            }
784
785            assert_eq!(PeersIterState::Finished, iter.next(now));
786        }
787
788        QuickCheck::new().tests(10).quickcheck(prop as fn(_))
789    }
790
791    #[test]
792    fn stalled_at_capacity() {
793        fn prop(mut iter: ClosestPeersIter) {
794            iter.state = State::Stalled;
795
796            for i in 0..usize::max(iter.config.parallelism.get(), iter.config.num_results.get()) {
797                iter.num_waiting = i;
798                assert!(
799                    !iter.at_capacity(),
800                    "Iterator should not be at capacity if less than \
801                     `max(parallelism, num_results)` requests are waiting.",
802                )
803            }
804
805            iter.num_waiting =
806                usize::max(iter.config.parallelism.get(), iter.config.num_results.get());
807            assert!(
808                iter.at_capacity(),
809                "Iterator should be at capacity if `max(parallelism, num_results)` requests are \
810                 waiting.",
811            )
812        }
813
814        QuickCheck::new().tests(10).quickcheck(prop as fn(_))
815    }
816}