1use bytes::Bytes;
22
23use crate::{
24 protocol::libp2p::kademlia::{
25 message::KademliaMessage,
26 query::{QueryAction, QueryId},
27 record::{Key as RecordKey, PeerRecord, Record},
28 types::{Distance, KademliaPeer, Key},
29 Quorum,
30 },
31 PeerId,
32};
33
34use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
35
36const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query::get_record";
38
39#[derive(Debug)]
41pub struct GetRecordConfig {
42 pub local_peer_id: PeerId,
44
45 pub known_records: usize,
49
50 pub quorum: Quorum,
52
53 pub replication_factor: usize,
55
56 pub parallelism_factor: usize,
58
59 pub query: QueryId,
61
62 pub target: Key<RecordKey>,
64}
65
66impl GetRecordConfig {
67 fn sufficient_records(&self, records: usize) -> bool {
71 let total_known = self.known_records + records;
74
75 match self.quorum {
76 Quorum::All => total_known >= self.replication_factor,
77 Quorum::One => total_known >= 1,
78 Quorum::N(needed_responses) => total_known >= needed_responses.get(),
79 }
80 }
81}
82
83#[derive(Debug)]
84pub struct GetRecordContext {
85 pub config: GetRecordConfig,
87
88 kad_message: Bytes,
90
91 pub pending: HashMap<PeerId, KademliaPeer>,
93
94 pub queried: HashSet<PeerId>,
99
100 pub candidates: BTreeMap<Distance, KademliaPeer>,
102
103 pub found_records: Vec<PeerRecord>,
105}
106
107impl GetRecordContext {
108 pub fn new(config: GetRecordConfig, in_peers: VecDeque<KademliaPeer>) -> Self {
110 let mut candidates = BTreeMap::new();
111
112 for candidate in &in_peers {
113 let distance = config.target.distance(&candidate.key);
114 candidates.insert(distance, candidate.clone());
115 }
116
117 let kad_message = KademliaMessage::get_record(config.target.clone().into_preimage());
118
119 Self {
120 config,
121 kad_message,
122
123 candidates,
124 pending: HashMap::new(),
125 queried: HashSet::new(),
126 found_records: Vec::new(),
127 }
128 }
129
130 pub fn found_records(self) -> Vec<PeerRecord> {
132 self.found_records
133 }
134
135 pub fn register_response_failure(&mut self, peer: PeerId) {
137 let Some(peer) = self.pending.remove(&peer) else {
138 tracing::debug!(target: LOG_TARGET, query = ?self.config.query, ?peer, "pending peer doesn't exist");
139 return;
140 };
141
142 self.queried.insert(peer.peer);
143 }
144
145 pub fn register_response(
147 &mut self,
148 peer: PeerId,
149 record: Option<Record>,
150 peers: Vec<KademliaPeer>,
151 ) {
152 tracing::trace!(target: LOG_TARGET, query = ?self.config.query, ?peer, "received response from peer");
153
154 let Some(peer) = self.pending.remove(&peer) else {
155 tracing::debug!(target: LOG_TARGET, query = ?self.config.query, ?peer, "received response from peer but didn't expect it");
156 return;
157 };
158
159 if let Some(record) = record {
160 if !record.is_expired(std::time::Instant::now()) {
161 self.found_records.push(PeerRecord {
162 peer: peer.peer,
163 record,
164 });
165 }
166 }
167
168 self.queried.insert(peer.peer);
171
172 let to_query_candidate = peers.into_iter().filter_map(|peer| {
173 if self.queried.contains(&peer.peer) {
175 return None;
176 }
177
178 if self.pending.contains_key(&peer.peer) {
180 return None;
181 }
182
183 if self.config.local_peer_id == peer.peer {
185 return None;
186 }
187
188 Some(peer)
189 });
190
191 for candidate in to_query_candidate {
192 let distance = self.config.target.distance(&candidate.key);
193 self.candidates.insert(distance, candidate);
194 }
195 }
196
197 pub fn next_peer_action(&mut self, peer: &PeerId) -> Option<QueryAction> {
200 self.pending.contains_key(peer).then_some(QueryAction::SendMessage {
201 query: self.config.query,
202 peer: *peer,
203 message: self.kad_message.clone(),
204 })
205 }
206
207 fn schedule_next_peer(&mut self) -> Option<QueryAction> {
209 tracing::trace!(target: LOG_TARGET, query = ?self.config.query, "get next peer");
210
211 let (_, candidate) = self.candidates.pop_first()?;
212 let peer = candidate.peer;
213
214 tracing::trace!(target: LOG_TARGET, query = ?self.config.query, ?peer, "current candidate");
215 self.pending.insert(candidate.peer, candidate);
216
217 Some(QueryAction::SendMessage {
218 query: self.config.query,
219 peer,
220 message: self.kad_message.clone(),
221 })
222 }
223
224 fn is_done(&self) -> bool {
228 self.pending.is_empty() && self.candidates.is_empty()
229 }
230
231 pub fn next_action(&mut self) -> Option<QueryAction> {
233 let known_records = self.config.known_records + self.found_records.len();
236
237 if self.is_done() {
240 return if known_records == 0 {
241 Some(QueryAction::QueryFailed {
242 query: self.config.query,
243 })
244 } else {
245 Some(QueryAction::QuerySucceeded {
246 query: self.config.query,
247 })
248 };
249 }
250
251 let sufficient_records = self.config.sufficient_records(self.found_records.len());
253 if sufficient_records {
254 return Some(QueryAction::QuerySucceeded {
255 query: self.config.query,
256 });
257 }
258
259 if self.pending.len() == self.config.parallelism_factor {
262 return None;
263 }
264
265 self.schedule_next_peer()
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use super::*;
272 use crate::protocol::libp2p::kademlia::types::ConnectionType;
273
274 fn default_config() -> GetRecordConfig {
275 GetRecordConfig {
276 local_peer_id: PeerId::random(),
277 quorum: Quorum::All,
278 known_records: 0,
279 replication_factor: 20,
280 parallelism_factor: 10,
281 query: QueryId(0),
282 target: Key::new(vec![1, 2, 3].into()),
283 }
284 }
285
286 fn peer_to_kad(peer: PeerId) -> KademliaPeer {
287 KademliaPeer {
288 peer,
289 key: Key::from(peer),
290 addresses: vec![],
291 connection: ConnectionType::Connected,
292 }
293 }
294
295 #[test]
296 fn config_check() {
297 let config = GetRecordConfig {
299 quorum: Quorum::All,
300 known_records: 0,
301 replication_factor: 20,
302 ..default_config()
303 };
304 assert!(config.sufficient_records(20));
305 assert!(!config.sufficient_records(19));
306
307 let config = GetRecordConfig {
309 quorum: Quorum::All,
310 known_records: 1,
311 replication_factor: 20,
312 ..default_config()
313 };
314 assert!(config.sufficient_records(19));
315 assert!(!config.sufficient_records(18));
316
317 let config = GetRecordConfig {
319 quorum: Quorum::One,
320 known_records: 0,
321 ..default_config()
322 };
323 assert!(config.sufficient_records(1));
324 assert!(!config.sufficient_records(0));
325
326 let config = GetRecordConfig {
328 quorum: Quorum::One,
329 known_records: 1,
330 ..default_config()
331 };
332 assert!(config.sufficient_records(1));
333 assert!(config.sufficient_records(0));
334
335 let config = GetRecordConfig {
337 quorum: Quorum::N(std::num::NonZeroUsize::new(10).expect("valid; qed")),
338 known_records: 0,
339 ..default_config()
340 };
341 assert!(config.sufficient_records(10));
342 assert!(!config.sufficient_records(9));
343
344 let config = GetRecordConfig {
346 quorum: Quorum::N(std::num::NonZeroUsize::new(10).expect("valid; qed")),
347 known_records: 1,
348 ..default_config()
349 };
350 assert!(config.sufficient_records(9));
351 assert!(!config.sufficient_records(8));
352 }
353
354 #[test]
355 fn completes_when_no_candidates() {
356 let config = default_config();
357 let mut context = GetRecordContext::new(config, VecDeque::new());
358 assert!(context.is_done());
359 let event = context.next_action().unwrap();
360 assert_eq!(event, QueryAction::QueryFailed { query: QueryId(0) });
361
362 let config = GetRecordConfig {
363 known_records: 1,
364 ..default_config()
365 };
366 let mut context = GetRecordContext::new(config, VecDeque::new());
367 assert!(context.is_done());
368 let event = context.next_action().unwrap();
369 assert_eq!(event, QueryAction::QuerySucceeded { query: QueryId(0) });
370 }
371
372 #[test]
373 fn fulfill_parallelism() {
374 let config = GetRecordConfig {
375 parallelism_factor: 3,
376 ..default_config()
377 };
378
379 let in_peers_set: HashSet<_> =
380 [PeerId::random(), PeerId::random(), PeerId::random()].into_iter().collect();
381 assert_eq!(in_peers_set.len(), 3);
382
383 let in_peers = in_peers_set.iter().map(|peer| peer_to_kad(*peer)).collect();
384 let mut context = GetRecordContext::new(config, in_peers);
385
386 for num in 0..3 {
387 let event = context.next_action().unwrap();
388 match event {
389 QueryAction::SendMessage { query, peer, .. } => {
390 assert_eq!(query, QueryId(0));
391 assert_eq!(context.pending.len(), num + 1);
393 assert!(context.pending.contains_key(&peer));
394
395 assert!(in_peers_set.contains(&peer));
397 }
398 _ => panic!("Unexpected event"),
399 }
400 }
401
402 assert!(context.next_action().is_none());
404 }
405
406 #[test]
407 fn completes_when_responses() {
408 let key = vec![1, 2, 3];
409 let config = GetRecordConfig {
410 parallelism_factor: 3,
411 replication_factor: 3,
412 ..default_config()
413 };
414
415 let peer_a = PeerId::random();
416 let peer_b = PeerId::random();
417 let peer_c = PeerId::random();
418
419 let in_peers_set: HashSet<_> = [peer_a, peer_b, peer_c].into_iter().collect();
420 assert_eq!(in_peers_set.len(), 3);
421
422 let in_peers = [peer_a, peer_b, peer_c].iter().map(|peer| peer_to_kad(*peer)).collect();
423 let mut context = GetRecordContext::new(config, in_peers);
424
425 for num in 0..3 {
427 let event = context.next_action().unwrap();
428 match event {
429 QueryAction::SendMessage { query, peer, .. } => {
430 assert_eq!(query, QueryId(0));
431 assert_eq!(context.pending.len(), num + 1);
433 assert!(context.pending.contains_key(&peer));
434
435 assert!(in_peers_set.contains(&peer));
437 }
438 _ => panic!("Unexpected event"),
439 }
440 }
441
442 let peer_d = PeerId::random();
444 context.register_response_failure(peer_d);
445 assert_eq!(context.pending.len(), 3);
446 assert!(context.queried.is_empty());
447
448 let record = Record::new(key.clone(), vec![1, 2, 3]);
450 context.register_response(peer_a, Some(record), vec![]);
451 assert_eq!(context.pending.len(), 2);
452 assert_eq!(context.queried.len(), 1);
453 assert_eq!(context.found_records.len(), 1);
454
455 let record = Record::new(key.clone(), vec![4, 5, 6]);
457 context.register_response(peer_b, Some(record), vec![peer_to_kad(peer_d.clone())]);
458 assert_eq!(context.pending.len(), 1);
459 assert_eq!(context.queried.len(), 2);
460 assert_eq!(context.found_records.len(), 2);
461 assert_eq!(context.candidates.len(), 1);
462
463 context.register_response_failure(peer_c);
465 assert!(context.pending.is_empty());
466 assert_eq!(context.queried.len(), 3);
467 assert_eq!(context.found_records.len(), 2);
468
469 let event = context.next_action().unwrap();
471 match event {
472 QueryAction::SendMessage { query, peer, .. } => {
473 assert_eq!(query, QueryId(0));
474 assert_eq!(context.pending.len(), 1);
476 assert_eq!(peer, peer_d);
477 }
478 _ => panic!("Unexpected event"),
479 }
480
481 let record = Record::new(key.clone(), vec![4, 5, 6]);
483 context.register_response(peer_d, Some(record), vec![]);
484
485 let event = context.next_action().unwrap();
487 assert_eq!(event, QueryAction::QuerySucceeded { query: QueryId(0) });
488
489 let found_records = context.found_records();
491 assert_eq!(
492 found_records,
493 vec![
494 PeerRecord {
495 peer: peer_a,
496 record: Record::new(key.clone(), vec![1, 2, 3]),
497 },
498 PeerRecord {
499 peer: peer_b,
500 record: Record::new(key.clone(), vec![4, 5, 6]),
501 },
502 PeerRecord {
503 peer: peer_d,
504 record: Record::new(key.clone(), vec![4, 5, 6]),
505 },
506 ]
507 );
508 }
509}