1use crate::{
24 error::{Error, ImmediateDialError, SubstreamError},
25 protocol::{
26 libp2p::kademlia::{
27 bucket::KBucketEntry,
28 executor::{QueryContext, QueryExecutor, QueryResult},
29 message::KademliaMessage,
30 query::{QueryAction, QueryEngine},
31 routing_table::RoutingTable,
32 store::{MemoryStore, MemoryStoreAction},
33 types::{ConnectionType, KademliaPeer, Key},
34 },
35 Direction, TransportEvent, TransportService,
36 },
37 substream::Substream,
38 transport::Endpoint,
39 types::SubstreamId,
40 PeerId,
41};
42
43use bytes::{Bytes, BytesMut};
44use futures::StreamExt;
45use multiaddr::Multiaddr;
46use tokio::sync::mpsc::{Receiver, Sender};
47
48use std::{
49 collections::{hash_map::Entry, HashMap},
50 sync::{
51 atomic::{AtomicUsize, Ordering},
52 Arc,
53 },
54 time::{Duration, Instant},
55};
56
57pub use config::{Config, ConfigBuilder};
58pub use handle::{
59 IncomingRecordValidationMode, KademliaCommand, KademliaEvent, KademliaHandle, Quorum,
60 RoutingTableUpdateMode,
61};
62pub use query::QueryId;
63pub use record::{ContentProvider, Key as RecordKey, PeerRecord, Record};
64
65const LOG_TARGET: &str = "litep2p::ipfs::kademlia";
67
68const PARALLELISM_FACTOR: usize = 3;
70
71mod bucket;
72mod config;
73mod executor;
74mod handle;
75mod message;
76mod query;
77mod record;
78mod routing_table;
79mod store;
80mod types;
81
82mod schema {
83 pub(super) mod kademlia {
84 include!(concat!(env!("OUT_DIR"), "/kademlia.rs"));
85 }
86}
87
88#[derive(Debug, Clone)]
90#[allow(clippy::enum_variant_names)]
91enum PeerAction {
92 SendFindNode(QueryId),
95
96 SendPutValue(QueryId, Bytes),
98
99 SendAddProvider(QueryId, Bytes),
101}
102
103impl PeerAction {
104 fn query_id(&self) -> QueryId {
105 match self {
106 PeerAction::SendFindNode(query_id) => *query_id,
107 PeerAction::SendPutValue(query_id, _) => *query_id,
108 PeerAction::SendAddProvider(query_id, _) => *query_id,
109 }
110 }
111}
112
113#[derive(Default)]
115struct PeerContext {
116 pending_actions: HashMap<SubstreamId, PeerAction>,
118}
119
120impl PeerContext {
121 pub fn new() -> Self {
123 Self {
124 pending_actions: HashMap::new(),
125 }
126 }
127
128 pub fn add_pending_action(&mut self, substream_id: SubstreamId, action: PeerAction) {
130 self.pending_actions.insert(substream_id, action);
131 }
132}
133
134pub(crate) struct Kademlia {
136 service: TransportService,
138
139 local_key: Key<PeerId>,
141
142 peers: HashMap<PeerId, PeerContext>,
144
145 event_tx: Sender<KademliaEvent>,
147
148 cmd_rx: Receiver<KademliaCommand>,
150
151 next_query_id: Arc<AtomicUsize>,
153
154 routing_table: RoutingTable,
156
157 replication_factor: usize,
159
160 store: MemoryStore,
162
163 pending_substreams: HashMap<SubstreamId, PeerId>,
165
166 pending_dials: HashMap<PeerId, Vec<PeerAction>>,
168
169 update_mode: RoutingTableUpdateMode,
171
172 validation_mode: IncomingRecordValidationMode,
174
175 record_ttl: Duration,
177
178 engine: QueryEngine,
180
181 executor: QueryExecutor,
183}
184
185impl Kademlia {
186 pub(crate) fn new(mut service: TransportService, config: Config) -> Self {
188 let local_peer_id = service.local_peer_id();
189 let local_key = Key::from(service.local_peer_id());
190 let mut routing_table = RoutingTable::new(local_key.clone());
191
192 for (peer, addresses) in config.known_peers {
193 tracing::trace!(target: LOG_TARGET, ?peer, ?addresses, "add bootstrap peer");
194
195 routing_table.add_known_peer(peer, addresses.clone(), ConnectionType::NotConnected);
196 service.add_known_address(&peer, addresses.into_iter());
197 }
198
199 let store = MemoryStore::with_config(local_peer_id, config.memory_store_config);
200
201 Self {
202 service,
203 routing_table,
204 peers: HashMap::new(),
205 cmd_rx: config.cmd_rx,
206 next_query_id: config.next_query_id,
207 store,
208 event_tx: config.event_tx,
209 local_key,
210 pending_dials: HashMap::new(),
211 executor: QueryExecutor::new(),
212 pending_substreams: HashMap::new(),
213 update_mode: config.update_mode,
214 validation_mode: config.validation_mode,
215 record_ttl: config.record_ttl,
216 replication_factor: config.replication_factor,
217 engine: QueryEngine::new(local_peer_id, config.replication_factor, PARALLELISM_FACTOR),
218 }
219 }
220
221 fn next_query_id(&mut self) -> QueryId {
223 let query_id = self.next_query_id.fetch_add(1, Ordering::Relaxed);
224
225 QueryId(query_id)
226 }
227
228 fn on_connection_established(&mut self, peer: PeerId, endpoint: Endpoint) -> crate::Result<()> {
230 tracing::trace!(target: LOG_TARGET, ?peer, "connection established");
231
232 match self.peers.entry(peer) {
233 Entry::Vacant(entry) => {
234 self.routing_table.on_connection_established(Key::from(peer), endpoint);
241
242 let Some(actions) = self.pending_dials.remove(&peer) else {
243 return Ok(());
247 };
248
249 let mut context = PeerContext::new();
252
253 for action in actions {
254 match self.service.open_substream(peer) {
255 Ok(substream_id) => {
256 context.add_pending_action(substream_id, action);
257 }
258 Err(error) => {
259 tracing::debug!(
260 target: LOG_TARGET,
261 ?peer,
262 ?action,
263 ?error,
264 "connection established to peer but failed to open substream",
265 );
266
267 if let PeerAction::SendFindNode(query_id) = action {
268 self.engine.register_send_failure(query_id, peer);
269 self.engine.register_response_failure(query_id, peer);
270 }
271 }
272 }
273 }
274
275 entry.insert(context);
276 Ok(())
277 }
278 Entry::Occupied(_) => {
279 tracing::warn!(
280 target: LOG_TARGET,
281 ?peer,
282 ?endpoint,
283 "connection already exists, discarding opening substreams, this is unexpected"
284 );
285
286 self.routing_table.on_connection_established(Key::from(peer), endpoint);
290
291 Err(Error::PeerAlreadyExists(peer))
292 }
293 }
294 }
295
296 async fn disconnect_peer(&mut self, peer: PeerId, query: Option<QueryId>) {
305 tracing::trace!(target: LOG_TARGET, ?peer, ?query, "disconnect peer");
306
307 if let Some(query) = query {
308 self.engine.register_peer_failure(query, peer);
309 }
310
311 if let Some(PeerContext { pending_actions }) = self.peers.remove(&peer) {
314 pending_actions.into_iter().for_each(|(_, action)| {
315 let query_id = action.query_id();
319 if Some(query_id) != query {
320 self.engine.register_peer_failure(query_id, peer);
321 }
322 });
323 }
324
325 if let KBucketEntry::Occupied(entry) = self.routing_table.entry(Key::from(peer)) {
326 entry.connection = ConnectionType::NotConnected;
327 }
328 }
329
330 async fn on_outbound_substream(
332 &mut self,
333 peer: PeerId,
334 substream_id: SubstreamId,
335 substream: Substream,
336 ) -> crate::Result<()> {
337 tracing::trace!(
338 target: LOG_TARGET,
339 ?peer,
340 ?substream_id,
341 "outbound substream opened",
342 );
343 let _ = self.pending_substreams.remove(&substream_id);
344
345 let pending_action = &mut self
346 .peers
347 .get_mut(&peer)
348 .ok_or(Error::PeerDoesntExist(peer))?
350 .pending_actions
351 .remove(&substream_id);
352
353 match pending_action.take() {
354 None => {
355 tracing::trace!(
356 target: LOG_TARGET,
357 ?peer,
358 ?substream_id,
359 "pending action doesn't exist for peer, closing substream",
360 );
361
362 let _ = substream.close().await;
363 return Ok(());
364 }
365 Some(PeerAction::SendFindNode(query)) => {
366 match self.engine.next_peer_action(&query, &peer) {
367 Some(QueryAction::SendMessage {
368 query,
369 peer,
370 message,
371 }) => {
372 tracing::trace!(target: LOG_TARGET, ?peer, ?query, "start sending message to peer");
373
374 self.executor.send_request_read_response(
375 peer,
376 Some(query),
377 message,
378 substream,
379 );
380 }
381 None => {
383 let _ = substream.close().await;
384 }
385 action => {
386 tracing::warn!(target: LOG_TARGET, ?query, ?peer, ?action, "unexpected action for `FIND_NODE`");
387 let _ = substream.close().await;
388 debug_assert!(false);
389 }
390 }
391 }
392 Some(PeerAction::SendPutValue(query, message)) => {
393 tracing::trace!(target: LOG_TARGET, ?peer, "send `PUT_VALUE` message");
394
395 self.executor.send_request_eat_response_failure(
396 peer,
397 Some(query),
398 message,
399 substream,
400 );
401 }
404 Some(PeerAction::SendAddProvider(query, message)) => {
405 tracing::trace!(target: LOG_TARGET, ?peer, "send `ADD_PROVIDER` message");
406
407 self.executor.send_message(peer, Some(query), message, substream);
408 }
409 }
410
411 Ok(())
412 }
413
414 async fn on_inbound_substream(&mut self, peer: PeerId, substream: Substream) {
416 tracing::trace!(target: LOG_TARGET, ?peer, "inbound substream opened");
417
418 self.peers.entry(peer).or_default();
421
422 self.executor.read_message(peer, None, substream);
423 }
424
425 async fn update_routing_table(&mut self, peers: &[KademliaPeer]) {
430 let peers: Vec<_> =
431 peers.iter().filter(|peer| peer.peer != self.service.local_peer_id()).collect();
432
433 let _ = self
436 .event_tx
437 .send(KademliaEvent::RoutingTableUpdate {
438 peers: peers.iter().map(|peer| peer.peer).collect::<Vec<PeerId>>(),
439 })
440 .await;
441
442 for info in peers {
443 let addresses = info.addresses();
444 self.service.add_known_address(&info.peer, addresses.clone().into_iter());
445
446 if std::matches!(self.update_mode, RoutingTableUpdateMode::Automatic) {
447 self.routing_table.add_known_peer(
448 info.peer,
449 addresses,
450 self.peers
451 .get(&info.peer)
452 .map_or(ConnectionType::NotConnected, |_| ConnectionType::Connected),
453 );
454 }
455 }
456 }
457
458 async fn on_message_received(
460 &mut self,
461 peer: PeerId,
462 query_id: Option<QueryId>,
463 message: BytesMut,
464 substream: Substream,
465 ) -> crate::Result<()> {
466 tracing::trace!(target: LOG_TARGET, ?peer, query = ?query_id, "handle message from peer");
467
468 match KademliaMessage::from_bytes(message, self.replication_factor)
469 .ok_or(Error::InvalidData)?
470 {
471 KademliaMessage::FindNode { target, peers } => {
472 match query_id {
473 Some(query_id) => {
474 tracing::trace!(
475 target: LOG_TARGET,
476 ?peer,
477 ?target,
478 query = ?query_id,
479 "handle `FIND_NODE` response",
480 );
481
482 self.update_routing_table(&peers).await;
484 self.engine.register_response(
485 query_id,
486 peer,
487 KademliaMessage::FindNode { target, peers },
488 );
489 substream.close().await;
490 }
491 None => {
492 tracing::trace!(
493 target: LOG_TARGET,
494 ?peer,
495 ?target,
496 "handle `FIND_NODE` request",
497 );
498
499 let message = KademliaMessage::find_node_response(
500 &target,
501 self.routing_table
502 .closest(&Key::new(target.as_ref()), self.replication_factor),
503 );
504 self.executor.send_message(peer, None, message.into(), substream);
505 }
506 }
507 }
508 KademliaMessage::PutValue { record } => match query_id {
509 Some(query_id) => {
510 tracing::trace!(
511 target: LOG_TARGET,
512 ?peer,
513 query = ?query_id,
514 record_key = ?record.key,
515 "handle `PUT_VALUE` response",
516 );
517
518 self.engine.register_response(
519 query_id,
520 peer,
521 KademliaMessage::PutValue { record },
522 );
523 substream.close().await;
524 }
525 None => {
526 tracing::trace!(
527 target: LOG_TARGET,
528 ?peer,
529 record_key = ?record.key,
530 "handle `PUT_VALUE` request",
531 );
532
533 if let IncomingRecordValidationMode::Automatic = self.validation_mode {
534 self.store.put(record.clone());
535 }
536
537 let message = KademliaMessage::put_value_response(
540 record.key.clone(),
541 record.value.clone(),
542 );
543 self.executor.send_message_eat_failure(peer, None, message, substream);
544 let _ = self.event_tx.send(KademliaEvent::IncomingRecord { record }).await;
548 }
549 },
550 KademliaMessage::GetRecord { key, record, peers } => {
551 match (query_id, key) {
552 (Some(query_id), key) => {
553 tracing::trace!(
554 target: LOG_TARGET,
555 ?peer,
556 query = ?query_id,
557 ?peers,
558 ?record,
559 "handle `GET_VALUE` response",
560 );
561
562 self.update_routing_table(&peers).await;
564
565 self.engine.register_response(
566 query_id,
567 peer,
568 KademliaMessage::GetRecord { key, record, peers },
569 );
570
571 substream.close().await;
572 }
573 (None, Some(key)) => {
574 tracing::trace!(
575 target: LOG_TARGET,
576 ?peer,
577 ?key,
578 "handle `GET_VALUE` request",
579 );
580
581 let value = self.store.get(&key).cloned();
582 let closest_peers = self
583 .routing_table
584 .closest(&Key::new(key.as_ref()), self.replication_factor);
585
586 let message =
587 KademliaMessage::get_value_response(key, closest_peers, value);
588 self.executor.send_message(peer, None, message.into(), substream);
589 }
590 (None, None) => tracing::debug!(
591 target: LOG_TARGET,
592 ?peer,
593 ?record,
594 ?peers,
595 "unable to handle `GET_RECORD` request with empty key",
596 ),
597 }
598 }
599 KademliaMessage::AddProvider { key, mut providers } => {
600 tracing::trace!(
601 target: LOG_TARGET,
602 ?peer,
603 ?key,
604 ?providers,
605 "handle `ADD_PROVIDER` message",
606 );
607
608 match (providers.len(), providers.pop()) {
609 (1, Some(provider)) => {
610 let addresses = provider.addresses();
611
612 if provider.peer == peer {
613 self.store.put_provider(
614 key.clone(),
615 ContentProvider {
616 peer,
617 addresses: addresses.clone(),
618 },
619 );
620
621 let _ = self
622 .event_tx
623 .send(KademliaEvent::IncomingProvider {
624 provided_key: key,
625 provider: ContentProvider {
626 peer: provider.peer,
627 addresses,
628 },
629 })
630 .await;
631 } else {
632 tracing::trace!(
633 target: LOG_TARGET,
634 publisher = ?peer,
635 provider = ?provider.peer,
636 "ignoring `ADD_PROVIDER` message with `publisher` != `provider`"
637 )
638 }
639 }
640 (n, _) => {
641 tracing::trace!(
642 target: LOG_TARGET,
643 publisher = ?peer,
644 ?n,
645 "ignoring `ADD_PROVIDER` message with `n` != 1 providers"
646 )
647 }
648 }
649 }
650 KademliaMessage::GetProviders {
651 key,
652 peers,
653 providers,
654 } => {
655 match (query_id, key) {
656 (Some(query_id), key) => {
657 tracing::trace!(
659 target: LOG_TARGET,
660 ?peer,
661 query = ?query_id,
662 ?key,
663 ?peers,
664 ?providers,
665 "handle `GET_PROVIDERS` response",
666 );
667
668 self.update_routing_table(&peers).await;
670
671 self.engine.register_response(
672 query_id,
673 peer,
674 KademliaMessage::GetProviders {
675 key,
676 peers,
677 providers,
678 },
679 );
680
681 substream.close().await;
682 }
683 (None, Some(key)) => {
684 tracing::trace!(
685 target: LOG_TARGET,
686 ?peer,
687 ?key,
688 "handle `GET_PROVIDERS` request",
689 );
690
691 let mut providers = self.store.get_providers(&key);
692
693 let local_peer_id = self.local_key.clone().into_preimage();
695 if let Some(p) =
696 providers.iter_mut().find(|p| p.peer == local_peer_id).as_mut()
697 {
698 p.addresses = self.service.public_addresses().get_addresses();
699 }
700
701 let closer_peers = self
702 .routing_table
703 .closest(&Key::new(key.as_ref()), self.replication_factor);
704
705 let message =
706 KademliaMessage::get_providers_response(providers, &closer_peers);
707 self.executor.send_message(peer, None, message.into(), substream);
708 }
709 (None, None) => tracing::debug!(
710 target: LOG_TARGET,
711 ?peer,
712 ?peers,
713 ?providers,
714 "unable to handle `GET_PROVIDERS` request with empty key",
715 ),
716 }
717 }
718 }
719
720 Ok(())
721 }
722
723 async fn on_substream_open_failure(
725 &mut self,
726 substream_id: SubstreamId,
727 error: SubstreamError,
728 ) {
729 tracing::trace!(
730 target: LOG_TARGET,
731 ?substream_id,
732 ?error,
733 "failed to open substream"
734 );
735
736 let Some(peer) = self.pending_substreams.remove(&substream_id) else {
737 tracing::debug!(
738 target: LOG_TARGET,
739 ?substream_id,
740 "outbound substream failed for non-existent peer"
741 );
742 return;
743 };
744
745 if let Some(context) = self.peers.get_mut(&peer) {
746 let query =
747 context.pending_actions.remove(&substream_id).as_ref().map(PeerAction::query_id);
748
749 self.disconnect_peer(peer, query).await;
750 }
751 }
752
753 fn on_dial_failure(&mut self, peer: PeerId, addresses: Vec<Multiaddr>) {
755 tracing::trace!(target: LOG_TARGET, ?peer, ?addresses, "failed to dial peer");
756
757 self.routing_table.on_dial_failure(Key::from(peer), &addresses);
758
759 let Some(actions) = self.pending_dials.remove(&peer) else {
760 return;
761 };
762
763 for action in actions {
764 let query = action.query_id();
765
766 tracing::trace!(
767 target: LOG_TARGET,
768 ?peer,
769 ?query,
770 ?addresses,
771 "report failure for pending query",
772 );
773
774 self.engine.register_send_failure(query, peer);
776 self.engine.register_response_failure(query, peer);
777 }
778 }
779
780 fn open_substream_or_dial(
782 &mut self,
783 peer: PeerId,
784 action: PeerAction,
785 query: Option<QueryId>,
786 ) -> Result<(), Error> {
787 match self.service.open_substream(peer) {
788 Ok(substream_id) => {
789 self.pending_substreams.insert(substream_id, peer);
790 self.peers.entry(peer).or_default().pending_actions.insert(substream_id, action);
791
792 Ok(())
793 }
794 Err(err) => {
795 tracing::trace!(target: LOG_TARGET, ?query, ?peer, ?err, "Failed to open substream. Dialing peer");
796
797 match self.service.dial(&peer) {
798 Ok(()) => {
799 self.pending_dials.entry(peer).or_default().push(action);
800 Ok(())
801 }
802
803 Err(ImmediateDialError::AlreadyConnected) => {
805 match self.service.open_substream(peer) {
807 Ok(substream_id) => {
808 self.pending_substreams.insert(substream_id, peer);
809 self.peers
810 .entry(peer)
811 .or_default()
812 .pending_actions
813 .insert(substream_id, action);
814 Ok(())
815 }
816 Err(err) => {
817 tracing::debug!(target: LOG_TARGET, ?query, ?peer, ?err, "Failed to open substream a second time");
818 Err(err.into())
819 }
820 }
821 }
822
823 Err(error) => {
824 tracing::trace!(target: LOG_TARGET, ?query, ?peer, ?error, "Failed to dial peer");
825 Err(error.into())
826 }
827 }
828 }
829 }
830 }
831
832 async fn on_query_action(&mut self, action: QueryAction) -> Result<(), (QueryId, PeerId)> {
834 match action {
835 QueryAction::SendMessage { query, peer, .. } => {
836 if self
838 .open_substream_or_dial(peer, PeerAction::SendFindNode(query), Some(query))
839 .is_err()
840 {
841 self.engine.register_send_failure(query, peer);
843 self.engine.register_response_failure(query, peer);
844 }
845 Ok(())
846 }
847 QueryAction::FindNodeQuerySucceeded {
848 target,
849 peers,
850 query,
851 } => {
852 tracing::debug!(
853 target: LOG_TARGET,
854 ?query,
855 peer = ?target,
856 num_peers = ?peers.len(),
857 "`FIND_NODE` succeeded",
858 );
859
860 let _ = self
861 .event_tx
862 .send(KademliaEvent::FindNodeSuccess {
863 target,
864 query_id: query,
865 peers: peers
866 .into_iter()
867 .map(|info| (info.peer, info.addresses()))
868 .collect(),
869 })
870 .await;
871 Ok(())
872 }
873 QueryAction::PutRecordToFoundNodes {
874 query,
875 record,
876 peers,
877 quorum,
878 } => {
879 tracing::trace!(
880 target: LOG_TARGET,
881 ?query,
882 record_key = ?record.key,
883 num_peers = ?peers.len(),
884 "store record to found peers",
885 );
886 let key = record.key.clone();
887 let message: Bytes = KademliaMessage::put_value(record);
888
889 for peer in &peers {
890 if let Err(error) = self.open_substream_or_dial(
891 peer.peer,
892 PeerAction::SendPutValue(query, message.clone()),
894 None,
895 ) {
896 tracing::debug!(
897 target: LOG_TARGET,
898 ?peer,
899 ?key,
900 ?error,
901 "failed to put record to peer",
902 );
903 }
904 }
905
906 self.engine.start_put_record_to_found_nodes_requests_tracking(
907 query,
908 key,
909 peers.into_iter().map(|peer| peer.peer).collect(),
910 quorum,
911 );
912
913 Ok(())
914 }
915 QueryAction::PutRecordQuerySucceeded { query, key } => {
916 tracing::debug!(target: LOG_TARGET, ?query, "`PUT_VALUE` query succeeded");
917
918 let _ = self
919 .event_tx
920 .send(KademliaEvent::PutRecordSuccess {
921 query_id: query,
922 key,
923 })
924 .await;
925 Ok(())
926 }
927 QueryAction::AddProviderToFoundNodes {
928 query,
929 provided_key,
930 provider,
931 peers,
932 quorum,
933 } => {
934 tracing::trace!(
935 target: LOG_TARGET,
936 ?provided_key,
937 num_peers = ?peers.len(),
938 "add provider record to found peers",
939 );
940
941 let message = KademliaMessage::add_provider(provided_key.clone(), provider);
942
943 for peer in &peers {
944 if let Err(error) = self.open_substream_or_dial(
945 peer.peer,
946 PeerAction::SendAddProvider(query, message.clone()),
947 None,
948 ) {
949 tracing::debug!(
950 target: LOG_TARGET,
951 ?peer,
952 ?provided_key,
953 ?error,
954 "failed to add provider record to peer",
955 )
956 }
957 }
958
959 self.engine.start_add_provider_to_found_nodes_requests_tracking(
960 query,
961 provided_key,
962 peers.into_iter().map(|peer| peer.peer).collect(),
963 quorum,
964 );
965
966 Ok(())
967 }
968 QueryAction::AddProviderQuerySucceeded {
969 query,
970 provided_key,
971 } => {
972 tracing::debug!(target: LOG_TARGET, ?query, "`ADD_PROVIDER` query succeeded");
973
974 let _ = self
975 .event_tx
976 .send(KademliaEvent::AddProviderSuccess {
977 query_id: query,
978 provided_key,
979 })
980 .await;
981 Ok(())
982 }
983 QueryAction::GetRecordQueryDone { query_id } => {
984 let _ = self.event_tx.send(KademliaEvent::GetRecordSuccess { query_id }).await;
985 Ok(())
986 }
987 QueryAction::GetProvidersQueryDone {
988 query_id,
989 provided_key,
990 providers,
991 } => {
992 let _ = self
993 .event_tx
994 .send(KademliaEvent::GetProvidersSuccess {
995 query_id,
996 provided_key,
997 providers,
998 })
999 .await;
1000 Ok(())
1001 }
1002 QueryAction::QueryFailed { query } => {
1003 tracing::debug!(target: LOG_TARGET, ?query, "query failed");
1004
1005 let _ = self.event_tx.send(KademliaEvent::QueryFailed { query_id: query }).await;
1006 Ok(())
1007 }
1008 QueryAction::GetRecordPartialResult { query_id, record } => {
1009 let _ = self
1010 .event_tx
1011 .send(KademliaEvent::GetRecordPartialResult { query_id, record })
1012 .await;
1013 Ok(())
1014 }
1015 QueryAction::QuerySucceeded { .. } => Ok(()),
1016 }
1017 }
1018
1019 pub async fn run(mut self) -> crate::Result<()> {
1021 tracing::debug!(target: LOG_TARGET, "starting kademlia event loop");
1022
1023 loop {
1024 while let Some(action) = self.engine.next_action() {
1026 if let Err((query, peer)) = self.on_query_action(action).await {
1027 self.disconnect_peer(peer, Some(query)).await;
1028 }
1029 }
1030
1031 tokio::select! {
1032 event = self.service.next() => match event {
1033 Some(TransportEvent::ConnectionEstablished { peer, endpoint }) => {
1034 if let Err(error) = self.on_connection_established(peer, endpoint) {
1035 tracing::debug!(
1036 target: LOG_TARGET,
1037 ?error,
1038 "failed to handle established connection",
1039 );
1040 }
1041 }
1042 Some(TransportEvent::ConnectionClosed { peer }) => {
1043 self.disconnect_peer(peer, None).await;
1044 }
1045 Some(TransportEvent::SubstreamOpened { peer, direction, substream, .. }) => {
1046 match direction {
1047 Direction::Inbound => self.on_inbound_substream(peer, substream).await,
1048 Direction::Outbound(substream_id) => {
1049 if let Err(error) = self
1050 .on_outbound_substream(peer, substream_id, substream)
1051 .await
1052 {
1053 tracing::debug!(
1054 target: LOG_TARGET,
1055 ?peer,
1056 ?substream_id,
1057 ?error,
1058 "failed to handle outbound substream",
1059 );
1060 }
1061 }
1062 }
1063 },
1064 Some(TransportEvent::SubstreamOpenFailure { substream, error }) => {
1065 self.on_substream_open_failure(substream, error).await;
1066 }
1067 Some(TransportEvent::DialFailure { peer, addresses }) =>
1068 self.on_dial_failure(peer, addresses),
1069 None => return Err(Error::EssentialTaskClosed),
1070 },
1071 context = self.executor.next() => {
1072 let QueryContext { peer, query_id, result } = context.unwrap();
1073
1074 match result {
1075 QueryResult::SendSuccess { substream } => {
1076 tracing::trace!(
1077 target: LOG_TARGET,
1078 ?peer,
1079 query = ?query_id,
1080 "message sent to peer",
1081 );
1082 let _ = substream.close().await;
1083
1084 if let Some(query_id) = query_id {
1085 self.engine.register_send_success(query_id, peer);
1086 }
1087 }
1088 QueryResult::AssumeSendSuccess => {
1093 tracing::trace!(
1094 target: LOG_TARGET,
1095 ?peer,
1096 query = ?query_id,
1097 "treating message as sent to peer",
1098 );
1099
1100 if let Some(query_id) = query_id {
1101 self.engine.register_send_success(query_id, peer);
1102 }
1103 }
1104 QueryResult::SendFailure { reason } => {
1105 tracing::debug!(
1106 target: LOG_TARGET,
1107 ?peer,
1108 query = ?query_id,
1109 ?reason,
1110 "failed to send message to peer",
1111 );
1112
1113 self.disconnect_peer(peer, query_id).await;
1114 }
1115 QueryResult::ReadSuccess { substream, message } => {
1116 tracing::trace!(
1117 target: LOG_TARGET,
1118 ?peer,
1119 query = ?query_id,
1120 "message read from peer",
1121 );
1122
1123 if let Some(query_id) = query_id {
1124 self.engine.register_send_success(query_id, peer);
1127 }
1128
1129 if let Err(error) = self.on_message_received(
1130 peer,
1131 query_id,
1132 message,
1133 substream
1134 ).await {
1135 tracing::debug!(
1136 target: LOG_TARGET,
1137 ?peer,
1138 ?error,
1139 "failed to process message",
1140 );
1141 }
1142 }
1143 QueryResult::ReadFailure { reason } => {
1144 tracing::debug!(
1145 target: LOG_TARGET,
1146 ?peer,
1147 query = ?query_id,
1148 ?reason,
1149 "failed to read message from substream",
1150 );
1151
1152 self.disconnect_peer(peer, query_id).await;
1153 }
1154 }
1155 },
1156 command = self.cmd_rx.recv() => {
1157 match command {
1158 Some(KademliaCommand::FindNode { peer, query_id }) => {
1159 tracing::debug!(
1160 target: LOG_TARGET,
1161 ?peer,
1162 query = ?query_id,
1163 "starting `FIND_NODE` query",
1164 );
1165
1166 self.engine.start_find_node(
1167 query_id,
1168 peer,
1169 self.routing_table
1170 .closest(&Key::from(peer), self.replication_factor)
1171 .into()
1172 );
1173 }
1174 Some(KademliaCommand::PutRecord { mut record, quorum, query_id }) => {
1175 tracing::debug!(
1176 target: LOG_TARGET,
1177 query = ?query_id,
1178 key = ?record.key,
1179 "store record to DHT",
1180 );
1181
1182 record.publisher = Some(self.local_key.clone().into_preimage());
1185
1186 record.expires = record
1188 .expires
1189 .or_else(|| Some(Instant::now() + self.record_ttl));
1190
1191 let key = Key::new(record.key.clone());
1192
1193 self.store.put(record.clone());
1194
1195 self.engine.start_put_record(
1196 query_id,
1197 record,
1198 self.routing_table.closest(&key, self.replication_factor).into(),
1199 quorum,
1200 );
1201 }
1202 Some(KademliaCommand::PutRecordToPeers {
1203 mut record,
1204 query_id,
1205 peers,
1206 update_local_store,
1207 quorum,
1208 }) => {
1209 tracing::debug!(
1210 target: LOG_TARGET,
1211 query = ?query_id,
1212 key = ?record.key,
1213 "store record to DHT to specified peers",
1214 );
1215
1216 record.expires = record
1218 .expires
1219 .or_else(|| Some(Instant::now() + self.record_ttl));
1220
1221 if update_local_store {
1222 self.store.put(record.clone());
1223 }
1224
1225 let peers = peers.into_iter().filter_map(|peer| {
1227 if peer == self.service.local_peer_id() {
1228 return None;
1229 }
1230
1231 match self.routing_table.entry(Key::from(peer)) {
1232 KBucketEntry::Occupied(entry) => Some(entry.clone()),
1233 KBucketEntry::Vacant(entry) if !entry.address_store.is_empty() =>
1234 Some(entry.clone()),
1235 _ => None,
1236 }
1237 }).collect();
1238
1239 self.engine.start_put_record_to_peers(
1240 query_id,
1241 record,
1242 peers,
1243 quorum,
1244 );
1245 }
1246 Some(KademliaCommand::StartProviding {
1247 key,
1248 quorum,
1249 query_id
1250 }) => {
1251 tracing::debug!(
1252 target: LOG_TARGET,
1253 query = ?query_id,
1254 ?key,
1255 "register as a content provider",
1256 );
1257
1258 let addresses = self.service.public_addresses().get_addresses();
1259 let provider = ContentProvider {
1260 peer: self.service.local_peer_id(),
1261 addresses,
1262 };
1263
1264 self.store.put_local_provider(key.clone(), quorum);
1265
1266 self.engine.start_add_provider(
1267 query_id,
1268 key.clone(),
1269 provider,
1270 self.routing_table
1271 .closest(&Key::new(key), self.replication_factor)
1272 .into(),
1273 quorum,
1274 );
1275 }
1276 Some(KademliaCommand::StopProviding {
1277 key,
1278 }) => {
1279 tracing::debug!(
1280 target: LOG_TARGET,
1281 ?key,
1282 "stop providing",
1283 );
1284
1285 self.store.remove_local_provider(key);
1286 }
1287 Some(KademliaCommand::GetRecord { key, quorum, query_id }) => {
1288 tracing::debug!(target: LOG_TARGET, ?key, "get record from DHT");
1289
1290 match (self.store.get(&key), quorum) {
1291 (Some(record), Quorum::One) => {
1292 let _ = self
1293 .event_tx
1294 .send(KademliaEvent::GetRecordPartialResult { query_id, record: PeerRecord {
1295 peer: self.service.local_peer_id(),
1296 record: record.clone(),
1297 } })
1298 .await;
1299
1300 let _ = self
1301 .event_tx
1302 .send(KademliaEvent::GetRecordSuccess {
1303 query_id,
1304 })
1305 .await;
1306 }
1307 (record, _) => {
1308 let local_record = record.is_some();
1309 if let Some(record) = record {
1310 let _ = self
1311 .event_tx
1312 .send(KademliaEvent::GetRecordPartialResult { query_id, record: PeerRecord {
1313 peer: self.service.local_peer_id(),
1314 record: record.clone(),
1315 } })
1316 .await;
1317 }
1318
1319 self.engine.start_get_record(
1320 query_id,
1321 key.clone(),
1322 self.routing_table
1323 .closest(&Key::new(key), self.replication_factor)
1324 .into(),
1325 quorum,
1326 local_record,
1327 );
1328 }
1329 }
1330
1331 }
1332 Some(KademliaCommand::GetProviders { key, query_id }) => {
1333 tracing::debug!(target: LOG_TARGET, ?key, "get providers from DHT");
1334
1335 let known_providers = self.store.get_providers(&key);
1336
1337 self.engine.start_get_providers(
1338 query_id,
1339 key.clone(),
1340 self.routing_table
1341 .closest(&Key::new(key), self.replication_factor)
1342 .into(),
1343 known_providers,
1344 );
1345 }
1346 Some(KademliaCommand::AddKnownPeer { peer, addresses }) => {
1347 tracing::trace!(
1348 target: LOG_TARGET,
1349 ?peer,
1350 ?addresses,
1351 "add known peer",
1352 );
1353
1354 self.routing_table.add_known_peer(
1355 peer,
1356 addresses.clone(),
1357 self.peers
1358 .get(&peer)
1359 .map_or(
1360 ConnectionType::NotConnected,
1361 |_| ConnectionType::Connected,
1362 ),
1363 );
1364 self.service.add_known_address(&peer, addresses.into_iter());
1365
1366 }
1367 Some(KademliaCommand::StoreRecord { mut record }) => {
1368 tracing::debug!(
1369 target: LOG_TARGET,
1370 key = ?record.key,
1371 "store record in local store",
1372 );
1373
1374 record.expires =
1376 record.expires.or_else(|| Some(Instant::now() + self.record_ttl));
1377
1378 self.store.put(record);
1379 }
1380 None => return Err(Error::EssentialTaskClosed),
1381 }
1382 },
1383 action = self.store.next_action() => match action {
1384 Some(MemoryStoreAction::RefreshProvider { provided_key, provider, quorum }) => {
1385 tracing::trace!(
1386 target: LOG_TARGET,
1387 ?provided_key,
1388 "republishing local provider",
1389 );
1390
1391 self.store.put_local_provider(provided_key.clone(), quorum);
1392
1393 let query_id = self.next_query_id();
1397 self.engine.start_add_provider(
1398 query_id,
1399 provided_key.clone(),
1400 provider,
1401 self.routing_table
1402 .closest(&Key::new(provided_key), self.replication_factor)
1403 .into(),
1404 quorum,
1405 );
1406 }
1407 None => {}
1408 }
1409 }
1410 }
1411 }
1412}
1413
1414#[cfg(test)]
1415mod tests {
1416 use super::*;
1417 use crate::{
1418 codec::ProtocolCodec,
1419 transport::{
1420 manager::{TransportManager, TransportManagerBuilder},
1421 KEEP_ALIVE_TIMEOUT,
1422 },
1423 types::protocol::ProtocolName,
1424 ConnectionId,
1425 };
1426 use multiaddr::Protocol;
1427 use multihash::Multihash;
1428 use std::str::FromStr;
1429 use tokio::sync::mpsc::channel;
1430
1431 #[allow(unused)]
1432 struct Context {
1433 _cmd_tx: Sender<KademliaCommand>,
1434 event_rx: Receiver<KademliaEvent>,
1435 }
1436
1437 fn make_kademlia() -> (Kademlia, Context, TransportManager) {
1438 let manager = TransportManagerBuilder::new().build();
1439
1440 let peer = PeerId::random();
1441 let (transport_service, _tx) = TransportService::new(
1442 peer,
1443 ProtocolName::from("/kad/1"),
1444 Vec::new(),
1445 Default::default(),
1446 manager.transport_manager_handle(),
1447 KEEP_ALIVE_TIMEOUT,
1448 );
1449 let (event_tx, event_rx) = channel(64);
1450 let (_cmd_tx, cmd_rx) = channel(64);
1451 let next_query_id = Arc::new(AtomicUsize::new(0usize));
1452
1453 let config = Config {
1454 protocol_names: vec![ProtocolName::from("/kad/1")],
1455 known_peers: HashMap::new(),
1456 codec: ProtocolCodec::UnsignedVarint(Some(70 * 1024)),
1457 replication_factor: 20usize,
1458 update_mode: RoutingTableUpdateMode::Automatic,
1459 validation_mode: IncomingRecordValidationMode::Automatic,
1460 record_ttl: Duration::from_secs(36 * 60 * 60),
1461 memory_store_config: Default::default(),
1462 event_tx,
1463 cmd_rx,
1464 next_query_id,
1465 };
1466
1467 (
1468 Kademlia::new(transport_service, config),
1469 Context { _cmd_tx, event_rx },
1470 manager,
1471 )
1472 }
1473
1474 #[tokio::test]
1475 async fn check_get_records_update() {
1476 let (mut kademlia, _context, _manager) = make_kademlia();
1477
1478 let key = RecordKey::from(vec![1, 2, 3]);
1479 let records = vec![
1480 PeerRecord {
1482 peer: PeerId::random(),
1483 record: Record::new(key.clone(), vec![0x1]),
1484 },
1485 PeerRecord {
1486 peer: PeerId::random(),
1487 record: Record::new(key.clone(), vec![0x1]),
1488 },
1489 PeerRecord {
1491 peer: PeerId::random(),
1492 record: Record::new(key.clone(), vec![0x2]),
1493 },
1494 ];
1495
1496 for record in records {
1497 let action = QueryAction::GetRecordPartialResult {
1498 query_id: QueryId(1),
1499 record,
1500 };
1501 assert!(kademlia.on_query_action(action).await.is_ok());
1502 }
1503
1504 let query_id = QueryId(1);
1505 let action = QueryAction::GetRecordQueryDone { query_id };
1506 assert!(kademlia.on_query_action(action).await.is_ok());
1507
1508 assert!(kademlia.store.get(&key).is_none());
1510 }
1511
1512 #[tokio::test]
1513 async fn check_get_records_update_with_expired_records() {
1514 let (mut kademlia, _context, _manager) = make_kademlia();
1515
1516 let key = RecordKey::from(vec![1, 2, 3]);
1517 let expired = std::time::Instant::now() - std::time::Duration::from_secs(10);
1518 let records = vec![
1519 PeerRecord {
1521 peer: PeerId::random(),
1522 record: Record {
1523 key: key.clone(),
1524 value: vec![0x1],
1525 publisher: None,
1526 expires: Some(expired),
1527 },
1528 },
1529 PeerRecord {
1530 peer: PeerId::random(),
1531 record: Record::new(key.clone(), vec![0x1]),
1532 },
1533 PeerRecord {
1535 peer: PeerId::random(),
1536 record: Record::new(key.clone(), vec![0x2]),
1537 },
1538 PeerRecord {
1539 peer: PeerId::random(),
1540 record: Record::new(key.clone(), vec![0x2]),
1541 },
1542 ];
1543
1544 for record in records {
1545 let action = QueryAction::GetRecordPartialResult {
1546 query_id: QueryId(1),
1547 record,
1548 };
1549 assert!(kademlia.on_query_action(action).await.is_ok());
1550 }
1551
1552 kademlia
1553 .on_query_action(QueryAction::GetRecordQueryDone {
1554 query_id: QueryId(1),
1555 })
1556 .await
1557 .unwrap();
1558
1559 assert!(kademlia.store.get(&key).is_none());
1561 }
1562
1563 #[tokio::test]
1564 async fn check_address_store_routing_table_updates() {
1565 let (mut kademlia, _context, _manager) = make_kademlia();
1566
1567 let peer = PeerId::random();
1568 let address_a = Multiaddr::from_str("/dns/domain1.com/tcp/30333").unwrap().with(
1569 Protocol::P2p(Multihash::from_bytes(&peer.to_bytes()).unwrap()),
1570 );
1571 let address_b = Multiaddr::from_str("/dns/domain1.com/tcp/30334").unwrap().with(
1572 Protocol::P2p(Multihash::from_bytes(&peer.to_bytes()).unwrap()),
1573 );
1574 let address_c = Multiaddr::from_str("/dns/domain1.com/tcp/30339").unwrap().with(
1575 Protocol::P2p(Multihash::from_bytes(&peer.to_bytes()).unwrap()),
1576 );
1577
1578 kademlia.routing_table.add_known_peer(
1580 peer,
1581 vec![address_a.clone()],
1582 ConnectionType::NotConnected,
1583 );
1584
1585 match kademlia.routing_table.entry(Key::from(peer)) {
1587 KBucketEntry::Occupied(entry) => {
1588 assert_eq!(entry.addresses(), vec![address_a.clone()]);
1589 }
1590 _ => panic!("Peer not found in routing table"),
1591 };
1592
1593 let _ = kademlia.on_connection_established(
1595 peer,
1596 Endpoint::Dialer {
1597 address: address_b.clone(),
1598 connection_id: ConnectionId::from(0),
1599 },
1600 );
1601
1602 match kademlia.routing_table.entry(Key::from(peer)) {
1605 KBucketEntry::Occupied(entry) => {
1606 assert_eq!(
1607 entry.addresses(),
1608 vec![address_b.clone(), address_a.clone()]
1609 );
1610 }
1611 _ => panic!("Peer not found in routing table"),
1612 };
1613
1614 let _ = kademlia.on_connection_established(
1616 peer,
1617 Endpoint::Listener {
1618 address: address_c.clone(),
1619 connection_id: ConnectionId::from(0),
1620 },
1621 );
1622 match kademlia.routing_table.entry(Key::from(peer)) {
1624 KBucketEntry::Occupied(entry) => {
1625 assert_eq!(
1626 entry.addresses(),
1627 vec![address_b.clone(), address_a.clone()]
1628 );
1629 }
1630 _ => panic!("Peer not found in routing table"),
1631 };
1632
1633 kademlia.on_dial_failure(peer, vec![address_b.clone(), address_b.clone()]);
1636
1637 match kademlia.routing_table.entry(Key::from(peer)) {
1638 KBucketEntry::Occupied(entry) => {
1639 assert_eq!(
1640 entry.addresses(),
1641 vec![address_a.clone(), address_b.clone()]
1642 );
1643 }
1644 _ => panic!("Peer not found in routing table"),
1645 };
1646 }
1647}