litep2p/protocol/libp2p/kademlia/query/
get_record.rs

1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use bytes::Bytes;
22
23use crate::{
24    protocol::libp2p::kademlia::{
25        message::KademliaMessage,
26        query::{QueryAction, QueryId},
27        record::{Key as RecordKey, PeerRecord, Record},
28        types::{Distance, KademliaPeer, Key},
29        Quorum,
30    },
31    PeerId,
32};
33
34use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
35
36/// Logging target for the file.
37const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query::get_record";
38
39/// The configuration needed to instantiate a new [`GetRecordContext`].
40#[derive(Debug)]
41pub struct GetRecordConfig {
42    /// Local peer ID.
43    pub local_peer_id: PeerId,
44
45    /// How many records we already know about (ie extracted from storage).
46    ///
47    /// This can either be 0 or 1 when the record is extracted local storage.
48    pub known_records: usize,
49
50    /// Quorum for the query.
51    pub quorum: Quorum,
52
53    /// Replication factor.
54    pub replication_factor: usize,
55
56    /// Parallelism factor.
57    pub parallelism_factor: usize,
58
59    /// Query ID.
60    pub query: QueryId,
61
62    /// Target key.
63    pub target: Key<RecordKey>,
64}
65
66impl GetRecordConfig {
67    /// Checks if the found number of records meets the specified quorum.
68    ///
69    /// Used to determine if the query found enough records to stop.
70    fn sufficient_records(&self, records: usize) -> bool {
71        // The total number of known records is the sum of the records we knew about before starting
72        // the query and the records we found along the way.
73        let total_known = self.known_records + records;
74
75        match self.quorum {
76            Quorum::All => total_known >= self.replication_factor,
77            Quorum::One => total_known >= 1,
78            Quorum::N(needed_responses) => total_known >= needed_responses.get(),
79        }
80    }
81}
82
83#[derive(Debug)]
84pub struct GetRecordContext {
85    /// Query immutable config.
86    pub config: GetRecordConfig,
87
88    /// Cached Kademlia message to send.
89    kad_message: Bytes,
90
91    /// Peers from whom the `QueryEngine` is waiting to hear a response.
92    pub pending: HashMap<PeerId, KademliaPeer>,
93
94    /// Queried candidates.
95    ///
96    /// These are the peers for whom the query has already been sent
97    /// and who have either returned their closest peers or failed to answer.
98    pub queried: HashSet<PeerId>,
99
100    /// Candidates.
101    pub candidates: BTreeMap<Distance, KademliaPeer>,
102
103    /// Found records.
104    pub found_records: Vec<PeerRecord>,
105}
106
107impl GetRecordContext {
108    /// Create new [`GetRecordContext`].
109    pub fn new(config: GetRecordConfig, in_peers: VecDeque<KademliaPeer>) -> Self {
110        let mut candidates = BTreeMap::new();
111
112        for candidate in &in_peers {
113            let distance = config.target.distance(&candidate.key);
114            candidates.insert(distance, candidate.clone());
115        }
116
117        let kad_message = KademliaMessage::get_record(config.target.clone().into_preimage());
118
119        Self {
120            config,
121            kad_message,
122
123            candidates,
124            pending: HashMap::new(),
125            queried: HashSet::new(),
126            found_records: Vec::new(),
127        }
128    }
129
130    /// Get the found records.
131    pub fn found_records(self) -> Vec<PeerRecord> {
132        self.found_records
133    }
134
135    /// Register response failure for `peer`.
136    pub fn register_response_failure(&mut self, peer: PeerId) {
137        let Some(peer) = self.pending.remove(&peer) else {
138            tracing::debug!(target: LOG_TARGET, query = ?self.config.query, ?peer, "pending peer doesn't exist");
139            return;
140        };
141
142        self.queried.insert(peer.peer);
143    }
144
145    /// Register `GET_VALUE` response from `peer`.
146    pub fn register_response(
147        &mut self,
148        peer: PeerId,
149        record: Option<Record>,
150        peers: Vec<KademliaPeer>,
151    ) {
152        tracing::trace!(target: LOG_TARGET, query = ?self.config.query, ?peer, "received response from peer");
153
154        let Some(peer) = self.pending.remove(&peer) else {
155            tracing::debug!(target: LOG_TARGET, query = ?self.config.query, ?peer, "received response from peer but didn't expect it");
156            return;
157        };
158
159        if let Some(record) = record {
160            if !record.is_expired(std::time::Instant::now()) {
161                self.found_records.push(PeerRecord {
162                    peer: peer.peer,
163                    record,
164                });
165            }
166        }
167
168        // Add the queried peer to `queried` and all new peers which haven't been
169        // queried to `candidates`
170        self.queried.insert(peer.peer);
171
172        let to_query_candidate = peers.into_iter().filter_map(|peer| {
173            // Peer already produced a response.
174            if self.queried.contains(&peer.peer) {
175                return None;
176            }
177
178            // Peer was queried, awaiting response.
179            if self.pending.contains_key(&peer.peer) {
180                return None;
181            }
182
183            // Local node.
184            if self.config.local_peer_id == peer.peer {
185                return None;
186            }
187
188            Some(peer)
189        });
190
191        for candidate in to_query_candidate {
192            let distance = self.config.target.distance(&candidate.key);
193            self.candidates.insert(distance, candidate);
194        }
195    }
196
197    /// Get next action for `peer`.
198    // TODO: remove this and store the next action to `PeerAction`
199    pub fn next_peer_action(&mut self, peer: &PeerId) -> Option<QueryAction> {
200        self.pending.contains_key(peer).then_some(QueryAction::SendMessage {
201            query: self.config.query,
202            peer: *peer,
203            message: self.kad_message.clone(),
204        })
205    }
206
207    /// Schedule next peer for outbound `GET_VALUE` query.
208    fn schedule_next_peer(&mut self) -> Option<QueryAction> {
209        tracing::trace!(target: LOG_TARGET, query = ?self.config.query, "get next peer");
210
211        let (_, candidate) = self.candidates.pop_first()?;
212        let peer = candidate.peer;
213
214        tracing::trace!(target: LOG_TARGET, query = ?self.config.query, ?peer, "current candidate");
215        self.pending.insert(candidate.peer, candidate);
216
217        Some(QueryAction::SendMessage {
218            query: self.config.query,
219            peer,
220            message: self.kad_message.clone(),
221        })
222    }
223
224    /// Check if the query cannot make any progress.
225    ///
226    /// Returns true when there are no pending responses and no candidates to query.
227    fn is_done(&self) -> bool {
228        self.pending.is_empty() && self.candidates.is_empty()
229    }
230
231    /// Get next action for a `GET_VALUE` query.
232    pub fn next_action(&mut self) -> Option<QueryAction> {
233        // These are the records we knew about before starting the query and
234        // the records we found along the way.
235        let known_records = self.config.known_records + self.found_records.len();
236
237        // If we cannot make progress, return the final result.
238        // A query failed when we are not able to identify one single record.
239        if self.is_done() {
240            return if known_records == 0 {
241                Some(QueryAction::QueryFailed {
242                    query: self.config.query,
243                })
244            } else {
245                Some(QueryAction::QuerySucceeded {
246                    query: self.config.query,
247                })
248            };
249        }
250
251        // Check if enough records have been found
252        let sufficient_records = self.config.sufficient_records(self.found_records.len());
253        if sufficient_records {
254            return Some(QueryAction::QuerySucceeded {
255                query: self.config.query,
256            });
257        }
258
259        // At this point, we either have pending responses or candidates to query; and we need more
260        // records. Ensure we do not exceed the parallelism factor.
261        if self.pending.len() == self.config.parallelism_factor {
262            return None;
263        }
264
265        self.schedule_next_peer()
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272    use crate::protocol::libp2p::kademlia::types::ConnectionType;
273
274    fn default_config() -> GetRecordConfig {
275        GetRecordConfig {
276            local_peer_id: PeerId::random(),
277            quorum: Quorum::All,
278            known_records: 0,
279            replication_factor: 20,
280            parallelism_factor: 10,
281            query: QueryId(0),
282            target: Key::new(vec![1, 2, 3].into()),
283        }
284    }
285
286    fn peer_to_kad(peer: PeerId) -> KademliaPeer {
287        KademliaPeer {
288            peer,
289            key: Key::from(peer),
290            addresses: vec![],
291            connection: ConnectionType::Connected,
292        }
293    }
294
295    #[test]
296    fn config_check() {
297        // Quorum::All with no known records.
298        let config = GetRecordConfig {
299            quorum: Quorum::All,
300            known_records: 0,
301            replication_factor: 20,
302            ..default_config()
303        };
304        assert!(config.sufficient_records(20));
305        assert!(!config.sufficient_records(19));
306
307        // Quorum::All with 1 known records.
308        let config = GetRecordConfig {
309            quorum: Quorum::All,
310            known_records: 1,
311            replication_factor: 20,
312            ..default_config()
313        };
314        assert!(config.sufficient_records(19));
315        assert!(!config.sufficient_records(18));
316
317        // Quorum::One with no known records.
318        let config = GetRecordConfig {
319            quorum: Quorum::One,
320            known_records: 0,
321            ..default_config()
322        };
323        assert!(config.sufficient_records(1));
324        assert!(!config.sufficient_records(0));
325
326        // Quorum::One with known records.
327        let config = GetRecordConfig {
328            quorum: Quorum::One,
329            known_records: 1,
330            ..default_config()
331        };
332        assert!(config.sufficient_records(1));
333        assert!(config.sufficient_records(0));
334
335        // Quorum::N with no known records.
336        let config = GetRecordConfig {
337            quorum: Quorum::N(std::num::NonZeroUsize::new(10).expect("valid; qed")),
338            known_records: 0,
339            ..default_config()
340        };
341        assert!(config.sufficient_records(10));
342        assert!(!config.sufficient_records(9));
343
344        // Quorum::N with known records.
345        let config = GetRecordConfig {
346            quorum: Quorum::N(std::num::NonZeroUsize::new(10).expect("valid; qed")),
347            known_records: 1,
348            ..default_config()
349        };
350        assert!(config.sufficient_records(9));
351        assert!(!config.sufficient_records(8));
352    }
353
354    #[test]
355    fn completes_when_no_candidates() {
356        let config = default_config();
357        let mut context = GetRecordContext::new(config, VecDeque::new());
358        assert!(context.is_done());
359        let event = context.next_action().unwrap();
360        assert_eq!(event, QueryAction::QueryFailed { query: QueryId(0) });
361
362        let config = GetRecordConfig {
363            known_records: 1,
364            ..default_config()
365        };
366        let mut context = GetRecordContext::new(config, VecDeque::new());
367        assert!(context.is_done());
368        let event = context.next_action().unwrap();
369        assert_eq!(event, QueryAction::QuerySucceeded { query: QueryId(0) });
370    }
371
372    #[test]
373    fn fulfill_parallelism() {
374        let config = GetRecordConfig {
375            parallelism_factor: 3,
376            ..default_config()
377        };
378
379        let in_peers_set: HashSet<_> =
380            [PeerId::random(), PeerId::random(), PeerId::random()].into_iter().collect();
381        assert_eq!(in_peers_set.len(), 3);
382
383        let in_peers = in_peers_set.iter().map(|peer| peer_to_kad(*peer)).collect();
384        let mut context = GetRecordContext::new(config, in_peers);
385
386        for num in 0..3 {
387            let event = context.next_action().unwrap();
388            match event {
389                QueryAction::SendMessage { query, peer, .. } => {
390                    assert_eq!(query, QueryId(0));
391                    // Added as pending.
392                    assert_eq!(context.pending.len(), num + 1);
393                    assert!(context.pending.contains_key(&peer));
394
395                    // Check the peer is the one provided.
396                    assert!(in_peers_set.contains(&peer));
397                }
398                _ => panic!("Unexpected event"),
399            }
400        }
401
402        // Fulfilled parallelism.
403        assert!(context.next_action().is_none());
404    }
405
406    #[test]
407    fn completes_when_responses() {
408        let key = vec![1, 2, 3];
409        let config = GetRecordConfig {
410            parallelism_factor: 3,
411            replication_factor: 3,
412            ..default_config()
413        };
414
415        let peer_a = PeerId::random();
416        let peer_b = PeerId::random();
417        let peer_c = PeerId::random();
418
419        let in_peers_set: HashSet<_> = [peer_a, peer_b, peer_c].into_iter().collect();
420        assert_eq!(in_peers_set.len(), 3);
421
422        let in_peers = [peer_a, peer_b, peer_c].iter().map(|peer| peer_to_kad(*peer)).collect();
423        let mut context = GetRecordContext::new(config, in_peers);
424
425        // Schedule peer queries.
426        for num in 0..3 {
427            let event = context.next_action().unwrap();
428            match event {
429                QueryAction::SendMessage { query, peer, .. } => {
430                    assert_eq!(query, QueryId(0));
431                    // Added as pending.
432                    assert_eq!(context.pending.len(), num + 1);
433                    assert!(context.pending.contains_key(&peer));
434
435                    // Check the peer is the one provided.
436                    assert!(in_peers_set.contains(&peer));
437                }
438                _ => panic!("Unexpected event"),
439            }
440        }
441
442        // Checks a failed query that was not initiated.
443        let peer_d = PeerId::random();
444        context.register_response_failure(peer_d);
445        assert_eq!(context.pending.len(), 3);
446        assert!(context.queried.is_empty());
447
448        // Provide responses back.
449        let record = Record::new(key.clone(), vec![1, 2, 3]);
450        context.register_response(peer_a, Some(record), vec![]);
451        assert_eq!(context.pending.len(), 2);
452        assert_eq!(context.queried.len(), 1);
453        assert_eq!(context.found_records.len(), 1);
454
455        // Provide different response from peer b with peer d as candidate.
456        let record = Record::new(key.clone(), vec![4, 5, 6]);
457        context.register_response(peer_b, Some(record), vec![peer_to_kad(peer_d.clone())]);
458        assert_eq!(context.pending.len(), 1);
459        assert_eq!(context.queried.len(), 2);
460        assert_eq!(context.found_records.len(), 2);
461        assert_eq!(context.candidates.len(), 1);
462
463        // Peer C fails.
464        context.register_response_failure(peer_c);
465        assert!(context.pending.is_empty());
466        assert_eq!(context.queried.len(), 3);
467        assert_eq!(context.found_records.len(), 2);
468
469        // Drain the last candidate.
470        let event = context.next_action().unwrap();
471        match event {
472            QueryAction::SendMessage { query, peer, .. } => {
473                assert_eq!(query, QueryId(0));
474                // Added as pending.
475                assert_eq!(context.pending.len(), 1);
476                assert_eq!(peer, peer_d);
477            }
478            _ => panic!("Unexpected event"),
479        }
480
481        // Peer D responds.
482        let record = Record::new(key.clone(), vec![4, 5, 6]);
483        context.register_response(peer_d, Some(record), vec![]);
484
485        // Produces the result.
486        let event = context.next_action().unwrap();
487        assert_eq!(event, QueryAction::QuerySucceeded { query: QueryId(0) });
488
489        // Check results.
490        let found_records = context.found_records();
491        assert_eq!(
492            found_records,
493            vec![
494                PeerRecord {
495                    peer: peer_a,
496                    record: Record::new(key.clone(), vec![1, 2, 3]),
497                },
498                PeerRecord {
499                    peer: peer_b,
500                    record: Record::new(key.clone(), vec![4, 5, 6]),
501                },
502                PeerRecord {
503                    peer: peer_d,
504                    record: Record::new(key.clone(), vec![4, 5, 6]),
505                },
506            ]
507        );
508    }
509}