1use crate::{
22 protocol::libp2p::kademlia::{
23 message::KademliaMessage,
24 query::{
25 find_node::{FindNodeConfig, FindNodeContext},
26 get_providers::{GetProvidersConfig, GetProvidersContext},
27 get_record::{GetRecordConfig, GetRecordContext},
28 },
29 record::{ContentProvider, Key as RecordKey, Record},
30 types::{KademliaPeer, Key},
31 PeerRecord, Quorum,
32 },
33 PeerId,
34};
35
36use bytes::Bytes;
37
38use std::collections::{HashMap, VecDeque};
39
40use self::{find_many_nodes::FindManyNodesContext, target_peers::PutToTargetPeersContext};
41
42mod find_many_nodes;
43mod find_node;
44mod get_providers;
45mod get_record;
46mod target_peers;
47
48const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query";
50
51#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
53#[cfg_attr(feature = "fuzz", derive(serde::Serialize, serde::Deserialize))]
54pub struct QueryId(pub usize);
55
56#[derive(Debug)]
58enum QueryType {
59 FindNode {
61 context: FindNodeContext<PeerId>,
63 },
64
65 PutRecord {
67 record: Record,
69
70 quorum: Quorum,
72
73 context: FindNodeContext<RecordKey>,
75 },
76
77 PutRecordToPeers {
79 record: Record,
81
82 quorum: Quorum,
84
85 context: FindManyNodesContext,
87 },
88
89 PutRecordToFoundNodes {
91 context: PutToTargetPeersContext,
93 },
94
95 GetRecord {
97 context: GetRecordContext,
99 },
100
101 AddProvider {
103 provided_key: RecordKey,
105
106 provider: ContentProvider,
108
109 quorum: Quorum,
111
112 context: FindNodeContext<RecordKey>,
114 },
115
116 AddProviderToFoundNodes {
118 context: PutToTargetPeersContext,
120 },
121
122 GetProviders {
124 context: GetProvidersContext,
126 },
127}
128
129#[derive(Debug)]
131pub enum QueryAction {
132 SendMessage {
134 query: QueryId,
136
137 peer: PeerId,
139
140 message: Bytes,
142 },
143
144 FindNodeQuerySucceeded {
146 query: QueryId,
148
149 target: PeerId,
151
152 peers: Vec<KademliaPeer>,
154 },
155
156 PutRecordToFoundNodes {
158 query: QueryId,
160
161 record: Record,
163
164 peers: Vec<KademliaPeer>,
166
167 quorum: Quorum,
169 },
170
171 PutRecordQuerySucceeded {
173 query: QueryId,
175
176 key: RecordKey,
178 },
179
180 AddProviderToFoundNodes {
182 query: QueryId,
184
185 provided_key: RecordKey,
187
188 provider: ContentProvider,
190
191 peers: Vec<KademliaPeer>,
193
194 quorum: Quorum,
196 },
197
198 AddProviderQuerySucceeded {
200 query: QueryId,
202
203 provided_key: RecordKey,
205 },
206
207 GetRecordQueryDone {
209 query_id: QueryId,
211 },
212
213 GetRecordPartialResult {
217 query_id: QueryId,
219
220 record: PeerRecord,
222 },
223
224 GetProvidersQueryDone {
226 query_id: QueryId,
228
229 provided_key: RecordKey,
231
232 providers: Vec<ContentProvider>,
234 },
235
236 QuerySucceeded {
238 query: QueryId,
240 },
241
242 QueryFailed {
244 query: QueryId,
246 },
247}
248
249pub struct QueryEngine {
251 local_peer_id: PeerId,
253
254 replication_factor: usize,
256
257 parallelism_factor: usize,
259
260 queries: HashMap<QueryId, QueryType>,
262}
263
264impl QueryEngine {
265 pub fn new(
267 local_peer_id: PeerId,
268 replication_factor: usize,
269 parallelism_factor: usize,
270 ) -> Self {
271 Self {
272 local_peer_id,
273 replication_factor,
274 parallelism_factor,
275 queries: HashMap::new(),
276 }
277 }
278
279 pub fn start_find_node(
281 &mut self,
282 query_id: QueryId,
283 target: PeerId,
284 candidates: VecDeque<KademliaPeer>,
285 ) -> QueryId {
286 tracing::debug!(
287 target: LOG_TARGET,
288 ?query_id,
289 ?target,
290 num_peers = ?candidates.len(),
291 "start `FIND_NODE` query"
292 );
293
294 let target = Key::from(target);
295 let config = FindNodeConfig {
296 local_peer_id: self.local_peer_id,
297 replication_factor: self.replication_factor,
298 parallelism_factor: self.parallelism_factor,
299 query: query_id,
300 target,
301 };
302
303 self.queries.insert(
304 query_id,
305 QueryType::FindNode {
306 context: FindNodeContext::new(config, candidates),
307 },
308 );
309
310 query_id
311 }
312
313 pub fn start_put_record(
315 &mut self,
316 query_id: QueryId,
317 record: Record,
318 candidates: VecDeque<KademliaPeer>,
319 quorum: Quorum,
320 ) -> QueryId {
321 tracing::debug!(
322 target: LOG_TARGET,
323 ?query_id,
324 target = ?record.key,
325 num_peers = ?candidates.len(),
326 "start `PUT_VALUE` query"
327 );
328
329 let target = Key::new(record.key.clone());
330 let config = FindNodeConfig {
331 local_peer_id: self.local_peer_id,
332 replication_factor: self.replication_factor,
333 parallelism_factor: self.parallelism_factor,
334 query: query_id,
335 target,
336 };
337
338 self.queries.insert(
339 query_id,
340 QueryType::PutRecord {
341 record,
342 quorum,
343 context: FindNodeContext::new(config, candidates),
344 },
345 );
346
347 query_id
348 }
349
350 pub fn start_put_record_to_peers(
352 &mut self,
353 query_id: QueryId,
354 record: Record,
355 peers_to_report: Vec<KademliaPeer>,
356 quorum: Quorum,
357 ) -> QueryId {
358 tracing::debug!(
359 target: LOG_TARGET,
360 ?query_id,
361 target = ?record.key,
362 num_peers = ?peers_to_report.len(),
363 "start `PUT_VALUE` query to peers"
364 );
365
366 self.queries.insert(
367 query_id,
368 QueryType::PutRecordToPeers {
369 record,
370 quorum,
371 context: FindManyNodesContext::new(query_id, peers_to_report),
372 },
373 );
374
375 query_id
376 }
377
378 pub fn start_get_record(
380 &mut self,
381 query_id: QueryId,
382 target: RecordKey,
383 candidates: VecDeque<KademliaPeer>,
384 quorum: Quorum,
385 local_record: bool,
386 ) -> QueryId {
387 tracing::debug!(
388 target: LOG_TARGET,
389 ?query_id,
390 ?target,
391 num_peers = ?candidates.len(),
392 "start `GET_VALUE` query"
393 );
394
395 let target = Key::new(target);
396 let config = GetRecordConfig {
397 local_peer_id: self.local_peer_id,
398 known_records: if local_record { 1 } else { 0 },
399 quorum,
400 replication_factor: self.replication_factor,
401 parallelism_factor: self.parallelism_factor,
402 query: query_id,
403 target,
404 };
405
406 self.queries.insert(
407 query_id,
408 QueryType::GetRecord {
409 context: GetRecordContext::new(config, candidates, local_record),
410 },
411 );
412
413 query_id
414 }
415
416 pub fn start_add_provider(
418 &mut self,
419 query_id: QueryId,
420 provided_key: RecordKey,
421 provider: ContentProvider,
422 candidates: VecDeque<KademliaPeer>,
423 quorum: Quorum,
424 ) -> QueryId {
425 tracing::debug!(
426 target: LOG_TARGET,
427 ?query_id,
428 ?provider,
429 num_peers = ?candidates.len(),
430 "start `ADD_PROVIDER` query",
431 );
432
433 let config = FindNodeConfig {
434 local_peer_id: self.local_peer_id,
435 replication_factor: self.replication_factor,
436 parallelism_factor: self.parallelism_factor,
437 query: query_id,
438 target: Key::new(provided_key.clone()),
439 };
440
441 self.queries.insert(
442 query_id,
443 QueryType::AddProvider {
444 provided_key,
445 provider,
446 quorum,
447 context: FindNodeContext::new(config, candidates),
448 },
449 );
450
451 query_id
452 }
453
454 pub fn start_get_providers(
456 &mut self,
457 query_id: QueryId,
458 key: RecordKey,
459 candidates: VecDeque<KademliaPeer>,
460 known_providers: Vec<ContentProvider>,
461 ) -> QueryId {
462 tracing::debug!(
463 target: LOG_TARGET,
464 ?query_id,
465 ?key,
466 num_peers = ?candidates.len(),
467 "start `GET_PROVIDERS` query",
468 );
469
470 let target = Key::new(key);
471 let config = GetProvidersConfig {
472 local_peer_id: self.local_peer_id,
473 parallelism_factor: self.parallelism_factor,
474 query: query_id,
475 target,
476 known_providers: known_providers.into_iter().map(Into::into).collect(),
477 };
478
479 self.queries.insert(
480 query_id,
481 QueryType::GetProviders {
482 context: GetProvidersContext::new(config, candidates),
483 },
484 );
485
486 query_id
487 }
488
489 pub fn start_put_record_to_found_nodes_requests_tracking(
491 &mut self,
492 query_id: QueryId,
493 key: RecordKey,
494 peers: Vec<PeerId>,
495 quorum: Quorum,
496 ) {
497 tracing::debug!(
498 target: LOG_TARGET,
499 ?query_id,
500 num_peers = ?peers.len(),
501 "start `PUT_VALUE` responses tracking"
502 );
503
504 self.queries.insert(
505 query_id,
506 QueryType::PutRecordToFoundNodes {
507 context: PutToTargetPeersContext::new(query_id, key, peers, quorum),
508 },
509 );
510 }
511
512 pub fn start_add_provider_to_found_nodes_requests_tracking(
514 &mut self,
515 query_id: QueryId,
516 provided_key: RecordKey,
517 peers: Vec<PeerId>,
518 quorum: Quorum,
519 ) {
520 tracing::debug!(
521 target: LOG_TARGET,
522 ?query_id,
523 num_peers = ?peers.len(),
524 "start `ADD_PROVIDER` progress tracking"
525 );
526
527 self.queries.insert(
528 query_id,
529 QueryType::AddProviderToFoundNodes {
530 context: PutToTargetPeersContext::new(query_id, provided_key, peers, quorum),
531 },
532 );
533 }
534
535 pub fn register_response_failure(&mut self, query: QueryId, peer: PeerId) {
537 tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register response failure");
538
539 match self.queries.get_mut(&query) {
540 None => {
541 tracing::trace!(target: LOG_TARGET, ?query, ?peer, "response failure for a stale query");
542 }
543 Some(QueryType::FindNode { context }) => {
544 context.register_response_failure(peer);
545 }
546 Some(QueryType::PutRecord { context, .. }) => {
547 context.register_response_failure(peer);
548 }
549 Some(QueryType::PutRecordToPeers { context, .. }) => {
550 context.register_response_failure(peer);
551 }
552 Some(QueryType::PutRecordToFoundNodes { context }) => {
553 context.register_response_failure(peer);
554 }
555 Some(QueryType::GetRecord { context }) => {
556 context.register_response_failure(peer);
557 }
558 Some(QueryType::AddProvider { context, .. }) => {
559 context.register_response_failure(peer);
560 }
561 Some(QueryType::AddProviderToFoundNodes { context }) => {
562 context.register_response_failure(peer);
563 }
564 Some(QueryType::GetProviders { context }) => {
565 context.register_response_failure(peer);
566 }
567 }
568 }
569
570 pub fn register_response(&mut self, query: QueryId, peer: PeerId, message: KademliaMessage) {
572 tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register response");
573
574 match self.queries.get_mut(&query) {
575 None => {
576 tracing::trace!(target: LOG_TARGET, ?query, ?peer, "response for a stale query");
577 }
578 Some(QueryType::FindNode { context }) => match message {
579 KademliaMessage::FindNode { peers, .. } => {
580 context.register_response(peer, peers);
581 }
582 message => {
583 tracing::debug!(
584 target: LOG_TARGET,
585 ?query,
586 ?peer,
587 "unexpected response to `FIND_NODE`: {message}",
588 );
589 context.register_response_failure(peer);
590 }
591 },
592 Some(QueryType::PutRecord { context, .. }) => match message {
593 KademliaMessage::FindNode { peers, .. } => {
594 context.register_response(peer, peers);
595 }
596 message => {
597 tracing::debug!(
598 target: LOG_TARGET,
599 ?query,
600 ?peer,
601 "unexpected response to `FIND_NODE` during `PUT_VALUE` query: {message}",
602 );
603 context.register_response_failure(peer);
604 }
605 },
606 Some(QueryType::PutRecordToPeers { context, .. }) => match message {
607 KademliaMessage::FindNode { peers, .. } => {
608 context.register_response(peer, peers);
609 }
610 message => {
611 tracing::debug!(
612 target: LOG_TARGET,
613 ?query,
614 ?peer,
615 "unexpected response to `FIND_NODE` during `PUT_VALUE` (to peers): {message}",
616 );
617 context.register_response_failure(peer);
618 }
619 },
620 Some(QueryType::PutRecordToFoundNodes { context }) => match message {
621 KademliaMessage::PutValue { .. } => {
622 context.register_response(peer);
623 }
624 message => {
625 tracing::debug!(
626 target: LOG_TARGET,
627 ?query,
628 ?peer,
629 "unexpected response to `PUT_VALUE`: {message}",
630 );
631 context.register_response_failure(peer);
632 }
633 },
634 Some(QueryType::GetRecord { context }) => match message {
635 KademliaMessage::GetRecord { record, peers, .. } =>
636 context.register_response(peer, record, peers),
637 message => {
638 tracing::debug!(
639 target: LOG_TARGET,
640 ?query,
641 ?peer,
642 "unexpected response to `GET_VALUE`: {message}",
643 );
644 context.register_response_failure(peer);
645 }
646 },
647 Some(QueryType::AddProvider { context, .. }) => match message {
648 KademliaMessage::FindNode { peers, .. } => {
649 context.register_response(peer, peers);
650 }
651 message => {
652 tracing::debug!(
653 target: LOG_TARGET,
654 ?query,
655 ?peer,
656 "unexpected response to `FIND_NODE` during `ADD_PROVIDER` query: {message}",
657 );
658 context.register_response_failure(peer);
659 }
660 },
661 Some(QueryType::AddProviderToFoundNodes { context, .. }) => match message {
662 KademliaMessage::AddProvider { .. } => {
663 context.register_response(peer);
664 }
665 message => {
666 tracing::debug!(
667 target: LOG_TARGET,
668 ?query,
669 ?peer,
670 "unexpected response to `ADD_PROVIDER`: {message}",
671 );
672 context.register_response_failure(peer);
673 }
674 },
675 Some(QueryType::GetProviders { context }) => match message {
676 KademliaMessage::GetProviders {
677 key: _,
678 providers,
679 peers,
680 } => {
681 context.register_response(peer, providers, peers);
682 }
683 message => {
684 tracing::debug!(
685 target: LOG_TARGET,
686 ?query,
687 ?peer,
688 "unexpected response to `GET_PROVIDERS`: {message}",
689 );
690 context.register_response_failure(peer);
691 }
692 },
693 }
694 }
695
696 pub fn register_send_failure(&mut self, query: QueryId, peer: PeerId) {
697 tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register send failure");
698
699 match self.queries.get_mut(&query) {
700 None => {
701 tracing::trace!(target: LOG_TARGET, ?query, ?peer, "send failure for a stale query");
702 }
703 Some(QueryType::FindNode { context }) => {
704 context.register_send_failure(peer);
705 }
706 Some(QueryType::PutRecord { context, .. }) => {
707 context.register_send_failure(peer);
708 }
709 Some(QueryType::PutRecordToPeers { context, .. }) => {
710 context.register_send_failure(peer);
711 }
712 Some(QueryType::PutRecordToFoundNodes { context }) => {
713 context.register_send_failure(peer);
714 }
715 Some(QueryType::GetRecord { context }) => {
716 context.register_send_failure(peer);
717 }
718 Some(QueryType::AddProvider { context, .. }) => {
719 context.register_send_failure(peer);
720 }
721 Some(QueryType::AddProviderToFoundNodes { context }) => {
722 context.register_send_failure(peer);
723 }
724 Some(QueryType::GetProviders { context }) => {
725 context.register_send_failure(peer);
726 }
727 }
728 }
729
730 pub fn register_send_success(&mut self, query: QueryId, peer: PeerId) {
731 tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register send success");
732
733 match self.queries.get_mut(&query) {
734 None => {
735 tracing::trace!(target: LOG_TARGET, ?query, ?peer, "send success for a stale query");
736 }
737 Some(QueryType::FindNode { context }) => {
738 context.register_send_success(peer);
739 }
740 Some(QueryType::PutRecord { context, .. }) => {
741 context.register_send_success(peer);
742 }
743 Some(QueryType::PutRecordToPeers { context, .. }) => {
744 context.register_send_success(peer);
745 }
746 Some(QueryType::PutRecordToFoundNodes { context, .. }) => {
747 context.register_send_success(peer);
748 }
749 Some(QueryType::GetRecord { context }) => {
750 context.register_send_success(peer);
751 }
752 Some(QueryType::AddProvider { context, .. }) => {
753 context.register_send_success(peer);
754 }
755 Some(QueryType::AddProviderToFoundNodes { context, .. }) => {
756 context.register_send_success(peer);
757 }
758 Some(QueryType::GetProviders { context }) => {
759 context.register_send_success(peer);
760 }
761 }
762 }
763
764 pub fn register_peer_failure(&mut self, query: QueryId, peer: PeerId) {
767 tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register peer failure");
768
769 self.register_send_failure(query, peer);
774 self.register_response_failure(query, peer);
775 }
776
777 pub fn next_peer_action(&mut self, query: &QueryId, peer: &PeerId) -> Option<QueryAction> {
779 tracing::trace!(target: LOG_TARGET, ?query, ?peer, "get next peer action");
780
781 match self.queries.get_mut(query) {
782 None => {
783 tracing::trace!(target: LOG_TARGET, ?query, ?peer, "response failure for a stale query");
784 None
785 }
786 Some(QueryType::FindNode { context }) => context.next_peer_action(peer),
787 Some(QueryType::PutRecord { context, .. }) => context.next_peer_action(peer),
788 Some(QueryType::PutRecordToPeers { context, .. }) => context.next_peer_action(peer),
789 Some(QueryType::GetRecord { context }) => context.next_peer_action(peer),
790 Some(QueryType::AddProvider { context, .. }) => context.next_peer_action(peer),
791 Some(QueryType::GetProviders { context }) => context.next_peer_action(peer),
792 Some(QueryType::PutRecordToFoundNodes { .. }) => {
793 None
795 }
796 Some(QueryType::AddProviderToFoundNodes { .. }) => {
797 None
799 }
800 }
801 }
802
803 fn on_query_succeeded(&mut self, query: QueryId) -> QueryAction {
806 match self.queries.remove(&query).expect("query to exist") {
807 QueryType::FindNode { context } => QueryAction::FindNodeQuerySucceeded {
808 query,
809 target: context.config.target.into_preimage(),
810 peers: context.responses.into_values().collect::<Vec<_>>(),
811 },
812 QueryType::PutRecord {
813 record,
814 quorum,
815 context,
816 } => QueryAction::PutRecordToFoundNodes {
817 query: context.config.query,
818 record,
819 peers: context.responses.into_values().collect::<Vec<_>>(),
820 quorum,
821 },
822 QueryType::PutRecordToPeers {
823 record,
824 quorum,
825 context,
826 } => QueryAction::PutRecordToFoundNodes {
827 query: context.query,
828 record,
829 peers: context.peers_to_report,
830 quorum,
831 },
832 QueryType::PutRecordToFoundNodes { context } => QueryAction::PutRecordQuerySucceeded {
833 query: context.query,
834 key: context.key,
835 },
836 QueryType::GetRecord { context } => QueryAction::GetRecordQueryDone {
837 query_id: context.config.query,
838 },
839 QueryType::AddProvider {
840 provided_key,
841 provider,
842 quorum,
843 context,
844 } => QueryAction::AddProviderToFoundNodes {
845 query: context.config.query,
846 provided_key,
847 provider,
848 peers: context.responses.into_values().collect::<Vec<_>>(),
849 quorum,
850 },
851 QueryType::AddProviderToFoundNodes { context } =>
852 QueryAction::AddProviderQuerySucceeded {
853 query: context.query,
854 provided_key: context.key,
855 },
856 QueryType::GetProviders { context } => QueryAction::GetProvidersQueryDone {
857 query_id: context.config.query,
858 provided_key: context.config.target.clone().into_preimage(),
859 providers: context.found_providers(),
860 },
861 }
862 }
863
864 fn on_query_failed(&mut self, query: QueryId) -> QueryAction {
867 let _ = self.queries.remove(&query).expect("query to exist");
868
869 QueryAction::QueryFailed { query }
870 }
871
872 pub fn next_action(&mut self) -> Option<QueryAction> {
874 for (_, state) in self.queries.iter_mut() {
875 let action = match state {
876 QueryType::FindNode { context } => context.next_action(),
877 QueryType::PutRecord { context, .. } => context.next_action(),
878 QueryType::PutRecordToPeers { context, .. } => context.next_action(),
879 QueryType::GetRecord { context } => context.next_action(),
880 QueryType::AddProvider { context, .. } => context.next_action(),
881 QueryType::GetProviders { context } => context.next_action(),
882 QueryType::PutRecordToFoundNodes { context, .. } => context.next_action(),
883 QueryType::AddProviderToFoundNodes { context, .. } => context.next_action(),
884 };
885
886 match action {
887 Some(QueryAction::QuerySucceeded { query }) => {
888 return Some(self.on_query_succeeded(query));
889 }
890 Some(QueryAction::QueryFailed { query }) =>
891 return Some(self.on_query_failed(query)),
892 Some(_) => return action,
893 _ => continue,
894 }
895 }
896
897 None
898 }
899}
900
901#[cfg(test)]
902mod tests {
903 use multihash::Multihash;
904
905 use super::*;
906 use crate::{
907 peer_id::MULTIHASH_IDENTITY_CODE, protocol::libp2p::kademlia::types::ConnectionType,
908 };
909
910 fn make_peer_id(first: u8, second: u8) -> PeerId {
912 let mut peer_id = vec![0u8; 32];
913 peer_id[0] = first;
914 peer_id[1] = second;
915
916 PeerId::from_bytes(
917 &Multihash::<64>::wrap(MULTIHASH_IDENTITY_CODE, &peer_id)
918 .expect("The digest size is never too large")
919 .to_bytes(),
920 )
921 .unwrap()
922 }
923
924 #[test]
925 fn find_node_query_fails() {
926 let _ = tracing_subscriber::fmt()
927 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
928 .try_init();
929
930 let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
931 let target_peer = PeerId::random();
932 let _target_key = Key::from(target_peer);
933
934 let query = engine.start_find_node(
935 QueryId(1337),
936 target_peer,
937 vec![
938 KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
939 KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
940 KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
941 KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
942 ]
943 .into(),
944 );
945
946 for _ in 0..4 {
947 if let Some(QueryAction::SendMessage { query, peer, .. }) = engine.next_action() {
948 engine.register_response_failure(query, peer);
949 }
950 }
951
952 if let Some(QueryAction::QueryFailed { query: failed }) = engine.next_action() {
953 assert_eq!(failed, query);
954 }
955
956 assert!(engine.next_action().is_none());
957 }
958
959 #[test]
960 fn find_node_lookup_paused() {
961 let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
962 let target_peer = PeerId::random();
963 let _target_key = Key::from(target_peer);
964
965 let _ = engine.start_find_node(
966 QueryId(1338),
967 target_peer,
968 vec![
969 KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
970 KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
971 KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
972 KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
973 ]
974 .into(),
975 );
976
977 for _ in 0..3 {
978 let _ = engine.next_action();
979 }
980
981 assert!(engine.next_action().is_none());
982 }
983
984 #[test]
985 fn find_node_query_succeeds() {
986 let _ = tracing_subscriber::fmt()
987 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
988 .try_init();
989
990 let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
991 let target_peer = make_peer_id(0, 0);
992 let target_key = Key::from(target_peer);
993
994 let distances = {
995 let mut distances = std::collections::BTreeMap::new();
996
997 for i in 1..64 {
998 let peer = make_peer_id(i, 0);
999 let key = Key::from(peer);
1000
1001 distances.insert(target_key.distance(&key), peer);
1002 }
1003
1004 distances
1005 };
1006 let mut iter = distances.iter();
1007
1008 let _query = engine.start_find_node(
1010 QueryId(1339),
1011 target_peer,
1012 vec![KademliaPeer::new(
1013 *iter.next().unwrap().1,
1014 vec![],
1015 ConnectionType::NotConnected,
1016 )]
1017 .into(),
1018 );
1019
1020 let action = engine.next_action();
1021 assert!(engine.next_action().is_none());
1022
1023 match action {
1025 Some(QueryAction::SendMessage { query, peer, .. }) => {
1026 engine.register_response(
1027 query,
1028 peer,
1029 KademliaMessage::FindNode {
1030 target: Vec::new(),
1031 peers: vec![
1032 KademliaPeer::new(
1033 *iter.next().unwrap().1,
1034 vec![],
1035 ConnectionType::NotConnected,
1036 ),
1037 KademliaPeer::new(
1038 *iter.next().unwrap().1,
1039 vec![],
1040 ConnectionType::NotConnected,
1041 ),
1042 KademliaPeer::new(
1043 *iter.next().unwrap().1,
1044 vec![],
1045 ConnectionType::NotConnected,
1046 ),
1047 ],
1048 },
1049 );
1050 }
1051 _ => panic!("invalid event received"),
1052 }
1053
1054 for _ in 0..3 {
1056 match engine.next_action() {
1057 Some(QueryAction::SendMessage { query, peer, .. }) => {
1058 println!("next send message to {peer:?}");
1059 engine.register_response(
1060 query,
1061 peer,
1062 KademliaMessage::FindNode {
1063 target: Vec::new(),
1064 peers: vec![],
1065 },
1066 );
1067 }
1068 _ => panic!("invalid event received"),
1069 }
1070 }
1071
1072 match engine.next_action() {
1073 Some(QueryAction::FindNodeQuerySucceeded { peers, .. }) => {
1074 assert_eq!(peers.len(), 4);
1075 }
1076 _ => panic!("invalid event received"),
1077 }
1078
1079 assert!(engine.next_action().is_none());
1080 }
1081
1082 #[test]
1083 fn put_record_fails() {
1084 let _ = tracing_subscriber::fmt()
1085 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1086 .try_init();
1087
1088 let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
1089 let record_key = RecordKey::new(&vec![1, 2, 3, 4]);
1090 let target_key = Key::new(record_key.clone());
1091 let original_record = Record::new(record_key.clone(), vec![1, 3, 3, 7, 1, 3, 3, 8]);
1092
1093 let distances = {
1094 let mut distances = std::collections::BTreeMap::new();
1095
1096 for i in 1..64 {
1097 let peer = make_peer_id(i, 0);
1098 let key = Key::from(peer);
1099
1100 distances.insert(target_key.distance(&key), peer);
1101 }
1102
1103 distances
1104 };
1105 let mut iter = distances.iter();
1106
1107 let original_query_id = QueryId(1340);
1109 let _query = engine.start_put_record(
1110 original_query_id,
1111 original_record.clone(),
1112 vec![KademliaPeer::new(
1113 *iter.next().unwrap().1,
1114 vec![],
1115 ConnectionType::NotConnected,
1116 )]
1117 .into(),
1118 Quorum::All,
1119 );
1120
1121 let action = engine.next_action();
1122 assert!(engine.next_action().is_none());
1123
1124 match action {
1126 Some(QueryAction::SendMessage { query, peer, .. }) => {
1127 engine.register_response(
1128 query,
1129 peer,
1130 KademliaMessage::FindNode {
1131 target: Vec::new(),
1132 peers: vec![
1133 KademliaPeer::new(
1134 *iter.next().unwrap().1,
1135 vec![],
1136 ConnectionType::NotConnected,
1137 ),
1138 KademliaPeer::new(
1139 *iter.next().unwrap().1,
1140 vec![],
1141 ConnectionType::NotConnected,
1142 ),
1143 KademliaPeer::new(
1144 *iter.next().unwrap().1,
1145 vec![],
1146 ConnectionType::NotConnected,
1147 ),
1148 ],
1149 },
1150 );
1151 }
1152 _ => panic!("invalid event received"),
1153 }
1154
1155 for _ in 0..3 {
1157 match engine.next_action() {
1158 Some(QueryAction::SendMessage { query, peer, .. }) => {
1159 println!("next send message to {peer:?}");
1160 engine.register_response(
1161 query,
1162 peer,
1163 KademliaMessage::FindNode {
1164 target: Vec::new(),
1165 peers: vec![],
1166 },
1167 );
1168 }
1169 _ => panic!("invalid event received"),
1170 }
1171 }
1172
1173 let mut peers = match engine.next_action() {
1174 Some(QueryAction::PutRecordToFoundNodes {
1175 query,
1176 peers,
1177 record,
1178 quorum,
1179 }) => {
1180 assert_eq!(query, original_query_id);
1181 assert_eq!(peers.len(), 4);
1182 assert_eq!(record.key, original_record.key);
1183 assert_eq!(record.value, original_record.value);
1184 assert!(matches!(quorum, Quorum::All));
1185
1186 peers
1187 }
1188 _ => panic!("invalid event received"),
1189 };
1190
1191 engine.start_put_record_to_found_nodes_requests_tracking(
1192 original_query_id,
1193 record_key.clone(),
1194 peers.iter().map(|p| p.peer).collect(),
1195 Quorum::All,
1196 );
1197
1198 let last_peer = peers.pop().unwrap();
1200 for peer in peers {
1201 engine.register_send_success(original_query_id, peer.peer);
1202 }
1203 engine.register_send_failure(original_query_id, last_peer.peer);
1204
1205 match engine.next_action() {
1206 Some(QueryAction::QueryFailed { query }) => {
1207 assert_eq!(query, original_query_id);
1208 }
1209 _ => panic!("invalid event received"),
1210 }
1211
1212 assert!(engine.next_action().is_none());
1213 }
1214
1215 #[test]
1216 fn put_record_succeeds() {
1217 let _ = tracing_subscriber::fmt()
1218 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1219 .try_init();
1220
1221 let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
1222 let record_key = RecordKey::new(&vec![1, 2, 3, 4]);
1223 let target_key = Key::new(record_key.clone());
1224 let original_record = Record::new(record_key.clone(), vec![1, 3, 3, 7, 1, 3, 3, 8]);
1225
1226 let distances = {
1227 let mut distances = std::collections::BTreeMap::new();
1228
1229 for i in 1..64 {
1230 let peer = make_peer_id(i, 0);
1231 let key = Key::from(peer);
1232
1233 distances.insert(target_key.distance(&key), peer);
1234 }
1235
1236 distances
1237 };
1238 let mut iter = distances.iter();
1239
1240 let original_query_id = QueryId(1340);
1242 let _query = engine.start_put_record(
1243 original_query_id,
1244 original_record.clone(),
1245 vec![KademliaPeer::new(
1246 *iter.next().unwrap().1,
1247 vec![],
1248 ConnectionType::NotConnected,
1249 )]
1250 .into(),
1251 Quorum::All,
1252 );
1253
1254 let action = engine.next_action();
1255 assert!(engine.next_action().is_none());
1256
1257 match action {
1259 Some(QueryAction::SendMessage { query, peer, .. }) => {
1260 engine.register_response(
1261 query,
1262 peer,
1263 KademliaMessage::FindNode {
1264 target: Vec::new(),
1265 peers: vec![
1266 KademliaPeer::new(
1267 *iter.next().unwrap().1,
1268 vec![],
1269 ConnectionType::NotConnected,
1270 ),
1271 KademliaPeer::new(
1272 *iter.next().unwrap().1,
1273 vec![],
1274 ConnectionType::NotConnected,
1275 ),
1276 KademliaPeer::new(
1277 *iter.next().unwrap().1,
1278 vec![],
1279 ConnectionType::NotConnected,
1280 ),
1281 ],
1282 },
1283 );
1284 }
1285 _ => panic!("invalid event received"),
1286 }
1287
1288 for _ in 0..3 {
1290 match engine.next_action() {
1291 Some(QueryAction::SendMessage { query, peer, .. }) => {
1292 println!("next send message to {peer:?}");
1293 engine.register_response(
1294 query,
1295 peer,
1296 KademliaMessage::FindNode {
1297 target: Vec::new(),
1298 peers: vec![],
1299 },
1300 );
1301 }
1302 _ => panic!("invalid event received"),
1303 }
1304 }
1305
1306 let peers = match engine.next_action() {
1307 Some(QueryAction::PutRecordToFoundNodes {
1308 query,
1309 peers,
1310 record,
1311 quorum,
1312 }) => {
1313 assert_eq!(query, original_query_id);
1314 assert_eq!(peers.len(), 4);
1315 assert_eq!(record.key, original_record.key);
1316 assert_eq!(record.value, original_record.value);
1317 assert!(matches!(quorum, Quorum::All));
1318
1319 peers
1320 }
1321 _ => panic!("invalid event received"),
1322 };
1323
1324 engine.start_put_record_to_found_nodes_requests_tracking(
1325 original_query_id,
1326 record_key.clone(),
1327 peers.iter().map(|p| p.peer).collect(),
1328 Quorum::All,
1329 );
1330
1331 for peer in &peers {
1333 engine.register_send_success(original_query_id, peer.peer);
1334 }
1335
1336 match engine.next_action() {
1337 Some(QueryAction::PutRecordQuerySucceeded { query, key }) => {
1338 assert_eq!(query, original_query_id);
1339 assert_eq!(key, record_key);
1340 }
1341 _ => panic!("invalid event received"),
1342 }
1343
1344 assert!(engine.next_action().is_none());
1345
1346 let _query = engine.start_get_record(
1348 QueryId(1341),
1349 record_key.clone(),
1350 vec![
1351 KademliaPeer::new(peers[0].peer, vec![], ConnectionType::NotConnected),
1352 KademliaPeer::new(peers[1].peer, vec![], ConnectionType::NotConnected),
1353 KademliaPeer::new(peers[2].peer, vec![], ConnectionType::NotConnected),
1354 KademliaPeer::new(peers[3].peer, vec![], ConnectionType::NotConnected),
1355 ]
1356 .into(),
1357 Quorum::All,
1358 false,
1359 );
1360
1361 let mut records = Vec::new();
1362 for _ in 0..4 {
1363 match engine.next_action() {
1364 Some(QueryAction::SendMessage { query, peer, .. }) => {
1365 assert_eq!(query, QueryId(1341));
1366 engine.register_response(
1367 query,
1368 peer,
1369 KademliaMessage::GetRecord {
1370 record: Some(original_record.clone()),
1371 peers: vec![],
1372 key: Some(record_key.clone()),
1373 },
1374 );
1375 }
1376 event => panic!("invalid event received {:?}", event),
1377 }
1378
1379 match engine.next_action() {
1382 Some(QueryAction::GetRecordPartialResult { query_id, record }) => {
1383 println!("Partial result {:?}", record);
1384 assert_eq!(query_id, QueryId(1341));
1385 records.push(record);
1386 }
1387 event => panic!("invalid event received {:?}", event),
1388 }
1389 }
1390
1391 let peers: std::collections::HashSet<_> = peers.into_iter().map(|p| p.peer).collect();
1392 match engine.next_action() {
1393 Some(QueryAction::GetRecordQueryDone { .. }) => {
1394 println!("Records {:?}", records);
1395 let query_peers = records
1396 .iter()
1397 .map(|peer_record| peer_record.peer)
1398 .collect::<std::collections::HashSet<_>>();
1399 assert_eq!(peers, query_peers);
1400
1401 let records: std::collections::HashSet<_> =
1402 records.into_iter().map(|peer_record| peer_record.record).collect();
1403 assert_eq!(records.len(), 1);
1405 let record = records.into_iter().next().unwrap();
1406
1407 assert_eq!(record.key, original_record.key);
1408 assert_eq!(record.value, original_record.value);
1409 }
1410 event => panic!("invalid event received {:?}", event),
1411 }
1412 }
1413
1414 #[test]
1415 fn put_record_succeeds_with_quorum_one() {
1416 let _ = tracing_subscriber::fmt()
1417 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1418 .try_init();
1419
1420 let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
1421 let record_key = RecordKey::new(&vec![1, 2, 3, 4]);
1422 let target_key = Key::new(record_key.clone());
1423 let original_record = Record::new(record_key.clone(), vec![1, 3, 3, 7, 1, 3, 3, 8]);
1424
1425 let distances = {
1426 let mut distances = std::collections::BTreeMap::new();
1427
1428 for i in 1..64 {
1429 let peer = make_peer_id(i, 0);
1430 let key = Key::from(peer);
1431
1432 distances.insert(target_key.distance(&key), peer);
1433 }
1434
1435 distances
1436 };
1437 let mut iter = distances.iter();
1438
1439 let original_query_id = QueryId(1340);
1441 let _query = engine.start_put_record(
1442 original_query_id,
1443 original_record.clone(),
1444 vec![KademliaPeer::new(
1445 *iter.next().unwrap().1,
1446 vec![],
1447 ConnectionType::NotConnected,
1448 )]
1449 .into(),
1450 Quorum::One,
1451 );
1452
1453 let action = engine.next_action();
1454 assert!(engine.next_action().is_none());
1455
1456 match action {
1458 Some(QueryAction::SendMessage { query, peer, .. }) => {
1459 engine.register_response(
1460 query,
1461 peer,
1462 KademliaMessage::FindNode {
1463 target: Vec::new(),
1464 peers: vec![
1465 KademliaPeer::new(
1466 *iter.next().unwrap().1,
1467 vec![],
1468 ConnectionType::NotConnected,
1469 ),
1470 KademliaPeer::new(
1471 *iter.next().unwrap().1,
1472 vec![],
1473 ConnectionType::NotConnected,
1474 ),
1475 KademliaPeer::new(
1476 *iter.next().unwrap().1,
1477 vec![],
1478 ConnectionType::NotConnected,
1479 ),
1480 ],
1481 },
1482 );
1483 }
1484 _ => panic!("invalid event received"),
1485 }
1486
1487 for _ in 0..3 {
1489 match engine.next_action() {
1490 Some(QueryAction::SendMessage { query, peer, .. }) => {
1491 println!("next send message to {peer:?}");
1492 engine.register_response(
1493 query,
1494 peer,
1495 KademliaMessage::FindNode {
1496 target: Vec::new(),
1497 peers: vec![],
1498 },
1499 );
1500 }
1501 _ => panic!("invalid event received"),
1502 }
1503 }
1504
1505 let peers = match engine.next_action() {
1506 Some(QueryAction::PutRecordToFoundNodes {
1507 query,
1508 peers,
1509 record,
1510 quorum,
1511 }) => {
1512 assert_eq!(query, original_query_id);
1513 assert_eq!(peers.len(), 4);
1514 assert_eq!(record.key, original_record.key);
1515 assert_eq!(record.value, original_record.value);
1516 assert!(matches!(quorum, Quorum::One));
1517
1518 peers
1519 }
1520 _ => panic!("invalid event received"),
1521 };
1522
1523 engine.start_put_record_to_found_nodes_requests_tracking(
1524 original_query_id,
1525 record_key.clone(),
1526 peers.iter().map(|p| p.peer).collect(),
1527 Quorum::One,
1528 );
1529
1530 assert!(peers.len() > 1);
1532 for peer in peers.iter().take(peers.len() - 1) {
1533 engine.register_send_failure(original_query_id, peer.peer);
1534 }
1535 engine.register_send_success(original_query_id, peers.last().unwrap().peer);
1536
1537 match engine.next_action() {
1538 Some(QueryAction::PutRecordQuerySucceeded { query, key }) => {
1539 assert_eq!(query, original_query_id);
1540 assert_eq!(key, record_key);
1541 }
1542 _ => panic!("invalid event received"),
1543 }
1544
1545 assert!(engine.next_action().is_none());
1546
1547 let _query = engine.start_get_record(
1549 QueryId(1341),
1550 record_key.clone(),
1551 vec![
1552 KademliaPeer::new(peers[0].peer, vec![], ConnectionType::NotConnected),
1553 KademliaPeer::new(peers[1].peer, vec![], ConnectionType::NotConnected),
1554 KademliaPeer::new(peers[2].peer, vec![], ConnectionType::NotConnected),
1555 KademliaPeer::new(peers[3].peer, vec![], ConnectionType::NotConnected),
1556 ]
1557 .into(),
1558 Quorum::All,
1559 false,
1560 );
1561
1562 let mut records = Vec::new();
1563 for _ in 0..4 {
1564 match engine.next_action() {
1565 Some(QueryAction::SendMessage { query, peer, .. }) => {
1566 assert_eq!(query, QueryId(1341));
1567 engine.register_response(
1568 query,
1569 peer,
1570 KademliaMessage::GetRecord {
1571 record: Some(original_record.clone()),
1572 peers: vec![],
1573 key: Some(record_key.clone()),
1574 },
1575 );
1576 }
1577 event => panic!("invalid event received {:?}", event),
1578 }
1579
1580 match engine.next_action() {
1583 Some(QueryAction::GetRecordPartialResult { query_id, record }) => {
1584 println!("Partial result {:?}", record);
1585 assert_eq!(query_id, QueryId(1341));
1586 records.push(record);
1587 }
1588 event => panic!("invalid event received {:?}", event),
1589 }
1590 }
1591
1592 let peers: std::collections::HashSet<_> = peers.into_iter().map(|p| p.peer).collect();
1593 match engine.next_action() {
1594 Some(QueryAction::GetRecordQueryDone { .. }) => {
1595 println!("Records {:?}", records);
1596 let query_peers = records
1597 .iter()
1598 .map(|peer_record| peer_record.peer)
1599 .collect::<std::collections::HashSet<_>>();
1600 assert_eq!(peers, query_peers);
1601
1602 let records: std::collections::HashSet<_> =
1603 records.into_iter().map(|peer_record| peer_record.record).collect();
1604 assert_eq!(records.len(), 1);
1606 let record = records.into_iter().next().unwrap();
1607
1608 assert_eq!(record.key, original_record.key);
1609 assert_eq!(record.value, original_record.value);
1610 }
1611 event => panic!("invalid event received {:?}", event),
1612 }
1613 }
1614
1615 #[test]
1616 fn add_provider_fails() {
1617 let _ = tracing_subscriber::fmt()
1618 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1619 .try_init();
1620
1621 let local_peer_id = PeerId::random();
1622 let mut engine = QueryEngine::new(local_peer_id, 20usize, 3usize);
1623 let original_provided_key = RecordKey::new(&vec![1, 2, 3, 4]);
1624 let local_content_provider = ContentProvider {
1625 peer: local_peer_id,
1626 addresses: vec![],
1627 };
1628 let target_key = Key::new(original_provided_key.clone());
1629
1630 let distances = {
1631 let mut distances = std::collections::BTreeMap::new();
1632
1633 for i in 1..64 {
1634 let peer = make_peer_id(i, 0);
1635 let key = Key::from(peer);
1636
1637 distances.insert(target_key.distance(&key), peer);
1638 }
1639
1640 distances
1641 };
1642 let mut iter = distances.iter();
1643
1644 let original_query_id = QueryId(1340);
1646 let _query = engine.start_add_provider(
1647 original_query_id,
1648 original_provided_key.clone(),
1649 local_content_provider.clone(),
1650 vec![KademliaPeer::new(
1651 *iter.next().unwrap().1,
1652 vec![],
1653 ConnectionType::NotConnected,
1654 )]
1655 .into(),
1656 Quorum::All,
1657 );
1658
1659 let action = engine.next_action();
1660 assert!(engine.next_action().is_none());
1661
1662 match action {
1664 Some(QueryAction::SendMessage { query, peer, .. }) => {
1665 engine.register_response(
1666 query,
1667 peer,
1668 KademliaMessage::FindNode {
1669 target: Vec::new(),
1670 peers: vec![
1671 KademliaPeer::new(
1672 *iter.next().unwrap().1,
1673 vec![],
1674 ConnectionType::NotConnected,
1675 ),
1676 KademliaPeer::new(
1677 *iter.next().unwrap().1,
1678 vec![],
1679 ConnectionType::NotConnected,
1680 ),
1681 KademliaPeer::new(
1682 *iter.next().unwrap().1,
1683 vec![],
1684 ConnectionType::NotConnected,
1685 ),
1686 ],
1687 },
1688 );
1689 }
1690 _ => panic!("invalid event received"),
1691 }
1692
1693 for _ in 0..3 {
1695 match engine.next_action() {
1696 Some(QueryAction::SendMessage { query, peer, .. }) => {
1697 println!("next send message to {peer:?}");
1698 engine.register_response(
1699 query,
1700 peer,
1701 KademliaMessage::FindNode {
1702 target: Vec::new(),
1703 peers: vec![],
1704 },
1705 );
1706 }
1707 _ => panic!("invalid event received"),
1708 }
1709 }
1710
1711 let mut peers = match engine.next_action() {
1712 Some(QueryAction::AddProviderToFoundNodes {
1713 query,
1714 provided_key,
1715 provider,
1716 peers,
1717 quorum,
1718 }) => {
1719 assert_eq!(query, original_query_id);
1720 assert_eq!(provided_key, original_provided_key);
1721 assert_eq!(provider, local_content_provider);
1722 assert_eq!(peers.len(), 4);
1723 assert!(matches!(quorum, Quorum::All));
1724
1725 peers
1726 }
1727 _ => panic!("invalid event received"),
1728 };
1729
1730 engine.start_add_provider_to_found_nodes_requests_tracking(
1731 original_query_id,
1732 original_provided_key.clone(),
1733 peers.iter().map(|p| p.peer).collect(),
1734 Quorum::All,
1735 );
1736
1737 let last_peer = peers.pop().unwrap();
1739 for peer in peers {
1740 engine.register_send_success(original_query_id, peer.peer);
1741 }
1742 engine.register_send_failure(original_query_id, last_peer.peer);
1743
1744 match engine.next_action() {
1745 Some(QueryAction::QueryFailed { query }) => {
1746 assert_eq!(query, original_query_id);
1747 }
1748 _ => panic!("invalid event received"),
1749 }
1750
1751 assert!(engine.next_action().is_none());
1752 }
1753
1754 #[test]
1755 fn add_provider_succeeds() {
1756 let _ = tracing_subscriber::fmt()
1757 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1758 .try_init();
1759
1760 let local_peer_id = PeerId::random();
1761 let mut engine = QueryEngine::new(local_peer_id, 20usize, 3usize);
1762 let original_provided_key = RecordKey::new(&vec![1, 2, 3, 4]);
1763 let local_content_provider = ContentProvider {
1764 peer: local_peer_id,
1765 addresses: vec![],
1766 };
1767
1768 let target_key = Key::new(original_provided_key.clone());
1769 let distances = {
1770 let mut distances = std::collections::BTreeMap::new();
1771
1772 for i in 1..64 {
1773 let peer = make_peer_id(i, 0);
1774 let key = Key::from(peer);
1775
1776 distances.insert(target_key.distance(&key), peer);
1777 }
1778
1779 distances
1780 };
1781 let mut iter = distances.iter();
1782
1783 let add_query_id = QueryId(1340);
1785 let _query = engine.start_add_provider(
1786 add_query_id,
1787 original_provided_key.clone(),
1788 local_content_provider.clone(),
1789 vec![KademliaPeer::new(
1790 *iter.next().unwrap().1,
1791 vec![],
1792 ConnectionType::NotConnected,
1793 )]
1794 .into(),
1795 Quorum::All,
1796 );
1797
1798 let action = engine.next_action();
1799 assert!(engine.next_action().is_none());
1800
1801 match action {
1803 Some(QueryAction::SendMessage { query, peer, .. }) => {
1804 engine.register_response(
1805 query,
1806 peer,
1807 KademliaMessage::FindNode {
1808 target: Vec::new(),
1809 peers: vec![
1810 KademliaPeer::new(
1811 *iter.next().unwrap().1,
1812 vec![],
1813 ConnectionType::NotConnected,
1814 ),
1815 KademliaPeer::new(
1816 *iter.next().unwrap().1,
1817 vec![],
1818 ConnectionType::NotConnected,
1819 ),
1820 KademliaPeer::new(
1821 *iter.next().unwrap().1,
1822 vec![],
1823 ConnectionType::NotConnected,
1824 ),
1825 ],
1826 },
1827 );
1828 }
1829 _ => panic!("invalid event received"),
1830 }
1831
1832 for _ in 0..3 {
1834 match engine.next_action() {
1835 Some(QueryAction::SendMessage { query, peer, .. }) => {
1836 println!("next send message to {peer:?}");
1837 engine.register_response(
1838 query,
1839 peer,
1840 KademliaMessage::FindNode {
1841 target: Vec::new(),
1842 peers: vec![],
1843 },
1844 );
1845 }
1846 _ => panic!("invalid event received"),
1847 }
1848 }
1849
1850 let peers = match engine.next_action() {
1851 Some(QueryAction::AddProviderToFoundNodes {
1852 query,
1853 provided_key,
1854 provider,
1855 peers,
1856 quorum,
1857 }) => {
1858 assert_eq!(query, add_query_id);
1859 assert_eq!(provided_key, original_provided_key);
1860 assert_eq!(provider, local_content_provider);
1861 assert_eq!(peers.len(), 4);
1862 assert!(matches!(quorum, Quorum::All));
1863
1864 peers
1865 }
1866 _ => panic!("invalid event received"),
1867 };
1868
1869 engine.start_add_provider_to_found_nodes_requests_tracking(
1870 add_query_id,
1871 original_provided_key.clone(),
1872 peers.iter().map(|p| p.peer).collect(),
1873 Quorum::All,
1874 );
1875
1876 for peer in &peers {
1878 engine.register_send_success(add_query_id, peer.peer);
1879 }
1880
1881 match engine.next_action() {
1882 Some(QueryAction::AddProviderQuerySucceeded {
1883 query,
1884 provided_key,
1885 }) => {
1886 assert_eq!(query, add_query_id);
1887 assert_eq!(provided_key, original_provided_key);
1888 }
1889 _ => panic!("invalid event received"),
1890 }
1891
1892 assert!(engine.next_action().is_none());
1893
1894 let get_query_id = QueryId(1341);
1896 let _query = engine.start_get_providers(
1897 get_query_id,
1898 original_provided_key.clone(),
1899 vec![
1900 KademliaPeer::new(peers[0].peer, vec![], ConnectionType::NotConnected),
1901 KademliaPeer::new(peers[1].peer, vec![], ConnectionType::NotConnected),
1902 KademliaPeer::new(peers[2].peer, vec![], ConnectionType::NotConnected),
1903 KademliaPeer::new(peers[3].peer, vec![], ConnectionType::NotConnected),
1904 ]
1905 .into(),
1906 vec![],
1907 );
1908
1909 for _ in 0..4 {
1910 match engine.next_action() {
1911 Some(QueryAction::SendMessage { query, peer, .. }) => {
1912 assert_eq!(query, get_query_id);
1913 engine.register_response(
1914 query,
1915 peer,
1916 KademliaMessage::GetProviders {
1917 key: Some(original_provided_key.clone()),
1918 peers: vec![],
1919 providers: vec![local_content_provider.clone().into()],
1920 },
1921 );
1922 }
1923 event => panic!("invalid event received {:?}", event),
1924 }
1925 }
1926
1927 match engine.next_action() {
1928 Some(QueryAction::GetProvidersQueryDone {
1929 query_id,
1930 provided_key,
1931 providers,
1932 }) => {
1933 assert_eq!(query_id, get_query_id);
1934 assert_eq!(provided_key, original_provided_key);
1935 assert_eq!(providers, vec![local_content_provider]);
1936 }
1937 event => panic!("invalid event received {:?}", event),
1938 }
1939 }
1940
1941 #[test]
1942 fn add_provider_succeeds_with_quorum_one() {
1943 let _ = tracing_subscriber::fmt()
1944 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1945 .try_init();
1946
1947 let local_peer_id = PeerId::random();
1948 let mut engine = QueryEngine::new(local_peer_id, 20usize, 3usize);
1949 let original_provided_key = RecordKey::new(&vec![1, 2, 3, 4]);
1950 let local_content_provider = ContentProvider {
1951 peer: local_peer_id,
1952 addresses: vec![],
1953 };
1954
1955 let target_key = Key::new(original_provided_key.clone());
1956 let distances = {
1957 let mut distances = std::collections::BTreeMap::new();
1958
1959 for i in 1..64 {
1960 let peer = make_peer_id(i, 0);
1961 let key = Key::from(peer);
1962
1963 distances.insert(target_key.distance(&key), peer);
1964 }
1965
1966 distances
1967 };
1968 let mut iter = distances.iter();
1969
1970 let add_query_id = QueryId(1340);
1972 let _query = engine.start_add_provider(
1973 add_query_id,
1974 original_provided_key.clone(),
1975 local_content_provider.clone(),
1976 vec![KademliaPeer::new(
1977 *iter.next().unwrap().1,
1978 vec![],
1979 ConnectionType::NotConnected,
1980 )]
1981 .into(),
1982 Quorum::One,
1983 );
1984
1985 let action = engine.next_action();
1986 assert!(engine.next_action().is_none());
1987
1988 match action {
1990 Some(QueryAction::SendMessage { query, peer, .. }) => {
1991 engine.register_response(
1992 query,
1993 peer,
1994 KademliaMessage::FindNode {
1995 target: Vec::new(),
1996 peers: vec![
1997 KademliaPeer::new(
1998 *iter.next().unwrap().1,
1999 vec![],
2000 ConnectionType::NotConnected,
2001 ),
2002 KademliaPeer::new(
2003 *iter.next().unwrap().1,
2004 vec![],
2005 ConnectionType::NotConnected,
2006 ),
2007 KademliaPeer::new(
2008 *iter.next().unwrap().1,
2009 vec![],
2010 ConnectionType::NotConnected,
2011 ),
2012 ],
2013 },
2014 );
2015 }
2016 _ => panic!("invalid event received"),
2017 }
2018
2019 for _ in 0..3 {
2021 match engine.next_action() {
2022 Some(QueryAction::SendMessage { query, peer, .. }) => {
2023 println!("next send message to {peer:?}");
2024 engine.register_response(
2025 query,
2026 peer,
2027 KademliaMessage::FindNode {
2028 target: Vec::new(),
2029 peers: vec![],
2030 },
2031 );
2032 }
2033 _ => panic!("invalid event received"),
2034 }
2035 }
2036
2037 let peers = match engine.next_action() {
2038 Some(QueryAction::AddProviderToFoundNodes {
2039 query,
2040 provided_key,
2041 provider,
2042 peers,
2043 quorum,
2044 }) => {
2045 assert_eq!(query, add_query_id);
2046 assert_eq!(provided_key, original_provided_key);
2047 assert_eq!(provider, local_content_provider);
2048 assert_eq!(peers.len(), 4);
2049 assert!(matches!(quorum, Quorum::One));
2050
2051 peers
2052 }
2053 _ => panic!("invalid event received"),
2054 };
2055
2056 engine.start_add_provider_to_found_nodes_requests_tracking(
2057 add_query_id,
2058 original_provided_key.clone(),
2059 peers.iter().map(|p| p.peer).collect(),
2060 Quorum::One,
2061 );
2062
2063 assert!(peers.len() > 1);
2065 engine.register_send_success(add_query_id, peers.first().unwrap().peer);
2066 for peer in peers.iter().skip(1) {
2067 engine.register_send_failure(add_query_id, peer.peer);
2068 }
2069
2070 match engine.next_action() {
2071 Some(QueryAction::AddProviderQuerySucceeded {
2072 query,
2073 provided_key,
2074 }) => {
2075 assert_eq!(query, add_query_id);
2076 assert_eq!(provided_key, original_provided_key);
2077 }
2078 _ => panic!("invalid event received"),
2079 }
2080
2081 assert!(engine.next_action().is_none());
2082
2083 let get_query_id = QueryId(1341);
2085 let _query = engine.start_get_providers(
2086 get_query_id,
2087 original_provided_key.clone(),
2088 vec![
2089 KademliaPeer::new(peers[0].peer, vec![], ConnectionType::NotConnected),
2090 KademliaPeer::new(peers[1].peer, vec![], ConnectionType::NotConnected),
2091 KademliaPeer::new(peers[2].peer, vec![], ConnectionType::NotConnected),
2092 KademliaPeer::new(peers[3].peer, vec![], ConnectionType::NotConnected),
2093 ]
2094 .into(),
2095 vec![],
2096 );
2097
2098 match engine.next_action() {
2100 Some(QueryAction::SendMessage { query, peer, .. }) => {
2101 assert_eq!(query, get_query_id);
2102 engine.register_response(
2103 query,
2104 peer,
2105 KademliaMessage::GetProviders {
2106 key: Some(original_provided_key.clone()),
2107 peers: vec![],
2108 providers: vec![local_content_provider.clone().into()],
2109 },
2110 );
2111 }
2112 event => panic!("invalid event received {:?}", event),
2113 }
2114
2115 for _ in 1..4 {
2117 match engine.next_action() {
2118 Some(QueryAction::SendMessage { query, peer, .. }) => {
2119 assert_eq!(query, get_query_id);
2120 engine.register_response(
2121 query,
2122 peer,
2123 KademliaMessage::GetProviders {
2124 key: Some(original_provided_key.clone()),
2125 peers: vec![],
2126 providers: vec![],
2127 },
2128 );
2129 }
2130 event => panic!("invalid event received {:?}", event),
2131 }
2132 }
2133
2134 match engine.next_action() {
2135 Some(QueryAction::GetProvidersQueryDone {
2136 query_id,
2137 provided_key,
2138 providers,
2139 }) => {
2140 assert_eq!(query_id, get_query_id);
2141 assert_eq!(provided_key, original_provided_key);
2142 assert_eq!(providers, vec![local_content_provider]);
2143 }
2144 event => panic!("invalid event received {:?}", event),
2145 }
2146 }
2147}