1mod peers;
22
23use peers::closest::{
24 disjoint::ClosestDisjointPeersIter, ClosestPeersIter, ClosestPeersIterConfig,
25};
26use peers::fixed::FixedPeersIter;
27use peers::PeersIterState;
28
29use crate::kbucket::{Key, KeyBytes};
30use crate::{ALPHA_VALUE, K_VALUE};
31use either::Either;
32use fnv::FnvHashMap;
33use instant::Instant;
34use libp2p_identity::PeerId;
35use std::{num::NonZeroUsize, time::Duration};
36
37pub(crate) struct QueryPool<TInner> {
43 next_id: usize,
44 config: QueryConfig,
45 queries: FnvHashMap<QueryId, Query<TInner>>,
46}
47
48pub(crate) enum QueryPoolState<'a, TInner> {
50 Idle,
52 Waiting(Option<(&'a mut Query<TInner>, PeerId)>),
55 Finished(Query<TInner>),
57 Timeout(Query<TInner>),
59}
60
61impl<TInner> QueryPool<TInner> {
62 pub(crate) fn new(config: QueryConfig) -> Self {
64 QueryPool {
65 next_id: 0,
66 config,
67 queries: Default::default(),
68 }
69 }
70
71 pub(crate) fn config(&self) -> &QueryConfig {
73 &self.config
74 }
75
76 pub(crate) fn iter(&self) -> impl Iterator<Item = &Query<TInner>> {
78 self.queries.values()
79 }
80
81 pub(crate) fn size(&self) -> usize {
83 self.queries.len()
84 }
85
86 pub(crate) fn iter_mut(&mut self) -> impl Iterator<Item = &mut Query<TInner>> {
88 self.queries.values_mut()
89 }
90
91 pub(crate) fn add_fixed<I>(&mut self, peers: I, inner: TInner) -> QueryId
93 where
94 I: IntoIterator<Item = PeerId>,
95 {
96 let id = self.next_query_id();
97 self.continue_fixed(id, peers, inner);
98 id
99 }
100
101 pub(crate) fn continue_fixed<I>(&mut self, id: QueryId, peers: I, inner: TInner)
105 where
106 I: IntoIterator<Item = PeerId>,
107 {
108 assert!(!self.queries.contains_key(&id));
109 let parallelism = self.config.replication_factor;
110 let peer_iter = QueryPeerIter::Fixed(FixedPeersIter::new(peers, parallelism));
111 let query = Query::new(id, peer_iter, inner);
112 self.queries.insert(id, query);
113 }
114
115 pub(crate) fn add_iter_closest<T, I>(&mut self, target: T, peers: I, inner: TInner) -> QueryId
117 where
118 T: Into<KeyBytes> + Clone,
119 I: IntoIterator<Item = Key<PeerId>>,
120 {
121 let id = self.next_query_id();
122 self.continue_iter_closest(id, target, peers, inner);
123 id
124 }
125
126 pub(crate) fn continue_iter_closest<T, I>(
128 &mut self,
129 id: QueryId,
130 target: T,
131 peers: I,
132 inner: TInner,
133 ) where
134 T: Into<KeyBytes> + Clone,
135 I: IntoIterator<Item = Key<PeerId>>,
136 {
137 let cfg = ClosestPeersIterConfig {
138 num_results: self.config.replication_factor,
139 parallelism: self.config.parallelism,
140 ..ClosestPeersIterConfig::default()
141 };
142
143 let peer_iter = if self.config.disjoint_query_paths {
144 QueryPeerIter::ClosestDisjoint(ClosestDisjointPeersIter::with_config(
145 cfg, target, peers,
146 ))
147 } else {
148 QueryPeerIter::Closest(ClosestPeersIter::with_config(cfg, target, peers))
149 };
150
151 let query = Query::new(id, peer_iter, inner);
152 self.queries.insert(id, query);
153 }
154
155 fn next_query_id(&mut self) -> QueryId {
156 let id = QueryId(self.next_id);
157 self.next_id = self.next_id.wrapping_add(1);
158 id
159 }
160
161 pub(crate) fn get(&self, id: &QueryId) -> Option<&Query<TInner>> {
163 self.queries.get(id)
164 }
165
166 pub(crate) fn get_mut(&mut self, id: &QueryId) -> Option<&mut Query<TInner>> {
168 self.queries.get_mut(id)
169 }
170
171 pub(crate) fn poll(&mut self, now: Instant) -> QueryPoolState<'_, TInner> {
173 let mut finished = None;
174 let mut timeout = None;
175 let mut waiting = None;
176
177 for (&query_id, query) in self.queries.iter_mut() {
178 query.stats.start = query.stats.start.or(Some(now));
179 match query.next(now) {
180 PeersIterState::Finished => {
181 finished = Some(query_id);
182 break;
183 }
184 PeersIterState::Waiting(Some(peer_id)) => {
185 let peer = peer_id.into_owned();
186 waiting = Some((query_id, peer));
187 break;
188 }
189 PeersIterState::Waiting(None) | PeersIterState::WaitingAtCapacity => {
190 let elapsed = now - query.stats.start.unwrap_or(now);
191 if elapsed >= self.config.timeout {
192 timeout = Some(query_id);
193 break;
194 }
195 }
196 }
197 }
198
199 if let Some((query_id, peer_id)) = waiting {
200 let query = self.queries.get_mut(&query_id).expect("s.a.");
201 return QueryPoolState::Waiting(Some((query, peer_id)));
202 }
203
204 if let Some(query_id) = finished {
205 let mut query = self.queries.remove(&query_id).expect("s.a.");
206 query.stats.end = Some(now);
207 return QueryPoolState::Finished(query);
208 }
209
210 if let Some(query_id) = timeout {
211 let mut query = self.queries.remove(&query_id).expect("s.a.");
212 query.stats.end = Some(now);
213 return QueryPoolState::Timeout(query);
214 }
215
216 if self.queries.is_empty() {
217 QueryPoolState::Idle
218 } else {
219 QueryPoolState::Waiting(None)
220 }
221 }
222}
223
224#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
226pub struct QueryId(usize);
227
228#[derive(Debug, Clone)]
230pub(crate) struct QueryConfig {
231 pub(crate) timeout: Duration,
235 pub(crate) replication_factor: NonZeroUsize,
239 pub(crate) parallelism: NonZeroUsize,
243 pub(crate) disjoint_query_paths: bool,
247}
248
249impl Default for QueryConfig {
250 fn default() -> Self {
251 QueryConfig {
252 timeout: Duration::from_secs(60),
253 replication_factor: NonZeroUsize::new(K_VALUE.get()).expect("K_VALUE > 0"),
254 parallelism: ALPHA_VALUE,
255 disjoint_query_paths: false,
256 }
257 }
258}
259
260pub(crate) struct Query<TInner> {
262 id: QueryId,
264 peer_iter: QueryPeerIter,
266 stats: QueryStats,
268 pub(crate) inner: TInner,
270}
271
272enum QueryPeerIter {
274 Closest(ClosestPeersIter),
275 ClosestDisjoint(ClosestDisjointPeersIter),
276 Fixed(FixedPeersIter),
277}
278
279impl<TInner> Query<TInner> {
280 fn new(id: QueryId, peer_iter: QueryPeerIter, inner: TInner) -> Self {
282 Query {
283 id,
284 inner,
285 peer_iter,
286 stats: QueryStats::empty(),
287 }
288 }
289
290 pub(crate) fn id(&self) -> QueryId {
292 self.id
293 }
294
295 pub(crate) fn stats(&self) -> &QueryStats {
297 &self.stats
298 }
299
300 pub(crate) fn on_failure(&mut self, peer: &PeerId) {
302 let updated = match &mut self.peer_iter {
303 QueryPeerIter::Closest(iter) => iter.on_failure(peer),
304 QueryPeerIter::ClosestDisjoint(iter) => iter.on_failure(peer),
305 QueryPeerIter::Fixed(iter) => iter.on_failure(peer),
306 };
307 if updated {
308 self.stats.failure += 1;
309 }
310 }
311
312 pub(crate) fn on_success<I>(&mut self, peer: &PeerId, new_peers: I)
316 where
317 I: IntoIterator<Item = PeerId>,
318 {
319 let updated = match &mut self.peer_iter {
320 QueryPeerIter::Closest(iter) => iter.on_success(peer, new_peers),
321 QueryPeerIter::ClosestDisjoint(iter) => iter.on_success(peer, new_peers),
322 QueryPeerIter::Fixed(iter) => iter.on_success(peer),
323 };
324 if updated {
325 self.stats.success += 1;
326 }
327 }
328
329 fn next(&mut self, now: Instant) -> PeersIterState<'_> {
331 let state = match &mut self.peer_iter {
332 QueryPeerIter::Closest(iter) => iter.next(now),
333 QueryPeerIter::ClosestDisjoint(iter) => iter.next(now),
334 QueryPeerIter::Fixed(iter) => iter.next(),
335 };
336
337 if let PeersIterState::Waiting(Some(_)) = state {
338 self.stats.requests += 1;
339 }
340
341 state
342 }
343
344 pub(crate) fn try_finish<'a, I>(&mut self, peers: I) -> bool
362 where
363 I: IntoIterator<Item = &'a PeerId>,
364 {
365 match &mut self.peer_iter {
366 QueryPeerIter::Closest(iter) => {
367 iter.finish();
368 true
369 }
370 QueryPeerIter::ClosestDisjoint(iter) => iter.finish_paths(peers),
371 QueryPeerIter::Fixed(iter) => {
372 iter.finish();
373 true
374 }
375 }
376 }
377
378 pub(crate) fn finish(&mut self) {
383 match &mut self.peer_iter {
384 QueryPeerIter::Closest(iter) => iter.finish(),
385 QueryPeerIter::ClosestDisjoint(iter) => iter.finish(),
386 QueryPeerIter::Fixed(iter) => iter.finish(),
387 }
388 }
389
390 pub(crate) fn is_finished(&self) -> bool {
395 match &self.peer_iter {
396 QueryPeerIter::Closest(iter) => iter.is_finished(),
397 QueryPeerIter::ClosestDisjoint(iter) => iter.is_finished(),
398 QueryPeerIter::Fixed(iter) => iter.is_finished(),
399 }
400 }
401
402 pub(crate) fn into_result(self) -> QueryResult<TInner, impl Iterator<Item = PeerId>> {
404 let peers = match self.peer_iter {
405 QueryPeerIter::Closest(iter) => Either::Left(Either::Left(iter.into_result())),
406 QueryPeerIter::ClosestDisjoint(iter) => Either::Left(Either::Right(iter.into_result())),
407 QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result()),
408 };
409 QueryResult {
410 peers,
411 inner: self.inner,
412 stats: self.stats,
413 }
414 }
415}
416
417pub(crate) struct QueryResult<TInner, TPeers> {
419 pub(crate) inner: TInner,
421 pub(crate) peers: TPeers,
423 pub(crate) stats: QueryStats,
425}
426
427#[derive(Clone, Debug, PartialEq, Eq)]
429pub struct QueryStats {
430 requests: u32,
431 success: u32,
432 failure: u32,
433 start: Option<Instant>,
434 end: Option<Instant>,
435}
436
437impl QueryStats {
438 pub fn empty() -> Self {
439 QueryStats {
440 requests: 0,
441 success: 0,
442 failure: 0,
443 start: None,
444 end: None,
445 }
446 }
447
448 pub fn num_requests(&self) -> u32 {
450 self.requests
451 }
452
453 pub fn num_successes(&self) -> u32 {
455 self.success
456 }
457
458 pub fn num_failures(&self) -> u32 {
460 self.failure
461 }
462
463 pub fn num_pending(&self) -> u32 {
468 self.requests - (self.success + self.failure)
469 }
470
471 pub fn duration(&self) -> Option<Duration> {
479 if let Some(s) = self.start {
480 if let Some(e) = self.end {
481 Some(e - s)
482 } else {
483 Some(Instant::now() - s)
484 }
485 } else {
486 None
487 }
488 }
489
490 pub fn merge(self, other: QueryStats) -> Self {
497 QueryStats {
498 requests: self.requests + other.requests,
499 success: self.success + other.success,
500 failure: self.failure + other.failure,
501 start: match (self.start, other.start) {
502 (Some(a), Some(b)) => Some(std::cmp::min(a, b)),
503 (a, b) => a.or(b),
504 },
505 end: std::cmp::max(self.end, other.end),
506 }
507 }
508}