1mod peers;
22
23use libp2p_core::Multiaddr;
24use peers::closest::{
25 disjoint::ClosestDisjointPeersIter, ClosestPeersIter, ClosestPeersIterConfig,
26};
27use peers::fixed::FixedPeersIter;
28use peers::PeersIterState;
29use smallvec::SmallVec;
30
31use crate::behaviour::PeerInfo;
32use crate::handler::HandlerIn;
33use crate::kbucket::{Key, KeyBytes};
34use crate::{QueryInfo, ALPHA_VALUE, K_VALUE};
35use either::Either;
36use fnv::FnvHashMap;
37use libp2p_identity::PeerId;
38use std::{num::NonZeroUsize, time::Duration};
39use web_time::Instant;
40
41pub(crate) struct QueryPool {
47 next_id: usize,
48 config: QueryConfig,
49 queries: FnvHashMap<QueryId, Query>,
50}
51
52pub(crate) enum QueryPoolState<'a> {
54 Idle,
56 Waiting(Option<(&'a mut Query, PeerId)>),
59 Finished(Query),
61 Timeout(Query),
63}
64
65impl QueryPool {
66 pub(crate) fn new(config: QueryConfig) -> Self {
68 QueryPool {
69 next_id: 0,
70 config,
71 queries: Default::default(),
72 }
73 }
74
75 pub(crate) fn config(&self) -> &QueryConfig {
77 &self.config
78 }
79
80 pub(crate) fn iter(&self) -> impl Iterator<Item = &Query> {
82 self.queries.values()
83 }
84
85 pub(crate) fn size(&self) -> usize {
87 self.queries.len()
88 }
89
90 pub(crate) fn iter_mut(&mut self) -> impl Iterator<Item = &mut Query> {
92 self.queries.values_mut()
93 }
94
95 pub(crate) fn add_fixed<I>(&mut self, peers: I, info: QueryInfo) -> QueryId
97 where
98 I: IntoIterator<Item = PeerId>,
99 {
100 let id = self.next_query_id();
101 self.continue_fixed(id, peers, info);
102 id
103 }
104
105 pub(crate) fn continue_fixed<I>(&mut self, id: QueryId, peers: I, info: QueryInfo)
109 where
110 I: IntoIterator<Item = PeerId>,
111 {
112 assert!(!self.queries.contains_key(&id));
113 let parallelism = self.config.replication_factor;
114 let peer_iter = QueryPeerIter::Fixed(FixedPeersIter::new(peers, parallelism));
115 let query = Query::new(id, peer_iter, info);
116 self.queries.insert(id, query);
117 }
118
119 pub(crate) fn add_iter_closest<T, I>(&mut self, target: T, peers: I, info: QueryInfo) -> QueryId
121 where
122 T: Into<KeyBytes> + Clone,
123 I: IntoIterator<Item = Key<PeerId>>,
124 {
125 let id = self.next_query_id();
126 self.continue_iter_closest(id, target, peers, info);
127 id
128 }
129
130 pub(crate) fn continue_iter_closest<T, I>(
132 &mut self,
133 id: QueryId,
134 target: T,
135 peers: I,
136 info: QueryInfo,
137 ) where
138 T: Into<KeyBytes> + Clone,
139 I: IntoIterator<Item = Key<PeerId>>,
140 {
141 let cfg = ClosestPeersIterConfig {
142 num_results: self.config.replication_factor,
143 parallelism: self.config.parallelism,
144 ..ClosestPeersIterConfig::default()
145 };
146
147 let peer_iter = if self.config.disjoint_query_paths {
148 QueryPeerIter::ClosestDisjoint(ClosestDisjointPeersIter::with_config(
149 cfg, target, peers,
150 ))
151 } else {
152 QueryPeerIter::Closest(ClosestPeersIter::with_config(cfg, target, peers))
153 };
154
155 let query = Query::new(id, peer_iter, info);
156 self.queries.insert(id, query);
157 }
158
159 fn next_query_id(&mut self) -> QueryId {
160 let id = QueryId(self.next_id);
161 self.next_id = self.next_id.wrapping_add(1);
162 id
163 }
164
165 pub(crate) fn get(&self, id: &QueryId) -> Option<&Query> {
167 self.queries.get(id)
168 }
169
170 pub(crate) fn get_mut(&mut self, id: &QueryId) -> Option<&mut Query> {
172 self.queries.get_mut(id)
173 }
174
175 pub(crate) fn poll(&mut self, now: Instant) -> QueryPoolState<'_> {
177 let mut finished = None;
178 let mut timeout = None;
179 let mut waiting = None;
180
181 for (&query_id, query) in self.queries.iter_mut() {
182 query.stats.start = query.stats.start.or(Some(now));
183 match query.next(now) {
184 PeersIterState::Finished => {
185 finished = Some(query_id);
186 break;
187 }
188 PeersIterState::Waiting(Some(peer_id)) => {
189 let peer = peer_id.into_owned();
190 waiting = Some((query_id, peer));
191 break;
192 }
193 PeersIterState::Waiting(None) | PeersIterState::WaitingAtCapacity => {
194 let elapsed = now - query.stats.start.unwrap_or(now);
195 if elapsed >= self.config.timeout {
196 timeout = Some(query_id);
197 break;
198 }
199 }
200 }
201 }
202
203 if let Some((query_id, peer_id)) = waiting {
204 let query = self.queries.get_mut(&query_id).expect("s.a.");
205 return QueryPoolState::Waiting(Some((query, peer_id)));
206 }
207
208 if let Some(query_id) = finished {
209 let mut query = self.queries.remove(&query_id).expect("s.a.");
210 query.stats.end = Some(now);
211 return QueryPoolState::Finished(query);
212 }
213
214 if let Some(query_id) = timeout {
215 let mut query = self.queries.remove(&query_id).expect("s.a.");
216 query.stats.end = Some(now);
217 return QueryPoolState::Timeout(query);
218 }
219
220 if self.queries.is_empty() {
221 QueryPoolState::Idle
222 } else {
223 QueryPoolState::Waiting(None)
224 }
225 }
226}
227
228#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
230pub struct QueryId(usize);
231
232impl std::fmt::Display for QueryId {
233 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234 write!(f, "{}", self.0)
235 }
236}
237
238#[derive(Debug, Clone)]
240pub(crate) struct QueryConfig {
241 pub(crate) timeout: Duration,
245 pub(crate) replication_factor: NonZeroUsize,
249 pub(crate) parallelism: NonZeroUsize,
253 pub(crate) disjoint_query_paths: bool,
257}
258
259impl Default for QueryConfig {
260 fn default() -> Self {
261 QueryConfig {
262 timeout: Duration::from_secs(60),
263 replication_factor: NonZeroUsize::new(K_VALUE.get()).expect("K_VALUE > 0"),
264 parallelism: ALPHA_VALUE,
265 disjoint_query_paths: false,
266 }
267 }
268}
269
270pub(crate) struct Query {
272 id: QueryId,
274 pub(crate) peers: QueryPeers,
276 pub(crate) stats: QueryStats,
278 pub(crate) info: QueryInfo,
280 pub(crate) pending_rpcs: SmallVec<[(PeerId, HandlerIn); K_VALUE.get()]>,
285}
286
287pub(crate) struct QueryPeers {
289 pub(crate) addresses: FnvHashMap<PeerId, SmallVec<[Multiaddr; 8]>>,
291 peer_iter: QueryPeerIter,
293}
294
295impl QueryPeers {
296 pub(crate) fn into_peerids_iter(self) -> impl Iterator<Item = PeerId> {
298 match self.peer_iter {
299 QueryPeerIter::Closest(iter) => Either::Left(Either::Left(iter.into_result())),
300 QueryPeerIter::ClosestDisjoint(iter) => Either::Left(Either::Right(iter.into_result())),
301 QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result()),
302 }
303 }
304
305 pub(crate) fn into_peerinfos_iter(mut self) -> impl Iterator<Item = PeerInfo> {
308 match self.peer_iter {
309 QueryPeerIter::Closest(iter) => Either::Left(Either::Left(iter.into_result())),
310 QueryPeerIter::ClosestDisjoint(iter) => Either::Left(Either::Right(iter.into_result())),
311 QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result()),
312 }
313 .map(move |peer_id| {
314 let addrs = self.addresses.remove(&peer_id).unwrap_or_default().to_vec();
315 PeerInfo { peer_id, addrs }
316 })
317 }
318}
319
320enum QueryPeerIter {
322 Closest(ClosestPeersIter),
323 ClosestDisjoint(ClosestDisjointPeersIter),
324 Fixed(FixedPeersIter),
325}
326
327impl Query {
328 fn new(id: QueryId, peer_iter: QueryPeerIter, info: QueryInfo) -> Self {
330 Query {
331 id,
332 info,
333 peers: QueryPeers {
334 addresses: Default::default(),
335 peer_iter,
336 },
337 pending_rpcs: SmallVec::default(),
338 stats: QueryStats::empty(),
339 }
340 }
341
342 pub(crate) fn id(&self) -> QueryId {
344 self.id
345 }
346
347 pub(crate) fn stats(&self) -> &QueryStats {
349 &self.stats
350 }
351
352 pub(crate) fn on_failure(&mut self, peer: &PeerId) {
354 let updated = match &mut self.peers.peer_iter {
355 QueryPeerIter::Closest(iter) => iter.on_failure(peer),
356 QueryPeerIter::ClosestDisjoint(iter) => iter.on_failure(peer),
357 QueryPeerIter::Fixed(iter) => iter.on_failure(peer),
358 };
359 if updated {
360 self.stats.failure += 1;
361 }
362 }
363
364 pub(crate) fn on_success<I>(&mut self, peer: &PeerId, new_peers: I)
368 where
369 I: IntoIterator<Item = PeerId>,
370 {
371 let updated = match &mut self.peers.peer_iter {
372 QueryPeerIter::Closest(iter) => iter.on_success(peer, new_peers),
373 QueryPeerIter::ClosestDisjoint(iter) => iter.on_success(peer, new_peers),
374 QueryPeerIter::Fixed(iter) => iter.on_success(peer),
375 };
376 if updated {
377 self.stats.success += 1;
378 }
379 }
380
381 fn next(&mut self, now: Instant) -> PeersIterState<'_> {
383 let state = match &mut self.peers.peer_iter {
384 QueryPeerIter::Closest(iter) => iter.next(now),
385 QueryPeerIter::ClosestDisjoint(iter) => iter.next(now),
386 QueryPeerIter::Fixed(iter) => iter.next(),
387 };
388
389 if let PeersIterState::Waiting(Some(_)) = state {
390 self.stats.requests += 1;
391 }
392
393 state
394 }
395
396 pub(crate) fn try_finish<'a, I>(&mut self, peers: I) -> bool
414 where
415 I: IntoIterator<Item = &'a PeerId>,
416 {
417 match &mut self.peers.peer_iter {
418 QueryPeerIter::Closest(iter) => {
419 iter.finish();
420 true
421 }
422 QueryPeerIter::ClosestDisjoint(iter) => iter.finish_paths(peers),
423 QueryPeerIter::Fixed(iter) => {
424 iter.finish();
425 true
426 }
427 }
428 }
429
430 pub(crate) fn finish(&mut self) {
435 match &mut self.peers.peer_iter {
436 QueryPeerIter::Closest(iter) => iter.finish(),
437 QueryPeerIter::ClosestDisjoint(iter) => iter.finish(),
438 QueryPeerIter::Fixed(iter) => iter.finish(),
439 }
440 }
441
442 pub(crate) fn is_finished(&self) -> bool {
447 match &self.peers.peer_iter {
448 QueryPeerIter::Closest(iter) => iter.is_finished(),
449 QueryPeerIter::ClosestDisjoint(iter) => iter.is_finished(),
450 QueryPeerIter::Fixed(iter) => iter.is_finished(),
451 }
452 }
453}
454
455#[derive(Clone, Debug, PartialEq, Eq)]
457pub struct QueryStats {
458 requests: u32,
459 success: u32,
460 failure: u32,
461 start: Option<Instant>,
462 end: Option<Instant>,
463}
464
465impl QueryStats {
466 pub fn empty() -> Self {
467 QueryStats {
468 requests: 0,
469 success: 0,
470 failure: 0,
471 start: None,
472 end: None,
473 }
474 }
475
476 pub fn num_requests(&self) -> u32 {
478 self.requests
479 }
480
481 pub fn num_successes(&self) -> u32 {
483 self.success
484 }
485
486 pub fn num_failures(&self) -> u32 {
488 self.failure
489 }
490
491 pub fn num_pending(&self) -> u32 {
496 self.requests - (self.success + self.failure)
497 }
498
499 pub fn duration(&self) -> Option<Duration> {
507 if let Some(s) = self.start {
508 if let Some(e) = self.end {
509 Some(e - s)
510 } else {
511 Some(Instant::now() - s)
512 }
513 } else {
514 None
515 }
516 }
517
518 pub fn merge(self, other: QueryStats) -> Self {
525 QueryStats {
526 requests: self.requests + other.requests,
527 success: self.success + other.success,
528 failure: self.failure + other.failure,
529 start: match (self.start, other.start) {
530 (Some(a), Some(b)) => Some(std::cmp::min(a, b)),
531 (a, b) => a.or(b),
532 },
533 end: std::cmp::max(self.end, other.end),
534 }
535 }
536}