libp2p_kad/
query.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21mod peers;
22
23use libp2p_core::Multiaddr;
24use peers::closest::{
25    disjoint::ClosestDisjointPeersIter, ClosestPeersIter, ClosestPeersIterConfig,
26};
27use peers::fixed::FixedPeersIter;
28use peers::PeersIterState;
29use smallvec::SmallVec;
30
31use crate::behaviour::PeerInfo;
32use crate::handler::HandlerIn;
33use crate::kbucket::{Key, KeyBytes};
34use crate::{QueryInfo, ALPHA_VALUE, K_VALUE};
35use either::Either;
36use fnv::FnvHashMap;
37use libp2p_identity::PeerId;
38use std::{num::NonZeroUsize, time::Duration};
39use web_time::Instant;
40
41/// A `QueryPool` provides an aggregate state machine for driving `Query`s to completion.
42///
43/// Internally, a `Query` is in turn driven by an underlying `QueryPeerIter`
44/// that determines the peer selection strategy, i.e. the order in which the
45/// peers involved in the query should be contacted.
46pub(crate) struct QueryPool {
47    next_id: usize,
48    config: QueryConfig,
49    queries: FnvHashMap<QueryId, Query>,
50}
51
52/// The observable states emitted by [`QueryPool::poll`].
53pub(crate) enum QueryPoolState<'a> {
54    /// The pool is idle, i.e. there are no queries to process.
55    Idle,
56    /// At least one query is waiting for results. `Some(request)` indicates
57    /// that a new request is now being waited on.
58    Waiting(Option<(&'a mut Query, PeerId)>),
59    /// A query has finished.
60    Finished(Query),
61    /// A query has timed out.
62    Timeout(Query),
63}
64
65impl QueryPool {
66    /// Creates a new `QueryPool` with the given configuration.
67    pub(crate) fn new(config: QueryConfig) -> Self {
68        QueryPool {
69            next_id: 0,
70            config,
71            queries: Default::default(),
72        }
73    }
74
75    /// Gets a reference to the `QueryConfig` used by the pool.
76    pub(crate) fn config(&self) -> &QueryConfig {
77        &self.config
78    }
79
80    /// Returns an iterator over the queries in the pool.
81    pub(crate) fn iter(&self) -> impl Iterator<Item = &Query> {
82        self.queries.values()
83    }
84
85    /// Gets the current size of the pool, i.e. the number of running queries.
86    pub(crate) fn size(&self) -> usize {
87        self.queries.len()
88    }
89
90    /// Returns an iterator that allows modifying each query in the pool.
91    pub(crate) fn iter_mut(&mut self) -> impl Iterator<Item = &mut Query> {
92        self.queries.values_mut()
93    }
94
95    /// Adds a query to the pool that contacts a fixed set of peers.
96    pub(crate) fn add_fixed<I>(&mut self, peers: I, info: QueryInfo) -> QueryId
97    where
98        I: IntoIterator<Item = PeerId>,
99    {
100        let id = self.next_query_id();
101        self.continue_fixed(id, peers, info);
102        id
103    }
104
105    /// Continues an earlier query with a fixed set of peers, reusing
106    /// the given query ID, which must be from a query that finished
107    /// earlier.
108    pub(crate) fn continue_fixed<I>(&mut self, id: QueryId, peers: I, info: QueryInfo)
109    where
110        I: IntoIterator<Item = PeerId>,
111    {
112        assert!(!self.queries.contains_key(&id));
113        let parallelism = self.config.replication_factor;
114        let peer_iter = QueryPeerIter::Fixed(FixedPeersIter::new(peers, parallelism));
115        let query = Query::new(id, peer_iter, info);
116        self.queries.insert(id, query);
117    }
118
119    /// Adds a query to the pool that iterates towards the closest peers to the target.
120    pub(crate) fn add_iter_closest<T, I>(&mut self, target: T, peers: I, info: QueryInfo) -> QueryId
121    where
122        T: Into<KeyBytes> + Clone,
123        I: IntoIterator<Item = Key<PeerId>>,
124    {
125        let id = self.next_query_id();
126        self.continue_iter_closest(id, target, peers, info);
127        id
128    }
129
130    /// Adds a query to the pool that iterates towards the closest peers to the target.
131    pub(crate) fn continue_iter_closest<T, I>(
132        &mut self,
133        id: QueryId,
134        target: T,
135        peers: I,
136        info: QueryInfo,
137    ) where
138        T: Into<KeyBytes> + Clone,
139        I: IntoIterator<Item = Key<PeerId>>,
140    {
141        let cfg = ClosestPeersIterConfig {
142            num_results: self.config.replication_factor,
143            parallelism: self.config.parallelism,
144            ..ClosestPeersIterConfig::default()
145        };
146
147        let peer_iter = if self.config.disjoint_query_paths {
148            QueryPeerIter::ClosestDisjoint(ClosestDisjointPeersIter::with_config(
149                cfg, target, peers,
150            ))
151        } else {
152            QueryPeerIter::Closest(ClosestPeersIter::with_config(cfg, target, peers))
153        };
154
155        let query = Query::new(id, peer_iter, info);
156        self.queries.insert(id, query);
157    }
158
159    fn next_query_id(&mut self) -> QueryId {
160        let id = QueryId(self.next_id);
161        self.next_id = self.next_id.wrapping_add(1);
162        id
163    }
164
165    /// Returns a reference to a query with the given ID, if it is in the pool.
166    pub(crate) fn get(&self, id: &QueryId) -> Option<&Query> {
167        self.queries.get(id)
168    }
169
170    /// Returns a mutablereference to a query with the given ID, if it is in the pool.
171    pub(crate) fn get_mut(&mut self, id: &QueryId) -> Option<&mut Query> {
172        self.queries.get_mut(id)
173    }
174
175    /// Polls the pool to advance the queries.
176    pub(crate) fn poll(&mut self, now: Instant) -> QueryPoolState<'_> {
177        let mut finished = None;
178        let mut timeout = None;
179        let mut waiting = None;
180
181        for (&query_id, query) in self.queries.iter_mut() {
182            query.stats.start = query.stats.start.or(Some(now));
183            match query.next(now) {
184                PeersIterState::Finished => {
185                    finished = Some(query_id);
186                    break;
187                }
188                PeersIterState::Waiting(Some(peer_id)) => {
189                    let peer = peer_id.into_owned();
190                    waiting = Some((query_id, peer));
191                    break;
192                }
193                PeersIterState::Waiting(None) | PeersIterState::WaitingAtCapacity => {
194                    let elapsed = now - query.stats.start.unwrap_or(now);
195                    if elapsed >= self.config.timeout {
196                        timeout = Some(query_id);
197                        break;
198                    }
199                }
200            }
201        }
202
203        if let Some((query_id, peer_id)) = waiting {
204            let query = self.queries.get_mut(&query_id).expect("s.a.");
205            return QueryPoolState::Waiting(Some((query, peer_id)));
206        }
207
208        if let Some(query_id) = finished {
209            let mut query = self.queries.remove(&query_id).expect("s.a.");
210            query.stats.end = Some(now);
211            return QueryPoolState::Finished(query);
212        }
213
214        if let Some(query_id) = timeout {
215            let mut query = self.queries.remove(&query_id).expect("s.a.");
216            query.stats.end = Some(now);
217            return QueryPoolState::Timeout(query);
218        }
219
220        if self.queries.is_empty() {
221            QueryPoolState::Idle
222        } else {
223            QueryPoolState::Waiting(None)
224        }
225    }
226}
227
228/// Unique identifier for an active query.
229#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
230pub struct QueryId(usize);
231
232impl std::fmt::Display for QueryId {
233    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234        write!(f, "{}", self.0)
235    }
236}
237
238/// The configuration for queries in a `QueryPool`.
239#[derive(Debug, Clone)]
240pub(crate) struct QueryConfig {
241    /// Timeout of a single query.
242    ///
243    /// See [`crate::behaviour::Config::set_query_timeout`] for details.
244    pub(crate) timeout: Duration,
245    /// The replication factor to use.
246    ///
247    /// See [`crate::behaviour::Config::set_replication_factor`] for details.
248    pub(crate) replication_factor: NonZeroUsize,
249    /// Allowed level of parallelism for iterative queries.
250    ///
251    /// See [`crate::behaviour::Config::set_parallelism`] for details.
252    pub(crate) parallelism: NonZeroUsize,
253    /// Whether to use disjoint paths on iterative lookups.
254    ///
255    /// See [`crate::behaviour::Config::disjoint_query_paths`] for details.
256    pub(crate) disjoint_query_paths: bool,
257}
258
259impl Default for QueryConfig {
260    fn default() -> Self {
261        QueryConfig {
262            timeout: Duration::from_secs(60),
263            replication_factor: NonZeroUsize::new(K_VALUE.get()).expect("K_VALUE > 0"),
264            parallelism: ALPHA_VALUE,
265            disjoint_query_paths: false,
266        }
267    }
268}
269
270/// A query in a `QueryPool`.
271pub(crate) struct Query {
272    /// The unique ID of the query.
273    id: QueryId,
274    /// The peer iterator that drives the query state.
275    pub(crate) peers: QueryPeers,
276    /// Execution statistics of the query.
277    pub(crate) stats: QueryStats,
278    /// The query-specific state.
279    pub(crate) info: QueryInfo,
280    /// A map of pending requests to peers.
281    ///
282    /// A request is pending if the targeted peer is not currently connected
283    /// and these requests are sent as soon as a connection to the peer is established.
284    pub(crate) pending_rpcs: SmallVec<[(PeerId, HandlerIn); K_VALUE.get()]>,
285}
286
287/// The peer iterator that drives the query state,
288pub(crate) struct QueryPeers {
289    /// Addresses of peers discovered during a query.
290    pub(crate) addresses: FnvHashMap<PeerId, SmallVec<[Multiaddr; 8]>>,
291    /// The peer iterator that drives the query state.
292    peer_iter: QueryPeerIter,
293}
294
295impl QueryPeers {
296    /// Consumes the peers iterator, producing a final `Iterator` over the discovered `PeerId`s.
297    pub(crate) fn into_peerids_iter(self) -> impl Iterator<Item = PeerId> {
298        match self.peer_iter {
299            QueryPeerIter::Closest(iter) => Either::Left(Either::Left(iter.into_result())),
300            QueryPeerIter::ClosestDisjoint(iter) => Either::Left(Either::Right(iter.into_result())),
301            QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result()),
302        }
303    }
304
305    /// Consumes the peers iterator, producing a final `Iterator` over the discovered `PeerId`s
306    /// with their matching `Multiaddr`s.
307    pub(crate) fn into_peerinfos_iter(mut self) -> impl Iterator<Item = PeerInfo> {
308        match self.peer_iter {
309            QueryPeerIter::Closest(iter) => Either::Left(Either::Left(iter.into_result())),
310            QueryPeerIter::ClosestDisjoint(iter) => Either::Left(Either::Right(iter.into_result())),
311            QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result()),
312        }
313        .map(move |peer_id| {
314            let addrs = self.addresses.remove(&peer_id).unwrap_or_default().to_vec();
315            PeerInfo { peer_id, addrs }
316        })
317    }
318}
319
320/// The peer selection strategies that can be used by queries.
321enum QueryPeerIter {
322    Closest(ClosestPeersIter),
323    ClosestDisjoint(ClosestDisjointPeersIter),
324    Fixed(FixedPeersIter),
325}
326
327impl Query {
328    /// Creates a new query without starting it.
329    fn new(id: QueryId, peer_iter: QueryPeerIter, info: QueryInfo) -> Self {
330        Query {
331            id,
332            info,
333            peers: QueryPeers {
334                addresses: Default::default(),
335                peer_iter,
336            },
337            pending_rpcs: SmallVec::default(),
338            stats: QueryStats::empty(),
339        }
340    }
341
342    /// Gets the unique ID of the query.
343    pub(crate) fn id(&self) -> QueryId {
344        self.id
345    }
346
347    /// Gets the current execution statistics of the query.
348    pub(crate) fn stats(&self) -> &QueryStats {
349        &self.stats
350    }
351
352    /// Informs the query that the attempt to contact `peer` failed.
353    pub(crate) fn on_failure(&mut self, peer: &PeerId) {
354        let updated = match &mut self.peers.peer_iter {
355            QueryPeerIter::Closest(iter) => iter.on_failure(peer),
356            QueryPeerIter::ClosestDisjoint(iter) => iter.on_failure(peer),
357            QueryPeerIter::Fixed(iter) => iter.on_failure(peer),
358        };
359        if updated {
360            self.stats.failure += 1;
361        }
362    }
363
364    /// Informs the query that the attempt to contact `peer` succeeded,
365    /// possibly resulting in new peers that should be incorporated into
366    /// the query, if applicable.
367    pub(crate) fn on_success<I>(&mut self, peer: &PeerId, new_peers: I)
368    where
369        I: IntoIterator<Item = PeerId>,
370    {
371        let updated = match &mut self.peers.peer_iter {
372            QueryPeerIter::Closest(iter) => iter.on_success(peer, new_peers),
373            QueryPeerIter::ClosestDisjoint(iter) => iter.on_success(peer, new_peers),
374            QueryPeerIter::Fixed(iter) => iter.on_success(peer),
375        };
376        if updated {
377            self.stats.success += 1;
378        }
379    }
380
381    /// Advances the state of the underlying peer iterator.
382    fn next(&mut self, now: Instant) -> PeersIterState<'_> {
383        let state = match &mut self.peers.peer_iter {
384            QueryPeerIter::Closest(iter) => iter.next(now),
385            QueryPeerIter::ClosestDisjoint(iter) => iter.next(now),
386            QueryPeerIter::Fixed(iter) => iter.next(),
387        };
388
389        if let PeersIterState::Waiting(Some(_)) = state {
390            self.stats.requests += 1;
391        }
392
393        state
394    }
395
396    /// Tries to (gracefully) finish the query prematurely, providing the peers
397    /// that are no longer of interest for further progress of the query.
398    ///
399    /// A query may require that in order to finish gracefully a certain subset
400    /// of peers must be contacted. E.g. in the case of disjoint query paths a
401    /// query may only finish gracefully if every path contacted a peer whose
402    /// response permits termination of the query. The given peers are those for
403    /// which this is considered to be the case, i.e. for which a termination
404    /// condition is satisfied.
405    ///
406    /// Returns `true` if the query did indeed finish, `false` otherwise. In the
407    /// latter case, a new attempt at finishing the query may be made with new
408    /// `peers`.
409    ///
410    /// A finished query immediately stops yielding new peers to contact and
411    /// will be reported by [`QueryPool::poll`] via
412    /// [`QueryPoolState::Finished`].
413    pub(crate) fn try_finish<'a, I>(&mut self, peers: I) -> bool
414    where
415        I: IntoIterator<Item = &'a PeerId>,
416    {
417        match &mut self.peers.peer_iter {
418            QueryPeerIter::Closest(iter) => {
419                iter.finish();
420                true
421            }
422            QueryPeerIter::ClosestDisjoint(iter) => iter.finish_paths(peers),
423            QueryPeerIter::Fixed(iter) => {
424                iter.finish();
425                true
426            }
427        }
428    }
429
430    /// Finishes the query prematurely.
431    ///
432    /// A finished query immediately stops yielding new peers to contact and will be
433    /// reported by [`QueryPool::poll`] via [`QueryPoolState::Finished`].
434    pub(crate) fn finish(&mut self) {
435        match &mut self.peers.peer_iter {
436            QueryPeerIter::Closest(iter) => iter.finish(),
437            QueryPeerIter::ClosestDisjoint(iter) => iter.finish(),
438            QueryPeerIter::Fixed(iter) => iter.finish(),
439        }
440    }
441
442    /// Checks whether the query has finished.
443    ///
444    /// A finished query is eventually reported by `QueryPool::next()` and
445    /// removed from the pool.
446    pub(crate) fn is_finished(&self) -> bool {
447        match &self.peers.peer_iter {
448            QueryPeerIter::Closest(iter) => iter.is_finished(),
449            QueryPeerIter::ClosestDisjoint(iter) => iter.is_finished(),
450            QueryPeerIter::Fixed(iter) => iter.is_finished(),
451        }
452    }
453}
454
455/// Execution statistics of a query.
456#[derive(Clone, Debug, PartialEq, Eq)]
457pub struct QueryStats {
458    requests: u32,
459    success: u32,
460    failure: u32,
461    start: Option<Instant>,
462    end: Option<Instant>,
463}
464
465impl QueryStats {
466    pub fn empty() -> Self {
467        QueryStats {
468            requests: 0,
469            success: 0,
470            failure: 0,
471            start: None,
472            end: None,
473        }
474    }
475
476    /// Gets the total number of requests initiated by the query.
477    pub fn num_requests(&self) -> u32 {
478        self.requests
479    }
480
481    /// Gets the number of successful requests.
482    pub fn num_successes(&self) -> u32 {
483        self.success
484    }
485
486    /// Gets the number of failed requests.
487    pub fn num_failures(&self) -> u32 {
488        self.failure
489    }
490
491    /// Gets the number of pending requests.
492    ///
493    /// > **Note**: A query can finish while still having pending
494    /// > requests, if the termination conditions are already met.
495    pub fn num_pending(&self) -> u32 {
496        self.requests - (self.success + self.failure)
497    }
498
499    /// Gets the duration of the query.
500    ///
501    /// If the query has not yet finished, the duration is measured from the
502    /// start of the query to the current instant.
503    ///
504    /// If the query did not yet start (i.e. yield the first peer to contact),
505    /// `None` is returned.
506    pub fn duration(&self) -> Option<Duration> {
507        if let Some(s) = self.start {
508            if let Some(e) = self.end {
509                Some(e - s)
510            } else {
511                Some(Instant::now() - s)
512            }
513        } else {
514            None
515        }
516    }
517
518    /// Merges these stats with the given stats of another query,
519    /// e.g. to accumulate statistics from a multi-phase query.
520    ///
521    /// Counters are merged cumulatively while the instants for
522    /// start and end of the queries are taken as the minimum and
523    /// maximum, respectively.
524    pub fn merge(self, other: QueryStats) -> Self {
525        QueryStats {
526            requests: self.requests + other.requests,
527            success: self.success + other.success,
528            failure: self.failure + other.failure,
529            start: match (self.start, other.start) {
530                (Some(a), Some(b)) => Some(std::cmp::min(a, b)),
531                (a, b) => a.or(b),
532            },
533            end: std::cmp::max(self.end, other.end),
534        }
535    }
536}