1use crate::{
22 protocol::libp2p::kademlia::{
23 message::KademliaMessage,
24 query::{
25 find_node::{FindNodeConfig, FindNodeContext},
26 get_record::{GetRecordConfig, GetRecordContext},
27 },
28 record::{Key as RecordKey, Record},
29 types::{KademliaPeer, Key},
30 PeerRecord, Quorum,
31 },
32 PeerId,
33};
34
35use bytes::Bytes;
36
37use std::collections::{HashMap, VecDeque};
38
39use self::find_many_nodes::FindManyNodesContext;
40
41mod find_many_nodes;
42mod find_node;
43mod get_record;
44
45const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query";
47
48#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
52pub struct QueryId(pub usize);
53
54#[derive(Debug)]
56enum QueryType {
57 FindNode {
59 context: FindNodeContext<PeerId>,
61 },
62
63 PutRecord {
65 record: Record,
67
68 context: FindNodeContext<RecordKey>,
70 },
71
72 PutRecordToPeers {
74 record: Record,
76
77 context: FindManyNodesContext,
79 },
80
81 GetRecord {
83 context: GetRecordContext,
85 },
86}
87
88#[derive(Debug, PartialEq, Eq)]
90pub enum QueryAction {
91 SendMessage {
93 query: QueryId,
95
96 peer: PeerId,
98
99 message: Bytes,
101 },
102
103 FindNodeQuerySucceeded {
105 query: QueryId,
107
108 target: PeerId,
110
111 peers: Vec<KademliaPeer>,
113 },
114
115 PutRecordToFoundNodes {
118 record: Record,
120
121 peers: Vec<KademliaPeer>,
123 },
124
125 GetRecordQueryDone {
127 query_id: QueryId,
129
130 records: Vec<PeerRecord>,
132 },
133
134 QuerySucceeded {
137 query: QueryId,
139 },
140
141 QueryFailed {
143 query: QueryId,
145 },
146}
147
148pub struct QueryEngine {
150 local_peer_id: PeerId,
152
153 replication_factor: usize,
155
156 parallelism_factor: usize,
158
159 queries: HashMap<QueryId, QueryType>,
161}
162
163impl QueryEngine {
164 pub fn new(
166 local_peer_id: PeerId,
167 replication_factor: usize,
168 parallelism_factor: usize,
169 ) -> Self {
170 Self {
171 local_peer_id,
172 replication_factor,
173 parallelism_factor,
174 queries: HashMap::new(),
175 }
176 }
177
178 pub fn start_find_node(
180 &mut self,
181 query_id: QueryId,
182 target: PeerId,
183 candidates: VecDeque<KademliaPeer>,
184 ) -> QueryId {
185 tracing::debug!(
186 target: LOG_TARGET,
187 ?query_id,
188 ?target,
189 num_peers = ?candidates.len(),
190 "start `FIND_NODE` query"
191 );
192
193 let target = Key::from(target);
194 let config = FindNodeConfig {
195 local_peer_id: self.local_peer_id,
196 replication_factor: self.replication_factor,
197 parallelism_factor: self.parallelism_factor,
198 query: query_id,
199 target,
200 };
201
202 self.queries.insert(
203 query_id,
204 QueryType::FindNode {
205 context: FindNodeContext::new(config, candidates),
206 },
207 );
208
209 query_id
210 }
211
212 pub fn start_put_record(
214 &mut self,
215 query_id: QueryId,
216 record: Record,
217 candidates: VecDeque<KademliaPeer>,
218 ) -> QueryId {
219 tracing::debug!(
220 target: LOG_TARGET,
221 ?query_id,
222 target = ?record.key,
223 num_peers = ?candidates.len(),
224 "start `PUT_VALUE` query"
225 );
226
227 let target = Key::new(record.key.clone());
228 let config = FindNodeConfig {
229 local_peer_id: self.local_peer_id,
230 replication_factor: self.replication_factor,
231 parallelism_factor: self.parallelism_factor,
232 query: query_id,
233 target,
234 };
235
236 self.queries.insert(
237 query_id,
238 QueryType::PutRecord {
239 record,
240 context: FindNodeContext::new(config, candidates),
241 },
242 );
243
244 query_id
245 }
246
247 pub fn start_put_record_to_peers(
249 &mut self,
250 query_id: QueryId,
251 record: Record,
252 peers_to_report: Vec<KademliaPeer>,
253 ) -> QueryId {
254 tracing::debug!(
255 target: LOG_TARGET,
256 ?query_id,
257 target = ?record.key,
258 num_peers = ?peers_to_report.len(),
259 "start `PUT_VALUE` query to peers"
260 );
261
262 self.queries.insert(
263 query_id,
264 QueryType::PutRecordToPeers {
265 record,
266 context: FindManyNodesContext::new(query_id, peers_to_report),
267 },
268 );
269
270 query_id
271 }
272
273 pub fn start_get_record(
275 &mut self,
276 query_id: QueryId,
277 target: RecordKey,
278 candidates: VecDeque<KademliaPeer>,
279 quorum: Quorum,
280 count: usize,
281 ) -> QueryId {
282 tracing::debug!(
283 target: LOG_TARGET,
284 ?query_id,
285 ?target,
286 num_peers = ?candidates.len(),
287 "start `GET_VALUE` query"
288 );
289
290 let target = Key::new(target);
291 let config = GetRecordConfig {
292 local_peer_id: self.local_peer_id,
293 known_records: count,
294 quorum,
295 replication_factor: self.replication_factor,
296 parallelism_factor: self.parallelism_factor,
297 query: query_id,
298 target,
299 };
300
301 self.queries.insert(
302 query_id,
303 QueryType::GetRecord {
304 context: GetRecordContext::new(config, candidates),
305 },
306 );
307
308 query_id
309 }
310
311 pub fn register_response_failure(&mut self, query: QueryId, peer: PeerId) {
313 tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register response failure");
314
315 match self.queries.get_mut(&query) {
316 None => {
317 tracing::trace!(target: LOG_TARGET, ?query, ?peer, "response failure for a stale query");
318 }
319 Some(QueryType::FindNode { context }) => {
320 context.register_response_failure(peer);
321 }
322 Some(QueryType::PutRecord { context, .. }) => {
323 context.register_response_failure(peer);
324 }
325 Some(QueryType::PutRecordToPeers { context, .. }) => {
326 context.register_response_failure(peer);
327 }
328 Some(QueryType::GetRecord { context }) => {
329 context.register_response_failure(peer);
330 }
331 }
332 }
333
334 pub fn register_response(&mut self, query: QueryId, peer: PeerId, message: KademliaMessage) {
336 tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register response");
337
338 match self.queries.get_mut(&query) {
339 None => {
340 tracing::trace!(target: LOG_TARGET, ?query, ?peer, "response failure for a stale query");
341 }
342 Some(QueryType::FindNode { context }) => match message {
343 KademliaMessage::FindNode { peers, .. } => {
344 context.register_response(peer, peers);
345 }
346 _ => unreachable!(),
347 },
348 Some(QueryType::PutRecord { context, .. }) => match message {
349 KademliaMessage::FindNode { peers, .. } => {
350 context.register_response(peer, peers);
351 }
352 _ => unreachable!(),
353 },
354 Some(QueryType::PutRecordToPeers { context, .. }) => match message {
355 KademliaMessage::FindNode { peers, .. } => {
356 context.register_response(peer, peers);
357 }
358 _ => unreachable!(),
359 },
360 Some(QueryType::GetRecord { context }) => match message {
361 KademliaMessage::GetRecord { record, peers, .. } => {
362 context.register_response(peer, record, peers);
363 }
364 _ => unreachable!(),
365 },
366 }
367 }
368
369 pub fn next_peer_action(&mut self, query: &QueryId, peer: &PeerId) -> Option<QueryAction> {
371 tracing::trace!(target: LOG_TARGET, ?query, ?peer, "get next peer action");
372
373 match self.queries.get_mut(query) {
374 None => {
375 tracing::trace!(target: LOG_TARGET, ?query, ?peer, "response failure for a stale query");
376 None
377 }
378 Some(QueryType::FindNode { context }) => context.next_peer_action(peer),
379 Some(QueryType::PutRecord { context, .. }) => context.next_peer_action(peer),
380 Some(QueryType::PutRecordToPeers { context, .. }) => context.next_peer_action(peer),
381 Some(QueryType::GetRecord { context }) => context.next_peer_action(peer),
382 }
383 }
384
385 fn on_query_succeeded(&mut self, query: QueryId) -> QueryAction {
388 match self.queries.remove(&query).expect("query to exist") {
389 QueryType::FindNode { context } => QueryAction::FindNodeQuerySucceeded {
390 query,
391 target: context.config.target.into_preimage(),
392 peers: context.responses.into_values().collect::<Vec<_>>(),
393 },
394 QueryType::PutRecord { record, context } => QueryAction::PutRecordToFoundNodes {
395 record,
396 peers: context.responses.into_values().collect::<Vec<_>>(),
397 },
398 QueryType::PutRecordToPeers { record, context } => QueryAction::PutRecordToFoundNodes {
399 record,
400 peers: context.peers_to_report,
401 },
402 QueryType::GetRecord { context } => QueryAction::GetRecordQueryDone {
403 query_id: context.config.query,
404 records: context.found_records(),
405 },
406 }
407 }
408
409 fn on_query_failed(&mut self, query: QueryId) -> QueryAction {
412 let _ = self.queries.remove(&query).expect("query to exist");
413
414 QueryAction::QueryFailed { query }
415 }
416
417 pub fn next_action(&mut self) -> Option<QueryAction> {
419 for (_, state) in self.queries.iter_mut() {
420 let action = match state {
421 QueryType::FindNode { context } => context.next_action(),
422 QueryType::PutRecord { context, .. } => context.next_action(),
423 QueryType::PutRecordToPeers { context, .. } => context.next_action(),
424 QueryType::GetRecord { context } => context.next_action(),
425 };
426
427 match action {
428 Some(QueryAction::QuerySucceeded { query }) => {
429 return Some(self.on_query_succeeded(query));
430 }
431 Some(QueryAction::QueryFailed { query }) =>
432 return Some(self.on_query_failed(query)),
433 Some(_) => return action,
434 _ => continue,
435 }
436 }
437
438 None
439 }
440}
441
442#[cfg(test)]
443mod tests {
444 use multihash::{Code, Multihash};
445
446 use super::*;
447 use crate::protocol::libp2p::kademlia::types::ConnectionType;
448
449 fn make_peer_id(first: u8, second: u8) -> PeerId {
451 let mut peer_id = vec![0u8; 32];
452 peer_id[0] = first;
453 peer_id[1] = second;
454
455 PeerId::from_bytes(
456 &Multihash::wrap(Code::Identity.into(), &peer_id)
457 .expect("The digest size is never too large")
458 .to_bytes(),
459 )
460 .unwrap()
461 }
462
463 #[test]
464 fn query_fails() {
465 let _ = tracing_subscriber::fmt()
466 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
467 .try_init();
468
469 let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
470 let target_peer = PeerId::random();
471 let _target_key = Key::from(target_peer);
472
473 let query = engine.start_find_node(
474 QueryId(1337),
475 target_peer,
476 vec![
477 KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
478 KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
479 KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
480 KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
481 ]
482 .into(),
483 );
484
485 for _ in 0..4 {
486 if let Some(QueryAction::SendMessage { query, peer, .. }) = engine.next_action() {
487 engine.register_response_failure(query, peer);
488 }
489 }
490
491 if let Some(QueryAction::QueryFailed { query: failed }) = engine.next_action() {
492 assert_eq!(failed, query);
493 }
494
495 assert!(engine.next_action().is_none());
496 }
497
498 #[test]
499 fn lookup_paused() {
500 let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
501 let target_peer = PeerId::random();
502 let _target_key = Key::from(target_peer);
503
504 let _ = engine.start_find_node(
505 QueryId(1338),
506 target_peer,
507 vec![
508 KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
509 KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
510 KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
511 KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
512 ]
513 .into(),
514 );
515
516 for _ in 0..3 {
517 let _ = engine.next_action();
518 }
519
520 assert!(engine.next_action().is_none());
521 }
522
523 #[test]
524 fn find_node_query_succeeds() {
525 let _ = tracing_subscriber::fmt()
526 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
527 .try_init();
528
529 let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
530 let target_peer = make_peer_id(0, 0);
531 let target_key = Key::from(target_peer);
532
533 let distances = {
534 let mut distances = std::collections::BTreeMap::new();
535
536 for i in 1..64 {
537 let peer = make_peer_id(i, 0);
538 let key = Key::from(peer);
539
540 distances.insert(target_key.distance(&key), peer);
541 }
542
543 distances
544 };
545 let mut iter = distances.iter();
546
547 let _query = engine.start_find_node(
549 QueryId(1339),
550 target_peer,
551 vec![KademliaPeer::new(
552 *iter.next().unwrap().1,
553 vec![],
554 ConnectionType::NotConnected,
555 )]
556 .into(),
557 );
558
559 let action = engine.next_action();
560 assert!(engine.next_action().is_none());
561
562 match action {
564 Some(QueryAction::SendMessage { query, peer, .. }) => {
565 engine.register_response(
566 query,
567 peer,
568 KademliaMessage::FindNode {
569 target: Vec::new(),
570 peers: vec![
571 KademliaPeer::new(
572 *iter.next().unwrap().1,
573 vec![],
574 ConnectionType::NotConnected,
575 ),
576 KademliaPeer::new(
577 *iter.next().unwrap().1,
578 vec![],
579 ConnectionType::NotConnected,
580 ),
581 KademliaPeer::new(
582 *iter.next().unwrap().1,
583 vec![],
584 ConnectionType::NotConnected,
585 ),
586 ],
587 },
588 );
589 }
590 _ => panic!("invalid event received"),
591 }
592
593 for _ in 0..3 {
595 match engine.next_action() {
596 Some(QueryAction::SendMessage { query, peer, .. }) => {
597 println!("next send message to {peer:?}");
598 engine.register_response(
599 query,
600 peer,
601 KademliaMessage::FindNode {
602 target: Vec::new(),
603 peers: vec![],
604 },
605 );
606 }
607 _ => panic!("invalid event received"),
608 }
609 }
610
611 match engine.next_action() {
612 Some(QueryAction::FindNodeQuerySucceeded { peers, .. }) => {
613 assert_eq!(peers.len(), 4);
614 }
615 _ => panic!("invalid event received"),
616 }
617
618 assert!(engine.next_action().is_none());
619 }
620
621 #[test]
622 fn put_record_succeeds() {
623 let _ = tracing_subscriber::fmt()
624 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
625 .try_init();
626
627 let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
628 let record_key = RecordKey::new(&vec![1, 2, 3, 4]);
629 let target_key = Key::new(record_key.clone());
630 let original_record = Record::new(record_key.clone(), vec![1, 3, 3, 7, 1, 3, 3, 8]);
631
632 let distances = {
633 let mut distances = std::collections::BTreeMap::new();
634
635 for i in 1..64 {
636 let peer = make_peer_id(i, 0);
637 let key = Key::from(peer);
638
639 distances.insert(target_key.distance(&key), peer);
640 }
641
642 distances
643 };
644 let mut iter = distances.iter();
645
646 let _query = engine.start_put_record(
648 QueryId(1340),
649 original_record.clone(),
650 vec![KademliaPeer::new(
651 *iter.next().unwrap().1,
652 vec![],
653 ConnectionType::NotConnected,
654 )]
655 .into(),
656 );
657
658 let action = engine.next_action();
659 assert!(engine.next_action().is_none());
660
661 match action {
663 Some(QueryAction::SendMessage { query, peer, .. }) => {
664 engine.register_response(
665 query,
666 peer,
667 KademliaMessage::FindNode {
668 target: Vec::new(),
669 peers: vec![
670 KademliaPeer::new(
671 *iter.next().unwrap().1,
672 vec![],
673 ConnectionType::NotConnected,
674 ),
675 KademliaPeer::new(
676 *iter.next().unwrap().1,
677 vec![],
678 ConnectionType::NotConnected,
679 ),
680 KademliaPeer::new(
681 *iter.next().unwrap().1,
682 vec![],
683 ConnectionType::NotConnected,
684 ),
685 ],
686 },
687 );
688 }
689 _ => panic!("invalid event received"),
690 }
691
692 for _ in 0..3 {
694 match engine.next_action() {
695 Some(QueryAction::SendMessage { query, peer, .. }) => {
696 println!("next send message to {peer:?}");
697 engine.register_response(
698 query,
699 peer,
700 KademliaMessage::FindNode {
701 target: Vec::new(),
702 peers: vec![],
703 },
704 );
705 }
706 _ => panic!("invalid event received"),
707 }
708 }
709
710 let peers = match engine.next_action() {
711 Some(QueryAction::PutRecordToFoundNodes { peers, record }) => {
712 assert_eq!(peers.len(), 4);
713 assert_eq!(record.key, original_record.key);
714 assert_eq!(record.value, original_record.value);
715 peers
716 }
717 _ => panic!("invalid event received"),
718 };
719
720 assert!(engine.next_action().is_none());
721
722 let _query = engine.start_get_record(
724 QueryId(1341),
725 record_key.clone(),
726 vec![
727 KademliaPeer::new(peers[0].peer, vec![], ConnectionType::NotConnected),
728 KademliaPeer::new(peers[1].peer, vec![], ConnectionType::NotConnected),
729 KademliaPeer::new(peers[2].peer, vec![], ConnectionType::NotConnected),
730 KademliaPeer::new(peers[3].peer, vec![], ConnectionType::NotConnected),
731 ]
732 .into(),
733 Quorum::All,
734 3,
735 );
736
737 for _ in 0..4 {
738 match engine.next_action() {
739 Some(QueryAction::SendMessage { query, peer, .. }) => {
740 engine.register_response(
741 query,
742 peer,
743 KademliaMessage::GetRecord {
744 record: Some(original_record.clone()),
745 peers: vec![],
746 key: Some(record_key.clone()),
747 },
748 );
749 }
750 _ => panic!("invalid event received"),
751 }
752 }
753
754 let peers: std::collections::HashSet<_> = peers.into_iter().map(|p| p.peer).collect();
755 match engine.next_action() {
756 Some(QueryAction::GetRecordQueryDone { records, .. }) => {
757 let query_peers = records
758 .iter()
759 .map(|peer_record| peer_record.peer)
760 .collect::<std::collections::HashSet<_>>();
761 assert_eq!(peers, query_peers);
762
763 let records: std::collections::HashSet<_> =
764 records.into_iter().map(|peer_record| peer_record.record).collect();
765 assert_eq!(records.len(), 1);
767 let record = records.into_iter().next().unwrap();
768
769 assert_eq!(record.key, original_record.key);
770 assert_eq!(record.value, original_record.value);
771 }
772 _ => panic!("invalid event received"),
773 }
774 }
775}