litep2p/protocol/libp2p/kademlia/query/
mod.rs

1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::{
22    protocol::libp2p::kademlia::{
23        message::KademliaMessage,
24        query::{
25            find_node::{FindNodeConfig, FindNodeContext},
26            get_providers::{GetProvidersConfig, GetProvidersContext},
27            get_record::{GetRecordConfig, GetRecordContext},
28        },
29        record::{ContentProvider, Key as RecordKey, Record},
30        types::{KademliaPeer, Key},
31        PeerRecord, Quorum,
32    },
33    PeerId,
34};
35
36use bytes::Bytes;
37
38use std::collections::{HashMap, VecDeque};
39
40use self::{find_many_nodes::FindManyNodesContext, target_peers::PutToTargetPeersContext};
41
42mod find_many_nodes;
43mod find_node;
44mod get_providers;
45mod get_record;
46mod target_peers;
47
48/// Logging target for the file.
49const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query";
50
51/// Type representing a query ID.
52#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
53#[cfg_attr(feature = "fuzz", derive(serde::Serialize, serde::Deserialize))]
54pub struct QueryId(pub usize);
55
56/// Query type.
57#[derive(Debug)]
58enum QueryType {
59    /// `FIND_NODE` query.
60    FindNode {
61        /// Context for the `FIND_NODE` query.
62        context: FindNodeContext<PeerId>,
63    },
64
65    /// `PUT_VALUE` query.
66    PutRecord {
67        /// Record that needs to be stored.
68        record: Record,
69
70        /// [`Quorum`] that needs to be reached for the query to succeed.
71        quorum: Quorum,
72
73        /// Context for the `FIND_NODE` query.
74        context: FindNodeContext<RecordKey>,
75    },
76
77    /// `PUT_VALUE` query to specified peers.
78    PutRecordToPeers {
79        /// Record that needs to be stored.
80        record: Record,
81
82        /// [`Quorum`] that needs to be reached for the query to succeed.
83        quorum: Quorum,
84
85        /// Context for finding peers.
86        context: FindManyNodesContext,
87    },
88
89    /// `PUT_VALUE` message sending phase.
90    PutRecordToFoundNodes {
91        /// Context for tracking `PUT_VALUE` responses.
92        context: PutToTargetPeersContext,
93    },
94
95    /// `GET_VALUE` query.
96    GetRecord {
97        /// Context for the `GET_VALUE` query.
98        context: GetRecordContext,
99    },
100
101    /// `ADD_PROVIDER` query.
102    AddProvider {
103        /// Provided key.
104        provided_key: RecordKey,
105
106        /// Provider record that need to be stored.
107        provider: ContentProvider,
108
109        /// [`Quorum`] that needs to be reached for the query to succeed.
110        quorum: Quorum,
111
112        /// Context for the `FIND_NODE` query.
113        context: FindNodeContext<RecordKey>,
114    },
115
116    /// `ADD_PROVIDER` message sending phase.
117    AddProviderToFoundNodes {
118        /// Context for tracking `ADD_PROVIDER` requests.
119        context: PutToTargetPeersContext,
120    },
121
122    /// `GET_PROVIDERS` query.
123    GetProviders {
124        /// Context for the `GET_PROVIDERS` query.
125        context: GetProvidersContext,
126    },
127}
128
129/// Query action.
130#[derive(Debug)]
131pub enum QueryAction {
132    /// Send message to peer.
133    SendMessage {
134        /// Query ID.
135        query: QueryId,
136
137        /// Peer.
138        peer: PeerId,
139
140        /// Message.
141        message: Bytes,
142    },
143
144    /// `FIND_NODE` query succeeded.
145    FindNodeQuerySucceeded {
146        /// ID of the query that succeeded.
147        query: QueryId,
148
149        /// Target peer.
150        target: PeerId,
151
152        /// Peers that were found.
153        peers: Vec<KademliaPeer>,
154    },
155
156    /// Store the record to nodes closest to target key.
157    PutRecordToFoundNodes {
158        /// Query ID of the original PUT_RECORD request.
159        query: QueryId,
160
161        /// Record to store.
162        record: Record,
163
164        /// Peers for whom the `PUT_VALUE` must be sent to.
165        peers: Vec<KademliaPeer>,
166
167        /// [`Quorum`] that needs to be reached for the query to succeed.
168        quorum: Quorum,
169    },
170
171    /// `PUT_VALUE` query succeeded.
172    PutRecordQuerySucceeded {
173        /// ID of the query that succeeded.
174        query: QueryId,
175
176        /// Record key of the stored record.
177        key: RecordKey,
178    },
179
180    /// Add the provider record to nodes closest to the target key.
181    AddProviderToFoundNodes {
182        /// Query ID of the original ADD_PROVIDER request.
183        query: QueryId,
184
185        /// Provided key.
186        provided_key: RecordKey,
187
188        /// Provider record.
189        provider: ContentProvider,
190
191        /// Peers for whom the `ADD_PROVIDER` must be sent to.
192        peers: Vec<KademliaPeer>,
193
194        /// [`Quorum`] that needs to be reached for the query to succeed.
195        quorum: Quorum,
196    },
197
198    /// `ADD_PROVIDER` query succeeded.
199    AddProviderQuerySucceeded {
200        /// ID of the query that succeeded.
201        query: QueryId,
202
203        /// Provided key.
204        provided_key: RecordKey,
205    },
206
207    /// `GET_VALUE` query succeeded.
208    GetRecordQueryDone {
209        /// Query ID.
210        query_id: QueryId,
211    },
212
213    /// `GET_VALUE` inflight query produced a result.
214    ///
215    /// This event is emitted when a peer responds to the query with a record.
216    GetRecordPartialResult {
217        /// Query ID.
218        query_id: QueryId,
219
220        /// Found record.
221        record: PeerRecord,
222    },
223
224    /// `GET_PROVIDERS` query succeeded.
225    GetProvidersQueryDone {
226        /// Query ID.
227        query_id: QueryId,
228
229        /// Provided key.
230        provided_key: RecordKey,
231
232        /// Found providers.
233        providers: Vec<ContentProvider>,
234    },
235
236    /// Query succeeded.
237    QuerySucceeded {
238        /// ID of the query that succeeded.
239        query: QueryId,
240    },
241
242    /// Query failed.
243    QueryFailed {
244        /// ID of the query that failed.
245        query: QueryId,
246    },
247}
248
249/// Kademlia query engine.
250pub struct QueryEngine {
251    /// Local peer ID.
252    local_peer_id: PeerId,
253
254    /// Replication factor.
255    replication_factor: usize,
256
257    /// Parallelism factor.
258    parallelism_factor: usize,
259
260    /// Active queries.
261    queries: HashMap<QueryId, QueryType>,
262}
263
264impl QueryEngine {
265    /// Create new [`QueryEngine`].
266    pub fn new(
267        local_peer_id: PeerId,
268        replication_factor: usize,
269        parallelism_factor: usize,
270    ) -> Self {
271        Self {
272            local_peer_id,
273            replication_factor,
274            parallelism_factor,
275            queries: HashMap::new(),
276        }
277    }
278
279    /// Start `FIND_NODE` query.
280    pub fn start_find_node(
281        &mut self,
282        query_id: QueryId,
283        target: PeerId,
284        candidates: VecDeque<KademliaPeer>,
285    ) -> QueryId {
286        tracing::debug!(
287            target: LOG_TARGET,
288            ?query_id,
289            ?target,
290            num_peers = ?candidates.len(),
291            "start `FIND_NODE` query"
292        );
293
294        let target = Key::from(target);
295        let config = FindNodeConfig {
296            local_peer_id: self.local_peer_id,
297            replication_factor: self.replication_factor,
298            parallelism_factor: self.parallelism_factor,
299            query: query_id,
300            target,
301        };
302
303        self.queries.insert(
304            query_id,
305            QueryType::FindNode {
306                context: FindNodeContext::new(config, candidates),
307            },
308        );
309
310        query_id
311    }
312
313    /// Start `PUT_VALUE` query.
314    pub fn start_put_record(
315        &mut self,
316        query_id: QueryId,
317        record: Record,
318        candidates: VecDeque<KademliaPeer>,
319        quorum: Quorum,
320    ) -> QueryId {
321        tracing::debug!(
322            target: LOG_TARGET,
323            ?query_id,
324            target = ?record.key,
325            num_peers = ?candidates.len(),
326            "start `PUT_VALUE` query"
327        );
328
329        let target = Key::new(record.key.clone());
330        let config = FindNodeConfig {
331            local_peer_id: self.local_peer_id,
332            replication_factor: self.replication_factor,
333            parallelism_factor: self.parallelism_factor,
334            query: query_id,
335            target,
336        };
337
338        self.queries.insert(
339            query_id,
340            QueryType::PutRecord {
341                record,
342                quorum,
343                context: FindNodeContext::new(config, candidates),
344            },
345        );
346
347        query_id
348    }
349
350    /// Start `PUT_VALUE` query to specified peers.
351    pub fn start_put_record_to_peers(
352        &mut self,
353        query_id: QueryId,
354        record: Record,
355        peers_to_report: Vec<KademliaPeer>,
356        quorum: Quorum,
357    ) -> QueryId {
358        tracing::debug!(
359            target: LOG_TARGET,
360            ?query_id,
361            target = ?record.key,
362            num_peers = ?peers_to_report.len(),
363            "start `PUT_VALUE` query to peers"
364        );
365
366        self.queries.insert(
367            query_id,
368            QueryType::PutRecordToPeers {
369                record,
370                quorum,
371                context: FindManyNodesContext::new(query_id, peers_to_report),
372            },
373        );
374
375        query_id
376    }
377
378    /// Start `GET_VALUE` query.
379    pub fn start_get_record(
380        &mut self,
381        query_id: QueryId,
382        target: RecordKey,
383        candidates: VecDeque<KademliaPeer>,
384        quorum: Quorum,
385        local_record: bool,
386    ) -> QueryId {
387        tracing::debug!(
388            target: LOG_TARGET,
389            ?query_id,
390            ?target,
391            num_peers = ?candidates.len(),
392            "start `GET_VALUE` query"
393        );
394
395        let target = Key::new(target);
396        let config = GetRecordConfig {
397            local_peer_id: self.local_peer_id,
398            known_records: if local_record { 1 } else { 0 },
399            quorum,
400            replication_factor: self.replication_factor,
401            parallelism_factor: self.parallelism_factor,
402            query: query_id,
403            target,
404        };
405
406        self.queries.insert(
407            query_id,
408            QueryType::GetRecord {
409                context: GetRecordContext::new(config, candidates, local_record),
410            },
411        );
412
413        query_id
414    }
415
416    /// Start `ADD_PROVIDER` query.
417    pub fn start_add_provider(
418        &mut self,
419        query_id: QueryId,
420        provided_key: RecordKey,
421        provider: ContentProvider,
422        candidates: VecDeque<KademliaPeer>,
423        quorum: Quorum,
424    ) -> QueryId {
425        tracing::debug!(
426            target: LOG_TARGET,
427            ?query_id,
428            ?provider,
429            num_peers = ?candidates.len(),
430            "start `ADD_PROVIDER` query",
431        );
432
433        let config = FindNodeConfig {
434            local_peer_id: self.local_peer_id,
435            replication_factor: self.replication_factor,
436            parallelism_factor: self.parallelism_factor,
437            query: query_id,
438            target: Key::new(provided_key.clone()),
439        };
440
441        self.queries.insert(
442            query_id,
443            QueryType::AddProvider {
444                provided_key,
445                provider,
446                quorum,
447                context: FindNodeContext::new(config, candidates),
448            },
449        );
450
451        query_id
452    }
453
454    /// Start `GET_PROVIDERS` query.
455    pub fn start_get_providers(
456        &mut self,
457        query_id: QueryId,
458        key: RecordKey,
459        candidates: VecDeque<KademliaPeer>,
460        known_providers: Vec<ContentProvider>,
461    ) -> QueryId {
462        tracing::debug!(
463            target: LOG_TARGET,
464            ?query_id,
465            ?key,
466            num_peers = ?candidates.len(),
467            "start `GET_PROVIDERS` query",
468        );
469
470        let target = Key::new(key);
471        let config = GetProvidersConfig {
472            local_peer_id: self.local_peer_id,
473            parallelism_factor: self.parallelism_factor,
474            query: query_id,
475            target,
476            known_providers: known_providers.into_iter().map(Into::into).collect(),
477        };
478
479        self.queries.insert(
480            query_id,
481            QueryType::GetProviders {
482                context: GetProvidersContext::new(config, candidates),
483            },
484        );
485
486        query_id
487    }
488
489    /// Start `PUT_VALUE` requests tracking.
490    pub fn start_put_record_to_found_nodes_requests_tracking(
491        &mut self,
492        query_id: QueryId,
493        key: RecordKey,
494        peers: Vec<PeerId>,
495        quorum: Quorum,
496    ) {
497        tracing::debug!(
498            target: LOG_TARGET,
499            ?query_id,
500            num_peers = ?peers.len(),
501            "start `PUT_VALUE` responses tracking"
502        );
503
504        self.queries.insert(
505            query_id,
506            QueryType::PutRecordToFoundNodes {
507                context: PutToTargetPeersContext::new(query_id, key, peers, quorum),
508            },
509        );
510    }
511
512    /// Start `ADD_PROVIDER` requests tracking.
513    pub fn start_add_provider_to_found_nodes_requests_tracking(
514        &mut self,
515        query_id: QueryId,
516        provided_key: RecordKey,
517        peers: Vec<PeerId>,
518        quorum: Quorum,
519    ) {
520        tracing::debug!(
521            target: LOG_TARGET,
522            ?query_id,
523            num_peers = ?peers.len(),
524            "start `ADD_PROVIDER` progress tracking"
525        );
526
527        self.queries.insert(
528            query_id,
529            QueryType::AddProviderToFoundNodes {
530                context: PutToTargetPeersContext::new(query_id, provided_key, peers, quorum),
531            },
532        );
533    }
534
535    /// Register response failure from a queried peer.
536    pub fn register_response_failure(&mut self, query: QueryId, peer: PeerId) {
537        tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register response failure");
538
539        match self.queries.get_mut(&query) {
540            None => {
541                tracing::trace!(target: LOG_TARGET, ?query, ?peer, "response failure for a stale query");
542            }
543            Some(QueryType::FindNode { context }) => {
544                context.register_response_failure(peer);
545            }
546            Some(QueryType::PutRecord { context, .. }) => {
547                context.register_response_failure(peer);
548            }
549            Some(QueryType::PutRecordToPeers { context, .. }) => {
550                context.register_response_failure(peer);
551            }
552            Some(QueryType::PutRecordToFoundNodes { context }) => {
553                context.register_response_failure(peer);
554            }
555            Some(QueryType::GetRecord { context }) => {
556                context.register_response_failure(peer);
557            }
558            Some(QueryType::AddProvider { context, .. }) => {
559                context.register_response_failure(peer);
560            }
561            Some(QueryType::AddProviderToFoundNodes { context }) => {
562                context.register_response_failure(peer);
563            }
564            Some(QueryType::GetProviders { context }) => {
565                context.register_response_failure(peer);
566            }
567        }
568    }
569
570    /// Register that `response` received from `peer`.
571    pub fn register_response(&mut self, query: QueryId, peer: PeerId, message: KademliaMessage) {
572        tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register response");
573
574        match self.queries.get_mut(&query) {
575            None => {
576                tracing::trace!(target: LOG_TARGET, ?query, ?peer, "response for a stale query");
577            }
578            Some(QueryType::FindNode { context }) => match message {
579                KademliaMessage::FindNode { peers, .. } => {
580                    context.register_response(peer, peers);
581                }
582                message => {
583                    tracing::debug!(
584                        target: LOG_TARGET,
585                        ?query,
586                        ?peer,
587                        "unexpected response to `FIND_NODE`: {message}",
588                    );
589                    context.register_response_failure(peer);
590                }
591            },
592            Some(QueryType::PutRecord { context, .. }) => match message {
593                KademliaMessage::FindNode { peers, .. } => {
594                    context.register_response(peer, peers);
595                }
596                message => {
597                    tracing::debug!(
598                        target: LOG_TARGET,
599                        ?query,
600                        ?peer,
601                        "unexpected response to `FIND_NODE` during `PUT_VALUE` query: {message}",
602                    );
603                    context.register_response_failure(peer);
604                }
605            },
606            Some(QueryType::PutRecordToPeers { context, .. }) => match message {
607                KademliaMessage::FindNode { peers, .. } => {
608                    context.register_response(peer, peers);
609                }
610                message => {
611                    tracing::debug!(
612                        target: LOG_TARGET,
613                        ?query,
614                        ?peer,
615                        "unexpected response to `FIND_NODE` during `PUT_VALUE` (to peers): {message}",
616                    );
617                    context.register_response_failure(peer);
618                }
619            },
620            Some(QueryType::PutRecordToFoundNodes { context }) => match message {
621                KademliaMessage::PutValue { .. } => {
622                    context.register_response(peer);
623                }
624                message => {
625                    tracing::debug!(
626                        target: LOG_TARGET,
627                        ?query,
628                        ?peer,
629                        "unexpected response to `PUT_VALUE`: {message}",
630                    );
631                    context.register_response_failure(peer);
632                }
633            },
634            Some(QueryType::GetRecord { context }) => match message {
635                KademliaMessage::GetRecord { record, peers, .. } =>
636                    context.register_response(peer, record, peers),
637                message => {
638                    tracing::debug!(
639                        target: LOG_TARGET,
640                        ?query,
641                        ?peer,
642                        "unexpected response to `GET_VALUE`: {message}",
643                    );
644                    context.register_response_failure(peer);
645                }
646            },
647            Some(QueryType::AddProvider { context, .. }) => match message {
648                KademliaMessage::FindNode { peers, .. } => {
649                    context.register_response(peer, peers);
650                }
651                message => {
652                    tracing::debug!(
653                        target: LOG_TARGET,
654                        ?query,
655                        ?peer,
656                        "unexpected response to `FIND_NODE` during `ADD_PROVIDER` query: {message}",
657                    );
658                    context.register_response_failure(peer);
659                }
660            },
661            Some(QueryType::AddProviderToFoundNodes { context, .. }) => match message {
662                KademliaMessage::AddProvider { .. } => {
663                    context.register_response(peer);
664                }
665                message => {
666                    tracing::debug!(
667                        target: LOG_TARGET,
668                        ?query,
669                        ?peer,
670                        "unexpected response to `ADD_PROVIDER`: {message}",
671                    );
672                    context.register_response_failure(peer);
673                }
674            },
675            Some(QueryType::GetProviders { context }) => match message {
676                KademliaMessage::GetProviders {
677                    key: _,
678                    providers,
679                    peers,
680                } => {
681                    context.register_response(peer, providers, peers);
682                }
683                message => {
684                    tracing::debug!(
685                        target: LOG_TARGET,
686                        ?query,
687                        ?peer,
688                        "unexpected response to `GET_PROVIDERS`: {message}",
689                    );
690                    context.register_response_failure(peer);
691                }
692            },
693        }
694    }
695
696    pub fn register_send_failure(&mut self, query: QueryId, peer: PeerId) {
697        tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register send failure");
698
699        match self.queries.get_mut(&query) {
700            None => {
701                tracing::trace!(target: LOG_TARGET, ?query, ?peer, "send failure for a stale query");
702            }
703            Some(QueryType::FindNode { context }) => {
704                context.register_send_failure(peer);
705            }
706            Some(QueryType::PutRecord { context, .. }) => {
707                context.register_send_failure(peer);
708            }
709            Some(QueryType::PutRecordToPeers { context, .. }) => {
710                context.register_send_failure(peer);
711            }
712            Some(QueryType::PutRecordToFoundNodes { context }) => {
713                context.register_send_failure(peer);
714            }
715            Some(QueryType::GetRecord { context }) => {
716                context.register_send_failure(peer);
717            }
718            Some(QueryType::AddProvider { context, .. }) => {
719                context.register_send_failure(peer);
720            }
721            Some(QueryType::AddProviderToFoundNodes { context }) => {
722                context.register_send_failure(peer);
723            }
724            Some(QueryType::GetProviders { context }) => {
725                context.register_send_failure(peer);
726            }
727        }
728    }
729
730    pub fn register_send_success(&mut self, query: QueryId, peer: PeerId) {
731        tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register send success");
732
733        match self.queries.get_mut(&query) {
734            None => {
735                tracing::trace!(target: LOG_TARGET, ?query, ?peer, "send success for a stale query");
736            }
737            Some(QueryType::FindNode { context }) => {
738                context.register_send_success(peer);
739            }
740            Some(QueryType::PutRecord { context, .. }) => {
741                context.register_send_success(peer);
742            }
743            Some(QueryType::PutRecordToPeers { context, .. }) => {
744                context.register_send_success(peer);
745            }
746            Some(QueryType::PutRecordToFoundNodes { context, .. }) => {
747                context.register_send_success(peer);
748            }
749            Some(QueryType::GetRecord { context }) => {
750                context.register_send_success(peer);
751            }
752            Some(QueryType::AddProvider { context, .. }) => {
753                context.register_send_success(peer);
754            }
755            Some(QueryType::AddProviderToFoundNodes { context, .. }) => {
756                context.register_send_success(peer);
757            }
758            Some(QueryType::GetProviders { context }) => {
759                context.register_send_success(peer);
760            }
761        }
762    }
763
764    /// Register peer failure when it is not known whether sending or receiveiing failed.
765    /// This is called from [`super::Kademlia::disconnect_peer`].
766    pub fn register_peer_failure(&mut self, query: QueryId, peer: PeerId) {
767        tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register peer failure");
768
769        // Because currently queries track either send success/failure (`PUT_VALUE`, `ADD_PROVIDER`)
770        // or response success/failure (`FIND_NODE`, `GET_VALUE`, `GET_PROVIDERS`),
771        // but not both, we can just call both here and not propagate this different type of
772        // failure to specific queries knowing this will result in the correct behaviour.
773        self.register_send_failure(query, peer);
774        self.register_response_failure(query, peer);
775    }
776
777    /// Get next action for `peer` from the [`QueryEngine`].
778    pub fn next_peer_action(&mut self, query: &QueryId, peer: &PeerId) -> Option<QueryAction> {
779        tracing::trace!(target: LOG_TARGET, ?query, ?peer, "get next peer action");
780
781        match self.queries.get_mut(query) {
782            None => {
783                tracing::trace!(target: LOG_TARGET, ?query, ?peer, "response failure for a stale query");
784                None
785            }
786            Some(QueryType::FindNode { context }) => context.next_peer_action(peer),
787            Some(QueryType::PutRecord { context, .. }) => context.next_peer_action(peer),
788            Some(QueryType::PutRecordToPeers { context, .. }) => context.next_peer_action(peer),
789            Some(QueryType::GetRecord { context }) => context.next_peer_action(peer),
790            Some(QueryType::AddProvider { context, .. }) => context.next_peer_action(peer),
791            Some(QueryType::GetProviders { context }) => context.next_peer_action(peer),
792            Some(QueryType::PutRecordToFoundNodes { .. }) => {
793                // All `PUT_VALUE` requests were sent when initiating this query type.
794                None
795            }
796            Some(QueryType::AddProviderToFoundNodes { .. }) => {
797                // All `ADD_PROVIDER` requests were sent when initiating this query type.
798                None
799            }
800        }
801    }
802
803    /// Handle query success by returning the queried value(s)
804    /// and removing the query from [`QueryEngine`].
805    fn on_query_succeeded(&mut self, query: QueryId) -> QueryAction {
806        match self.queries.remove(&query).expect("query to exist") {
807            QueryType::FindNode { context } => QueryAction::FindNodeQuerySucceeded {
808                query,
809                target: context.config.target.into_preimage(),
810                peers: context.responses.into_values().collect::<Vec<_>>(),
811            },
812            QueryType::PutRecord {
813                record,
814                quorum,
815                context,
816            } => QueryAction::PutRecordToFoundNodes {
817                query: context.config.query,
818                record,
819                peers: context.responses.into_values().collect::<Vec<_>>(),
820                quorum,
821            },
822            QueryType::PutRecordToPeers {
823                record,
824                quorum,
825                context,
826            } => QueryAction::PutRecordToFoundNodes {
827                query: context.query,
828                record,
829                peers: context.peers_to_report,
830                quorum,
831            },
832            QueryType::PutRecordToFoundNodes { context } => QueryAction::PutRecordQuerySucceeded {
833                query: context.query,
834                key: context.key,
835            },
836            QueryType::GetRecord { context } => QueryAction::GetRecordQueryDone {
837                query_id: context.config.query,
838            },
839            QueryType::AddProvider {
840                provided_key,
841                provider,
842                quorum,
843                context,
844            } => QueryAction::AddProviderToFoundNodes {
845                query: context.config.query,
846                provided_key,
847                provider,
848                peers: context.responses.into_values().collect::<Vec<_>>(),
849                quorum,
850            },
851            QueryType::AddProviderToFoundNodes { context } =>
852                QueryAction::AddProviderQuerySucceeded {
853                    query: context.query,
854                    provided_key: context.key,
855                },
856            QueryType::GetProviders { context } => QueryAction::GetProvidersQueryDone {
857                query_id: context.config.query,
858                provided_key: context.config.target.clone().into_preimage(),
859                providers: context.found_providers(),
860            },
861        }
862    }
863
864    /// Handle query failure by removing the query from [`QueryEngine`] and
865    /// returning the appropriate [`QueryAction`] to user.
866    fn on_query_failed(&mut self, query: QueryId) -> QueryAction {
867        let _ = self.queries.remove(&query).expect("query to exist");
868
869        QueryAction::QueryFailed { query }
870    }
871
872    /// Get next action from the [`QueryEngine`].
873    pub fn next_action(&mut self) -> Option<QueryAction> {
874        for (_, state) in self.queries.iter_mut() {
875            let action = match state {
876                QueryType::FindNode { context } => context.next_action(),
877                QueryType::PutRecord { context, .. } => context.next_action(),
878                QueryType::PutRecordToPeers { context, .. } => context.next_action(),
879                QueryType::GetRecord { context } => context.next_action(),
880                QueryType::AddProvider { context, .. } => context.next_action(),
881                QueryType::GetProviders { context } => context.next_action(),
882                QueryType::PutRecordToFoundNodes { context, .. } => context.next_action(),
883                QueryType::AddProviderToFoundNodes { context, .. } => context.next_action(),
884            };
885
886            match action {
887                Some(QueryAction::QuerySucceeded { query }) => {
888                    return Some(self.on_query_succeeded(query));
889                }
890                Some(QueryAction::QueryFailed { query }) =>
891                    return Some(self.on_query_failed(query)),
892                Some(_) => return action,
893                _ => continue,
894            }
895        }
896
897        None
898    }
899}
900
901#[cfg(test)]
902mod tests {
903    use multihash::Multihash;
904
905    use super::*;
906    use crate::{
907        peer_id::MULTIHASH_IDENTITY_CODE, protocol::libp2p::kademlia::types::ConnectionType,
908    };
909
910    // make fixed peer id
911    fn make_peer_id(first: u8, second: u8) -> PeerId {
912        let mut peer_id = vec![0u8; 32];
913        peer_id[0] = first;
914        peer_id[1] = second;
915
916        PeerId::from_bytes(
917            &Multihash::<64>::wrap(MULTIHASH_IDENTITY_CODE, &peer_id)
918                .expect("The digest size is never too large")
919                .to_bytes(),
920        )
921        .unwrap()
922    }
923
924    #[test]
925    fn find_node_query_fails() {
926        let _ = tracing_subscriber::fmt()
927            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
928            .try_init();
929
930        let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
931        let target_peer = PeerId::random();
932        let _target_key = Key::from(target_peer);
933
934        let query = engine.start_find_node(
935            QueryId(1337),
936            target_peer,
937            vec![
938                KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
939                KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
940                KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
941                KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
942            ]
943            .into(),
944        );
945
946        for _ in 0..4 {
947            if let Some(QueryAction::SendMessage { query, peer, .. }) = engine.next_action() {
948                engine.register_response_failure(query, peer);
949            }
950        }
951
952        if let Some(QueryAction::QueryFailed { query: failed }) = engine.next_action() {
953            assert_eq!(failed, query);
954        }
955
956        assert!(engine.next_action().is_none());
957    }
958
959    #[test]
960    fn find_node_lookup_paused() {
961        let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
962        let target_peer = PeerId::random();
963        let _target_key = Key::from(target_peer);
964
965        let _ = engine.start_find_node(
966            QueryId(1338),
967            target_peer,
968            vec![
969                KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
970                KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
971                KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
972                KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
973            ]
974            .into(),
975        );
976
977        for _ in 0..3 {
978            let _ = engine.next_action();
979        }
980
981        assert!(engine.next_action().is_none());
982    }
983
984    #[test]
985    fn find_node_query_succeeds() {
986        let _ = tracing_subscriber::fmt()
987            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
988            .try_init();
989
990        let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
991        let target_peer = make_peer_id(0, 0);
992        let target_key = Key::from(target_peer);
993
994        let distances = {
995            let mut distances = std::collections::BTreeMap::new();
996
997            for i in 1..64 {
998                let peer = make_peer_id(i, 0);
999                let key = Key::from(peer);
1000
1001                distances.insert(target_key.distance(&key), peer);
1002            }
1003
1004            distances
1005        };
1006        let mut iter = distances.iter();
1007
1008        // start find node with one known peer
1009        let _query = engine.start_find_node(
1010            QueryId(1339),
1011            target_peer,
1012            vec![KademliaPeer::new(
1013                *iter.next().unwrap().1,
1014                vec![],
1015                ConnectionType::NotConnected,
1016            )]
1017            .into(),
1018        );
1019
1020        let action = engine.next_action();
1021        assert!(engine.next_action().is_none());
1022
1023        // the one known peer responds with 3 other peers it knows
1024        match action {
1025            Some(QueryAction::SendMessage { query, peer, .. }) => {
1026                engine.register_response(
1027                    query,
1028                    peer,
1029                    KademliaMessage::FindNode {
1030                        target: Vec::new(),
1031                        peers: vec![
1032                            KademliaPeer::new(
1033                                *iter.next().unwrap().1,
1034                                vec![],
1035                                ConnectionType::NotConnected,
1036                            ),
1037                            KademliaPeer::new(
1038                                *iter.next().unwrap().1,
1039                                vec![],
1040                                ConnectionType::NotConnected,
1041                            ),
1042                            KademliaPeer::new(
1043                                *iter.next().unwrap().1,
1044                                vec![],
1045                                ConnectionType::NotConnected,
1046                            ),
1047                        ],
1048                    },
1049                );
1050            }
1051            _ => panic!("invalid event received"),
1052        }
1053
1054        // send empty response for the last three nodes
1055        for _ in 0..3 {
1056            match engine.next_action() {
1057                Some(QueryAction::SendMessage { query, peer, .. }) => {
1058                    println!("next send message to {peer:?}");
1059                    engine.register_response(
1060                        query,
1061                        peer,
1062                        KademliaMessage::FindNode {
1063                            target: Vec::new(),
1064                            peers: vec![],
1065                        },
1066                    );
1067                }
1068                _ => panic!("invalid event received"),
1069            }
1070        }
1071
1072        match engine.next_action() {
1073            Some(QueryAction::FindNodeQuerySucceeded { peers, .. }) => {
1074                assert_eq!(peers.len(), 4);
1075            }
1076            _ => panic!("invalid event received"),
1077        }
1078
1079        assert!(engine.next_action().is_none());
1080    }
1081
1082    #[test]
1083    fn put_record_fails() {
1084        let _ = tracing_subscriber::fmt()
1085            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1086            .try_init();
1087
1088        let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
1089        let record_key = RecordKey::new(&vec![1, 2, 3, 4]);
1090        let target_key = Key::new(record_key.clone());
1091        let original_record = Record::new(record_key.clone(), vec![1, 3, 3, 7, 1, 3, 3, 8]);
1092
1093        let distances = {
1094            let mut distances = std::collections::BTreeMap::new();
1095
1096            for i in 1..64 {
1097                let peer = make_peer_id(i, 0);
1098                let key = Key::from(peer);
1099
1100                distances.insert(target_key.distance(&key), peer);
1101            }
1102
1103            distances
1104        };
1105        let mut iter = distances.iter();
1106
1107        // start find node with one known peer
1108        let original_query_id = QueryId(1340);
1109        let _query = engine.start_put_record(
1110            original_query_id,
1111            original_record.clone(),
1112            vec![KademliaPeer::new(
1113                *iter.next().unwrap().1,
1114                vec![],
1115                ConnectionType::NotConnected,
1116            )]
1117            .into(),
1118            Quorum::All,
1119        );
1120
1121        let action = engine.next_action();
1122        assert!(engine.next_action().is_none());
1123
1124        // the one known peer responds with 3 other peers it knows
1125        match action {
1126            Some(QueryAction::SendMessage { query, peer, .. }) => {
1127                engine.register_response(
1128                    query,
1129                    peer,
1130                    KademliaMessage::FindNode {
1131                        target: Vec::new(),
1132                        peers: vec![
1133                            KademliaPeer::new(
1134                                *iter.next().unwrap().1,
1135                                vec![],
1136                                ConnectionType::NotConnected,
1137                            ),
1138                            KademliaPeer::new(
1139                                *iter.next().unwrap().1,
1140                                vec![],
1141                                ConnectionType::NotConnected,
1142                            ),
1143                            KademliaPeer::new(
1144                                *iter.next().unwrap().1,
1145                                vec![],
1146                                ConnectionType::NotConnected,
1147                            ),
1148                        ],
1149                    },
1150                );
1151            }
1152            _ => panic!("invalid event received"),
1153        }
1154
1155        // send empty response for the last three nodes
1156        for _ in 0..3 {
1157            match engine.next_action() {
1158                Some(QueryAction::SendMessage { query, peer, .. }) => {
1159                    println!("next send message to {peer:?}");
1160                    engine.register_response(
1161                        query,
1162                        peer,
1163                        KademliaMessage::FindNode {
1164                            target: Vec::new(),
1165                            peers: vec![],
1166                        },
1167                    );
1168                }
1169                _ => panic!("invalid event received"),
1170            }
1171        }
1172
1173        let mut peers = match engine.next_action() {
1174            Some(QueryAction::PutRecordToFoundNodes {
1175                query,
1176                peers,
1177                record,
1178                quorum,
1179            }) => {
1180                assert_eq!(query, original_query_id);
1181                assert_eq!(peers.len(), 4);
1182                assert_eq!(record.key, original_record.key);
1183                assert_eq!(record.value, original_record.value);
1184                assert!(matches!(quorum, Quorum::All));
1185
1186                peers
1187            }
1188            _ => panic!("invalid event received"),
1189        };
1190
1191        engine.start_put_record_to_found_nodes_requests_tracking(
1192            original_query_id,
1193            record_key.clone(),
1194            peers.iter().map(|p| p.peer).collect(),
1195            Quorum::All,
1196        );
1197
1198        // sends to all but one peer succeed
1199        let last_peer = peers.pop().unwrap();
1200        for peer in peers {
1201            engine.register_send_success(original_query_id, peer.peer);
1202        }
1203        engine.register_send_failure(original_query_id, last_peer.peer);
1204
1205        match engine.next_action() {
1206            Some(QueryAction::QueryFailed { query }) => {
1207                assert_eq!(query, original_query_id);
1208            }
1209            _ => panic!("invalid event received"),
1210        }
1211
1212        assert!(engine.next_action().is_none());
1213    }
1214
1215    #[test]
1216    fn put_record_succeeds() {
1217        let _ = tracing_subscriber::fmt()
1218            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1219            .try_init();
1220
1221        let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
1222        let record_key = RecordKey::new(&vec![1, 2, 3, 4]);
1223        let target_key = Key::new(record_key.clone());
1224        let original_record = Record::new(record_key.clone(), vec![1, 3, 3, 7, 1, 3, 3, 8]);
1225
1226        let distances = {
1227            let mut distances = std::collections::BTreeMap::new();
1228
1229            for i in 1..64 {
1230                let peer = make_peer_id(i, 0);
1231                let key = Key::from(peer);
1232
1233                distances.insert(target_key.distance(&key), peer);
1234            }
1235
1236            distances
1237        };
1238        let mut iter = distances.iter();
1239
1240        // start find node with one known peer
1241        let original_query_id = QueryId(1340);
1242        let _query = engine.start_put_record(
1243            original_query_id,
1244            original_record.clone(),
1245            vec![KademliaPeer::new(
1246                *iter.next().unwrap().1,
1247                vec![],
1248                ConnectionType::NotConnected,
1249            )]
1250            .into(),
1251            Quorum::All,
1252        );
1253
1254        let action = engine.next_action();
1255        assert!(engine.next_action().is_none());
1256
1257        // the one known peer responds with 3 other peers it knows
1258        match action {
1259            Some(QueryAction::SendMessage { query, peer, .. }) => {
1260                engine.register_response(
1261                    query,
1262                    peer,
1263                    KademliaMessage::FindNode {
1264                        target: Vec::new(),
1265                        peers: vec![
1266                            KademliaPeer::new(
1267                                *iter.next().unwrap().1,
1268                                vec![],
1269                                ConnectionType::NotConnected,
1270                            ),
1271                            KademliaPeer::new(
1272                                *iter.next().unwrap().1,
1273                                vec![],
1274                                ConnectionType::NotConnected,
1275                            ),
1276                            KademliaPeer::new(
1277                                *iter.next().unwrap().1,
1278                                vec![],
1279                                ConnectionType::NotConnected,
1280                            ),
1281                        ],
1282                    },
1283                );
1284            }
1285            _ => panic!("invalid event received"),
1286        }
1287
1288        // send empty response for the last three nodes
1289        for _ in 0..3 {
1290            match engine.next_action() {
1291                Some(QueryAction::SendMessage { query, peer, .. }) => {
1292                    println!("next send message to {peer:?}");
1293                    engine.register_response(
1294                        query,
1295                        peer,
1296                        KademliaMessage::FindNode {
1297                            target: Vec::new(),
1298                            peers: vec![],
1299                        },
1300                    );
1301                }
1302                _ => panic!("invalid event received"),
1303            }
1304        }
1305
1306        let peers = match engine.next_action() {
1307            Some(QueryAction::PutRecordToFoundNodes {
1308                query,
1309                peers,
1310                record,
1311                quorum,
1312            }) => {
1313                assert_eq!(query, original_query_id);
1314                assert_eq!(peers.len(), 4);
1315                assert_eq!(record.key, original_record.key);
1316                assert_eq!(record.value, original_record.value);
1317                assert!(matches!(quorum, Quorum::All));
1318
1319                peers
1320            }
1321            _ => panic!("invalid event received"),
1322        };
1323
1324        engine.start_put_record_to_found_nodes_requests_tracking(
1325            original_query_id,
1326            record_key.clone(),
1327            peers.iter().map(|p| p.peer).collect(),
1328            Quorum::All,
1329        );
1330
1331        // simulate successful sends to all peers
1332        for peer in &peers {
1333            engine.register_send_success(original_query_id, peer.peer);
1334        }
1335
1336        match engine.next_action() {
1337            Some(QueryAction::PutRecordQuerySucceeded { query, key }) => {
1338                assert_eq!(query, original_query_id);
1339                assert_eq!(key, record_key);
1340            }
1341            _ => panic!("invalid event received"),
1342        }
1343
1344        assert!(engine.next_action().is_none());
1345
1346        // get records from those peers.
1347        let _query = engine.start_get_record(
1348            QueryId(1341),
1349            record_key.clone(),
1350            vec![
1351                KademliaPeer::new(peers[0].peer, vec![], ConnectionType::NotConnected),
1352                KademliaPeer::new(peers[1].peer, vec![], ConnectionType::NotConnected),
1353                KademliaPeer::new(peers[2].peer, vec![], ConnectionType::NotConnected),
1354                KademliaPeer::new(peers[3].peer, vec![], ConnectionType::NotConnected),
1355            ]
1356            .into(),
1357            Quorum::All,
1358            false,
1359        );
1360
1361        let mut records = Vec::new();
1362        for _ in 0..4 {
1363            match engine.next_action() {
1364                Some(QueryAction::SendMessage { query, peer, .. }) => {
1365                    assert_eq!(query, QueryId(1341));
1366                    engine.register_response(
1367                        query,
1368                        peer,
1369                        KademliaMessage::GetRecord {
1370                            record: Some(original_record.clone()),
1371                            peers: vec![],
1372                            key: Some(record_key.clone()),
1373                        },
1374                    );
1375                }
1376                event => panic!("invalid event received {:?}", event),
1377            }
1378
1379            // GetRecordPartialResult is emitted after the `register_response` if the record is
1380            // valid.
1381            match engine.next_action() {
1382                Some(QueryAction::GetRecordPartialResult { query_id, record }) => {
1383                    println!("Partial result {:?}", record);
1384                    assert_eq!(query_id, QueryId(1341));
1385                    records.push(record);
1386                }
1387                event => panic!("invalid event received {:?}", event),
1388            }
1389        }
1390
1391        let peers: std::collections::HashSet<_> = peers.into_iter().map(|p| p.peer).collect();
1392        match engine.next_action() {
1393            Some(QueryAction::GetRecordQueryDone { .. }) => {
1394                println!("Records {:?}", records);
1395                let query_peers = records
1396                    .iter()
1397                    .map(|peer_record| peer_record.peer)
1398                    .collect::<std::collections::HashSet<_>>();
1399                assert_eq!(peers, query_peers);
1400
1401                let records: std::collections::HashSet<_> =
1402                    records.into_iter().map(|peer_record| peer_record.record).collect();
1403                // One single record found across peers.
1404                assert_eq!(records.len(), 1);
1405                let record = records.into_iter().next().unwrap();
1406
1407                assert_eq!(record.key, original_record.key);
1408                assert_eq!(record.value, original_record.value);
1409            }
1410            event => panic!("invalid event received {:?}", event),
1411        }
1412    }
1413
1414    #[test]
1415    fn put_record_succeeds_with_quorum_one() {
1416        let _ = tracing_subscriber::fmt()
1417            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1418            .try_init();
1419
1420        let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
1421        let record_key = RecordKey::new(&vec![1, 2, 3, 4]);
1422        let target_key = Key::new(record_key.clone());
1423        let original_record = Record::new(record_key.clone(), vec![1, 3, 3, 7, 1, 3, 3, 8]);
1424
1425        let distances = {
1426            let mut distances = std::collections::BTreeMap::new();
1427
1428            for i in 1..64 {
1429                let peer = make_peer_id(i, 0);
1430                let key = Key::from(peer);
1431
1432                distances.insert(target_key.distance(&key), peer);
1433            }
1434
1435            distances
1436        };
1437        let mut iter = distances.iter();
1438
1439        // start find node with one known peer
1440        let original_query_id = QueryId(1340);
1441        let _query = engine.start_put_record(
1442            original_query_id,
1443            original_record.clone(),
1444            vec![KademliaPeer::new(
1445                *iter.next().unwrap().1,
1446                vec![],
1447                ConnectionType::NotConnected,
1448            )]
1449            .into(),
1450            Quorum::One,
1451        );
1452
1453        let action = engine.next_action();
1454        assert!(engine.next_action().is_none());
1455
1456        // the one known peer responds with 3 other peers it knows
1457        match action {
1458            Some(QueryAction::SendMessage { query, peer, .. }) => {
1459                engine.register_response(
1460                    query,
1461                    peer,
1462                    KademliaMessage::FindNode {
1463                        target: Vec::new(),
1464                        peers: vec![
1465                            KademliaPeer::new(
1466                                *iter.next().unwrap().1,
1467                                vec![],
1468                                ConnectionType::NotConnected,
1469                            ),
1470                            KademliaPeer::new(
1471                                *iter.next().unwrap().1,
1472                                vec![],
1473                                ConnectionType::NotConnected,
1474                            ),
1475                            KademliaPeer::new(
1476                                *iter.next().unwrap().1,
1477                                vec![],
1478                                ConnectionType::NotConnected,
1479                            ),
1480                        ],
1481                    },
1482                );
1483            }
1484            _ => panic!("invalid event received"),
1485        }
1486
1487        // send empty response for the last three nodes
1488        for _ in 0..3 {
1489            match engine.next_action() {
1490                Some(QueryAction::SendMessage { query, peer, .. }) => {
1491                    println!("next send message to {peer:?}");
1492                    engine.register_response(
1493                        query,
1494                        peer,
1495                        KademliaMessage::FindNode {
1496                            target: Vec::new(),
1497                            peers: vec![],
1498                        },
1499                    );
1500                }
1501                _ => panic!("invalid event received"),
1502            }
1503        }
1504
1505        let peers = match engine.next_action() {
1506            Some(QueryAction::PutRecordToFoundNodes {
1507                query,
1508                peers,
1509                record,
1510                quorum,
1511            }) => {
1512                assert_eq!(query, original_query_id);
1513                assert_eq!(peers.len(), 4);
1514                assert_eq!(record.key, original_record.key);
1515                assert_eq!(record.value, original_record.value);
1516                assert!(matches!(quorum, Quorum::One));
1517
1518                peers
1519            }
1520            _ => panic!("invalid event received"),
1521        };
1522
1523        engine.start_put_record_to_found_nodes_requests_tracking(
1524            original_query_id,
1525            record_key.clone(),
1526            peers.iter().map(|p| p.peer).collect(),
1527            Quorum::One,
1528        );
1529
1530        // all but one peer fail
1531        assert!(peers.len() > 1);
1532        for peer in peers.iter().take(peers.len() - 1) {
1533            engine.register_send_failure(original_query_id, peer.peer);
1534        }
1535        engine.register_send_success(original_query_id, peers.last().unwrap().peer);
1536
1537        match engine.next_action() {
1538            Some(QueryAction::PutRecordQuerySucceeded { query, key }) => {
1539                assert_eq!(query, original_query_id);
1540                assert_eq!(key, record_key);
1541            }
1542            _ => panic!("invalid event received"),
1543        }
1544
1545        assert!(engine.next_action().is_none());
1546
1547        // get records from those peers.
1548        let _query = engine.start_get_record(
1549            QueryId(1341),
1550            record_key.clone(),
1551            vec![
1552                KademliaPeer::new(peers[0].peer, vec![], ConnectionType::NotConnected),
1553                KademliaPeer::new(peers[1].peer, vec![], ConnectionType::NotConnected),
1554                KademliaPeer::new(peers[2].peer, vec![], ConnectionType::NotConnected),
1555                KademliaPeer::new(peers[3].peer, vec![], ConnectionType::NotConnected),
1556            ]
1557            .into(),
1558            Quorum::All,
1559            false,
1560        );
1561
1562        let mut records = Vec::new();
1563        for _ in 0..4 {
1564            match engine.next_action() {
1565                Some(QueryAction::SendMessage { query, peer, .. }) => {
1566                    assert_eq!(query, QueryId(1341));
1567                    engine.register_response(
1568                        query,
1569                        peer,
1570                        KademliaMessage::GetRecord {
1571                            record: Some(original_record.clone()),
1572                            peers: vec![],
1573                            key: Some(record_key.clone()),
1574                        },
1575                    );
1576                }
1577                event => panic!("invalid event received {:?}", event),
1578            }
1579
1580            // GetRecordPartialResult is emitted after the `register_response` if the record is
1581            // valid.
1582            match engine.next_action() {
1583                Some(QueryAction::GetRecordPartialResult { query_id, record }) => {
1584                    println!("Partial result {:?}", record);
1585                    assert_eq!(query_id, QueryId(1341));
1586                    records.push(record);
1587                }
1588                event => panic!("invalid event received {:?}", event),
1589            }
1590        }
1591
1592        let peers: std::collections::HashSet<_> = peers.into_iter().map(|p| p.peer).collect();
1593        match engine.next_action() {
1594            Some(QueryAction::GetRecordQueryDone { .. }) => {
1595                println!("Records {:?}", records);
1596                let query_peers = records
1597                    .iter()
1598                    .map(|peer_record| peer_record.peer)
1599                    .collect::<std::collections::HashSet<_>>();
1600                assert_eq!(peers, query_peers);
1601
1602                let records: std::collections::HashSet<_> =
1603                    records.into_iter().map(|peer_record| peer_record.record).collect();
1604                // One single record found across peers.
1605                assert_eq!(records.len(), 1);
1606                let record = records.into_iter().next().unwrap();
1607
1608                assert_eq!(record.key, original_record.key);
1609                assert_eq!(record.value, original_record.value);
1610            }
1611            event => panic!("invalid event received {:?}", event),
1612        }
1613    }
1614
1615    #[test]
1616    fn add_provider_fails() {
1617        let _ = tracing_subscriber::fmt()
1618            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1619            .try_init();
1620
1621        let local_peer_id = PeerId::random();
1622        let mut engine = QueryEngine::new(local_peer_id, 20usize, 3usize);
1623        let original_provided_key = RecordKey::new(&vec![1, 2, 3, 4]);
1624        let local_content_provider = ContentProvider {
1625            peer: local_peer_id,
1626            addresses: vec![],
1627        };
1628        let target_key = Key::new(original_provided_key.clone());
1629
1630        let distances = {
1631            let mut distances = std::collections::BTreeMap::new();
1632
1633            for i in 1..64 {
1634                let peer = make_peer_id(i, 0);
1635                let key = Key::from(peer);
1636
1637                distances.insert(target_key.distance(&key), peer);
1638            }
1639
1640            distances
1641        };
1642        let mut iter = distances.iter();
1643
1644        // start add provider with one known peer
1645        let original_query_id = QueryId(1340);
1646        let _query = engine.start_add_provider(
1647            original_query_id,
1648            original_provided_key.clone(),
1649            local_content_provider.clone(),
1650            vec![KademliaPeer::new(
1651                *iter.next().unwrap().1,
1652                vec![],
1653                ConnectionType::NotConnected,
1654            )]
1655            .into(),
1656            Quorum::All,
1657        );
1658
1659        let action = engine.next_action();
1660        assert!(engine.next_action().is_none());
1661
1662        // the one known peer responds with 3 other peers it knows
1663        match action {
1664            Some(QueryAction::SendMessage { query, peer, .. }) => {
1665                engine.register_response(
1666                    query,
1667                    peer,
1668                    KademliaMessage::FindNode {
1669                        target: Vec::new(),
1670                        peers: vec![
1671                            KademliaPeer::new(
1672                                *iter.next().unwrap().1,
1673                                vec![],
1674                                ConnectionType::NotConnected,
1675                            ),
1676                            KademliaPeer::new(
1677                                *iter.next().unwrap().1,
1678                                vec![],
1679                                ConnectionType::NotConnected,
1680                            ),
1681                            KademliaPeer::new(
1682                                *iter.next().unwrap().1,
1683                                vec![],
1684                                ConnectionType::NotConnected,
1685                            ),
1686                        ],
1687                    },
1688                );
1689            }
1690            _ => panic!("invalid event received"),
1691        }
1692
1693        // send empty response for the last three nodes
1694        for _ in 0..3 {
1695            match engine.next_action() {
1696                Some(QueryAction::SendMessage { query, peer, .. }) => {
1697                    println!("next send message to {peer:?}");
1698                    engine.register_response(
1699                        query,
1700                        peer,
1701                        KademliaMessage::FindNode {
1702                            target: Vec::new(),
1703                            peers: vec![],
1704                        },
1705                    );
1706                }
1707                _ => panic!("invalid event received"),
1708            }
1709        }
1710
1711        let mut peers = match engine.next_action() {
1712            Some(QueryAction::AddProviderToFoundNodes {
1713                query,
1714                provided_key,
1715                provider,
1716                peers,
1717                quorum,
1718            }) => {
1719                assert_eq!(query, original_query_id);
1720                assert_eq!(provided_key, original_provided_key);
1721                assert_eq!(provider, local_content_provider);
1722                assert_eq!(peers.len(), 4);
1723                assert!(matches!(quorum, Quorum::All));
1724
1725                peers
1726            }
1727            _ => panic!("invalid event received"),
1728        };
1729
1730        engine.start_add_provider_to_found_nodes_requests_tracking(
1731            original_query_id,
1732            original_provided_key.clone(),
1733            peers.iter().map(|p| p.peer).collect(),
1734            Quorum::All,
1735        );
1736
1737        // sends to all but one peer succeed
1738        let last_peer = peers.pop().unwrap();
1739        for peer in peers {
1740            engine.register_send_success(original_query_id, peer.peer);
1741        }
1742        engine.register_send_failure(original_query_id, last_peer.peer);
1743
1744        match engine.next_action() {
1745            Some(QueryAction::QueryFailed { query }) => {
1746                assert_eq!(query, original_query_id);
1747            }
1748            _ => panic!("invalid event received"),
1749        }
1750
1751        assert!(engine.next_action().is_none());
1752    }
1753
1754    #[test]
1755    fn add_provider_succeeds() {
1756        let _ = tracing_subscriber::fmt()
1757            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1758            .try_init();
1759
1760        let local_peer_id = PeerId::random();
1761        let mut engine = QueryEngine::new(local_peer_id, 20usize, 3usize);
1762        let original_provided_key = RecordKey::new(&vec![1, 2, 3, 4]);
1763        let local_content_provider = ContentProvider {
1764            peer: local_peer_id,
1765            addresses: vec![],
1766        };
1767
1768        let target_key = Key::new(original_provided_key.clone());
1769        let distances = {
1770            let mut distances = std::collections::BTreeMap::new();
1771
1772            for i in 1..64 {
1773                let peer = make_peer_id(i, 0);
1774                let key = Key::from(peer);
1775
1776                distances.insert(target_key.distance(&key), peer);
1777            }
1778
1779            distances
1780        };
1781        let mut iter = distances.iter();
1782
1783        // start add provider with one known peer
1784        let add_query_id = QueryId(1340);
1785        let _query = engine.start_add_provider(
1786            add_query_id,
1787            original_provided_key.clone(),
1788            local_content_provider.clone(),
1789            vec![KademliaPeer::new(
1790                *iter.next().unwrap().1,
1791                vec![],
1792                ConnectionType::NotConnected,
1793            )]
1794            .into(),
1795            Quorum::All,
1796        );
1797
1798        let action = engine.next_action();
1799        assert!(engine.next_action().is_none());
1800
1801        // the one known peer responds with 3 other peers it knows
1802        match action {
1803            Some(QueryAction::SendMessage { query, peer, .. }) => {
1804                engine.register_response(
1805                    query,
1806                    peer,
1807                    KademliaMessage::FindNode {
1808                        target: Vec::new(),
1809                        peers: vec![
1810                            KademliaPeer::new(
1811                                *iter.next().unwrap().1,
1812                                vec![],
1813                                ConnectionType::NotConnected,
1814                            ),
1815                            KademliaPeer::new(
1816                                *iter.next().unwrap().1,
1817                                vec![],
1818                                ConnectionType::NotConnected,
1819                            ),
1820                            KademliaPeer::new(
1821                                *iter.next().unwrap().1,
1822                                vec![],
1823                                ConnectionType::NotConnected,
1824                            ),
1825                        ],
1826                    },
1827                );
1828            }
1829            _ => panic!("invalid event received"),
1830        }
1831
1832        // send empty response for the last three nodes
1833        for _ in 0..3 {
1834            match engine.next_action() {
1835                Some(QueryAction::SendMessage { query, peer, .. }) => {
1836                    println!("next send message to {peer:?}");
1837                    engine.register_response(
1838                        query,
1839                        peer,
1840                        KademliaMessage::FindNode {
1841                            target: Vec::new(),
1842                            peers: vec![],
1843                        },
1844                    );
1845                }
1846                _ => panic!("invalid event received"),
1847            }
1848        }
1849
1850        let peers = match engine.next_action() {
1851            Some(QueryAction::AddProviderToFoundNodes {
1852                query,
1853                provided_key,
1854                provider,
1855                peers,
1856                quorum,
1857            }) => {
1858                assert_eq!(query, add_query_id);
1859                assert_eq!(provided_key, original_provided_key);
1860                assert_eq!(provider, local_content_provider);
1861                assert_eq!(peers.len(), 4);
1862                assert!(matches!(quorum, Quorum::All));
1863
1864                peers
1865            }
1866            _ => panic!("invalid event received"),
1867        };
1868
1869        engine.start_add_provider_to_found_nodes_requests_tracking(
1870            add_query_id,
1871            original_provided_key.clone(),
1872            peers.iter().map(|p| p.peer).collect(),
1873            Quorum::All,
1874        );
1875
1876        // simulate successful sends to all peers
1877        for peer in &peers {
1878            engine.register_send_success(add_query_id, peer.peer);
1879        }
1880
1881        match engine.next_action() {
1882            Some(QueryAction::AddProviderQuerySucceeded {
1883                query,
1884                provided_key,
1885            }) => {
1886                assert_eq!(query, add_query_id);
1887                assert_eq!(provided_key, original_provided_key);
1888            }
1889            _ => panic!("invalid event received"),
1890        }
1891
1892        assert!(engine.next_action().is_none());
1893
1894        // get providers from those peers.
1895        let get_query_id = QueryId(1341);
1896        let _query = engine.start_get_providers(
1897            get_query_id,
1898            original_provided_key.clone(),
1899            vec![
1900                KademliaPeer::new(peers[0].peer, vec![], ConnectionType::NotConnected),
1901                KademliaPeer::new(peers[1].peer, vec![], ConnectionType::NotConnected),
1902                KademliaPeer::new(peers[2].peer, vec![], ConnectionType::NotConnected),
1903                KademliaPeer::new(peers[3].peer, vec![], ConnectionType::NotConnected),
1904            ]
1905            .into(),
1906            vec![],
1907        );
1908
1909        for _ in 0..4 {
1910            match engine.next_action() {
1911                Some(QueryAction::SendMessage { query, peer, .. }) => {
1912                    assert_eq!(query, get_query_id);
1913                    engine.register_response(
1914                        query,
1915                        peer,
1916                        KademliaMessage::GetProviders {
1917                            key: Some(original_provided_key.clone()),
1918                            peers: vec![],
1919                            providers: vec![local_content_provider.clone().into()],
1920                        },
1921                    );
1922                }
1923                event => panic!("invalid event received {:?}", event),
1924            }
1925        }
1926
1927        match engine.next_action() {
1928            Some(QueryAction::GetProvidersQueryDone {
1929                query_id,
1930                provided_key,
1931                providers,
1932            }) => {
1933                assert_eq!(query_id, get_query_id);
1934                assert_eq!(provided_key, original_provided_key);
1935                assert_eq!(providers, vec![local_content_provider]);
1936            }
1937            event => panic!("invalid event received {:?}", event),
1938        }
1939    }
1940
1941    #[test]
1942    fn add_provider_succeeds_with_quorum_one() {
1943        let _ = tracing_subscriber::fmt()
1944            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1945            .try_init();
1946
1947        let local_peer_id = PeerId::random();
1948        let mut engine = QueryEngine::new(local_peer_id, 20usize, 3usize);
1949        let original_provided_key = RecordKey::new(&vec![1, 2, 3, 4]);
1950        let local_content_provider = ContentProvider {
1951            peer: local_peer_id,
1952            addresses: vec![],
1953        };
1954
1955        let target_key = Key::new(original_provided_key.clone());
1956        let distances = {
1957            let mut distances = std::collections::BTreeMap::new();
1958
1959            for i in 1..64 {
1960                let peer = make_peer_id(i, 0);
1961                let key = Key::from(peer);
1962
1963                distances.insert(target_key.distance(&key), peer);
1964            }
1965
1966            distances
1967        };
1968        let mut iter = distances.iter();
1969
1970        // start add provider with one known peer
1971        let add_query_id = QueryId(1340);
1972        let _query = engine.start_add_provider(
1973            add_query_id,
1974            original_provided_key.clone(),
1975            local_content_provider.clone(),
1976            vec![KademliaPeer::new(
1977                *iter.next().unwrap().1,
1978                vec![],
1979                ConnectionType::NotConnected,
1980            )]
1981            .into(),
1982            Quorum::One,
1983        );
1984
1985        let action = engine.next_action();
1986        assert!(engine.next_action().is_none());
1987
1988        // the one known peer responds with 3 other peers it knows
1989        match action {
1990            Some(QueryAction::SendMessage { query, peer, .. }) => {
1991                engine.register_response(
1992                    query,
1993                    peer,
1994                    KademliaMessage::FindNode {
1995                        target: Vec::new(),
1996                        peers: vec![
1997                            KademliaPeer::new(
1998                                *iter.next().unwrap().1,
1999                                vec![],
2000                                ConnectionType::NotConnected,
2001                            ),
2002                            KademliaPeer::new(
2003                                *iter.next().unwrap().1,
2004                                vec![],
2005                                ConnectionType::NotConnected,
2006                            ),
2007                            KademliaPeer::new(
2008                                *iter.next().unwrap().1,
2009                                vec![],
2010                                ConnectionType::NotConnected,
2011                            ),
2012                        ],
2013                    },
2014                );
2015            }
2016            _ => panic!("invalid event received"),
2017        }
2018
2019        // send empty response for the last three nodes
2020        for _ in 0..3 {
2021            match engine.next_action() {
2022                Some(QueryAction::SendMessage { query, peer, .. }) => {
2023                    println!("next send message to {peer:?}");
2024                    engine.register_response(
2025                        query,
2026                        peer,
2027                        KademliaMessage::FindNode {
2028                            target: Vec::new(),
2029                            peers: vec![],
2030                        },
2031                    );
2032                }
2033                _ => panic!("invalid event received"),
2034            }
2035        }
2036
2037        let peers = match engine.next_action() {
2038            Some(QueryAction::AddProviderToFoundNodes {
2039                query,
2040                provided_key,
2041                provider,
2042                peers,
2043                quorum,
2044            }) => {
2045                assert_eq!(query, add_query_id);
2046                assert_eq!(provided_key, original_provided_key);
2047                assert_eq!(provider, local_content_provider);
2048                assert_eq!(peers.len(), 4);
2049                assert!(matches!(quorum, Quorum::One));
2050
2051                peers
2052            }
2053            _ => panic!("invalid event received"),
2054        };
2055
2056        engine.start_add_provider_to_found_nodes_requests_tracking(
2057            add_query_id,
2058            original_provided_key.clone(),
2059            peers.iter().map(|p| p.peer).collect(),
2060            Quorum::One,
2061        );
2062
2063        // all but one peer fail
2064        assert!(peers.len() > 1);
2065        engine.register_send_success(add_query_id, peers.first().unwrap().peer);
2066        for peer in peers.iter().skip(1) {
2067            engine.register_send_failure(add_query_id, peer.peer);
2068        }
2069
2070        match engine.next_action() {
2071            Some(QueryAction::AddProviderQuerySucceeded {
2072                query,
2073                provided_key,
2074            }) => {
2075                assert_eq!(query, add_query_id);
2076                assert_eq!(provided_key, original_provided_key);
2077            }
2078            _ => panic!("invalid event received"),
2079        }
2080
2081        assert!(engine.next_action().is_none());
2082
2083        // get providers from those peers.
2084        let get_query_id = QueryId(1341);
2085        let _query = engine.start_get_providers(
2086            get_query_id,
2087            original_provided_key.clone(),
2088            vec![
2089                KademliaPeer::new(peers[0].peer, vec![], ConnectionType::NotConnected),
2090                KademliaPeer::new(peers[1].peer, vec![], ConnectionType::NotConnected),
2091                KademliaPeer::new(peers[2].peer, vec![], ConnectionType::NotConnected),
2092                KademliaPeer::new(peers[3].peer, vec![], ConnectionType::NotConnected),
2093            ]
2094            .into(),
2095            vec![],
2096        );
2097
2098        // first peer responds with the provider
2099        match engine.next_action() {
2100            Some(QueryAction::SendMessage { query, peer, .. }) => {
2101                assert_eq!(query, get_query_id);
2102                engine.register_response(
2103                    query,
2104                    peer,
2105                    KademliaMessage::GetProviders {
2106                        key: Some(original_provided_key.clone()),
2107                        peers: vec![],
2108                        providers: vec![local_content_provider.clone().into()],
2109                    },
2110                );
2111            }
2112            event => panic!("invalid event received {:?}", event),
2113        }
2114
2115        // other peers respond with no providers
2116        for _ in 1..4 {
2117            match engine.next_action() {
2118                Some(QueryAction::SendMessage { query, peer, .. }) => {
2119                    assert_eq!(query, get_query_id);
2120                    engine.register_response(
2121                        query,
2122                        peer,
2123                        KademliaMessage::GetProviders {
2124                            key: Some(original_provided_key.clone()),
2125                            peers: vec![],
2126                            providers: vec![],
2127                        },
2128                    );
2129                }
2130                event => panic!("invalid event received {:?}", event),
2131            }
2132        }
2133
2134        match engine.next_action() {
2135            Some(QueryAction::GetProvidersQueryDone {
2136                query_id,
2137                provided_key,
2138                providers,
2139            }) => {
2140                assert_eq!(query_id, get_query_id);
2141                assert_eq!(provided_key, original_provided_key);
2142                assert_eq!(providers, vec![local_content_provider]);
2143            }
2144            event => panic!("invalid event received {:?}", event),
2145        }
2146    }
2147}