litep2p/protocol/libp2p/kademlia/query/
mod.rs

1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::{
22    protocol::libp2p::kademlia::{
23        message::KademliaMessage,
24        query::{
25            find_node::{FindNodeConfig, FindNodeContext},
26            get_record::{GetRecordConfig, GetRecordContext},
27        },
28        record::{Key as RecordKey, Record},
29        types::{KademliaPeer, Key},
30        PeerRecord, Quorum,
31    },
32    PeerId,
33};
34
35use bytes::Bytes;
36
37use std::collections::{HashMap, VecDeque};
38
39use self::find_many_nodes::FindManyNodesContext;
40
41mod find_many_nodes;
42mod find_node;
43mod get_record;
44
45/// Logging target for the file.
46const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query";
47
48// TODO: store record key instead of the actual record
49
50/// Type representing a query ID.
51#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
52pub struct QueryId(pub usize);
53
54/// Query type.
55#[derive(Debug)]
56enum QueryType {
57    /// `FIND_NODE` query.
58    FindNode {
59        /// Context for the `FIND_NODE` query
60        context: FindNodeContext<PeerId>,
61    },
62
63    /// `PUT_VALUE` query.
64    PutRecord {
65        /// Record that needs to be stored.
66        record: Record,
67
68        /// Context for the `FIND_NODE` query
69        context: FindNodeContext<RecordKey>,
70    },
71
72    /// `PUT_VALUE` query to specified peers.
73    PutRecordToPeers {
74        /// Record that needs to be stored.
75        record: Record,
76
77        /// Context for finding peers.
78        context: FindManyNodesContext,
79    },
80
81    /// `GET_VALUE` query.
82    GetRecord {
83        /// Context for the `GET_VALUE` query.
84        context: GetRecordContext,
85    },
86}
87
88/// Query action.
89#[derive(Debug, PartialEq, Eq)]
90pub enum QueryAction {
91    /// Send message to peer.
92    SendMessage {
93        /// Query ID.
94        query: QueryId,
95
96        /// Peer.
97        peer: PeerId,
98
99        /// Message.
100        message: Bytes,
101    },
102
103    /// `FIND_NODE` query succeeded.
104    FindNodeQuerySucceeded {
105        /// ID of the query that succeeded.
106        query: QueryId,
107
108        /// Target peer.
109        target: PeerId,
110
111        /// Peers that were found.
112        peers: Vec<KademliaPeer>,
113    },
114
115    /// Store the record to nodes closest to target key.
116    // TODO: horrible name
117    PutRecordToFoundNodes {
118        /// Target peer.
119        record: Record,
120
121        /// Peers for whom the `PUT_VALUE` must be sent to.
122        peers: Vec<KademliaPeer>,
123    },
124
125    /// `GET_VALUE` query succeeded.
126    GetRecordQueryDone {
127        /// Query ID.
128        query_id: QueryId,
129
130        /// Found records.
131        records: Vec<PeerRecord>,
132    },
133
134    // TODO: remove
135    /// Query succeeded.
136    QuerySucceeded {
137        /// ID of the query that succeeded.
138        query: QueryId,
139    },
140
141    /// Query failed.
142    QueryFailed {
143        /// ID of the query that failed.
144        query: QueryId,
145    },
146}
147
148/// Kademlia query engine.
149pub struct QueryEngine {
150    /// Local peer ID.
151    local_peer_id: PeerId,
152
153    /// Replication factor.
154    replication_factor: usize,
155
156    /// Parallelism factor.
157    parallelism_factor: usize,
158
159    /// Active queries.
160    queries: HashMap<QueryId, QueryType>,
161}
162
163impl QueryEngine {
164    /// Create new [`QueryEngine`].
165    pub fn new(
166        local_peer_id: PeerId,
167        replication_factor: usize,
168        parallelism_factor: usize,
169    ) -> Self {
170        Self {
171            local_peer_id,
172            replication_factor,
173            parallelism_factor,
174            queries: HashMap::new(),
175        }
176    }
177
178    /// Start `FIND_NODE` query.
179    pub fn start_find_node(
180        &mut self,
181        query_id: QueryId,
182        target: PeerId,
183        candidates: VecDeque<KademliaPeer>,
184    ) -> QueryId {
185        tracing::debug!(
186            target: LOG_TARGET,
187            ?query_id,
188            ?target,
189            num_peers = ?candidates.len(),
190            "start `FIND_NODE` query"
191        );
192
193        let target = Key::from(target);
194        let config = FindNodeConfig {
195            local_peer_id: self.local_peer_id,
196            replication_factor: self.replication_factor,
197            parallelism_factor: self.parallelism_factor,
198            query: query_id,
199            target,
200        };
201
202        self.queries.insert(
203            query_id,
204            QueryType::FindNode {
205                context: FindNodeContext::new(config, candidates),
206            },
207        );
208
209        query_id
210    }
211
212    /// Start `PUT_VALUE` query.
213    pub fn start_put_record(
214        &mut self,
215        query_id: QueryId,
216        record: Record,
217        candidates: VecDeque<KademliaPeer>,
218    ) -> QueryId {
219        tracing::debug!(
220            target: LOG_TARGET,
221            ?query_id,
222            target = ?record.key,
223            num_peers = ?candidates.len(),
224            "start `PUT_VALUE` query"
225        );
226
227        let target = Key::new(record.key.clone());
228        let config = FindNodeConfig {
229            local_peer_id: self.local_peer_id,
230            replication_factor: self.replication_factor,
231            parallelism_factor: self.parallelism_factor,
232            query: query_id,
233            target,
234        };
235
236        self.queries.insert(
237            query_id,
238            QueryType::PutRecord {
239                record,
240                context: FindNodeContext::new(config, candidates),
241            },
242        );
243
244        query_id
245    }
246
247    /// Start `PUT_VALUE` query to specified peers.
248    pub fn start_put_record_to_peers(
249        &mut self,
250        query_id: QueryId,
251        record: Record,
252        peers_to_report: Vec<KademliaPeer>,
253    ) -> QueryId {
254        tracing::debug!(
255            target: LOG_TARGET,
256            ?query_id,
257            target = ?record.key,
258            num_peers = ?peers_to_report.len(),
259            "start `PUT_VALUE` query to peers"
260        );
261
262        self.queries.insert(
263            query_id,
264            QueryType::PutRecordToPeers {
265                record,
266                context: FindManyNodesContext::new(query_id, peers_to_report),
267            },
268        );
269
270        query_id
271    }
272
273    /// Start `GET_VALUE` query.
274    pub fn start_get_record(
275        &mut self,
276        query_id: QueryId,
277        target: RecordKey,
278        candidates: VecDeque<KademliaPeer>,
279        quorum: Quorum,
280        count: usize,
281    ) -> QueryId {
282        tracing::debug!(
283            target: LOG_TARGET,
284            ?query_id,
285            ?target,
286            num_peers = ?candidates.len(),
287            "start `GET_VALUE` query"
288        );
289
290        let target = Key::new(target);
291        let config = GetRecordConfig {
292            local_peer_id: self.local_peer_id,
293            known_records: count,
294            quorum,
295            replication_factor: self.replication_factor,
296            parallelism_factor: self.parallelism_factor,
297            query: query_id,
298            target,
299        };
300
301        self.queries.insert(
302            query_id,
303            QueryType::GetRecord {
304                context: GetRecordContext::new(config, candidates),
305            },
306        );
307
308        query_id
309    }
310
311    /// Register response failure from a queried peer.
312    pub fn register_response_failure(&mut self, query: QueryId, peer: PeerId) {
313        tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register response failure");
314
315        match self.queries.get_mut(&query) {
316            None => {
317                tracing::trace!(target: LOG_TARGET, ?query, ?peer, "response failure for a stale query");
318            }
319            Some(QueryType::FindNode { context }) => {
320                context.register_response_failure(peer);
321            }
322            Some(QueryType::PutRecord { context, .. }) => {
323                context.register_response_failure(peer);
324            }
325            Some(QueryType::PutRecordToPeers { context, .. }) => {
326                context.register_response_failure(peer);
327            }
328            Some(QueryType::GetRecord { context }) => {
329                context.register_response_failure(peer);
330            }
331        }
332    }
333
334    /// Register that `response` received from `peer`.
335    pub fn register_response(&mut self, query: QueryId, peer: PeerId, message: KademliaMessage) {
336        tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register response");
337
338        match self.queries.get_mut(&query) {
339            None => {
340                tracing::trace!(target: LOG_TARGET, ?query, ?peer, "response failure for a stale query");
341            }
342            Some(QueryType::FindNode { context }) => match message {
343                KademliaMessage::FindNode { peers, .. } => {
344                    context.register_response(peer, peers);
345                }
346                _ => unreachable!(),
347            },
348            Some(QueryType::PutRecord { context, .. }) => match message {
349                KademliaMessage::FindNode { peers, .. } => {
350                    context.register_response(peer, peers);
351                }
352                _ => unreachable!(),
353            },
354            Some(QueryType::PutRecordToPeers { context, .. }) => match message {
355                KademliaMessage::FindNode { peers, .. } => {
356                    context.register_response(peer, peers);
357                }
358                _ => unreachable!(),
359            },
360            Some(QueryType::GetRecord { context }) => match message {
361                KademliaMessage::GetRecord { record, peers, .. } => {
362                    context.register_response(peer, record, peers);
363                }
364                _ => unreachable!(),
365            },
366        }
367    }
368
369    /// Get next action for `peer` from the [`QueryEngine`].
370    pub fn next_peer_action(&mut self, query: &QueryId, peer: &PeerId) -> Option<QueryAction> {
371        tracing::trace!(target: LOG_TARGET, ?query, ?peer, "get next peer action");
372
373        match self.queries.get_mut(query) {
374            None => {
375                tracing::trace!(target: LOG_TARGET, ?query, ?peer, "response failure for a stale query");
376                None
377            }
378            Some(QueryType::FindNode { context }) => context.next_peer_action(peer),
379            Some(QueryType::PutRecord { context, .. }) => context.next_peer_action(peer),
380            Some(QueryType::PutRecordToPeers { context, .. }) => context.next_peer_action(peer),
381            Some(QueryType::GetRecord { context }) => context.next_peer_action(peer),
382        }
383    }
384
385    /// Handle query success by returning the queried value(s)
386    /// and removing the query from [`QueryEngine`].
387    fn on_query_succeeded(&mut self, query: QueryId) -> QueryAction {
388        match self.queries.remove(&query).expect("query to exist") {
389            QueryType::FindNode { context } => QueryAction::FindNodeQuerySucceeded {
390                query,
391                target: context.config.target.into_preimage(),
392                peers: context.responses.into_values().collect::<Vec<_>>(),
393            },
394            QueryType::PutRecord { record, context } => QueryAction::PutRecordToFoundNodes {
395                record,
396                peers: context.responses.into_values().collect::<Vec<_>>(),
397            },
398            QueryType::PutRecordToPeers { record, context } => QueryAction::PutRecordToFoundNodes {
399                record,
400                peers: context.peers_to_report,
401            },
402            QueryType::GetRecord { context } => QueryAction::GetRecordQueryDone {
403                query_id: context.config.query,
404                records: context.found_records(),
405            },
406        }
407    }
408
409    /// Handle query failure by removing the query from [`QueryEngine`] and
410    /// returning the appropriate [`QueryAction`] to user.
411    fn on_query_failed(&mut self, query: QueryId) -> QueryAction {
412        let _ = self.queries.remove(&query).expect("query to exist");
413
414        QueryAction::QueryFailed { query }
415    }
416
417    /// Get next action from the [`QueryEngine`].
418    pub fn next_action(&mut self) -> Option<QueryAction> {
419        for (_, state) in self.queries.iter_mut() {
420            let action = match state {
421                QueryType::FindNode { context } => context.next_action(),
422                QueryType::PutRecord { context, .. } => context.next_action(),
423                QueryType::PutRecordToPeers { context, .. } => context.next_action(),
424                QueryType::GetRecord { context } => context.next_action(),
425            };
426
427            match action {
428                Some(QueryAction::QuerySucceeded { query }) => {
429                    return Some(self.on_query_succeeded(query));
430                }
431                Some(QueryAction::QueryFailed { query }) =>
432                    return Some(self.on_query_failed(query)),
433                Some(_) => return action,
434                _ => continue,
435            }
436        }
437
438        None
439    }
440}
441
442#[cfg(test)]
443mod tests {
444    use multihash::{Code, Multihash};
445
446    use super::*;
447    use crate::protocol::libp2p::kademlia::types::ConnectionType;
448
449    // make fixed peer id
450    fn make_peer_id(first: u8, second: u8) -> PeerId {
451        let mut peer_id = vec![0u8; 32];
452        peer_id[0] = first;
453        peer_id[1] = second;
454
455        PeerId::from_bytes(
456            &Multihash::wrap(Code::Identity.into(), &peer_id)
457                .expect("The digest size is never too large")
458                .to_bytes(),
459        )
460        .unwrap()
461    }
462
463    #[test]
464    fn query_fails() {
465        let _ = tracing_subscriber::fmt()
466            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
467            .try_init();
468
469        let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
470        let target_peer = PeerId::random();
471        let _target_key = Key::from(target_peer);
472
473        let query = engine.start_find_node(
474            QueryId(1337),
475            target_peer,
476            vec![
477                KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
478                KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
479                KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
480                KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
481            ]
482            .into(),
483        );
484
485        for _ in 0..4 {
486            if let Some(QueryAction::SendMessage { query, peer, .. }) = engine.next_action() {
487                engine.register_response_failure(query, peer);
488            }
489        }
490
491        if let Some(QueryAction::QueryFailed { query: failed }) = engine.next_action() {
492            assert_eq!(failed, query);
493        }
494
495        assert!(engine.next_action().is_none());
496    }
497
498    #[test]
499    fn lookup_paused() {
500        let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
501        let target_peer = PeerId::random();
502        let _target_key = Key::from(target_peer);
503
504        let _ = engine.start_find_node(
505            QueryId(1338),
506            target_peer,
507            vec![
508                KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
509                KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
510                KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
511                KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
512            ]
513            .into(),
514        );
515
516        for _ in 0..3 {
517            let _ = engine.next_action();
518        }
519
520        assert!(engine.next_action().is_none());
521    }
522
523    #[test]
524    fn find_node_query_succeeds() {
525        let _ = tracing_subscriber::fmt()
526            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
527            .try_init();
528
529        let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
530        let target_peer = make_peer_id(0, 0);
531        let target_key = Key::from(target_peer);
532
533        let distances = {
534            let mut distances = std::collections::BTreeMap::new();
535
536            for i in 1..64 {
537                let peer = make_peer_id(i, 0);
538                let key = Key::from(peer);
539
540                distances.insert(target_key.distance(&key), peer);
541            }
542
543            distances
544        };
545        let mut iter = distances.iter();
546
547        // start find node with one known peer
548        let _query = engine.start_find_node(
549            QueryId(1339),
550            target_peer,
551            vec![KademliaPeer::new(
552                *iter.next().unwrap().1,
553                vec![],
554                ConnectionType::NotConnected,
555            )]
556            .into(),
557        );
558
559        let action = engine.next_action();
560        assert!(engine.next_action().is_none());
561
562        // the one known peer responds with 3 other peers it knows
563        match action {
564            Some(QueryAction::SendMessage { query, peer, .. }) => {
565                engine.register_response(
566                    query,
567                    peer,
568                    KademliaMessage::FindNode {
569                        target: Vec::new(),
570                        peers: vec![
571                            KademliaPeer::new(
572                                *iter.next().unwrap().1,
573                                vec![],
574                                ConnectionType::NotConnected,
575                            ),
576                            KademliaPeer::new(
577                                *iter.next().unwrap().1,
578                                vec![],
579                                ConnectionType::NotConnected,
580                            ),
581                            KademliaPeer::new(
582                                *iter.next().unwrap().1,
583                                vec![],
584                                ConnectionType::NotConnected,
585                            ),
586                        ],
587                    },
588                );
589            }
590            _ => panic!("invalid event received"),
591        }
592
593        // send empty response for the last three nodes
594        for _ in 0..3 {
595            match engine.next_action() {
596                Some(QueryAction::SendMessage { query, peer, .. }) => {
597                    println!("next send message to {peer:?}");
598                    engine.register_response(
599                        query,
600                        peer,
601                        KademliaMessage::FindNode {
602                            target: Vec::new(),
603                            peers: vec![],
604                        },
605                    );
606                }
607                _ => panic!("invalid event received"),
608            }
609        }
610
611        match engine.next_action() {
612            Some(QueryAction::FindNodeQuerySucceeded { peers, .. }) => {
613                assert_eq!(peers.len(), 4);
614            }
615            _ => panic!("invalid event received"),
616        }
617
618        assert!(engine.next_action().is_none());
619    }
620
621    #[test]
622    fn put_record_succeeds() {
623        let _ = tracing_subscriber::fmt()
624            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
625            .try_init();
626
627        let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
628        let record_key = RecordKey::new(&vec![1, 2, 3, 4]);
629        let target_key = Key::new(record_key.clone());
630        let original_record = Record::new(record_key.clone(), vec![1, 3, 3, 7, 1, 3, 3, 8]);
631
632        let distances = {
633            let mut distances = std::collections::BTreeMap::new();
634
635            for i in 1..64 {
636                let peer = make_peer_id(i, 0);
637                let key = Key::from(peer);
638
639                distances.insert(target_key.distance(&key), peer);
640            }
641
642            distances
643        };
644        let mut iter = distances.iter();
645
646        // start find node with one known peer
647        let _query = engine.start_put_record(
648            QueryId(1340),
649            original_record.clone(),
650            vec![KademliaPeer::new(
651                *iter.next().unwrap().1,
652                vec![],
653                ConnectionType::NotConnected,
654            )]
655            .into(),
656        );
657
658        let action = engine.next_action();
659        assert!(engine.next_action().is_none());
660
661        // the one known peer responds with 3 other peers it knows
662        match action {
663            Some(QueryAction::SendMessage { query, peer, .. }) => {
664                engine.register_response(
665                    query,
666                    peer,
667                    KademliaMessage::FindNode {
668                        target: Vec::new(),
669                        peers: vec![
670                            KademliaPeer::new(
671                                *iter.next().unwrap().1,
672                                vec![],
673                                ConnectionType::NotConnected,
674                            ),
675                            KademliaPeer::new(
676                                *iter.next().unwrap().1,
677                                vec![],
678                                ConnectionType::NotConnected,
679                            ),
680                            KademliaPeer::new(
681                                *iter.next().unwrap().1,
682                                vec![],
683                                ConnectionType::NotConnected,
684                            ),
685                        ],
686                    },
687                );
688            }
689            _ => panic!("invalid event received"),
690        }
691
692        // send empty response for the last three nodes
693        for _ in 0..3 {
694            match engine.next_action() {
695                Some(QueryAction::SendMessage { query, peer, .. }) => {
696                    println!("next send message to {peer:?}");
697                    engine.register_response(
698                        query,
699                        peer,
700                        KademliaMessage::FindNode {
701                            target: Vec::new(),
702                            peers: vec![],
703                        },
704                    );
705                }
706                _ => panic!("invalid event received"),
707            }
708        }
709
710        let peers = match engine.next_action() {
711            Some(QueryAction::PutRecordToFoundNodes { peers, record }) => {
712                assert_eq!(peers.len(), 4);
713                assert_eq!(record.key, original_record.key);
714                assert_eq!(record.value, original_record.value);
715                peers
716            }
717            _ => panic!("invalid event received"),
718        };
719
720        assert!(engine.next_action().is_none());
721
722        // get records from those peers.
723        let _query = engine.start_get_record(
724            QueryId(1341),
725            record_key.clone(),
726            vec![
727                KademliaPeer::new(peers[0].peer, vec![], ConnectionType::NotConnected),
728                KademliaPeer::new(peers[1].peer, vec![], ConnectionType::NotConnected),
729                KademliaPeer::new(peers[2].peer, vec![], ConnectionType::NotConnected),
730                KademliaPeer::new(peers[3].peer, vec![], ConnectionType::NotConnected),
731            ]
732            .into(),
733            Quorum::All,
734            3,
735        );
736
737        for _ in 0..4 {
738            match engine.next_action() {
739                Some(QueryAction::SendMessage { query, peer, .. }) => {
740                    engine.register_response(
741                        query,
742                        peer,
743                        KademliaMessage::GetRecord {
744                            record: Some(original_record.clone()),
745                            peers: vec![],
746                            key: Some(record_key.clone()),
747                        },
748                    );
749                }
750                _ => panic!("invalid event received"),
751            }
752        }
753
754        let peers: std::collections::HashSet<_> = peers.into_iter().map(|p| p.peer).collect();
755        match engine.next_action() {
756            Some(QueryAction::GetRecordQueryDone { records, .. }) => {
757                let query_peers = records
758                    .iter()
759                    .map(|peer_record| peer_record.peer)
760                    .collect::<std::collections::HashSet<_>>();
761                assert_eq!(peers, query_peers);
762
763                let records: std::collections::HashSet<_> =
764                    records.into_iter().map(|peer_record| peer_record.record).collect();
765                // One single record found across peers.
766                assert_eq!(records.len(), 1);
767                let record = records.into_iter().next().unwrap();
768
769                assert_eq!(record.key, original_record.key);
770                assert_eq!(record.value, original_record.value);
771            }
772            _ => panic!("invalid event received"),
773        }
774    }
775}