1use crate::{
22 protocol::libp2p::kademlia::{
23 message::KademliaMessage,
24 query::{
25 find_node::{FindNodeConfig, FindNodeContext},
26 get_providers::{GetProvidersConfig, GetProvidersContext},
27 get_record::{GetRecordConfig, GetRecordContext},
28 },
29 record::{ContentProvider, Key as RecordKey, Record},
30 types::{KademliaPeer, Key},
31 PeerRecord, Quorum,
32 },
33 PeerId,
34};
35
36use bytes::Bytes;
37
38use std::collections::{HashMap, VecDeque};
39
40use self::{find_many_nodes::FindManyNodesContext, target_peers::PutToTargetPeersContext};
41
42mod find_many_nodes;
43mod find_node;
44mod get_providers;
45mod get_record;
46mod target_peers;
47
48const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query";
50
51#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
53#[cfg_attr(feature = "fuzz", derive(serde::Serialize, serde::Deserialize))]
54pub struct QueryId(pub usize);
55
56#[derive(Debug)]
58enum QueryType {
59 FindNode {
61 context: FindNodeContext<PeerId>,
63 },
64
65 PutRecord {
67 record: Record,
69
70 quorum: Quorum,
72
73 context: FindNodeContext<RecordKey>,
75 },
76
77 PutRecordToPeers {
79 record: Record,
81
82 quorum: Quorum,
84
85 context: FindManyNodesContext,
87 },
88
89 PutRecordToFoundNodes {
91 context: PutToTargetPeersContext,
93 },
94
95 GetRecord {
97 context: GetRecordContext,
99 },
100
101 AddProvider {
103 provided_key: RecordKey,
105
106 provider: ContentProvider,
108
109 quorum: Quorum,
111
112 context: FindNodeContext<RecordKey>,
114 },
115
116 AddProviderToFoundNodes {
118 context: PutToTargetPeersContext,
120 },
121
122 GetProviders {
124 context: GetProvidersContext,
126 },
127}
128
129#[derive(Debug)]
131pub enum QueryAction {
132 SendMessage {
134 query: QueryId,
136
137 peer: PeerId,
139
140 message: Bytes,
142 },
143
144 FindNodeQuerySucceeded {
146 query: QueryId,
148
149 target: PeerId,
151
152 peers: Vec<KademliaPeer>,
154 },
155
156 PutRecordToFoundNodes {
158 query: QueryId,
160
161 record: Record,
163
164 peers: Vec<KademliaPeer>,
166
167 quorum: Quorum,
169 },
170
171 PutRecordQuerySucceeded {
173 query: QueryId,
175
176 key: RecordKey,
178 },
179
180 AddProviderToFoundNodes {
182 query: QueryId,
184
185 provided_key: RecordKey,
187
188 provider: ContentProvider,
190
191 peers: Vec<KademliaPeer>,
193
194 quorum: Quorum,
196 },
197
198 AddProviderQuerySucceeded {
200 query: QueryId,
202
203 provided_key: RecordKey,
205 },
206
207 GetRecordQueryDone {
209 query_id: QueryId,
211 },
212
213 GetRecordPartialResult {
217 query_id: QueryId,
219
220 record: PeerRecord,
222 },
223
224 GetProvidersQueryDone {
226 query_id: QueryId,
228
229 provided_key: RecordKey,
231
232 providers: Vec<ContentProvider>,
234 },
235
236 QuerySucceeded {
238 query: QueryId,
240 },
241
242 QueryFailed {
244 query: QueryId,
246 },
247}
248
249pub struct QueryEngine {
251 local_peer_id: PeerId,
253
254 replication_factor: usize,
256
257 parallelism_factor: usize,
259
260 queries: HashMap<QueryId, QueryType>,
262}
263
264impl QueryEngine {
265 pub fn new(
267 local_peer_id: PeerId,
268 replication_factor: usize,
269 parallelism_factor: usize,
270 ) -> Self {
271 Self {
272 local_peer_id,
273 replication_factor,
274 parallelism_factor,
275 queries: HashMap::new(),
276 }
277 }
278
279 pub fn start_find_node(
281 &mut self,
282 query_id: QueryId,
283 target: PeerId,
284 candidates: VecDeque<KademliaPeer>,
285 ) -> QueryId {
286 tracing::debug!(
287 target: LOG_TARGET,
288 ?query_id,
289 ?target,
290 num_peers = ?candidates.len(),
291 "start `FIND_NODE` query"
292 );
293
294 let target = Key::from(target);
295 let config = FindNodeConfig {
296 local_peer_id: self.local_peer_id,
297 replication_factor: self.replication_factor,
298 parallelism_factor: self.parallelism_factor,
299 query: query_id,
300 target,
301 };
302
303 self.queries.insert(
304 query_id,
305 QueryType::FindNode {
306 context: FindNodeContext::new(config, candidates),
307 },
308 );
309
310 query_id
311 }
312
313 pub fn start_put_record(
315 &mut self,
316 query_id: QueryId,
317 record: Record,
318 candidates: VecDeque<KademliaPeer>,
319 quorum: Quorum,
320 ) -> QueryId {
321 tracing::debug!(
322 target: LOG_TARGET,
323 ?query_id,
324 target = ?record.key,
325 num_peers = ?candidates.len(),
326 "start `PUT_VALUE` query"
327 );
328
329 let target = Key::new(record.key.clone());
330 let config = FindNodeConfig {
331 local_peer_id: self.local_peer_id,
332 replication_factor: self.replication_factor,
333 parallelism_factor: self.parallelism_factor,
334 query: query_id,
335 target,
336 };
337
338 self.queries.insert(
339 query_id,
340 QueryType::PutRecord {
341 record,
342 quorum,
343 context: FindNodeContext::new(config, candidates),
344 },
345 );
346
347 query_id
348 }
349
350 pub fn start_put_record_to_peers(
352 &mut self,
353 query_id: QueryId,
354 record: Record,
355 peers_to_report: Vec<KademliaPeer>,
356 quorum: Quorum,
357 ) -> QueryId {
358 tracing::debug!(
359 target: LOG_TARGET,
360 ?query_id,
361 target = ?record.key,
362 num_peers = ?peers_to_report.len(),
363 "start `PUT_VALUE` query to peers"
364 );
365
366 self.queries.insert(
367 query_id,
368 QueryType::PutRecordToPeers {
369 record,
370 quorum,
371 context: FindManyNodesContext::new(query_id, peers_to_report),
372 },
373 );
374
375 query_id
376 }
377
378 pub fn start_get_record(
380 &mut self,
381 query_id: QueryId,
382 target: RecordKey,
383 candidates: VecDeque<KademliaPeer>,
384 quorum: Quorum,
385 local_record: bool,
386 ) -> QueryId {
387 tracing::debug!(
388 target: LOG_TARGET,
389 ?query_id,
390 ?target,
391 num_peers = ?candidates.len(),
392 "start `GET_VALUE` query"
393 );
394
395 let target = Key::new(target);
396 let config = GetRecordConfig {
397 local_peer_id: self.local_peer_id,
398 known_records: if local_record { 1 } else { 0 },
399 quorum,
400 replication_factor: self.replication_factor,
401 parallelism_factor: self.parallelism_factor,
402 query: query_id,
403 target,
404 };
405
406 self.queries.insert(
407 query_id,
408 QueryType::GetRecord {
409 context: GetRecordContext::new(config, candidates, local_record),
410 },
411 );
412
413 query_id
414 }
415
416 pub fn start_add_provider(
418 &mut self,
419 query_id: QueryId,
420 provided_key: RecordKey,
421 provider: ContentProvider,
422 candidates: VecDeque<KademliaPeer>,
423 quorum: Quorum,
424 ) -> QueryId {
425 tracing::debug!(
426 target: LOG_TARGET,
427 ?query_id,
428 ?provider,
429 num_peers = ?candidates.len(),
430 "start `ADD_PROVIDER` query",
431 );
432
433 let config = FindNodeConfig {
434 local_peer_id: self.local_peer_id,
435 replication_factor: self.replication_factor,
436 parallelism_factor: self.parallelism_factor,
437 query: query_id,
438 target: Key::new(provided_key.clone()),
439 };
440
441 self.queries.insert(
442 query_id,
443 QueryType::AddProvider {
444 provided_key,
445 provider,
446 quorum,
447 context: FindNodeContext::new(config, candidates),
448 },
449 );
450
451 query_id
452 }
453
454 pub fn start_get_providers(
456 &mut self,
457 query_id: QueryId,
458 key: RecordKey,
459 candidates: VecDeque<KademliaPeer>,
460 known_providers: Vec<ContentProvider>,
461 ) -> QueryId {
462 tracing::debug!(
463 target: LOG_TARGET,
464 ?query_id,
465 ?key,
466 num_peers = ?candidates.len(),
467 "start `GET_PROVIDERS` query",
468 );
469
470 let target = Key::new(key);
471 let config = GetProvidersConfig {
472 local_peer_id: self.local_peer_id,
473 parallelism_factor: self.parallelism_factor,
474 query: query_id,
475 target,
476 known_providers: known_providers.into_iter().map(Into::into).collect(),
477 };
478
479 self.queries.insert(
480 query_id,
481 QueryType::GetProviders {
482 context: GetProvidersContext::new(config, candidates),
483 },
484 );
485
486 query_id
487 }
488
489 pub fn start_put_record_to_found_nodes_requests_tracking(
491 &mut self,
492 query_id: QueryId,
493 key: RecordKey,
494 peers: Vec<PeerId>,
495 quorum: Quorum,
496 ) {
497 tracing::debug!(
498 target: LOG_TARGET,
499 ?query_id,
500 num_peers = ?peers.len(),
501 "start `PUT_VALUE` responses tracking"
502 );
503
504 self.queries.insert(
505 query_id,
506 QueryType::PutRecordToFoundNodes {
507 context: PutToTargetPeersContext::new(query_id, key, peers, quorum),
508 },
509 );
510 }
511
512 pub fn start_add_provider_to_found_nodes_requests_tracking(
514 &mut self,
515 query_id: QueryId,
516 provided_key: RecordKey,
517 peers: Vec<PeerId>,
518 quorum: Quorum,
519 ) {
520 tracing::debug!(
521 target: LOG_TARGET,
522 ?query_id,
523 num_peers = ?peers.len(),
524 "start `ADD_PROVIDER` progress tracking"
525 );
526
527 self.queries.insert(
528 query_id,
529 QueryType::AddProviderToFoundNodes {
530 context: PutToTargetPeersContext::new(query_id, provided_key, peers, quorum),
531 },
532 );
533 }
534
535 pub fn register_response_failure(&mut self, query: QueryId, peer: PeerId) {
537 tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register response failure");
538
539 match self.queries.get_mut(&query) {
540 None => {
541 tracing::trace!(target: LOG_TARGET, ?query, ?peer, "response failure for a stale query");
542 }
543 Some(QueryType::FindNode { context }) => {
544 context.register_response_failure(peer);
545 }
546 Some(QueryType::PutRecord { context, .. }) => {
547 context.register_response_failure(peer);
548 }
549 Some(QueryType::PutRecordToPeers { context, .. }) => {
550 context.register_response_failure(peer);
551 }
552 Some(QueryType::PutRecordToFoundNodes { context }) => {
553 context.register_response_failure(peer);
554 }
555 Some(QueryType::GetRecord { context }) => {
556 context.register_response_failure(peer);
557 }
558 Some(QueryType::AddProvider { context, .. }) => {
559 context.register_response_failure(peer);
560 }
561 Some(QueryType::AddProviderToFoundNodes { context }) => {
562 context.register_response_failure(peer);
563 }
564 Some(QueryType::GetProviders { context }) => {
565 context.register_response_failure(peer);
566 }
567 }
568 }
569
570 pub fn register_response(&mut self, query: QueryId, peer: PeerId, message: KademliaMessage) {
572 tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register response");
573
574 match self.queries.get_mut(&query) {
575 None => {
576 tracing::trace!(target: LOG_TARGET, ?query, ?peer, "response for a stale query");
577 }
578 Some(QueryType::FindNode { context }) => match message {
579 KademliaMessage::FindNode { peers, .. } => {
580 context.register_response(peer, peers);
581 }
582 message => {
583 tracing::debug!(
584 target: LOG_TARGET,
585 ?query,
586 ?peer,
587 "unexpected response to `FIND_NODE`: {message}",
588 );
589 context.register_response_failure(peer);
590 }
591 },
592 Some(QueryType::PutRecord { context, .. }) => match message {
593 KademliaMessage::FindNode { peers, .. } => {
594 context.register_response(peer, peers);
595 }
596 message => {
597 tracing::debug!(
598 target: LOG_TARGET,
599 ?query,
600 ?peer,
601 "unexpected response to `FIND_NODE` during `PUT_VALUE` query: {message}",
602 );
603 context.register_response_failure(peer);
604 }
605 },
606 Some(QueryType::PutRecordToPeers { context, .. }) => match message {
607 KademliaMessage::FindNode { peers, .. } => {
608 context.register_response(peer, peers);
609 }
610 message => {
611 tracing::debug!(
612 target: LOG_TARGET,
613 ?query,
614 ?peer,
615 "unexpected response to `FIND_NODE` during `PUT_VALUE` (to peers): {message}",
616 );
617 context.register_response_failure(peer);
618 }
619 },
620 Some(QueryType::PutRecordToFoundNodes { context }) => match message {
621 KademliaMessage::PutValue { .. } => {
622 context.register_response(peer);
623 }
624 message => {
625 tracing::debug!(
626 target: LOG_TARGET,
627 ?query,
628 ?peer,
629 "unexpected response to `PUT_VALUE`: {message}",
630 );
631 context.register_response_failure(peer);
632 }
633 },
634 Some(QueryType::GetRecord { context }) => match message {
635 KademliaMessage::GetRecord { record, peers, .. } =>
636 context.register_response(peer, record, peers),
637 message => {
638 tracing::debug!(
639 target: LOG_TARGET,
640 ?query,
641 ?peer,
642 "unexpected response to `GET_VALUE`: {message}",
643 );
644 context.register_response_failure(peer);
645 }
646 },
647 Some(QueryType::AddProvider { context, .. }) => match message {
648 KademliaMessage::FindNode { peers, .. } => {
649 context.register_response(peer, peers);
650 }
651 message => {
652 tracing::debug!(
653 target: LOG_TARGET,
654 ?query,
655 ?peer,
656 "unexpected response to `FIND_NODE` during `ADD_PROVIDER` query: {message}",
657 );
658 context.register_response_failure(peer);
659 }
660 },
661 Some(QueryType::AddProviderToFoundNodes { context, .. }) => match message {
662 KademliaMessage::AddProvider { .. } => {
663 context.register_response(peer);
664 }
665 message => {
666 tracing::debug!(
667 target: LOG_TARGET,
668 ?query,
669 ?peer,
670 "unexpected response to `ADD_PROVIDER`: {message}",
671 );
672 context.register_response_failure(peer);
673 }
674 },
675 Some(QueryType::GetProviders { context }) => match message {
676 KademliaMessage::GetProviders {
677 key: _,
678 providers,
679 peers,
680 } => {
681 context.register_response(peer, providers, peers);
682 }
683 message => {
684 tracing::debug!(
685 target: LOG_TARGET,
686 ?query,
687 ?peer,
688 "unexpected response to `GET_PROVIDERS`: {message}",
689 );
690 context.register_response_failure(peer);
691 }
692 },
693 }
694 }
695
696 pub fn register_send_failure(&mut self, query: QueryId, peer: PeerId) {
697 tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register send failure");
698
699 match self.queries.get_mut(&query) {
700 None => {
701 tracing::trace!(target: LOG_TARGET, ?query, ?peer, "send failure for a stale query");
702 }
703 Some(QueryType::FindNode { context }) => {
704 context.register_send_failure(peer);
705 }
706 Some(QueryType::PutRecord { context, .. }) => {
707 context.register_send_failure(peer);
708 }
709 Some(QueryType::PutRecordToPeers { context, .. }) => {
710 context.register_send_failure(peer);
711 }
712 Some(QueryType::PutRecordToFoundNodes { context }) => {
713 context.register_send_failure(peer);
714 }
715 Some(QueryType::GetRecord { context }) => {
716 context.register_send_failure(peer);
717 }
718 Some(QueryType::AddProvider { context, .. }) => {
719 context.register_send_failure(peer);
720 }
721 Some(QueryType::AddProviderToFoundNodes { context }) => {
722 context.register_send_failure(peer);
723 }
724 Some(QueryType::GetProviders { context }) => {
725 context.register_send_failure(peer);
726 }
727 }
728 }
729
730 pub fn register_send_success(&mut self, query: QueryId, peer: PeerId) {
731 tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register send success");
732
733 match self.queries.get_mut(&query) {
734 None => {
735 tracing::trace!(target: LOG_TARGET, ?query, ?peer, "send success for a stale query");
736 }
737 Some(QueryType::FindNode { context }) => {
738 context.register_send_success(peer);
739 }
740 Some(QueryType::PutRecord { context, .. }) => {
741 context.register_send_success(peer);
742 }
743 Some(QueryType::PutRecordToPeers { context, .. }) => {
744 context.register_send_success(peer);
745 }
746 Some(QueryType::PutRecordToFoundNodes { context, .. }) => {
747 context.register_send_success(peer);
748 }
749 Some(QueryType::GetRecord { context }) => {
750 context.register_send_success(peer);
751 }
752 Some(QueryType::AddProvider { context, .. }) => {
753 context.register_send_success(peer);
754 }
755 Some(QueryType::AddProviderToFoundNodes { context, .. }) => {
756 context.register_send_success(peer);
757 }
758 Some(QueryType::GetProviders { context }) => {
759 context.register_send_success(peer);
760 }
761 }
762 }
763
764 pub fn register_peer_failure(&mut self, query: QueryId, peer: PeerId) {
767 tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register peer failure");
768
769 self.register_send_failure(query, peer);
774 self.register_response_failure(query, peer);
775 }
776
777 pub fn next_peer_action(&mut self, query: &QueryId, peer: &PeerId) -> Option<QueryAction> {
779 tracing::trace!(target: LOG_TARGET, ?query, ?peer, "get next peer action");
780
781 match self.queries.get_mut(query) {
782 None => {
783 tracing::trace!(target: LOG_TARGET, ?query, ?peer, "response failure for a stale query");
784 None
785 }
786 Some(QueryType::FindNode { context }) => context.next_peer_action(peer),
787 Some(QueryType::PutRecord { context, .. }) => context.next_peer_action(peer),
788 Some(QueryType::PutRecordToPeers { context, .. }) => context.next_peer_action(peer),
789 Some(QueryType::GetRecord { context }) => context.next_peer_action(peer),
790 Some(QueryType::AddProvider { context, .. }) => context.next_peer_action(peer),
791 Some(QueryType::GetProviders { context }) => context.next_peer_action(peer),
792 Some(QueryType::PutRecordToFoundNodes { .. }) => {
793 None
795 }
796 Some(QueryType::AddProviderToFoundNodes { .. }) => {
797 None
799 }
800 }
801 }
802
803 fn on_query_succeeded(&mut self, query: QueryId) -> QueryAction {
806 match self.queries.remove(&query).expect("query to exist") {
807 QueryType::FindNode { context } => QueryAction::FindNodeQuerySucceeded {
808 query,
809 target: context.config.target.into_preimage(),
810 peers: context.responses.into_values().collect::<Vec<_>>(),
811 },
812 QueryType::PutRecord {
813 record,
814 quorum,
815 context,
816 } => QueryAction::PutRecordToFoundNodes {
817 query: context.config.query,
818 record,
819 peers: context.responses.into_values().collect::<Vec<_>>(),
820 quorum,
821 },
822 QueryType::PutRecordToPeers {
823 record,
824 quorum,
825 context,
826 } => QueryAction::PutRecordToFoundNodes {
827 query: context.query,
828 record,
829 peers: context.peers_to_report,
830 quorum,
831 },
832 QueryType::PutRecordToFoundNodes { context } => QueryAction::PutRecordQuerySucceeded {
833 query: context.query,
834 key: context.key,
835 },
836 QueryType::GetRecord { context } => QueryAction::GetRecordQueryDone {
837 query_id: context.config.query,
838 },
839 QueryType::AddProvider {
840 provided_key,
841 provider,
842 quorum,
843 context,
844 } => QueryAction::AddProviderToFoundNodes {
845 query: context.config.query,
846 provided_key,
847 provider,
848 peers: context.responses.into_values().collect::<Vec<_>>(),
849 quorum,
850 },
851 QueryType::AddProviderToFoundNodes { context } =>
852 QueryAction::AddProviderQuerySucceeded {
853 query: context.query,
854 provided_key: context.key,
855 },
856 QueryType::GetProviders { context } => QueryAction::GetProvidersQueryDone {
857 query_id: context.config.query,
858 provided_key: context.config.target.clone().into_preimage(),
859 providers: context.found_providers(),
860 },
861 }
862 }
863
864 fn on_query_failed(&mut self, query: QueryId) -> QueryAction {
867 let _ = self.queries.remove(&query).expect("query to exist");
868
869 QueryAction::QueryFailed { query }
870 }
871
872 pub fn next_action(&mut self) -> Option<QueryAction> {
874 for (_, state) in self.queries.iter_mut() {
875 let action = match state {
876 QueryType::FindNode { context } => context.next_action(),
877 QueryType::PutRecord { context, .. } => context.next_action(),
878 QueryType::PutRecordToPeers { context, .. } => context.next_action(),
879 QueryType::GetRecord { context } => context.next_action(),
880 QueryType::AddProvider { context, .. } => context.next_action(),
881 QueryType::GetProviders { context } => context.next_action(),
882 QueryType::PutRecordToFoundNodes { context, .. } => context.next_action(),
883 QueryType::AddProviderToFoundNodes { context, .. } => context.next_action(),
884 };
885
886 match action {
887 Some(QueryAction::QuerySucceeded { query }) => {
888 return Some(self.on_query_succeeded(query));
889 }
890 Some(QueryAction::QueryFailed { query }) =>
891 return Some(self.on_query_failed(query)),
892 Some(_) => return action,
893 _ => continue,
894 }
895 }
896
897 None
898 }
899}
900
901#[cfg(test)]
902mod tests {
903 use multihash::{Code, Multihash};
904
905 use super::*;
906 use crate::protocol::libp2p::kademlia::types::ConnectionType;
907
908 fn make_peer_id(first: u8, second: u8) -> PeerId {
910 let mut peer_id = vec![0u8; 32];
911 peer_id[0] = first;
912 peer_id[1] = second;
913
914 PeerId::from_bytes(
915 &Multihash::wrap(Code::Identity.into(), &peer_id)
916 .expect("The digest size is never too large")
917 .to_bytes(),
918 )
919 .unwrap()
920 }
921
922 #[test]
923 fn find_node_query_fails() {
924 let _ = tracing_subscriber::fmt()
925 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
926 .try_init();
927
928 let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
929 let target_peer = PeerId::random();
930 let _target_key = Key::from(target_peer);
931
932 let query = engine.start_find_node(
933 QueryId(1337),
934 target_peer,
935 vec![
936 KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
937 KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
938 KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
939 KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
940 ]
941 .into(),
942 );
943
944 for _ in 0..4 {
945 if let Some(QueryAction::SendMessage { query, peer, .. }) = engine.next_action() {
946 engine.register_response_failure(query, peer);
947 }
948 }
949
950 if let Some(QueryAction::QueryFailed { query: failed }) = engine.next_action() {
951 assert_eq!(failed, query);
952 }
953
954 assert!(engine.next_action().is_none());
955 }
956
957 #[test]
958 fn find_node_lookup_paused() {
959 let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
960 let target_peer = PeerId::random();
961 let _target_key = Key::from(target_peer);
962
963 let _ = engine.start_find_node(
964 QueryId(1338),
965 target_peer,
966 vec![
967 KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
968 KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
969 KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
970 KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
971 ]
972 .into(),
973 );
974
975 for _ in 0..3 {
976 let _ = engine.next_action();
977 }
978
979 assert!(engine.next_action().is_none());
980 }
981
982 #[test]
983 fn find_node_query_succeeds() {
984 let _ = tracing_subscriber::fmt()
985 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
986 .try_init();
987
988 let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
989 let target_peer = make_peer_id(0, 0);
990 let target_key = Key::from(target_peer);
991
992 let distances = {
993 let mut distances = std::collections::BTreeMap::new();
994
995 for i in 1..64 {
996 let peer = make_peer_id(i, 0);
997 let key = Key::from(peer);
998
999 distances.insert(target_key.distance(&key), peer);
1000 }
1001
1002 distances
1003 };
1004 let mut iter = distances.iter();
1005
1006 let _query = engine.start_find_node(
1008 QueryId(1339),
1009 target_peer,
1010 vec![KademliaPeer::new(
1011 *iter.next().unwrap().1,
1012 vec![],
1013 ConnectionType::NotConnected,
1014 )]
1015 .into(),
1016 );
1017
1018 let action = engine.next_action();
1019 assert!(engine.next_action().is_none());
1020
1021 match action {
1023 Some(QueryAction::SendMessage { query, peer, .. }) => {
1024 engine.register_response(
1025 query,
1026 peer,
1027 KademliaMessage::FindNode {
1028 target: Vec::new(),
1029 peers: vec![
1030 KademliaPeer::new(
1031 *iter.next().unwrap().1,
1032 vec![],
1033 ConnectionType::NotConnected,
1034 ),
1035 KademliaPeer::new(
1036 *iter.next().unwrap().1,
1037 vec![],
1038 ConnectionType::NotConnected,
1039 ),
1040 KademliaPeer::new(
1041 *iter.next().unwrap().1,
1042 vec![],
1043 ConnectionType::NotConnected,
1044 ),
1045 ],
1046 },
1047 );
1048 }
1049 _ => panic!("invalid event received"),
1050 }
1051
1052 for _ in 0..3 {
1054 match engine.next_action() {
1055 Some(QueryAction::SendMessage { query, peer, .. }) => {
1056 println!("next send message to {peer:?}");
1057 engine.register_response(
1058 query,
1059 peer,
1060 KademliaMessage::FindNode {
1061 target: Vec::new(),
1062 peers: vec![],
1063 },
1064 );
1065 }
1066 _ => panic!("invalid event received"),
1067 }
1068 }
1069
1070 match engine.next_action() {
1071 Some(QueryAction::FindNodeQuerySucceeded { peers, .. }) => {
1072 assert_eq!(peers.len(), 4);
1073 }
1074 _ => panic!("invalid event received"),
1075 }
1076
1077 assert!(engine.next_action().is_none());
1078 }
1079
1080 #[test]
1081 fn put_record_fails() {
1082 let _ = tracing_subscriber::fmt()
1083 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1084 .try_init();
1085
1086 let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
1087 let record_key = RecordKey::new(&vec![1, 2, 3, 4]);
1088 let target_key = Key::new(record_key.clone());
1089 let original_record = Record::new(record_key.clone(), vec![1, 3, 3, 7, 1, 3, 3, 8]);
1090
1091 let distances = {
1092 let mut distances = std::collections::BTreeMap::new();
1093
1094 for i in 1..64 {
1095 let peer = make_peer_id(i, 0);
1096 let key = Key::from(peer);
1097
1098 distances.insert(target_key.distance(&key), peer);
1099 }
1100
1101 distances
1102 };
1103 let mut iter = distances.iter();
1104
1105 let original_query_id = QueryId(1340);
1107 let _query = engine.start_put_record(
1108 original_query_id,
1109 original_record.clone(),
1110 vec![KademliaPeer::new(
1111 *iter.next().unwrap().1,
1112 vec![],
1113 ConnectionType::NotConnected,
1114 )]
1115 .into(),
1116 Quorum::All,
1117 );
1118
1119 let action = engine.next_action();
1120 assert!(engine.next_action().is_none());
1121
1122 match action {
1124 Some(QueryAction::SendMessage { query, peer, .. }) => {
1125 engine.register_response(
1126 query,
1127 peer,
1128 KademliaMessage::FindNode {
1129 target: Vec::new(),
1130 peers: vec![
1131 KademliaPeer::new(
1132 *iter.next().unwrap().1,
1133 vec![],
1134 ConnectionType::NotConnected,
1135 ),
1136 KademliaPeer::new(
1137 *iter.next().unwrap().1,
1138 vec![],
1139 ConnectionType::NotConnected,
1140 ),
1141 KademliaPeer::new(
1142 *iter.next().unwrap().1,
1143 vec![],
1144 ConnectionType::NotConnected,
1145 ),
1146 ],
1147 },
1148 );
1149 }
1150 _ => panic!("invalid event received"),
1151 }
1152
1153 for _ in 0..3 {
1155 match engine.next_action() {
1156 Some(QueryAction::SendMessage { query, peer, .. }) => {
1157 println!("next send message to {peer:?}");
1158 engine.register_response(
1159 query,
1160 peer,
1161 KademliaMessage::FindNode {
1162 target: Vec::new(),
1163 peers: vec![],
1164 },
1165 );
1166 }
1167 _ => panic!("invalid event received"),
1168 }
1169 }
1170
1171 let mut peers = match engine.next_action() {
1172 Some(QueryAction::PutRecordToFoundNodes {
1173 query,
1174 peers,
1175 record,
1176 quorum,
1177 }) => {
1178 assert_eq!(query, original_query_id);
1179 assert_eq!(peers.len(), 4);
1180 assert_eq!(record.key, original_record.key);
1181 assert_eq!(record.value, original_record.value);
1182 assert!(matches!(quorum, Quorum::All));
1183
1184 peers
1185 }
1186 _ => panic!("invalid event received"),
1187 };
1188
1189 engine.start_put_record_to_found_nodes_requests_tracking(
1190 original_query_id,
1191 record_key.clone(),
1192 peers.iter().map(|p| p.peer).collect(),
1193 Quorum::All,
1194 );
1195
1196 let last_peer = peers.pop().unwrap();
1198 for peer in peers {
1199 engine.register_send_success(original_query_id, peer.peer);
1200 }
1201 engine.register_send_failure(original_query_id, last_peer.peer);
1202
1203 match engine.next_action() {
1204 Some(QueryAction::QueryFailed { query }) => {
1205 assert_eq!(query, original_query_id);
1206 }
1207 _ => panic!("invalid event received"),
1208 }
1209
1210 assert!(engine.next_action().is_none());
1211 }
1212
1213 #[test]
1214 fn put_record_succeeds() {
1215 let _ = tracing_subscriber::fmt()
1216 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1217 .try_init();
1218
1219 let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
1220 let record_key = RecordKey::new(&vec![1, 2, 3, 4]);
1221 let target_key = Key::new(record_key.clone());
1222 let original_record = Record::new(record_key.clone(), vec![1, 3, 3, 7, 1, 3, 3, 8]);
1223
1224 let distances = {
1225 let mut distances = std::collections::BTreeMap::new();
1226
1227 for i in 1..64 {
1228 let peer = make_peer_id(i, 0);
1229 let key = Key::from(peer);
1230
1231 distances.insert(target_key.distance(&key), peer);
1232 }
1233
1234 distances
1235 };
1236 let mut iter = distances.iter();
1237
1238 let original_query_id = QueryId(1340);
1240 let _query = engine.start_put_record(
1241 original_query_id,
1242 original_record.clone(),
1243 vec![KademliaPeer::new(
1244 *iter.next().unwrap().1,
1245 vec![],
1246 ConnectionType::NotConnected,
1247 )]
1248 .into(),
1249 Quorum::All,
1250 );
1251
1252 let action = engine.next_action();
1253 assert!(engine.next_action().is_none());
1254
1255 match action {
1257 Some(QueryAction::SendMessage { query, peer, .. }) => {
1258 engine.register_response(
1259 query,
1260 peer,
1261 KademliaMessage::FindNode {
1262 target: Vec::new(),
1263 peers: vec![
1264 KademliaPeer::new(
1265 *iter.next().unwrap().1,
1266 vec![],
1267 ConnectionType::NotConnected,
1268 ),
1269 KademliaPeer::new(
1270 *iter.next().unwrap().1,
1271 vec![],
1272 ConnectionType::NotConnected,
1273 ),
1274 KademliaPeer::new(
1275 *iter.next().unwrap().1,
1276 vec![],
1277 ConnectionType::NotConnected,
1278 ),
1279 ],
1280 },
1281 );
1282 }
1283 _ => panic!("invalid event received"),
1284 }
1285
1286 for _ in 0..3 {
1288 match engine.next_action() {
1289 Some(QueryAction::SendMessage { query, peer, .. }) => {
1290 println!("next send message to {peer:?}");
1291 engine.register_response(
1292 query,
1293 peer,
1294 KademliaMessage::FindNode {
1295 target: Vec::new(),
1296 peers: vec![],
1297 },
1298 );
1299 }
1300 _ => panic!("invalid event received"),
1301 }
1302 }
1303
1304 let peers = match engine.next_action() {
1305 Some(QueryAction::PutRecordToFoundNodes {
1306 query,
1307 peers,
1308 record,
1309 quorum,
1310 }) => {
1311 assert_eq!(query, original_query_id);
1312 assert_eq!(peers.len(), 4);
1313 assert_eq!(record.key, original_record.key);
1314 assert_eq!(record.value, original_record.value);
1315 assert!(matches!(quorum, Quorum::All));
1316
1317 peers
1318 }
1319 _ => panic!("invalid event received"),
1320 };
1321
1322 engine.start_put_record_to_found_nodes_requests_tracking(
1323 original_query_id,
1324 record_key.clone(),
1325 peers.iter().map(|p| p.peer).collect(),
1326 Quorum::All,
1327 );
1328
1329 for peer in &peers {
1331 engine.register_send_success(original_query_id, peer.peer);
1332 }
1333
1334 match engine.next_action() {
1335 Some(QueryAction::PutRecordQuerySucceeded { query, key }) => {
1336 assert_eq!(query, original_query_id);
1337 assert_eq!(key, record_key);
1338 }
1339 _ => panic!("invalid event received"),
1340 }
1341
1342 assert!(engine.next_action().is_none());
1343
1344 let _query = engine.start_get_record(
1346 QueryId(1341),
1347 record_key.clone(),
1348 vec![
1349 KademliaPeer::new(peers[0].peer, vec![], ConnectionType::NotConnected),
1350 KademliaPeer::new(peers[1].peer, vec![], ConnectionType::NotConnected),
1351 KademliaPeer::new(peers[2].peer, vec![], ConnectionType::NotConnected),
1352 KademliaPeer::new(peers[3].peer, vec![], ConnectionType::NotConnected),
1353 ]
1354 .into(),
1355 Quorum::All,
1356 false,
1357 );
1358
1359 let mut records = Vec::new();
1360 for _ in 0..4 {
1361 match engine.next_action() {
1362 Some(QueryAction::SendMessage { query, peer, .. }) => {
1363 assert_eq!(query, QueryId(1341));
1364 engine.register_response(
1365 query,
1366 peer,
1367 KademliaMessage::GetRecord {
1368 record: Some(original_record.clone()),
1369 peers: vec![],
1370 key: Some(record_key.clone()),
1371 },
1372 );
1373 }
1374 event => panic!("invalid event received {:?}", event),
1375 }
1376
1377 match engine.next_action() {
1380 Some(QueryAction::GetRecordPartialResult { query_id, record }) => {
1381 println!("Partial result {:?}", record);
1382 assert_eq!(query_id, QueryId(1341));
1383 records.push(record);
1384 }
1385 event => panic!("invalid event received {:?}", event),
1386 }
1387 }
1388
1389 let peers: std::collections::HashSet<_> = peers.into_iter().map(|p| p.peer).collect();
1390 match engine.next_action() {
1391 Some(QueryAction::GetRecordQueryDone { .. }) => {
1392 println!("Records {:?}", records);
1393 let query_peers = records
1394 .iter()
1395 .map(|peer_record| peer_record.peer)
1396 .collect::<std::collections::HashSet<_>>();
1397 assert_eq!(peers, query_peers);
1398
1399 let records: std::collections::HashSet<_> =
1400 records.into_iter().map(|peer_record| peer_record.record).collect();
1401 assert_eq!(records.len(), 1);
1403 let record = records.into_iter().next().unwrap();
1404
1405 assert_eq!(record.key, original_record.key);
1406 assert_eq!(record.value, original_record.value);
1407 }
1408 event => panic!("invalid event received {:?}", event),
1409 }
1410 }
1411
1412 #[test]
1413 fn put_record_succeeds_with_quorum_one() {
1414 let _ = tracing_subscriber::fmt()
1415 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1416 .try_init();
1417
1418 let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
1419 let record_key = RecordKey::new(&vec![1, 2, 3, 4]);
1420 let target_key = Key::new(record_key.clone());
1421 let original_record = Record::new(record_key.clone(), vec![1, 3, 3, 7, 1, 3, 3, 8]);
1422
1423 let distances = {
1424 let mut distances = std::collections::BTreeMap::new();
1425
1426 for i in 1..64 {
1427 let peer = make_peer_id(i, 0);
1428 let key = Key::from(peer);
1429
1430 distances.insert(target_key.distance(&key), peer);
1431 }
1432
1433 distances
1434 };
1435 let mut iter = distances.iter();
1436
1437 let original_query_id = QueryId(1340);
1439 let _query = engine.start_put_record(
1440 original_query_id,
1441 original_record.clone(),
1442 vec![KademliaPeer::new(
1443 *iter.next().unwrap().1,
1444 vec![],
1445 ConnectionType::NotConnected,
1446 )]
1447 .into(),
1448 Quorum::One,
1449 );
1450
1451 let action = engine.next_action();
1452 assert!(engine.next_action().is_none());
1453
1454 match action {
1456 Some(QueryAction::SendMessage { query, peer, .. }) => {
1457 engine.register_response(
1458 query,
1459 peer,
1460 KademliaMessage::FindNode {
1461 target: Vec::new(),
1462 peers: vec![
1463 KademliaPeer::new(
1464 *iter.next().unwrap().1,
1465 vec![],
1466 ConnectionType::NotConnected,
1467 ),
1468 KademliaPeer::new(
1469 *iter.next().unwrap().1,
1470 vec![],
1471 ConnectionType::NotConnected,
1472 ),
1473 KademliaPeer::new(
1474 *iter.next().unwrap().1,
1475 vec![],
1476 ConnectionType::NotConnected,
1477 ),
1478 ],
1479 },
1480 );
1481 }
1482 _ => panic!("invalid event received"),
1483 }
1484
1485 for _ in 0..3 {
1487 match engine.next_action() {
1488 Some(QueryAction::SendMessage { query, peer, .. }) => {
1489 println!("next send message to {peer:?}");
1490 engine.register_response(
1491 query,
1492 peer,
1493 KademliaMessage::FindNode {
1494 target: Vec::new(),
1495 peers: vec![],
1496 },
1497 );
1498 }
1499 _ => panic!("invalid event received"),
1500 }
1501 }
1502
1503 let peers = match engine.next_action() {
1504 Some(QueryAction::PutRecordToFoundNodes {
1505 query,
1506 peers,
1507 record,
1508 quorum,
1509 }) => {
1510 assert_eq!(query, original_query_id);
1511 assert_eq!(peers.len(), 4);
1512 assert_eq!(record.key, original_record.key);
1513 assert_eq!(record.value, original_record.value);
1514 assert!(matches!(quorum, Quorum::One));
1515
1516 peers
1517 }
1518 _ => panic!("invalid event received"),
1519 };
1520
1521 engine.start_put_record_to_found_nodes_requests_tracking(
1522 original_query_id,
1523 record_key.clone(),
1524 peers.iter().map(|p| p.peer).collect(),
1525 Quorum::One,
1526 );
1527
1528 assert!(peers.len() > 1);
1530 for peer in peers.iter().take(peers.len() - 1) {
1531 engine.register_send_failure(original_query_id, peer.peer);
1532 }
1533 engine.register_send_success(original_query_id, peers.last().unwrap().peer);
1534
1535 match engine.next_action() {
1536 Some(QueryAction::PutRecordQuerySucceeded { query, key }) => {
1537 assert_eq!(query, original_query_id);
1538 assert_eq!(key, record_key);
1539 }
1540 _ => panic!("invalid event received"),
1541 }
1542
1543 assert!(engine.next_action().is_none());
1544
1545 let _query = engine.start_get_record(
1547 QueryId(1341),
1548 record_key.clone(),
1549 vec![
1550 KademliaPeer::new(peers[0].peer, vec![], ConnectionType::NotConnected),
1551 KademliaPeer::new(peers[1].peer, vec![], ConnectionType::NotConnected),
1552 KademliaPeer::new(peers[2].peer, vec![], ConnectionType::NotConnected),
1553 KademliaPeer::new(peers[3].peer, vec![], ConnectionType::NotConnected),
1554 ]
1555 .into(),
1556 Quorum::All,
1557 false,
1558 );
1559
1560 let mut records = Vec::new();
1561 for _ in 0..4 {
1562 match engine.next_action() {
1563 Some(QueryAction::SendMessage { query, peer, .. }) => {
1564 assert_eq!(query, QueryId(1341));
1565 engine.register_response(
1566 query,
1567 peer,
1568 KademliaMessage::GetRecord {
1569 record: Some(original_record.clone()),
1570 peers: vec![],
1571 key: Some(record_key.clone()),
1572 },
1573 );
1574 }
1575 event => panic!("invalid event received {:?}", event),
1576 }
1577
1578 match engine.next_action() {
1581 Some(QueryAction::GetRecordPartialResult { query_id, record }) => {
1582 println!("Partial result {:?}", record);
1583 assert_eq!(query_id, QueryId(1341));
1584 records.push(record);
1585 }
1586 event => panic!("invalid event received {:?}", event),
1587 }
1588 }
1589
1590 let peers: std::collections::HashSet<_> = peers.into_iter().map(|p| p.peer).collect();
1591 match engine.next_action() {
1592 Some(QueryAction::GetRecordQueryDone { .. }) => {
1593 println!("Records {:?}", records);
1594 let query_peers = records
1595 .iter()
1596 .map(|peer_record| peer_record.peer)
1597 .collect::<std::collections::HashSet<_>>();
1598 assert_eq!(peers, query_peers);
1599
1600 let records: std::collections::HashSet<_> =
1601 records.into_iter().map(|peer_record| peer_record.record).collect();
1602 assert_eq!(records.len(), 1);
1604 let record = records.into_iter().next().unwrap();
1605
1606 assert_eq!(record.key, original_record.key);
1607 assert_eq!(record.value, original_record.value);
1608 }
1609 event => panic!("invalid event received {:?}", event),
1610 }
1611 }
1612
1613 #[test]
1614 fn add_provider_fails() {
1615 let _ = tracing_subscriber::fmt()
1616 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1617 .try_init();
1618
1619 let local_peer_id = PeerId::random();
1620 let mut engine = QueryEngine::new(local_peer_id, 20usize, 3usize);
1621 let original_provided_key = RecordKey::new(&vec![1, 2, 3, 4]);
1622 let local_content_provider = ContentProvider {
1623 peer: local_peer_id,
1624 addresses: vec![],
1625 };
1626 let target_key = Key::new(original_provided_key.clone());
1627
1628 let distances = {
1629 let mut distances = std::collections::BTreeMap::new();
1630
1631 for i in 1..64 {
1632 let peer = make_peer_id(i, 0);
1633 let key = Key::from(peer);
1634
1635 distances.insert(target_key.distance(&key), peer);
1636 }
1637
1638 distances
1639 };
1640 let mut iter = distances.iter();
1641
1642 let original_query_id = QueryId(1340);
1644 let _query = engine.start_add_provider(
1645 original_query_id,
1646 original_provided_key.clone(),
1647 local_content_provider.clone(),
1648 vec![KademliaPeer::new(
1649 *iter.next().unwrap().1,
1650 vec![],
1651 ConnectionType::NotConnected,
1652 )]
1653 .into(),
1654 Quorum::All,
1655 );
1656
1657 let action = engine.next_action();
1658 assert!(engine.next_action().is_none());
1659
1660 match action {
1662 Some(QueryAction::SendMessage { query, peer, .. }) => {
1663 engine.register_response(
1664 query,
1665 peer,
1666 KademliaMessage::FindNode {
1667 target: Vec::new(),
1668 peers: vec![
1669 KademliaPeer::new(
1670 *iter.next().unwrap().1,
1671 vec![],
1672 ConnectionType::NotConnected,
1673 ),
1674 KademliaPeer::new(
1675 *iter.next().unwrap().1,
1676 vec![],
1677 ConnectionType::NotConnected,
1678 ),
1679 KademliaPeer::new(
1680 *iter.next().unwrap().1,
1681 vec![],
1682 ConnectionType::NotConnected,
1683 ),
1684 ],
1685 },
1686 );
1687 }
1688 _ => panic!("invalid event received"),
1689 }
1690
1691 for _ in 0..3 {
1693 match engine.next_action() {
1694 Some(QueryAction::SendMessage { query, peer, .. }) => {
1695 println!("next send message to {peer:?}");
1696 engine.register_response(
1697 query,
1698 peer,
1699 KademliaMessage::FindNode {
1700 target: Vec::new(),
1701 peers: vec![],
1702 },
1703 );
1704 }
1705 _ => panic!("invalid event received"),
1706 }
1707 }
1708
1709 let mut peers = match engine.next_action() {
1710 Some(QueryAction::AddProviderToFoundNodes {
1711 query,
1712 provided_key,
1713 provider,
1714 peers,
1715 quorum,
1716 }) => {
1717 assert_eq!(query, original_query_id);
1718 assert_eq!(provided_key, original_provided_key);
1719 assert_eq!(provider, local_content_provider);
1720 assert_eq!(peers.len(), 4);
1721 assert!(matches!(quorum, Quorum::All));
1722
1723 peers
1724 }
1725 _ => panic!("invalid event received"),
1726 };
1727
1728 engine.start_add_provider_to_found_nodes_requests_tracking(
1729 original_query_id,
1730 original_provided_key.clone(),
1731 peers.iter().map(|p| p.peer).collect(),
1732 Quorum::All,
1733 );
1734
1735 let last_peer = peers.pop().unwrap();
1737 for peer in peers {
1738 engine.register_send_success(original_query_id, peer.peer);
1739 }
1740 engine.register_send_failure(original_query_id, last_peer.peer);
1741
1742 match engine.next_action() {
1743 Some(QueryAction::QueryFailed { query }) => {
1744 assert_eq!(query, original_query_id);
1745 }
1746 _ => panic!("invalid event received"),
1747 }
1748
1749 assert!(engine.next_action().is_none());
1750 }
1751
1752 #[test]
1753 fn add_provider_succeeds() {
1754 let _ = tracing_subscriber::fmt()
1755 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1756 .try_init();
1757
1758 let local_peer_id = PeerId::random();
1759 let mut engine = QueryEngine::new(local_peer_id, 20usize, 3usize);
1760 let original_provided_key = RecordKey::new(&vec![1, 2, 3, 4]);
1761 let local_content_provider = ContentProvider {
1762 peer: local_peer_id,
1763 addresses: vec![],
1764 };
1765
1766 let target_key = Key::new(original_provided_key.clone());
1767 let distances = {
1768 let mut distances = std::collections::BTreeMap::new();
1769
1770 for i in 1..64 {
1771 let peer = make_peer_id(i, 0);
1772 let key = Key::from(peer);
1773
1774 distances.insert(target_key.distance(&key), peer);
1775 }
1776
1777 distances
1778 };
1779 let mut iter = distances.iter();
1780
1781 let add_query_id = QueryId(1340);
1783 let _query = engine.start_add_provider(
1784 add_query_id,
1785 original_provided_key.clone(),
1786 local_content_provider.clone(),
1787 vec![KademliaPeer::new(
1788 *iter.next().unwrap().1,
1789 vec![],
1790 ConnectionType::NotConnected,
1791 )]
1792 .into(),
1793 Quorum::All,
1794 );
1795
1796 let action = engine.next_action();
1797 assert!(engine.next_action().is_none());
1798
1799 match action {
1801 Some(QueryAction::SendMessage { query, peer, .. }) => {
1802 engine.register_response(
1803 query,
1804 peer,
1805 KademliaMessage::FindNode {
1806 target: Vec::new(),
1807 peers: vec![
1808 KademliaPeer::new(
1809 *iter.next().unwrap().1,
1810 vec![],
1811 ConnectionType::NotConnected,
1812 ),
1813 KademliaPeer::new(
1814 *iter.next().unwrap().1,
1815 vec![],
1816 ConnectionType::NotConnected,
1817 ),
1818 KademliaPeer::new(
1819 *iter.next().unwrap().1,
1820 vec![],
1821 ConnectionType::NotConnected,
1822 ),
1823 ],
1824 },
1825 );
1826 }
1827 _ => panic!("invalid event received"),
1828 }
1829
1830 for _ in 0..3 {
1832 match engine.next_action() {
1833 Some(QueryAction::SendMessage { query, peer, .. }) => {
1834 println!("next send message to {peer:?}");
1835 engine.register_response(
1836 query,
1837 peer,
1838 KademliaMessage::FindNode {
1839 target: Vec::new(),
1840 peers: vec![],
1841 },
1842 );
1843 }
1844 _ => panic!("invalid event received"),
1845 }
1846 }
1847
1848 let peers = match engine.next_action() {
1849 Some(QueryAction::AddProviderToFoundNodes {
1850 query,
1851 provided_key,
1852 provider,
1853 peers,
1854 quorum,
1855 }) => {
1856 assert_eq!(query, add_query_id);
1857 assert_eq!(provided_key, original_provided_key);
1858 assert_eq!(provider, local_content_provider);
1859 assert_eq!(peers.len(), 4);
1860 assert!(matches!(quorum, Quorum::All));
1861
1862 peers
1863 }
1864 _ => panic!("invalid event received"),
1865 };
1866
1867 engine.start_add_provider_to_found_nodes_requests_tracking(
1868 add_query_id,
1869 original_provided_key.clone(),
1870 peers.iter().map(|p| p.peer).collect(),
1871 Quorum::All,
1872 );
1873
1874 for peer in &peers {
1876 engine.register_send_success(add_query_id, peer.peer);
1877 }
1878
1879 match engine.next_action() {
1880 Some(QueryAction::AddProviderQuerySucceeded {
1881 query,
1882 provided_key,
1883 }) => {
1884 assert_eq!(query, add_query_id);
1885 assert_eq!(provided_key, original_provided_key);
1886 }
1887 _ => panic!("invalid event received"),
1888 }
1889
1890 assert!(engine.next_action().is_none());
1891
1892 let get_query_id = QueryId(1341);
1894 let _query = engine.start_get_providers(
1895 get_query_id,
1896 original_provided_key.clone(),
1897 vec![
1898 KademliaPeer::new(peers[0].peer, vec![], ConnectionType::NotConnected),
1899 KademliaPeer::new(peers[1].peer, vec![], ConnectionType::NotConnected),
1900 KademliaPeer::new(peers[2].peer, vec![], ConnectionType::NotConnected),
1901 KademliaPeer::new(peers[3].peer, vec![], ConnectionType::NotConnected),
1902 ]
1903 .into(),
1904 vec![],
1905 );
1906
1907 for _ in 0..4 {
1908 match engine.next_action() {
1909 Some(QueryAction::SendMessage { query, peer, .. }) => {
1910 assert_eq!(query, get_query_id);
1911 engine.register_response(
1912 query,
1913 peer,
1914 KademliaMessage::GetProviders {
1915 key: Some(original_provided_key.clone()),
1916 peers: vec![],
1917 providers: vec![local_content_provider.clone().into()],
1918 },
1919 );
1920 }
1921 event => panic!("invalid event received {:?}", event),
1922 }
1923 }
1924
1925 match engine.next_action() {
1926 Some(QueryAction::GetProvidersQueryDone {
1927 query_id,
1928 provided_key,
1929 providers,
1930 }) => {
1931 assert_eq!(query_id, get_query_id);
1932 assert_eq!(provided_key, original_provided_key);
1933 assert_eq!(providers, vec![local_content_provider]);
1934 }
1935 event => panic!("invalid event received {:?}", event),
1936 }
1937 }
1938
1939 #[test]
1940 fn add_provider_succeeds_with_quorum_one() {
1941 let _ = tracing_subscriber::fmt()
1942 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1943 .try_init();
1944
1945 let local_peer_id = PeerId::random();
1946 let mut engine = QueryEngine::new(local_peer_id, 20usize, 3usize);
1947 let original_provided_key = RecordKey::new(&vec![1, 2, 3, 4]);
1948 let local_content_provider = ContentProvider {
1949 peer: local_peer_id,
1950 addresses: vec![],
1951 };
1952
1953 let target_key = Key::new(original_provided_key.clone());
1954 let distances = {
1955 let mut distances = std::collections::BTreeMap::new();
1956
1957 for i in 1..64 {
1958 let peer = make_peer_id(i, 0);
1959 let key = Key::from(peer);
1960
1961 distances.insert(target_key.distance(&key), peer);
1962 }
1963
1964 distances
1965 };
1966 let mut iter = distances.iter();
1967
1968 let add_query_id = QueryId(1340);
1970 let _query = engine.start_add_provider(
1971 add_query_id,
1972 original_provided_key.clone(),
1973 local_content_provider.clone(),
1974 vec![KademliaPeer::new(
1975 *iter.next().unwrap().1,
1976 vec![],
1977 ConnectionType::NotConnected,
1978 )]
1979 .into(),
1980 Quorum::One,
1981 );
1982
1983 let action = engine.next_action();
1984 assert!(engine.next_action().is_none());
1985
1986 match action {
1988 Some(QueryAction::SendMessage { query, peer, .. }) => {
1989 engine.register_response(
1990 query,
1991 peer,
1992 KademliaMessage::FindNode {
1993 target: Vec::new(),
1994 peers: vec![
1995 KademliaPeer::new(
1996 *iter.next().unwrap().1,
1997 vec![],
1998 ConnectionType::NotConnected,
1999 ),
2000 KademliaPeer::new(
2001 *iter.next().unwrap().1,
2002 vec![],
2003 ConnectionType::NotConnected,
2004 ),
2005 KademliaPeer::new(
2006 *iter.next().unwrap().1,
2007 vec![],
2008 ConnectionType::NotConnected,
2009 ),
2010 ],
2011 },
2012 );
2013 }
2014 _ => panic!("invalid event received"),
2015 }
2016
2017 for _ in 0..3 {
2019 match engine.next_action() {
2020 Some(QueryAction::SendMessage { query, peer, .. }) => {
2021 println!("next send message to {peer:?}");
2022 engine.register_response(
2023 query,
2024 peer,
2025 KademliaMessage::FindNode {
2026 target: Vec::new(),
2027 peers: vec![],
2028 },
2029 );
2030 }
2031 _ => panic!("invalid event received"),
2032 }
2033 }
2034
2035 let peers = match engine.next_action() {
2036 Some(QueryAction::AddProviderToFoundNodes {
2037 query,
2038 provided_key,
2039 provider,
2040 peers,
2041 quorum,
2042 }) => {
2043 assert_eq!(query, add_query_id);
2044 assert_eq!(provided_key, original_provided_key);
2045 assert_eq!(provider, local_content_provider);
2046 assert_eq!(peers.len(), 4);
2047 assert!(matches!(quorum, Quorum::One));
2048
2049 peers
2050 }
2051 _ => panic!("invalid event received"),
2052 };
2053
2054 engine.start_add_provider_to_found_nodes_requests_tracking(
2055 add_query_id,
2056 original_provided_key.clone(),
2057 peers.iter().map(|p| p.peer).collect(),
2058 Quorum::One,
2059 );
2060
2061 assert!(peers.len() > 1);
2063 engine.register_send_success(add_query_id, peers.first().unwrap().peer);
2064 for peer in peers.iter().skip(1) {
2065 engine.register_send_failure(add_query_id, peer.peer);
2066 }
2067
2068 match engine.next_action() {
2069 Some(QueryAction::AddProviderQuerySucceeded {
2070 query,
2071 provided_key,
2072 }) => {
2073 assert_eq!(query, add_query_id);
2074 assert_eq!(provided_key, original_provided_key);
2075 }
2076 _ => panic!("invalid event received"),
2077 }
2078
2079 assert!(engine.next_action().is_none());
2080
2081 let get_query_id = QueryId(1341);
2083 let _query = engine.start_get_providers(
2084 get_query_id,
2085 original_provided_key.clone(),
2086 vec![
2087 KademliaPeer::new(peers[0].peer, vec![], ConnectionType::NotConnected),
2088 KademliaPeer::new(peers[1].peer, vec![], ConnectionType::NotConnected),
2089 KademliaPeer::new(peers[2].peer, vec![], ConnectionType::NotConnected),
2090 KademliaPeer::new(peers[3].peer, vec![], ConnectionType::NotConnected),
2091 ]
2092 .into(),
2093 vec![],
2094 );
2095
2096 match engine.next_action() {
2098 Some(QueryAction::SendMessage { query, peer, .. }) => {
2099 assert_eq!(query, get_query_id);
2100 engine.register_response(
2101 query,
2102 peer,
2103 KademliaMessage::GetProviders {
2104 key: Some(original_provided_key.clone()),
2105 peers: vec![],
2106 providers: vec![local_content_provider.clone().into()],
2107 },
2108 );
2109 }
2110 event => panic!("invalid event received {:?}", event),
2111 }
2112
2113 for _ in 1..4 {
2115 match engine.next_action() {
2116 Some(QueryAction::SendMessage { query, peer, .. }) => {
2117 assert_eq!(query, get_query_id);
2118 engine.register_response(
2119 query,
2120 peer,
2121 KademliaMessage::GetProviders {
2122 key: Some(original_provided_key.clone()),
2123 peers: vec![],
2124 providers: vec![],
2125 },
2126 );
2127 }
2128 event => panic!("invalid event received {:?}", event),
2129 }
2130 }
2131
2132 match engine.next_action() {
2133 Some(QueryAction::GetProvidersQueryDone {
2134 query_id,
2135 provided_key,
2136 providers,
2137 }) => {
2138 assert_eq!(query_id, get_query_id);
2139 assert_eq!(provided_key, original_provided_key);
2140 assert_eq!(providers, vec![local_content_provider]);
2141 }
2142 event => panic!("invalid event received {:?}", event),
2143 }
2144 }
2145}