litep2p/protocol/libp2p/kademlia/query/
mod.rs

1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::{
22    protocol::libp2p::kademlia::{
23        message::KademliaMessage,
24        query::{
25            find_node::{FindNodeConfig, FindNodeContext},
26            get_providers::{GetProvidersConfig, GetProvidersContext},
27            get_record::{GetRecordConfig, GetRecordContext},
28        },
29        record::{ContentProvider, Key as RecordKey, Record},
30        types::{KademliaPeer, Key},
31        PeerRecord, Quorum,
32    },
33    PeerId,
34};
35
36use bytes::Bytes;
37
38use std::collections::{HashMap, VecDeque};
39
40use self::{find_many_nodes::FindManyNodesContext, target_peers::PutToTargetPeersContext};
41
42mod find_many_nodes;
43mod find_node;
44mod get_providers;
45mod get_record;
46mod target_peers;
47
48/// Logging target for the file.
49const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query";
50
51/// Type representing a query ID.
52#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
53#[cfg_attr(feature = "fuzz", derive(serde::Serialize, serde::Deserialize))]
54pub struct QueryId(pub usize);
55
56/// Query type.
57#[derive(Debug)]
58enum QueryType {
59    /// `FIND_NODE` query.
60    FindNode {
61        /// Context for the `FIND_NODE` query.
62        context: FindNodeContext<PeerId>,
63    },
64
65    /// `PUT_VALUE` query.
66    PutRecord {
67        /// Record that needs to be stored.
68        record: Record,
69
70        /// [`Quorum`] that needs to be reached for the query to succeed.
71        quorum: Quorum,
72
73        /// Context for the `FIND_NODE` query.
74        context: FindNodeContext<RecordKey>,
75    },
76
77    /// `PUT_VALUE` query to specified peers.
78    PutRecordToPeers {
79        /// Record that needs to be stored.
80        record: Record,
81
82        /// [`Quorum`] that needs to be reached for the query to succeed.
83        quorum: Quorum,
84
85        /// Context for finding peers.
86        context: FindManyNodesContext,
87    },
88
89    /// `PUT_VALUE` message sending phase.
90    PutRecordToFoundNodes {
91        /// Context for tracking `PUT_VALUE` responses.
92        context: PutToTargetPeersContext,
93    },
94
95    /// `GET_VALUE` query.
96    GetRecord {
97        /// Context for the `GET_VALUE` query.
98        context: GetRecordContext,
99    },
100
101    /// `ADD_PROVIDER` query.
102    AddProvider {
103        /// Provided key.
104        provided_key: RecordKey,
105
106        /// Provider record that need to be stored.
107        provider: ContentProvider,
108
109        /// [`Quorum`] that needs to be reached for the query to succeed.
110        quorum: Quorum,
111
112        /// Context for the `FIND_NODE` query.
113        context: FindNodeContext<RecordKey>,
114    },
115
116    /// `ADD_PROVIDER` message sending phase.
117    AddProviderToFoundNodes {
118        /// Context for tracking `ADD_PROVIDER` requests.
119        context: PutToTargetPeersContext,
120    },
121
122    /// `GET_PROVIDERS` query.
123    GetProviders {
124        /// Context for the `GET_PROVIDERS` query.
125        context: GetProvidersContext,
126    },
127}
128
129/// Query action.
130#[derive(Debug)]
131pub enum QueryAction {
132    /// Send message to peer.
133    SendMessage {
134        /// Query ID.
135        query: QueryId,
136
137        /// Peer.
138        peer: PeerId,
139
140        /// Message.
141        message: Bytes,
142    },
143
144    /// `FIND_NODE` query succeeded.
145    FindNodeQuerySucceeded {
146        /// ID of the query that succeeded.
147        query: QueryId,
148
149        /// Target peer.
150        target: PeerId,
151
152        /// Peers that were found.
153        peers: Vec<KademliaPeer>,
154    },
155
156    /// Store the record to nodes closest to target key.
157    PutRecordToFoundNodes {
158        /// Query ID of the original PUT_RECORD request.
159        query: QueryId,
160
161        /// Record to store.
162        record: Record,
163
164        /// Peers for whom the `PUT_VALUE` must be sent to.
165        peers: Vec<KademliaPeer>,
166
167        /// [`Quorum`] that needs to be reached for the query to succeed.
168        quorum: Quorum,
169    },
170
171    /// `PUT_VALUE` query succeeded.
172    PutRecordQuerySucceeded {
173        /// ID of the query that succeeded.
174        query: QueryId,
175
176        /// Record key of the stored record.
177        key: RecordKey,
178    },
179
180    /// Add the provider record to nodes closest to the target key.
181    AddProviderToFoundNodes {
182        /// Query ID of the original ADD_PROVIDER request.
183        query: QueryId,
184
185        /// Provided key.
186        provided_key: RecordKey,
187
188        /// Provider record.
189        provider: ContentProvider,
190
191        /// Peers for whom the `ADD_PROVIDER` must be sent to.
192        peers: Vec<KademliaPeer>,
193
194        /// [`Quorum`] that needs to be reached for the query to succeed.
195        quorum: Quorum,
196    },
197
198    /// `ADD_PROVIDER` query succeeded.
199    AddProviderQuerySucceeded {
200        /// ID of the query that succeeded.
201        query: QueryId,
202
203        /// Provided key.
204        provided_key: RecordKey,
205    },
206
207    /// `GET_VALUE` query succeeded.
208    GetRecordQueryDone {
209        /// Query ID.
210        query_id: QueryId,
211    },
212
213    /// `GET_VALUE` inflight query produced a result.
214    ///
215    /// This event is emitted when a peer responds to the query with a record.
216    GetRecordPartialResult {
217        /// Query ID.
218        query_id: QueryId,
219
220        /// Found record.
221        record: PeerRecord,
222    },
223
224    /// `GET_PROVIDERS` query succeeded.
225    GetProvidersQueryDone {
226        /// Query ID.
227        query_id: QueryId,
228
229        /// Provided key.
230        provided_key: RecordKey,
231
232        /// Found providers.
233        providers: Vec<ContentProvider>,
234    },
235
236    /// Query succeeded.
237    QuerySucceeded {
238        /// ID of the query that succeeded.
239        query: QueryId,
240    },
241
242    /// Query failed.
243    QueryFailed {
244        /// ID of the query that failed.
245        query: QueryId,
246    },
247}
248
249/// Kademlia query engine.
250pub struct QueryEngine {
251    /// Local peer ID.
252    local_peer_id: PeerId,
253
254    /// Replication factor.
255    replication_factor: usize,
256
257    /// Parallelism factor.
258    parallelism_factor: usize,
259
260    /// Active queries.
261    queries: HashMap<QueryId, QueryType>,
262}
263
264impl QueryEngine {
265    /// Create new [`QueryEngine`].
266    pub fn new(
267        local_peer_id: PeerId,
268        replication_factor: usize,
269        parallelism_factor: usize,
270    ) -> Self {
271        Self {
272            local_peer_id,
273            replication_factor,
274            parallelism_factor,
275            queries: HashMap::new(),
276        }
277    }
278
279    /// Start `FIND_NODE` query.
280    pub fn start_find_node(
281        &mut self,
282        query_id: QueryId,
283        target: PeerId,
284        candidates: VecDeque<KademliaPeer>,
285    ) -> QueryId {
286        tracing::debug!(
287            target: LOG_TARGET,
288            ?query_id,
289            ?target,
290            num_peers = ?candidates.len(),
291            "start `FIND_NODE` query"
292        );
293
294        let target = Key::from(target);
295        let config = FindNodeConfig {
296            local_peer_id: self.local_peer_id,
297            replication_factor: self.replication_factor,
298            parallelism_factor: self.parallelism_factor,
299            query: query_id,
300            target,
301        };
302
303        self.queries.insert(
304            query_id,
305            QueryType::FindNode {
306                context: FindNodeContext::new(config, candidates),
307            },
308        );
309
310        query_id
311    }
312
313    /// Start `PUT_VALUE` query.
314    pub fn start_put_record(
315        &mut self,
316        query_id: QueryId,
317        record: Record,
318        candidates: VecDeque<KademliaPeer>,
319        quorum: Quorum,
320    ) -> QueryId {
321        tracing::debug!(
322            target: LOG_TARGET,
323            ?query_id,
324            target = ?record.key,
325            num_peers = ?candidates.len(),
326            "start `PUT_VALUE` query"
327        );
328
329        let target = Key::new(record.key.clone());
330        let config = FindNodeConfig {
331            local_peer_id: self.local_peer_id,
332            replication_factor: self.replication_factor,
333            parallelism_factor: self.parallelism_factor,
334            query: query_id,
335            target,
336        };
337
338        self.queries.insert(
339            query_id,
340            QueryType::PutRecord {
341                record,
342                quorum,
343                context: FindNodeContext::new(config, candidates),
344            },
345        );
346
347        query_id
348    }
349
350    /// Start `PUT_VALUE` query to specified peers.
351    pub fn start_put_record_to_peers(
352        &mut self,
353        query_id: QueryId,
354        record: Record,
355        peers_to_report: Vec<KademliaPeer>,
356        quorum: Quorum,
357    ) -> QueryId {
358        tracing::debug!(
359            target: LOG_TARGET,
360            ?query_id,
361            target = ?record.key,
362            num_peers = ?peers_to_report.len(),
363            "start `PUT_VALUE` query to peers"
364        );
365
366        self.queries.insert(
367            query_id,
368            QueryType::PutRecordToPeers {
369                record,
370                quorum,
371                context: FindManyNodesContext::new(query_id, peers_to_report),
372            },
373        );
374
375        query_id
376    }
377
378    /// Start `GET_VALUE` query.
379    pub fn start_get_record(
380        &mut self,
381        query_id: QueryId,
382        target: RecordKey,
383        candidates: VecDeque<KademliaPeer>,
384        quorum: Quorum,
385        local_record: bool,
386    ) -> QueryId {
387        tracing::debug!(
388            target: LOG_TARGET,
389            ?query_id,
390            ?target,
391            num_peers = ?candidates.len(),
392            "start `GET_VALUE` query"
393        );
394
395        let target = Key::new(target);
396        let config = GetRecordConfig {
397            local_peer_id: self.local_peer_id,
398            known_records: if local_record { 1 } else { 0 },
399            quorum,
400            replication_factor: self.replication_factor,
401            parallelism_factor: self.parallelism_factor,
402            query: query_id,
403            target,
404        };
405
406        self.queries.insert(
407            query_id,
408            QueryType::GetRecord {
409                context: GetRecordContext::new(config, candidates, local_record),
410            },
411        );
412
413        query_id
414    }
415
416    /// Start `ADD_PROVIDER` query.
417    pub fn start_add_provider(
418        &mut self,
419        query_id: QueryId,
420        provided_key: RecordKey,
421        provider: ContentProvider,
422        candidates: VecDeque<KademliaPeer>,
423        quorum: Quorum,
424    ) -> QueryId {
425        tracing::debug!(
426            target: LOG_TARGET,
427            ?query_id,
428            ?provider,
429            num_peers = ?candidates.len(),
430            "start `ADD_PROVIDER` query",
431        );
432
433        let config = FindNodeConfig {
434            local_peer_id: self.local_peer_id,
435            replication_factor: self.replication_factor,
436            parallelism_factor: self.parallelism_factor,
437            query: query_id,
438            target: Key::new(provided_key.clone()),
439        };
440
441        self.queries.insert(
442            query_id,
443            QueryType::AddProvider {
444                provided_key,
445                provider,
446                quorum,
447                context: FindNodeContext::new(config, candidates),
448            },
449        );
450
451        query_id
452    }
453
454    /// Start `GET_PROVIDERS` query.
455    pub fn start_get_providers(
456        &mut self,
457        query_id: QueryId,
458        key: RecordKey,
459        candidates: VecDeque<KademliaPeer>,
460        known_providers: Vec<ContentProvider>,
461    ) -> QueryId {
462        tracing::debug!(
463            target: LOG_TARGET,
464            ?query_id,
465            ?key,
466            num_peers = ?candidates.len(),
467            "start `GET_PROVIDERS` query",
468        );
469
470        let target = Key::new(key);
471        let config = GetProvidersConfig {
472            local_peer_id: self.local_peer_id,
473            parallelism_factor: self.parallelism_factor,
474            query: query_id,
475            target,
476            known_providers: known_providers.into_iter().map(Into::into).collect(),
477        };
478
479        self.queries.insert(
480            query_id,
481            QueryType::GetProviders {
482                context: GetProvidersContext::new(config, candidates),
483            },
484        );
485
486        query_id
487    }
488
489    /// Start `PUT_VALUE` requests tracking.
490    pub fn start_put_record_to_found_nodes_requests_tracking(
491        &mut self,
492        query_id: QueryId,
493        key: RecordKey,
494        peers: Vec<PeerId>,
495        quorum: Quorum,
496    ) {
497        tracing::debug!(
498            target: LOG_TARGET,
499            ?query_id,
500            num_peers = ?peers.len(),
501            "start `PUT_VALUE` responses tracking"
502        );
503
504        self.queries.insert(
505            query_id,
506            QueryType::PutRecordToFoundNodes {
507                context: PutToTargetPeersContext::new(query_id, key, peers, quorum),
508            },
509        );
510    }
511
512    /// Start `ADD_PROVIDER` requests tracking.
513    pub fn start_add_provider_to_found_nodes_requests_tracking(
514        &mut self,
515        query_id: QueryId,
516        provided_key: RecordKey,
517        peers: Vec<PeerId>,
518        quorum: Quorum,
519    ) {
520        tracing::debug!(
521            target: LOG_TARGET,
522            ?query_id,
523            num_peers = ?peers.len(),
524            "start `ADD_PROVIDER` progress tracking"
525        );
526
527        self.queries.insert(
528            query_id,
529            QueryType::AddProviderToFoundNodes {
530                context: PutToTargetPeersContext::new(query_id, provided_key, peers, quorum),
531            },
532        );
533    }
534
535    /// Register response failure from a queried peer.
536    pub fn register_response_failure(&mut self, query: QueryId, peer: PeerId) {
537        tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register response failure");
538
539        match self.queries.get_mut(&query) {
540            None => {
541                tracing::trace!(target: LOG_TARGET, ?query, ?peer, "response failure for a stale query");
542            }
543            Some(QueryType::FindNode { context }) => {
544                context.register_response_failure(peer);
545            }
546            Some(QueryType::PutRecord { context, .. }) => {
547                context.register_response_failure(peer);
548            }
549            Some(QueryType::PutRecordToPeers { context, .. }) => {
550                context.register_response_failure(peer);
551            }
552            Some(QueryType::PutRecordToFoundNodes { context }) => {
553                context.register_response_failure(peer);
554            }
555            Some(QueryType::GetRecord { context }) => {
556                context.register_response_failure(peer);
557            }
558            Some(QueryType::AddProvider { context, .. }) => {
559                context.register_response_failure(peer);
560            }
561            Some(QueryType::AddProviderToFoundNodes { context }) => {
562                context.register_response_failure(peer);
563            }
564            Some(QueryType::GetProviders { context }) => {
565                context.register_response_failure(peer);
566            }
567        }
568    }
569
570    /// Register that `response` received from `peer`.
571    pub fn register_response(&mut self, query: QueryId, peer: PeerId, message: KademliaMessage) {
572        tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register response");
573
574        match self.queries.get_mut(&query) {
575            None => {
576                tracing::trace!(target: LOG_TARGET, ?query, ?peer, "response for a stale query");
577            }
578            Some(QueryType::FindNode { context }) => match message {
579                KademliaMessage::FindNode { peers, .. } => {
580                    context.register_response(peer, peers);
581                }
582                message => {
583                    tracing::debug!(
584                        target: LOG_TARGET,
585                        ?query,
586                        ?peer,
587                        "unexpected response to `FIND_NODE`: {message}",
588                    );
589                    context.register_response_failure(peer);
590                }
591            },
592            Some(QueryType::PutRecord { context, .. }) => match message {
593                KademliaMessage::FindNode { peers, .. } => {
594                    context.register_response(peer, peers);
595                }
596                message => {
597                    tracing::debug!(
598                        target: LOG_TARGET,
599                        ?query,
600                        ?peer,
601                        "unexpected response to `FIND_NODE` during `PUT_VALUE` query: {message}",
602                    );
603                    context.register_response_failure(peer);
604                }
605            },
606            Some(QueryType::PutRecordToPeers { context, .. }) => match message {
607                KademliaMessage::FindNode { peers, .. } => {
608                    context.register_response(peer, peers);
609                }
610                message => {
611                    tracing::debug!(
612                        target: LOG_TARGET,
613                        ?query,
614                        ?peer,
615                        "unexpected response to `FIND_NODE` during `PUT_VALUE` (to peers): {message}",
616                    );
617                    context.register_response_failure(peer);
618                }
619            },
620            Some(QueryType::PutRecordToFoundNodes { context }) => match message {
621                KademliaMessage::PutValue { .. } => {
622                    context.register_response(peer);
623                }
624                message => {
625                    tracing::debug!(
626                        target: LOG_TARGET,
627                        ?query,
628                        ?peer,
629                        "unexpected response to `PUT_VALUE`: {message}",
630                    );
631                    context.register_response_failure(peer);
632                }
633            },
634            Some(QueryType::GetRecord { context }) => match message {
635                KademliaMessage::GetRecord { record, peers, .. } =>
636                    context.register_response(peer, record, peers),
637                message => {
638                    tracing::debug!(
639                        target: LOG_TARGET,
640                        ?query,
641                        ?peer,
642                        "unexpected response to `GET_VALUE`: {message}",
643                    );
644                    context.register_response_failure(peer);
645                }
646            },
647            Some(QueryType::AddProvider { context, .. }) => match message {
648                KademliaMessage::FindNode { peers, .. } => {
649                    context.register_response(peer, peers);
650                }
651                message => {
652                    tracing::debug!(
653                        target: LOG_TARGET,
654                        ?query,
655                        ?peer,
656                        "unexpected response to `FIND_NODE` during `ADD_PROVIDER` query: {message}",
657                    );
658                    context.register_response_failure(peer);
659                }
660            },
661            Some(QueryType::AddProviderToFoundNodes { context, .. }) => match message {
662                KademliaMessage::AddProvider { .. } => {
663                    context.register_response(peer);
664                }
665                message => {
666                    tracing::debug!(
667                        target: LOG_TARGET,
668                        ?query,
669                        ?peer,
670                        "unexpected response to `ADD_PROVIDER`: {message}",
671                    );
672                    context.register_response_failure(peer);
673                }
674            },
675            Some(QueryType::GetProviders { context }) => match message {
676                KademliaMessage::GetProviders {
677                    key: _,
678                    providers,
679                    peers,
680                } => {
681                    context.register_response(peer, providers, peers);
682                }
683                message => {
684                    tracing::debug!(
685                        target: LOG_TARGET,
686                        ?query,
687                        ?peer,
688                        "unexpected response to `GET_PROVIDERS`: {message}",
689                    );
690                    context.register_response_failure(peer);
691                }
692            },
693        }
694    }
695
696    pub fn register_send_failure(&mut self, query: QueryId, peer: PeerId) {
697        tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register send failure");
698
699        match self.queries.get_mut(&query) {
700            None => {
701                tracing::trace!(target: LOG_TARGET, ?query, ?peer, "send failure for a stale query");
702            }
703            Some(QueryType::FindNode { context }) => {
704                context.register_send_failure(peer);
705            }
706            Some(QueryType::PutRecord { context, .. }) => {
707                context.register_send_failure(peer);
708            }
709            Some(QueryType::PutRecordToPeers { context, .. }) => {
710                context.register_send_failure(peer);
711            }
712            Some(QueryType::PutRecordToFoundNodes { context }) => {
713                context.register_send_failure(peer);
714            }
715            Some(QueryType::GetRecord { context }) => {
716                context.register_send_failure(peer);
717            }
718            Some(QueryType::AddProvider { context, .. }) => {
719                context.register_send_failure(peer);
720            }
721            Some(QueryType::AddProviderToFoundNodes { context }) => {
722                context.register_send_failure(peer);
723            }
724            Some(QueryType::GetProviders { context }) => {
725                context.register_send_failure(peer);
726            }
727        }
728    }
729
730    pub fn register_send_success(&mut self, query: QueryId, peer: PeerId) {
731        tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register send success");
732
733        match self.queries.get_mut(&query) {
734            None => {
735                tracing::trace!(target: LOG_TARGET, ?query, ?peer, "send success for a stale query");
736            }
737            Some(QueryType::FindNode { context }) => {
738                context.register_send_success(peer);
739            }
740            Some(QueryType::PutRecord { context, .. }) => {
741                context.register_send_success(peer);
742            }
743            Some(QueryType::PutRecordToPeers { context, .. }) => {
744                context.register_send_success(peer);
745            }
746            Some(QueryType::PutRecordToFoundNodes { context, .. }) => {
747                context.register_send_success(peer);
748            }
749            Some(QueryType::GetRecord { context }) => {
750                context.register_send_success(peer);
751            }
752            Some(QueryType::AddProvider { context, .. }) => {
753                context.register_send_success(peer);
754            }
755            Some(QueryType::AddProviderToFoundNodes { context, .. }) => {
756                context.register_send_success(peer);
757            }
758            Some(QueryType::GetProviders { context }) => {
759                context.register_send_success(peer);
760            }
761        }
762    }
763
764    /// Register peer failure when it is not known whether sending or receiveiing failed.
765    /// This is called from [`super::Kademlia::disconnect_peer`].
766    pub fn register_peer_failure(&mut self, query: QueryId, peer: PeerId) {
767        tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register peer failure");
768
769        // Because currently queries track either send success/failure (`PUT_VALUE`, `ADD_PROVIDER`)
770        // or response success/failure (`FIND_NODE`, `GET_VALUE`, `GET_PROVIDERS`),
771        // but not both, we can just call both here and not propagate this different type of
772        // failure to specific queries knowing this will result in the correct behaviour.
773        self.register_send_failure(query, peer);
774        self.register_response_failure(query, peer);
775    }
776
777    /// Get next action for `peer` from the [`QueryEngine`].
778    pub fn next_peer_action(&mut self, query: &QueryId, peer: &PeerId) -> Option<QueryAction> {
779        tracing::trace!(target: LOG_TARGET, ?query, ?peer, "get next peer action");
780
781        match self.queries.get_mut(query) {
782            None => {
783                tracing::trace!(target: LOG_TARGET, ?query, ?peer, "response failure for a stale query");
784                None
785            }
786            Some(QueryType::FindNode { context }) => context.next_peer_action(peer),
787            Some(QueryType::PutRecord { context, .. }) => context.next_peer_action(peer),
788            Some(QueryType::PutRecordToPeers { context, .. }) => context.next_peer_action(peer),
789            Some(QueryType::GetRecord { context }) => context.next_peer_action(peer),
790            Some(QueryType::AddProvider { context, .. }) => context.next_peer_action(peer),
791            Some(QueryType::GetProviders { context }) => context.next_peer_action(peer),
792            Some(QueryType::PutRecordToFoundNodes { .. }) => {
793                // All `PUT_VALUE` requests were sent when initiating this query type.
794                None
795            }
796            Some(QueryType::AddProviderToFoundNodes { .. }) => {
797                // All `ADD_PROVIDER` requests were sent when initiating this query type.
798                None
799            }
800        }
801    }
802
803    /// Handle query success by returning the queried value(s)
804    /// and removing the query from [`QueryEngine`].
805    fn on_query_succeeded(&mut self, query: QueryId) -> QueryAction {
806        match self.queries.remove(&query).expect("query to exist") {
807            QueryType::FindNode { context } => QueryAction::FindNodeQuerySucceeded {
808                query,
809                target: context.config.target.into_preimage(),
810                peers: context.responses.into_values().collect::<Vec<_>>(),
811            },
812            QueryType::PutRecord {
813                record,
814                quorum,
815                context,
816            } => QueryAction::PutRecordToFoundNodes {
817                query: context.config.query,
818                record,
819                peers: context.responses.into_values().collect::<Vec<_>>(),
820                quorum,
821            },
822            QueryType::PutRecordToPeers {
823                record,
824                quorum,
825                context,
826            } => QueryAction::PutRecordToFoundNodes {
827                query: context.query,
828                record,
829                peers: context.peers_to_report,
830                quorum,
831            },
832            QueryType::PutRecordToFoundNodes { context } => QueryAction::PutRecordQuerySucceeded {
833                query: context.query,
834                key: context.key,
835            },
836            QueryType::GetRecord { context } => QueryAction::GetRecordQueryDone {
837                query_id: context.config.query,
838            },
839            QueryType::AddProvider {
840                provided_key,
841                provider,
842                quorum,
843                context,
844            } => QueryAction::AddProviderToFoundNodes {
845                query: context.config.query,
846                provided_key,
847                provider,
848                peers: context.responses.into_values().collect::<Vec<_>>(),
849                quorum,
850            },
851            QueryType::AddProviderToFoundNodes { context } =>
852                QueryAction::AddProviderQuerySucceeded {
853                    query: context.query,
854                    provided_key: context.key,
855                },
856            QueryType::GetProviders { context } => QueryAction::GetProvidersQueryDone {
857                query_id: context.config.query,
858                provided_key: context.config.target.clone().into_preimage(),
859                providers: context.found_providers(),
860            },
861        }
862    }
863
864    /// Handle query failure by removing the query from [`QueryEngine`] and
865    /// returning the appropriate [`QueryAction`] to user.
866    fn on_query_failed(&mut self, query: QueryId) -> QueryAction {
867        let _ = self.queries.remove(&query).expect("query to exist");
868
869        QueryAction::QueryFailed { query }
870    }
871
872    /// Get next action from the [`QueryEngine`].
873    pub fn next_action(&mut self) -> Option<QueryAction> {
874        for (_, state) in self.queries.iter_mut() {
875            let action = match state {
876                QueryType::FindNode { context } => context.next_action(),
877                QueryType::PutRecord { context, .. } => context.next_action(),
878                QueryType::PutRecordToPeers { context, .. } => context.next_action(),
879                QueryType::GetRecord { context } => context.next_action(),
880                QueryType::AddProvider { context, .. } => context.next_action(),
881                QueryType::GetProviders { context } => context.next_action(),
882                QueryType::PutRecordToFoundNodes { context, .. } => context.next_action(),
883                QueryType::AddProviderToFoundNodes { context, .. } => context.next_action(),
884            };
885
886            match action {
887                Some(QueryAction::QuerySucceeded { query }) => {
888                    return Some(self.on_query_succeeded(query));
889                }
890                Some(QueryAction::QueryFailed { query }) =>
891                    return Some(self.on_query_failed(query)),
892                Some(_) => return action,
893                _ => continue,
894            }
895        }
896
897        None
898    }
899}
900
901#[cfg(test)]
902mod tests {
903    use multihash::{Code, Multihash};
904
905    use super::*;
906    use crate::protocol::libp2p::kademlia::types::ConnectionType;
907
908    // make fixed peer id
909    fn make_peer_id(first: u8, second: u8) -> PeerId {
910        let mut peer_id = vec![0u8; 32];
911        peer_id[0] = first;
912        peer_id[1] = second;
913
914        PeerId::from_bytes(
915            &Multihash::wrap(Code::Identity.into(), &peer_id)
916                .expect("The digest size is never too large")
917                .to_bytes(),
918        )
919        .unwrap()
920    }
921
922    #[test]
923    fn find_node_query_fails() {
924        let _ = tracing_subscriber::fmt()
925            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
926            .try_init();
927
928        let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
929        let target_peer = PeerId::random();
930        let _target_key = Key::from(target_peer);
931
932        let query = engine.start_find_node(
933            QueryId(1337),
934            target_peer,
935            vec![
936                KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
937                KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
938                KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
939                KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
940            ]
941            .into(),
942        );
943
944        for _ in 0..4 {
945            if let Some(QueryAction::SendMessage { query, peer, .. }) = engine.next_action() {
946                engine.register_response_failure(query, peer);
947            }
948        }
949
950        if let Some(QueryAction::QueryFailed { query: failed }) = engine.next_action() {
951            assert_eq!(failed, query);
952        }
953
954        assert!(engine.next_action().is_none());
955    }
956
957    #[test]
958    fn find_node_lookup_paused() {
959        let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
960        let target_peer = PeerId::random();
961        let _target_key = Key::from(target_peer);
962
963        let _ = engine.start_find_node(
964            QueryId(1338),
965            target_peer,
966            vec![
967                KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
968                KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
969                KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
970                KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
971            ]
972            .into(),
973        );
974
975        for _ in 0..3 {
976            let _ = engine.next_action();
977        }
978
979        assert!(engine.next_action().is_none());
980    }
981
982    #[test]
983    fn find_node_query_succeeds() {
984        let _ = tracing_subscriber::fmt()
985            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
986            .try_init();
987
988        let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
989        let target_peer = make_peer_id(0, 0);
990        let target_key = Key::from(target_peer);
991
992        let distances = {
993            let mut distances = std::collections::BTreeMap::new();
994
995            for i in 1..64 {
996                let peer = make_peer_id(i, 0);
997                let key = Key::from(peer);
998
999                distances.insert(target_key.distance(&key), peer);
1000            }
1001
1002            distances
1003        };
1004        let mut iter = distances.iter();
1005
1006        // start find node with one known peer
1007        let _query = engine.start_find_node(
1008            QueryId(1339),
1009            target_peer,
1010            vec![KademliaPeer::new(
1011                *iter.next().unwrap().1,
1012                vec![],
1013                ConnectionType::NotConnected,
1014            )]
1015            .into(),
1016        );
1017
1018        let action = engine.next_action();
1019        assert!(engine.next_action().is_none());
1020
1021        // the one known peer responds with 3 other peers it knows
1022        match action {
1023            Some(QueryAction::SendMessage { query, peer, .. }) => {
1024                engine.register_response(
1025                    query,
1026                    peer,
1027                    KademliaMessage::FindNode {
1028                        target: Vec::new(),
1029                        peers: vec![
1030                            KademliaPeer::new(
1031                                *iter.next().unwrap().1,
1032                                vec![],
1033                                ConnectionType::NotConnected,
1034                            ),
1035                            KademliaPeer::new(
1036                                *iter.next().unwrap().1,
1037                                vec![],
1038                                ConnectionType::NotConnected,
1039                            ),
1040                            KademliaPeer::new(
1041                                *iter.next().unwrap().1,
1042                                vec![],
1043                                ConnectionType::NotConnected,
1044                            ),
1045                        ],
1046                    },
1047                );
1048            }
1049            _ => panic!("invalid event received"),
1050        }
1051
1052        // send empty response for the last three nodes
1053        for _ in 0..3 {
1054            match engine.next_action() {
1055                Some(QueryAction::SendMessage { query, peer, .. }) => {
1056                    println!("next send message to {peer:?}");
1057                    engine.register_response(
1058                        query,
1059                        peer,
1060                        KademliaMessage::FindNode {
1061                            target: Vec::new(),
1062                            peers: vec![],
1063                        },
1064                    );
1065                }
1066                _ => panic!("invalid event received"),
1067            }
1068        }
1069
1070        match engine.next_action() {
1071            Some(QueryAction::FindNodeQuerySucceeded { peers, .. }) => {
1072                assert_eq!(peers.len(), 4);
1073            }
1074            _ => panic!("invalid event received"),
1075        }
1076
1077        assert!(engine.next_action().is_none());
1078    }
1079
1080    #[test]
1081    fn put_record_fails() {
1082        let _ = tracing_subscriber::fmt()
1083            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1084            .try_init();
1085
1086        let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
1087        let record_key = RecordKey::new(&vec![1, 2, 3, 4]);
1088        let target_key = Key::new(record_key.clone());
1089        let original_record = Record::new(record_key.clone(), vec![1, 3, 3, 7, 1, 3, 3, 8]);
1090
1091        let distances = {
1092            let mut distances = std::collections::BTreeMap::new();
1093
1094            for i in 1..64 {
1095                let peer = make_peer_id(i, 0);
1096                let key = Key::from(peer);
1097
1098                distances.insert(target_key.distance(&key), peer);
1099            }
1100
1101            distances
1102        };
1103        let mut iter = distances.iter();
1104
1105        // start find node with one known peer
1106        let original_query_id = QueryId(1340);
1107        let _query = engine.start_put_record(
1108            original_query_id,
1109            original_record.clone(),
1110            vec![KademliaPeer::new(
1111                *iter.next().unwrap().1,
1112                vec![],
1113                ConnectionType::NotConnected,
1114            )]
1115            .into(),
1116            Quorum::All,
1117        );
1118
1119        let action = engine.next_action();
1120        assert!(engine.next_action().is_none());
1121
1122        // the one known peer responds with 3 other peers it knows
1123        match action {
1124            Some(QueryAction::SendMessage { query, peer, .. }) => {
1125                engine.register_response(
1126                    query,
1127                    peer,
1128                    KademliaMessage::FindNode {
1129                        target: Vec::new(),
1130                        peers: vec![
1131                            KademliaPeer::new(
1132                                *iter.next().unwrap().1,
1133                                vec![],
1134                                ConnectionType::NotConnected,
1135                            ),
1136                            KademliaPeer::new(
1137                                *iter.next().unwrap().1,
1138                                vec![],
1139                                ConnectionType::NotConnected,
1140                            ),
1141                            KademliaPeer::new(
1142                                *iter.next().unwrap().1,
1143                                vec![],
1144                                ConnectionType::NotConnected,
1145                            ),
1146                        ],
1147                    },
1148                );
1149            }
1150            _ => panic!("invalid event received"),
1151        }
1152
1153        // send empty response for the last three nodes
1154        for _ in 0..3 {
1155            match engine.next_action() {
1156                Some(QueryAction::SendMessage { query, peer, .. }) => {
1157                    println!("next send message to {peer:?}");
1158                    engine.register_response(
1159                        query,
1160                        peer,
1161                        KademliaMessage::FindNode {
1162                            target: Vec::new(),
1163                            peers: vec![],
1164                        },
1165                    );
1166                }
1167                _ => panic!("invalid event received"),
1168            }
1169        }
1170
1171        let mut peers = match engine.next_action() {
1172            Some(QueryAction::PutRecordToFoundNodes {
1173                query,
1174                peers,
1175                record,
1176                quorum,
1177            }) => {
1178                assert_eq!(query, original_query_id);
1179                assert_eq!(peers.len(), 4);
1180                assert_eq!(record.key, original_record.key);
1181                assert_eq!(record.value, original_record.value);
1182                assert!(matches!(quorum, Quorum::All));
1183
1184                peers
1185            }
1186            _ => panic!("invalid event received"),
1187        };
1188
1189        engine.start_put_record_to_found_nodes_requests_tracking(
1190            original_query_id,
1191            record_key.clone(),
1192            peers.iter().map(|p| p.peer).collect(),
1193            Quorum::All,
1194        );
1195
1196        // sends to all but one peer succeed
1197        let last_peer = peers.pop().unwrap();
1198        for peer in peers {
1199            engine.register_send_success(original_query_id, peer.peer);
1200        }
1201        engine.register_send_failure(original_query_id, last_peer.peer);
1202
1203        match engine.next_action() {
1204            Some(QueryAction::QueryFailed { query }) => {
1205                assert_eq!(query, original_query_id);
1206            }
1207            _ => panic!("invalid event received"),
1208        }
1209
1210        assert!(engine.next_action().is_none());
1211    }
1212
1213    #[test]
1214    fn put_record_succeeds() {
1215        let _ = tracing_subscriber::fmt()
1216            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1217            .try_init();
1218
1219        let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
1220        let record_key = RecordKey::new(&vec![1, 2, 3, 4]);
1221        let target_key = Key::new(record_key.clone());
1222        let original_record = Record::new(record_key.clone(), vec![1, 3, 3, 7, 1, 3, 3, 8]);
1223
1224        let distances = {
1225            let mut distances = std::collections::BTreeMap::new();
1226
1227            for i in 1..64 {
1228                let peer = make_peer_id(i, 0);
1229                let key = Key::from(peer);
1230
1231                distances.insert(target_key.distance(&key), peer);
1232            }
1233
1234            distances
1235        };
1236        let mut iter = distances.iter();
1237
1238        // start find node with one known peer
1239        let original_query_id = QueryId(1340);
1240        let _query = engine.start_put_record(
1241            original_query_id,
1242            original_record.clone(),
1243            vec![KademliaPeer::new(
1244                *iter.next().unwrap().1,
1245                vec![],
1246                ConnectionType::NotConnected,
1247            )]
1248            .into(),
1249            Quorum::All,
1250        );
1251
1252        let action = engine.next_action();
1253        assert!(engine.next_action().is_none());
1254
1255        // the one known peer responds with 3 other peers it knows
1256        match action {
1257            Some(QueryAction::SendMessage { query, peer, .. }) => {
1258                engine.register_response(
1259                    query,
1260                    peer,
1261                    KademliaMessage::FindNode {
1262                        target: Vec::new(),
1263                        peers: vec![
1264                            KademliaPeer::new(
1265                                *iter.next().unwrap().1,
1266                                vec![],
1267                                ConnectionType::NotConnected,
1268                            ),
1269                            KademliaPeer::new(
1270                                *iter.next().unwrap().1,
1271                                vec![],
1272                                ConnectionType::NotConnected,
1273                            ),
1274                            KademliaPeer::new(
1275                                *iter.next().unwrap().1,
1276                                vec![],
1277                                ConnectionType::NotConnected,
1278                            ),
1279                        ],
1280                    },
1281                );
1282            }
1283            _ => panic!("invalid event received"),
1284        }
1285
1286        // send empty response for the last three nodes
1287        for _ in 0..3 {
1288            match engine.next_action() {
1289                Some(QueryAction::SendMessage { query, peer, .. }) => {
1290                    println!("next send message to {peer:?}");
1291                    engine.register_response(
1292                        query,
1293                        peer,
1294                        KademliaMessage::FindNode {
1295                            target: Vec::new(),
1296                            peers: vec![],
1297                        },
1298                    );
1299                }
1300                _ => panic!("invalid event received"),
1301            }
1302        }
1303
1304        let peers = match engine.next_action() {
1305            Some(QueryAction::PutRecordToFoundNodes {
1306                query,
1307                peers,
1308                record,
1309                quorum,
1310            }) => {
1311                assert_eq!(query, original_query_id);
1312                assert_eq!(peers.len(), 4);
1313                assert_eq!(record.key, original_record.key);
1314                assert_eq!(record.value, original_record.value);
1315                assert!(matches!(quorum, Quorum::All));
1316
1317                peers
1318            }
1319            _ => panic!("invalid event received"),
1320        };
1321
1322        engine.start_put_record_to_found_nodes_requests_tracking(
1323            original_query_id,
1324            record_key.clone(),
1325            peers.iter().map(|p| p.peer).collect(),
1326            Quorum::All,
1327        );
1328
1329        // simulate successful sends to all peers
1330        for peer in &peers {
1331            engine.register_send_success(original_query_id, peer.peer);
1332        }
1333
1334        match engine.next_action() {
1335            Some(QueryAction::PutRecordQuerySucceeded { query, key }) => {
1336                assert_eq!(query, original_query_id);
1337                assert_eq!(key, record_key);
1338            }
1339            _ => panic!("invalid event received"),
1340        }
1341
1342        assert!(engine.next_action().is_none());
1343
1344        // get records from those peers.
1345        let _query = engine.start_get_record(
1346            QueryId(1341),
1347            record_key.clone(),
1348            vec![
1349                KademliaPeer::new(peers[0].peer, vec![], ConnectionType::NotConnected),
1350                KademliaPeer::new(peers[1].peer, vec![], ConnectionType::NotConnected),
1351                KademliaPeer::new(peers[2].peer, vec![], ConnectionType::NotConnected),
1352                KademliaPeer::new(peers[3].peer, vec![], ConnectionType::NotConnected),
1353            ]
1354            .into(),
1355            Quorum::All,
1356            false,
1357        );
1358
1359        let mut records = Vec::new();
1360        for _ in 0..4 {
1361            match engine.next_action() {
1362                Some(QueryAction::SendMessage { query, peer, .. }) => {
1363                    assert_eq!(query, QueryId(1341));
1364                    engine.register_response(
1365                        query,
1366                        peer,
1367                        KademliaMessage::GetRecord {
1368                            record: Some(original_record.clone()),
1369                            peers: vec![],
1370                            key: Some(record_key.clone()),
1371                        },
1372                    );
1373                }
1374                event => panic!("invalid event received {:?}", event),
1375            }
1376
1377            // GetRecordPartialResult is emitted after the `register_response` if the record is
1378            // valid.
1379            match engine.next_action() {
1380                Some(QueryAction::GetRecordPartialResult { query_id, record }) => {
1381                    println!("Partial result {:?}", record);
1382                    assert_eq!(query_id, QueryId(1341));
1383                    records.push(record);
1384                }
1385                event => panic!("invalid event received {:?}", event),
1386            }
1387        }
1388
1389        let peers: std::collections::HashSet<_> = peers.into_iter().map(|p| p.peer).collect();
1390        match engine.next_action() {
1391            Some(QueryAction::GetRecordQueryDone { .. }) => {
1392                println!("Records {:?}", records);
1393                let query_peers = records
1394                    .iter()
1395                    .map(|peer_record| peer_record.peer)
1396                    .collect::<std::collections::HashSet<_>>();
1397                assert_eq!(peers, query_peers);
1398
1399                let records: std::collections::HashSet<_> =
1400                    records.into_iter().map(|peer_record| peer_record.record).collect();
1401                // One single record found across peers.
1402                assert_eq!(records.len(), 1);
1403                let record = records.into_iter().next().unwrap();
1404
1405                assert_eq!(record.key, original_record.key);
1406                assert_eq!(record.value, original_record.value);
1407            }
1408            event => panic!("invalid event received {:?}", event),
1409        }
1410    }
1411
1412    #[test]
1413    fn put_record_succeeds_with_quorum_one() {
1414        let _ = tracing_subscriber::fmt()
1415            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1416            .try_init();
1417
1418        let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
1419        let record_key = RecordKey::new(&vec![1, 2, 3, 4]);
1420        let target_key = Key::new(record_key.clone());
1421        let original_record = Record::new(record_key.clone(), vec![1, 3, 3, 7, 1, 3, 3, 8]);
1422
1423        let distances = {
1424            let mut distances = std::collections::BTreeMap::new();
1425
1426            for i in 1..64 {
1427                let peer = make_peer_id(i, 0);
1428                let key = Key::from(peer);
1429
1430                distances.insert(target_key.distance(&key), peer);
1431            }
1432
1433            distances
1434        };
1435        let mut iter = distances.iter();
1436
1437        // start find node with one known peer
1438        let original_query_id = QueryId(1340);
1439        let _query = engine.start_put_record(
1440            original_query_id,
1441            original_record.clone(),
1442            vec![KademliaPeer::new(
1443                *iter.next().unwrap().1,
1444                vec![],
1445                ConnectionType::NotConnected,
1446            )]
1447            .into(),
1448            Quorum::One,
1449        );
1450
1451        let action = engine.next_action();
1452        assert!(engine.next_action().is_none());
1453
1454        // the one known peer responds with 3 other peers it knows
1455        match action {
1456            Some(QueryAction::SendMessage { query, peer, .. }) => {
1457                engine.register_response(
1458                    query,
1459                    peer,
1460                    KademliaMessage::FindNode {
1461                        target: Vec::new(),
1462                        peers: vec![
1463                            KademliaPeer::new(
1464                                *iter.next().unwrap().1,
1465                                vec![],
1466                                ConnectionType::NotConnected,
1467                            ),
1468                            KademliaPeer::new(
1469                                *iter.next().unwrap().1,
1470                                vec![],
1471                                ConnectionType::NotConnected,
1472                            ),
1473                            KademliaPeer::new(
1474                                *iter.next().unwrap().1,
1475                                vec![],
1476                                ConnectionType::NotConnected,
1477                            ),
1478                        ],
1479                    },
1480                );
1481            }
1482            _ => panic!("invalid event received"),
1483        }
1484
1485        // send empty response for the last three nodes
1486        for _ in 0..3 {
1487            match engine.next_action() {
1488                Some(QueryAction::SendMessage { query, peer, .. }) => {
1489                    println!("next send message to {peer:?}");
1490                    engine.register_response(
1491                        query,
1492                        peer,
1493                        KademliaMessage::FindNode {
1494                            target: Vec::new(),
1495                            peers: vec![],
1496                        },
1497                    );
1498                }
1499                _ => panic!("invalid event received"),
1500            }
1501        }
1502
1503        let peers = match engine.next_action() {
1504            Some(QueryAction::PutRecordToFoundNodes {
1505                query,
1506                peers,
1507                record,
1508                quorum,
1509            }) => {
1510                assert_eq!(query, original_query_id);
1511                assert_eq!(peers.len(), 4);
1512                assert_eq!(record.key, original_record.key);
1513                assert_eq!(record.value, original_record.value);
1514                assert!(matches!(quorum, Quorum::One));
1515
1516                peers
1517            }
1518            _ => panic!("invalid event received"),
1519        };
1520
1521        engine.start_put_record_to_found_nodes_requests_tracking(
1522            original_query_id,
1523            record_key.clone(),
1524            peers.iter().map(|p| p.peer).collect(),
1525            Quorum::One,
1526        );
1527
1528        // all but one peer fail
1529        assert!(peers.len() > 1);
1530        for peer in peers.iter().take(peers.len() - 1) {
1531            engine.register_send_failure(original_query_id, peer.peer);
1532        }
1533        engine.register_send_success(original_query_id, peers.last().unwrap().peer);
1534
1535        match engine.next_action() {
1536            Some(QueryAction::PutRecordQuerySucceeded { query, key }) => {
1537                assert_eq!(query, original_query_id);
1538                assert_eq!(key, record_key);
1539            }
1540            _ => panic!("invalid event received"),
1541        }
1542
1543        assert!(engine.next_action().is_none());
1544
1545        // get records from those peers.
1546        let _query = engine.start_get_record(
1547            QueryId(1341),
1548            record_key.clone(),
1549            vec![
1550                KademliaPeer::new(peers[0].peer, vec![], ConnectionType::NotConnected),
1551                KademliaPeer::new(peers[1].peer, vec![], ConnectionType::NotConnected),
1552                KademliaPeer::new(peers[2].peer, vec![], ConnectionType::NotConnected),
1553                KademliaPeer::new(peers[3].peer, vec![], ConnectionType::NotConnected),
1554            ]
1555            .into(),
1556            Quorum::All,
1557            false,
1558        );
1559
1560        let mut records = Vec::new();
1561        for _ in 0..4 {
1562            match engine.next_action() {
1563                Some(QueryAction::SendMessage { query, peer, .. }) => {
1564                    assert_eq!(query, QueryId(1341));
1565                    engine.register_response(
1566                        query,
1567                        peer,
1568                        KademliaMessage::GetRecord {
1569                            record: Some(original_record.clone()),
1570                            peers: vec![],
1571                            key: Some(record_key.clone()),
1572                        },
1573                    );
1574                }
1575                event => panic!("invalid event received {:?}", event),
1576            }
1577
1578            // GetRecordPartialResult is emitted after the `register_response` if the record is
1579            // valid.
1580            match engine.next_action() {
1581                Some(QueryAction::GetRecordPartialResult { query_id, record }) => {
1582                    println!("Partial result {:?}", record);
1583                    assert_eq!(query_id, QueryId(1341));
1584                    records.push(record);
1585                }
1586                event => panic!("invalid event received {:?}", event),
1587            }
1588        }
1589
1590        let peers: std::collections::HashSet<_> = peers.into_iter().map(|p| p.peer).collect();
1591        match engine.next_action() {
1592            Some(QueryAction::GetRecordQueryDone { .. }) => {
1593                println!("Records {:?}", records);
1594                let query_peers = records
1595                    .iter()
1596                    .map(|peer_record| peer_record.peer)
1597                    .collect::<std::collections::HashSet<_>>();
1598                assert_eq!(peers, query_peers);
1599
1600                let records: std::collections::HashSet<_> =
1601                    records.into_iter().map(|peer_record| peer_record.record).collect();
1602                // One single record found across peers.
1603                assert_eq!(records.len(), 1);
1604                let record = records.into_iter().next().unwrap();
1605
1606                assert_eq!(record.key, original_record.key);
1607                assert_eq!(record.value, original_record.value);
1608            }
1609            event => panic!("invalid event received {:?}", event),
1610        }
1611    }
1612
1613    #[test]
1614    fn add_provider_fails() {
1615        let _ = tracing_subscriber::fmt()
1616            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1617            .try_init();
1618
1619        let local_peer_id = PeerId::random();
1620        let mut engine = QueryEngine::new(local_peer_id, 20usize, 3usize);
1621        let original_provided_key = RecordKey::new(&vec![1, 2, 3, 4]);
1622        let local_content_provider = ContentProvider {
1623            peer: local_peer_id,
1624            addresses: vec![],
1625        };
1626        let target_key = Key::new(original_provided_key.clone());
1627
1628        let distances = {
1629            let mut distances = std::collections::BTreeMap::new();
1630
1631            for i in 1..64 {
1632                let peer = make_peer_id(i, 0);
1633                let key = Key::from(peer);
1634
1635                distances.insert(target_key.distance(&key), peer);
1636            }
1637
1638            distances
1639        };
1640        let mut iter = distances.iter();
1641
1642        // start add provider with one known peer
1643        let original_query_id = QueryId(1340);
1644        let _query = engine.start_add_provider(
1645            original_query_id,
1646            original_provided_key.clone(),
1647            local_content_provider.clone(),
1648            vec![KademliaPeer::new(
1649                *iter.next().unwrap().1,
1650                vec![],
1651                ConnectionType::NotConnected,
1652            )]
1653            .into(),
1654            Quorum::All,
1655        );
1656
1657        let action = engine.next_action();
1658        assert!(engine.next_action().is_none());
1659
1660        // the one known peer responds with 3 other peers it knows
1661        match action {
1662            Some(QueryAction::SendMessage { query, peer, .. }) => {
1663                engine.register_response(
1664                    query,
1665                    peer,
1666                    KademliaMessage::FindNode {
1667                        target: Vec::new(),
1668                        peers: vec![
1669                            KademliaPeer::new(
1670                                *iter.next().unwrap().1,
1671                                vec![],
1672                                ConnectionType::NotConnected,
1673                            ),
1674                            KademliaPeer::new(
1675                                *iter.next().unwrap().1,
1676                                vec![],
1677                                ConnectionType::NotConnected,
1678                            ),
1679                            KademliaPeer::new(
1680                                *iter.next().unwrap().1,
1681                                vec![],
1682                                ConnectionType::NotConnected,
1683                            ),
1684                        ],
1685                    },
1686                );
1687            }
1688            _ => panic!("invalid event received"),
1689        }
1690
1691        // send empty response for the last three nodes
1692        for _ in 0..3 {
1693            match engine.next_action() {
1694                Some(QueryAction::SendMessage { query, peer, .. }) => {
1695                    println!("next send message to {peer:?}");
1696                    engine.register_response(
1697                        query,
1698                        peer,
1699                        KademliaMessage::FindNode {
1700                            target: Vec::new(),
1701                            peers: vec![],
1702                        },
1703                    );
1704                }
1705                _ => panic!("invalid event received"),
1706            }
1707        }
1708
1709        let mut peers = match engine.next_action() {
1710            Some(QueryAction::AddProviderToFoundNodes {
1711                query,
1712                provided_key,
1713                provider,
1714                peers,
1715                quorum,
1716            }) => {
1717                assert_eq!(query, original_query_id);
1718                assert_eq!(provided_key, original_provided_key);
1719                assert_eq!(provider, local_content_provider);
1720                assert_eq!(peers.len(), 4);
1721                assert!(matches!(quorum, Quorum::All));
1722
1723                peers
1724            }
1725            _ => panic!("invalid event received"),
1726        };
1727
1728        engine.start_add_provider_to_found_nodes_requests_tracking(
1729            original_query_id,
1730            original_provided_key.clone(),
1731            peers.iter().map(|p| p.peer).collect(),
1732            Quorum::All,
1733        );
1734
1735        // sends to all but one peer succeed
1736        let last_peer = peers.pop().unwrap();
1737        for peer in peers {
1738            engine.register_send_success(original_query_id, peer.peer);
1739        }
1740        engine.register_send_failure(original_query_id, last_peer.peer);
1741
1742        match engine.next_action() {
1743            Some(QueryAction::QueryFailed { query }) => {
1744                assert_eq!(query, original_query_id);
1745            }
1746            _ => panic!("invalid event received"),
1747        }
1748
1749        assert!(engine.next_action().is_none());
1750    }
1751
1752    #[test]
1753    fn add_provider_succeeds() {
1754        let _ = tracing_subscriber::fmt()
1755            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1756            .try_init();
1757
1758        let local_peer_id = PeerId::random();
1759        let mut engine = QueryEngine::new(local_peer_id, 20usize, 3usize);
1760        let original_provided_key = RecordKey::new(&vec![1, 2, 3, 4]);
1761        let local_content_provider = ContentProvider {
1762            peer: local_peer_id,
1763            addresses: vec![],
1764        };
1765
1766        let target_key = Key::new(original_provided_key.clone());
1767        let distances = {
1768            let mut distances = std::collections::BTreeMap::new();
1769
1770            for i in 1..64 {
1771                let peer = make_peer_id(i, 0);
1772                let key = Key::from(peer);
1773
1774                distances.insert(target_key.distance(&key), peer);
1775            }
1776
1777            distances
1778        };
1779        let mut iter = distances.iter();
1780
1781        // start add provider with one known peer
1782        let add_query_id = QueryId(1340);
1783        let _query = engine.start_add_provider(
1784            add_query_id,
1785            original_provided_key.clone(),
1786            local_content_provider.clone(),
1787            vec![KademliaPeer::new(
1788                *iter.next().unwrap().1,
1789                vec![],
1790                ConnectionType::NotConnected,
1791            )]
1792            .into(),
1793            Quorum::All,
1794        );
1795
1796        let action = engine.next_action();
1797        assert!(engine.next_action().is_none());
1798
1799        // the one known peer responds with 3 other peers it knows
1800        match action {
1801            Some(QueryAction::SendMessage { query, peer, .. }) => {
1802                engine.register_response(
1803                    query,
1804                    peer,
1805                    KademliaMessage::FindNode {
1806                        target: Vec::new(),
1807                        peers: vec![
1808                            KademliaPeer::new(
1809                                *iter.next().unwrap().1,
1810                                vec![],
1811                                ConnectionType::NotConnected,
1812                            ),
1813                            KademliaPeer::new(
1814                                *iter.next().unwrap().1,
1815                                vec![],
1816                                ConnectionType::NotConnected,
1817                            ),
1818                            KademliaPeer::new(
1819                                *iter.next().unwrap().1,
1820                                vec![],
1821                                ConnectionType::NotConnected,
1822                            ),
1823                        ],
1824                    },
1825                );
1826            }
1827            _ => panic!("invalid event received"),
1828        }
1829
1830        // send empty response for the last three nodes
1831        for _ in 0..3 {
1832            match engine.next_action() {
1833                Some(QueryAction::SendMessage { query, peer, .. }) => {
1834                    println!("next send message to {peer:?}");
1835                    engine.register_response(
1836                        query,
1837                        peer,
1838                        KademliaMessage::FindNode {
1839                            target: Vec::new(),
1840                            peers: vec![],
1841                        },
1842                    );
1843                }
1844                _ => panic!("invalid event received"),
1845            }
1846        }
1847
1848        let peers = match engine.next_action() {
1849            Some(QueryAction::AddProviderToFoundNodes {
1850                query,
1851                provided_key,
1852                provider,
1853                peers,
1854                quorum,
1855            }) => {
1856                assert_eq!(query, add_query_id);
1857                assert_eq!(provided_key, original_provided_key);
1858                assert_eq!(provider, local_content_provider);
1859                assert_eq!(peers.len(), 4);
1860                assert!(matches!(quorum, Quorum::All));
1861
1862                peers
1863            }
1864            _ => panic!("invalid event received"),
1865        };
1866
1867        engine.start_add_provider_to_found_nodes_requests_tracking(
1868            add_query_id,
1869            original_provided_key.clone(),
1870            peers.iter().map(|p| p.peer).collect(),
1871            Quorum::All,
1872        );
1873
1874        // simulate successful sends to all peers
1875        for peer in &peers {
1876            engine.register_send_success(add_query_id, peer.peer);
1877        }
1878
1879        match engine.next_action() {
1880            Some(QueryAction::AddProviderQuerySucceeded {
1881                query,
1882                provided_key,
1883            }) => {
1884                assert_eq!(query, add_query_id);
1885                assert_eq!(provided_key, original_provided_key);
1886            }
1887            _ => panic!("invalid event received"),
1888        }
1889
1890        assert!(engine.next_action().is_none());
1891
1892        // get providers from those peers.
1893        let get_query_id = QueryId(1341);
1894        let _query = engine.start_get_providers(
1895            get_query_id,
1896            original_provided_key.clone(),
1897            vec![
1898                KademliaPeer::new(peers[0].peer, vec![], ConnectionType::NotConnected),
1899                KademliaPeer::new(peers[1].peer, vec![], ConnectionType::NotConnected),
1900                KademliaPeer::new(peers[2].peer, vec![], ConnectionType::NotConnected),
1901                KademliaPeer::new(peers[3].peer, vec![], ConnectionType::NotConnected),
1902            ]
1903            .into(),
1904            vec![],
1905        );
1906
1907        for _ in 0..4 {
1908            match engine.next_action() {
1909                Some(QueryAction::SendMessage { query, peer, .. }) => {
1910                    assert_eq!(query, get_query_id);
1911                    engine.register_response(
1912                        query,
1913                        peer,
1914                        KademliaMessage::GetProviders {
1915                            key: Some(original_provided_key.clone()),
1916                            peers: vec![],
1917                            providers: vec![local_content_provider.clone().into()],
1918                        },
1919                    );
1920                }
1921                event => panic!("invalid event received {:?}", event),
1922            }
1923        }
1924
1925        match engine.next_action() {
1926            Some(QueryAction::GetProvidersQueryDone {
1927                query_id,
1928                provided_key,
1929                providers,
1930            }) => {
1931                assert_eq!(query_id, get_query_id);
1932                assert_eq!(provided_key, original_provided_key);
1933                assert_eq!(providers, vec![local_content_provider]);
1934            }
1935            event => panic!("invalid event received {:?}", event),
1936        }
1937    }
1938
1939    #[test]
1940    fn add_provider_succeeds_with_quorum_one() {
1941        let _ = tracing_subscriber::fmt()
1942            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1943            .try_init();
1944
1945        let local_peer_id = PeerId::random();
1946        let mut engine = QueryEngine::new(local_peer_id, 20usize, 3usize);
1947        let original_provided_key = RecordKey::new(&vec![1, 2, 3, 4]);
1948        let local_content_provider = ContentProvider {
1949            peer: local_peer_id,
1950            addresses: vec![],
1951        };
1952
1953        let target_key = Key::new(original_provided_key.clone());
1954        let distances = {
1955            let mut distances = std::collections::BTreeMap::new();
1956
1957            for i in 1..64 {
1958                let peer = make_peer_id(i, 0);
1959                let key = Key::from(peer);
1960
1961                distances.insert(target_key.distance(&key), peer);
1962            }
1963
1964            distances
1965        };
1966        let mut iter = distances.iter();
1967
1968        // start add provider with one known peer
1969        let add_query_id = QueryId(1340);
1970        let _query = engine.start_add_provider(
1971            add_query_id,
1972            original_provided_key.clone(),
1973            local_content_provider.clone(),
1974            vec![KademliaPeer::new(
1975                *iter.next().unwrap().1,
1976                vec![],
1977                ConnectionType::NotConnected,
1978            )]
1979            .into(),
1980            Quorum::One,
1981        );
1982
1983        let action = engine.next_action();
1984        assert!(engine.next_action().is_none());
1985
1986        // the one known peer responds with 3 other peers it knows
1987        match action {
1988            Some(QueryAction::SendMessage { query, peer, .. }) => {
1989                engine.register_response(
1990                    query,
1991                    peer,
1992                    KademliaMessage::FindNode {
1993                        target: Vec::new(),
1994                        peers: vec![
1995                            KademliaPeer::new(
1996                                *iter.next().unwrap().1,
1997                                vec![],
1998                                ConnectionType::NotConnected,
1999                            ),
2000                            KademliaPeer::new(
2001                                *iter.next().unwrap().1,
2002                                vec![],
2003                                ConnectionType::NotConnected,
2004                            ),
2005                            KademliaPeer::new(
2006                                *iter.next().unwrap().1,
2007                                vec![],
2008                                ConnectionType::NotConnected,
2009                            ),
2010                        ],
2011                    },
2012                );
2013            }
2014            _ => panic!("invalid event received"),
2015        }
2016
2017        // send empty response for the last three nodes
2018        for _ in 0..3 {
2019            match engine.next_action() {
2020                Some(QueryAction::SendMessage { query, peer, .. }) => {
2021                    println!("next send message to {peer:?}");
2022                    engine.register_response(
2023                        query,
2024                        peer,
2025                        KademliaMessage::FindNode {
2026                            target: Vec::new(),
2027                            peers: vec![],
2028                        },
2029                    );
2030                }
2031                _ => panic!("invalid event received"),
2032            }
2033        }
2034
2035        let peers = match engine.next_action() {
2036            Some(QueryAction::AddProviderToFoundNodes {
2037                query,
2038                provided_key,
2039                provider,
2040                peers,
2041                quorum,
2042            }) => {
2043                assert_eq!(query, add_query_id);
2044                assert_eq!(provided_key, original_provided_key);
2045                assert_eq!(provider, local_content_provider);
2046                assert_eq!(peers.len(), 4);
2047                assert!(matches!(quorum, Quorum::One));
2048
2049                peers
2050            }
2051            _ => panic!("invalid event received"),
2052        };
2053
2054        engine.start_add_provider_to_found_nodes_requests_tracking(
2055            add_query_id,
2056            original_provided_key.clone(),
2057            peers.iter().map(|p| p.peer).collect(),
2058            Quorum::One,
2059        );
2060
2061        // all but one peer fail
2062        assert!(peers.len() > 1);
2063        engine.register_send_success(add_query_id, peers.first().unwrap().peer);
2064        for peer in peers.iter().skip(1) {
2065            engine.register_send_failure(add_query_id, peer.peer);
2066        }
2067
2068        match engine.next_action() {
2069            Some(QueryAction::AddProviderQuerySucceeded {
2070                query,
2071                provided_key,
2072            }) => {
2073                assert_eq!(query, add_query_id);
2074                assert_eq!(provided_key, original_provided_key);
2075            }
2076            _ => panic!("invalid event received"),
2077        }
2078
2079        assert!(engine.next_action().is_none());
2080
2081        // get providers from those peers.
2082        let get_query_id = QueryId(1341);
2083        let _query = engine.start_get_providers(
2084            get_query_id,
2085            original_provided_key.clone(),
2086            vec![
2087                KademliaPeer::new(peers[0].peer, vec![], ConnectionType::NotConnected),
2088                KademliaPeer::new(peers[1].peer, vec![], ConnectionType::NotConnected),
2089                KademliaPeer::new(peers[2].peer, vec![], ConnectionType::NotConnected),
2090                KademliaPeer::new(peers[3].peer, vec![], ConnectionType::NotConnected),
2091            ]
2092            .into(),
2093            vec![],
2094        );
2095
2096        // first peer responds with the provider
2097        match engine.next_action() {
2098            Some(QueryAction::SendMessage { query, peer, .. }) => {
2099                assert_eq!(query, get_query_id);
2100                engine.register_response(
2101                    query,
2102                    peer,
2103                    KademliaMessage::GetProviders {
2104                        key: Some(original_provided_key.clone()),
2105                        peers: vec![],
2106                        providers: vec![local_content_provider.clone().into()],
2107                    },
2108                );
2109            }
2110            event => panic!("invalid event received {:?}", event),
2111        }
2112
2113        // other peers respond with no providers
2114        for _ in 1..4 {
2115            match engine.next_action() {
2116                Some(QueryAction::SendMessage { query, peer, .. }) => {
2117                    assert_eq!(query, get_query_id);
2118                    engine.register_response(
2119                        query,
2120                        peer,
2121                        KademliaMessage::GetProviders {
2122                            key: Some(original_provided_key.clone()),
2123                            peers: vec![],
2124                            providers: vec![],
2125                        },
2126                    );
2127                }
2128                event => panic!("invalid event received {:?}", event),
2129            }
2130        }
2131
2132        match engine.next_action() {
2133            Some(QueryAction::GetProvidersQueryDone {
2134                query_id,
2135                provided_key,
2136                providers,
2137            }) => {
2138                assert_eq!(query_id, get_query_id);
2139                assert_eq!(provided_key, original_provided_key);
2140                assert_eq!(providers, vec![local_content_provider]);
2141            }
2142            event => panic!("invalid event received {:?}", event),
2143        }
2144    }
2145}