1use crate::{
24 error::{Error, ImmediateDialError, SubstreamError},
25 protocol::{
26 libp2p::kademlia::{
27 bucket::KBucketEntry,
28 executor::{QueryContext, QueryExecutor, QueryResult},
29 handle::KademliaCommand,
30 message::KademliaMessage,
31 query::{QueryAction, QueryEngine},
32 record::ProviderRecord,
33 routing_table::RoutingTable,
34 store::MemoryStore,
35 types::{ConnectionType, KademliaPeer, Key},
36 },
37 Direction, TransportEvent, TransportService,
38 },
39 substream::Substream,
40 types::SubstreamId,
41 PeerId,
42};
43
44use bytes::{Bytes, BytesMut};
45use futures::StreamExt;
46use multiaddr::Multiaddr;
47use tokio::sync::mpsc::{Receiver, Sender};
48
49use std::{
50 collections::{hash_map::Entry, HashMap},
51 time::{Duration, Instant},
52};
53
54pub use self::handle::RecordsType;
55pub use config::{Config, ConfigBuilder};
56pub use handle::{
57 IncomingRecordValidationMode, KademliaEvent, KademliaHandle, Quorum, RoutingTableUpdateMode,
58};
59pub use query::QueryId;
60pub use record::{Key as RecordKey, PeerRecord, Record};
61
62const LOG_TARGET: &str = "litep2p::ipfs::kademlia";
64
65const PARALLELISM_FACTOR: usize = 3;
67
68mod bucket;
69mod config;
70mod executor;
71mod handle;
72mod message;
73mod query;
74mod record;
75mod routing_table;
76mod store;
77mod types;
78
79mod schema {
80 pub(super) mod kademlia {
81 include!(concat!(env!("OUT_DIR"), "/kademlia.rs"));
82 }
83}
84
85#[derive(Debug)]
87enum PeerAction {
88 SendFindNode(QueryId),
90
91 SendPutValue(Bytes),
93}
94
95#[derive(Default)]
97struct PeerContext {
98 pending_actions: HashMap<SubstreamId, PeerAction>,
100}
101
102impl PeerContext {
103 pub fn new() -> Self {
105 Self {
106 pending_actions: HashMap::new(),
107 }
108 }
109
110 pub fn add_pending_action(&mut self, substream_id: SubstreamId, action: PeerAction) {
112 self.pending_actions.insert(substream_id, action);
113 }
114}
115
116pub(crate) struct Kademlia {
118 service: TransportService,
120
121 local_key: Key<PeerId>,
123
124 peers: HashMap<PeerId, PeerContext>,
126
127 event_tx: Sender<KademliaEvent>,
129
130 cmd_rx: Receiver<KademliaCommand>,
132
133 routing_table: RoutingTable,
135
136 replication_factor: usize,
138
139 store: MemoryStore,
141
142 pending_substreams: HashMap<SubstreamId, PeerId>,
144
145 pending_dials: HashMap<PeerId, Vec<PeerAction>>,
147
148 update_mode: RoutingTableUpdateMode,
150
151 validation_mode: IncomingRecordValidationMode,
153
154 record_ttl: Duration,
156
157 provider_ttl: Duration,
159
160 engine: QueryEngine,
162
163 executor: QueryExecutor,
165}
166
167impl Kademlia {
168 pub(crate) fn new(mut service: TransportService, config: Config) -> Self {
170 let local_peer_id = service.local_peer_id();
171 let local_key = Key::from(service.local_peer_id());
172 let mut routing_table = RoutingTable::new(local_key.clone());
173
174 for (peer, addresses) in config.known_peers {
175 tracing::trace!(target: LOG_TARGET, ?peer, ?addresses, "add bootstrap peer");
176
177 routing_table.add_known_peer(peer, addresses.clone(), ConnectionType::NotConnected);
178 service.add_known_address(&peer, addresses.into_iter());
179 }
180
181 Self {
182 service,
183 routing_table,
184 peers: HashMap::new(),
185 cmd_rx: config.cmd_rx,
186 store: MemoryStore::new(),
187 event_tx: config.event_tx,
188 local_key,
189 pending_dials: HashMap::new(),
190 executor: QueryExecutor::new(),
191 pending_substreams: HashMap::new(),
192 update_mode: config.update_mode,
193 validation_mode: config.validation_mode,
194 record_ttl: config.record_ttl,
195 provider_ttl: config.provider_ttl,
196 replication_factor: config.replication_factor,
197 engine: QueryEngine::new(local_peer_id, config.replication_factor, PARALLELISM_FACTOR),
198 }
199 }
200
201 fn on_connection_established(&mut self, peer: PeerId) -> crate::Result<()> {
203 tracing::trace!(target: LOG_TARGET, ?peer, "connection established");
204
205 match self.peers.entry(peer) {
206 Entry::Vacant(entry) => {
207 if let KBucketEntry::Occupied(entry) = self.routing_table.entry(Key::from(peer)) {
208 entry.connection = ConnectionType::Connected;
209 }
210
211 let Some(actions) = self.pending_dials.remove(&peer) else {
212 entry.insert(PeerContext::new());
213 return Ok(());
214 };
215
216 let mut context = PeerContext::new();
219
220 for action in actions {
221 match self.service.open_substream(peer) {
222 Ok(substream_id) => {
223 context.add_pending_action(substream_id, action);
224 }
225 Err(error) => {
226 tracing::debug!(
227 target: LOG_TARGET,
228 ?peer,
229 ?action,
230 ?error,
231 "connection established to peer but failed to open substream",
232 );
233
234 if let PeerAction::SendFindNode(query_id) = action {
235 self.engine.register_response_failure(query_id, peer);
236 }
237 }
238 }
239 }
240
241 entry.insert(context);
242 Ok(())
243 }
244 Entry::Occupied(_) => Err(Error::PeerAlreadyExists(peer)),
245 }
246 }
247
248 async fn disconnect_peer(&mut self, peer: PeerId, query: Option<QueryId>) {
257 tracing::trace!(target: LOG_TARGET, ?peer, ?query, "disconnect peer");
258
259 if let Some(query) = query {
260 self.engine.register_response_failure(query, peer);
261 }
262
263 if let Some(PeerContext { pending_actions }) = self.peers.remove(&peer) {
264 pending_actions.into_iter().for_each(|(_, action)| {
265 if let PeerAction::SendFindNode(query_id) = action {
266 self.engine.register_response_failure(query_id, peer);
267 }
268 });
269 }
270
271 if let KBucketEntry::Occupied(entry) = self.routing_table.entry(Key::from(peer)) {
272 entry.connection = ConnectionType::NotConnected;
273 }
274 }
275
276 async fn on_outbound_substream(
278 &mut self,
279 peer: PeerId,
280 substream_id: SubstreamId,
281 substream: Substream,
282 ) -> crate::Result<()> {
283 tracing::trace!(
284 target: LOG_TARGET,
285 ?peer,
286 ?substream_id,
287 "outbound substream opened",
288 );
289 let _ = self.pending_substreams.remove(&substream_id);
290
291 let pending_action = &mut self
292 .peers
293 .get_mut(&peer)
294 .ok_or(Error::PeerDoesntExist(peer))?
295 .pending_actions
296 .remove(&substream_id);
297
298 match pending_action.take() {
299 None => {
300 tracing::trace!(
301 target: LOG_TARGET,
302 ?peer,
303 ?substream_id,
304 "pending action doesn't exist for peer, closing substream",
305 );
306
307 let _ = substream.close().await;
308 return Ok(());
309 }
310 Some(PeerAction::SendFindNode(query)) => {
311 match self.engine.next_peer_action(&query, &peer) {
312 Some(QueryAction::SendMessage {
313 query,
314 peer,
315 message,
316 }) => {
317 tracing::trace!(target: LOG_TARGET, ?peer, ?query, "start sending message to peer");
318
319 self.executor.send_request_read_response(
320 peer,
321 Some(query),
322 message,
323 substream,
324 );
325 }
326 None => {
328 let _ = substream.close().await;
329 }
330 action => {
331 tracing::warn!(target: LOG_TARGET, ?query, ?peer, ?action, "unexpected action for `FIND_NODE`");
332 let _ = substream.close().await;
333 debug_assert!(false);
334 }
335 }
336 }
337 Some(PeerAction::SendPutValue(message)) => {
338 tracing::trace!(target: LOG_TARGET, ?peer, "send `PUT_VALUE` response");
339
340 self.executor.send_message(peer, message, substream);
341 }
342 }
343
344 Ok(())
345 }
346
347 async fn on_inbound_substream(&mut self, peer: PeerId, substream: Substream) {
349 tracing::trace!(target: LOG_TARGET, ?peer, "inbound substream opened");
350
351 self.executor.read_message(peer, None, substream);
352 }
353
354 async fn update_routing_table(&mut self, peers: &[KademliaPeer]) {
359 let peers: Vec<_> =
360 peers.iter().filter(|peer| peer.peer != self.service.local_peer_id()).collect();
361
362 let _ = self
365 .event_tx
366 .send(KademliaEvent::RoutingTableUpdate {
367 peers: peers.iter().map(|peer| peer.peer).collect::<Vec<PeerId>>(),
368 })
369 .await;
370
371 for info in peers {
372 self.service.add_known_address(&info.peer, info.addresses.iter().cloned());
373
374 if std::matches!(self.update_mode, RoutingTableUpdateMode::Automatic) {
375 self.routing_table.add_known_peer(
376 info.peer,
377 info.addresses.clone(),
378 self.peers
379 .get(&info.peer)
380 .map_or(ConnectionType::NotConnected, |_| ConnectionType::Connected),
381 );
382 }
383 }
384 }
385
386 async fn on_message_received(
388 &mut self,
389 peer: PeerId,
390 query_id: Option<QueryId>,
391 message: BytesMut,
392 substream: Substream,
393 ) -> crate::Result<()> {
394 tracing::trace!(target: LOG_TARGET, ?peer, query = ?query_id, "handle message from peer");
395
396 match KademliaMessage::from_bytes(message).ok_or(Error::InvalidData)? {
397 ref message @ KademliaMessage::FindNode {
398 ref target,
399 ref peers,
400 } => {
401 match query_id {
402 Some(query_id) => {
403 tracing::trace!(
404 target: LOG_TARGET,
405 ?peer,
406 ?target,
407 query = ?query_id,
408 "handle `FIND_NODE` response",
409 );
410
411 self.update_routing_table(peers).await;
413 self.engine.register_response(query_id, peer, message.clone());
414 }
415 None => {
416 tracing::trace!(
417 target: LOG_TARGET,
418 ?peer,
419 ?target,
420 "handle `FIND_NODE` request",
421 );
422
423 let message = KademliaMessage::find_node_response(
424 target,
425 self.routing_table
426 .closest(Key::from(target.clone()), self.replication_factor),
427 );
428 self.executor.send_message(peer, message.into(), substream);
429 }
430 }
431 }
432 KademliaMessage::PutValue { record } => {
433 tracing::trace!(
434 target: LOG_TARGET,
435 ?peer,
436 record_key = ?record.key,
437 "handle `PUT_VALUE` message",
438 );
439
440 if let IncomingRecordValidationMode::Automatic = self.validation_mode {
441 self.store.put(record.clone());
442 }
443
444 let _ = self.event_tx.send(KademliaEvent::IncomingRecord { record }).await;
445 }
446 ref message @ KademliaMessage::GetRecord {
447 ref key,
448 ref record,
449 ref peers,
450 } => {
451 match (query_id, key) {
452 (Some(query_id), _) => {
453 tracing::trace!(
454 target: LOG_TARGET,
455 ?peer,
456 query = ?query_id,
457 ?peers,
458 ?record,
459 "handle `GET_VALUE` response",
460 );
461
462 self.update_routing_table(peers).await;
464 self.engine.register_response(query_id, peer, message.clone());
465 }
466 (None, Some(key)) => {
467 tracing::trace!(
468 target: LOG_TARGET,
469 ?peer,
470 ?key,
471 "handle `GET_VALUE` request",
472 );
473
474 let value = self.store.get(key).cloned();
475 let closest_peers = self
476 .routing_table
477 .closest(Key::from(key.to_vec()), self.replication_factor);
478
479 let message = KademliaMessage::get_value_response(
480 (*key).clone(),
481 closest_peers,
482 value,
483 );
484 self.executor.send_message(peer, message.into(), substream);
485 }
486 (None, None) => tracing::debug!(
487 target: LOG_TARGET,
488 ?peer,
489 ?message,
490 "unable to handle `GET_RECORD` request with empty key",
491 ),
492 }
493 }
494 KademliaMessage::AddProvider { key, providers } => {
495 tracing::trace!(
496 target: LOG_TARGET,
497 ?peer,
498 ?key,
499 ?providers,
500 "handle `ADD_PROVIDER` message",
501 );
502
503 match (providers.len(), providers.first()) {
504 (1, Some(provider)) =>
505 if provider.peer == peer {
506 self.store.put_provider(ProviderRecord {
507 key,
508 provider: peer,
509 addresses: provider.addresses.clone(),
510 expires: Instant::now() + self.provider_ttl,
511 });
512 } else {
513 tracing::trace!(
514 target: LOG_TARGET,
515 publisher = ?peer,
516 provider = ?provider.peer,
517 "ignoring `ADD_PROVIDER` message with `publisher` != `provider`"
518 )
519 },
520 (n, _) => {
521 tracing::trace!(
522 target: LOG_TARGET,
523 publisher = ?peer,
524 ?n,
525 "ignoring `ADD_PROVIDER` message with `n` != 1 providers"
526 )
527 }
528 }
529 }
530 ref message @ KademliaMessage::GetProviders {
531 ref key,
532 ref peers,
533 ref providers,
534 } => {
535 match (query_id, key) {
536 (Some(query_id), key) => {
537 tracing::trace!(
539 target: LOG_TARGET,
540 ?peer,
541 query = ?query_id,
542 ?key,
543 ?peers,
544 ?providers,
545 "handle `GET_PROVIDERS` response",
546 );
547
548 self.update_routing_table(peers).await;
550
551 self.engine.register_response(query_id, peer, message.clone());
552 }
553 (None, Some(key)) => {
554 tracing::trace!(
555 target: LOG_TARGET,
556 ?peer,
557 ?key,
558 "handle `GET_PROVIDERS` request",
559 );
560
561 let providers = self.store.get_providers(key);
562 let closer_peers = self
567 .routing_table
568 .closest(Key::from(key.to_vec()), self.replication_factor);
569
570 let message = KademliaMessage::get_providers_response(
571 key.clone(),
572 providers,
573 &closer_peers,
574 );
575 self.executor.send_message(peer, message.into(), substream);
576 }
577 (None, None) => tracing::debug!(
578 target: LOG_TARGET,
579 ?peer,
580 ?message,
581 "unable to handle `GET_PROVIDERS` request with empty key",
582 ),
583 }
584 }
585 }
586
587 Ok(())
588 }
589
590 async fn on_substream_open_failure(
592 &mut self,
593 substream_id: SubstreamId,
594 error: SubstreamError,
595 ) {
596 tracing::trace!(
597 target: LOG_TARGET,
598 ?substream_id,
599 ?error,
600 "failed to open substream"
601 );
602
603 let Some(peer) = self.pending_substreams.remove(&substream_id) else {
604 tracing::debug!(
605 target: LOG_TARGET,
606 ?substream_id,
607 "outbound substream failed for non-existent peer"
608 );
609 return;
610 };
611
612 if let Some(context) = self.peers.get_mut(&peer) {
613 let query = match context.pending_actions.remove(&substream_id) {
614 Some(PeerAction::SendFindNode(query)) => Some(query),
615 _ => None,
616 };
617
618 self.disconnect_peer(peer, query).await;
619 }
620 }
621
622 fn on_dial_failure(&mut self, peer: PeerId, address: Multiaddr) {
624 tracing::trace!(target: LOG_TARGET, ?peer, ?address, "failed to dial peer");
625
626 let Some(actions) = self.pending_dials.remove(&peer) else {
627 return;
628 };
629
630 for action in actions {
631 if let PeerAction::SendFindNode(query_id) = action {
632 tracing::trace!(
633 target: LOG_TARGET,
634 ?peer,
635 query = ?query_id,
636 ?address,
637 "report failure for pending query",
638 );
639
640 self.engine.register_response_failure(query_id, peer);
641 }
642 }
643 }
644
645 fn open_substream_or_dial(
647 &mut self,
648 peer: PeerId,
649 action: PeerAction,
650 query: Option<QueryId>,
651 ) -> Result<(), Error> {
652 match self.service.open_substream(peer) {
653 Ok(substream_id) => {
654 self.pending_substreams.insert(substream_id, peer);
655 self.peers.entry(peer).or_default().pending_actions.insert(substream_id, action);
656
657 Ok(())
658 }
659 Err(err) => {
660 tracing::trace!(target: LOG_TARGET, ?query, ?peer, ?err, "Failed to open substream. Dialing peer");
661
662 match self.service.dial(&peer) {
663 Ok(()) => {
664 self.pending_dials.entry(peer).or_default().push(action);
665 Ok(())
666 }
667
668 Err(ImmediateDialError::AlreadyConnected) => {
670 match self.service.open_substream(peer) {
672 Ok(substream_id) => {
673 self.pending_substreams.insert(substream_id, peer);
674 self.peers
675 .entry(peer)
676 .or_default()
677 .pending_actions
678 .insert(substream_id, action);
679 Ok(())
680 }
681 Err(err) => {
682 tracing::trace!(target: LOG_TARGET, ?query, ?peer, ?err, "Failed to open substream a second time");
683 Err(err.into())
684 }
685 }
686 }
687
688 Err(error) => {
689 tracing::trace!(target: LOG_TARGET, ?query, ?peer, ?error, "Failed to dial peer");
690 Err(error.into())
691 }
692 }
693 }
694 }
695 }
696
697 async fn on_query_action(&mut self, action: QueryAction) -> Result<(), (QueryId, PeerId)> {
699 match action {
700 QueryAction::SendMessage { query, peer, .. } => {
701 if self
702 .open_substream_or_dial(peer, PeerAction::SendFindNode(query), Some(query))
703 .is_err()
704 {
705 self.engine.register_response_failure(query, peer);
707 }
708 Ok(())
709 }
710 QueryAction::FindNodeQuerySucceeded {
711 target,
712 peers,
713 query,
714 } => {
715 tracing::debug!(
716 target: LOG_TARGET,
717 ?query,
718 peer = ?target,
719 num_peers = ?peers.len(),
720 "`FIND_NODE` succeeded",
721 );
722
723 let _ = self
724 .event_tx
725 .send(KademliaEvent::FindNodeSuccess {
726 target,
727 query_id: query,
728 peers: peers.into_iter().map(|info| (info.peer, info.addresses)).collect(),
729 })
730 .await;
731 Ok(())
732 }
733 QueryAction::PutRecordToFoundNodes { record, peers } => {
734 tracing::trace!(
735 target: LOG_TARGET,
736 record_key = ?record.key,
737 num_peers = ?peers.len(),
738 "store record to found peers",
739 );
740 let key = record.key.clone();
741 let message = KademliaMessage::put_value(record);
742
743 for peer in peers {
744 if let Err(error) = self.open_substream_or_dial(
745 peer.peer,
746 PeerAction::SendPutValue(message.clone()),
747 None,
748 ) {
749 tracing::debug!(
750 target: LOG_TARGET,
751 ?peer,
752 ?key,
753 ?error,
754 "failed to put record to peer",
755 );
756 }
757 }
758
759 Ok(())
760 }
761 QueryAction::GetRecordQueryDone { query_id, records } => {
762 let _ = self
763 .event_tx
764 .send(KademliaEvent::GetRecordSuccess {
765 query_id,
766 records: RecordsType::Network(records),
767 })
768 .await;
769 Ok(())
770 }
771 QueryAction::QueryFailed { query } => {
772 tracing::debug!(target: LOG_TARGET, ?query, "query failed");
773
774 let _ = self.event_tx.send(KademliaEvent::QueryFailed { query_id: query }).await;
775 Ok(())
776 }
777 QueryAction::QuerySucceeded { .. } => unreachable!(),
778 }
779 }
780
781 pub async fn run(mut self) -> crate::Result<()> {
783 tracing::debug!(target: LOG_TARGET, "starting kademlia event loop");
784
785 loop {
786 while let Some(action) = self.engine.next_action() {
788 if let Err((query, peer)) = self.on_query_action(action).await {
789 self.disconnect_peer(peer, Some(query)).await;
790 }
791 }
792
793 tokio::select! {
794 event = self.service.next() => match event {
795 Some(TransportEvent::ConnectionEstablished { peer, .. }) => {
796 if let Err(error) = self.on_connection_established(peer) {
797 tracing::debug!(target: LOG_TARGET, ?error, "failed to handle established connection");
798 }
799 }
800 Some(TransportEvent::ConnectionClosed { peer }) => {
801 self.disconnect_peer(peer, None).await;
802 }
803 Some(TransportEvent::SubstreamOpened { peer, direction, substream, .. }) => {
804 match direction {
805 Direction::Inbound => self.on_inbound_substream(peer, substream).await,
806 Direction::Outbound(substream_id) => {
807 if let Err(error) = self.on_outbound_substream(peer, substream_id, substream).await {
808 tracing::debug!(
809 target: LOG_TARGET,
810 ?peer,
811 ?substream_id,
812 ?error,
813 "failed to handle outbound substream",
814 );
815 }
816 }
817 }
818 },
819 Some(TransportEvent::SubstreamOpenFailure { substream, error }) => {
820 self.on_substream_open_failure(substream, error).await;
821 }
822 Some(TransportEvent::DialFailure { peer, address, .. }) => self.on_dial_failure(peer, address),
823 None => return Err(Error::EssentialTaskClosed),
824 },
825 context = self.executor.next() => {
826 let QueryContext { peer, query_id, result } = context.unwrap();
827
828 match result {
829 QueryResult::SendSuccess { substream } => {
830 tracing::trace!(target: LOG_TARGET, ?peer, query = ?query_id, "message sent to peer");
831 let _ = substream.close().await;
832 }
833 QueryResult::ReadSuccess { substream, message } => {
834 tracing::trace!(target: LOG_TARGET, ?peer, query = ?query_id, "message read from peer");
835
836 if let Err(error) = self.on_message_received(peer, query_id, message, substream).await {
837 tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to process message");
838 }
839 }
840 QueryResult::SubstreamClosed | QueryResult::Timeout => {
841 tracing::debug!(
842 target: LOG_TARGET,
843 ?peer,
844 query = ?query_id,
845 ?result,
846 "failed to read message from substream",
847 );
848
849 self.disconnect_peer(peer, query_id).await;
850 }
851 }
852 }
853 command = self.cmd_rx.recv() => {
854 match command {
855 Some(KademliaCommand::FindNode { peer, query_id }) => {
856 tracing::debug!(target: LOG_TARGET, ?peer, query = ?query_id, "starting `FIND_NODE` query");
857
858 self.engine.start_find_node(
859 query_id,
860 peer,
861 self.routing_table.closest(Key::from(peer), self.replication_factor).into()
862 );
863 }
864 Some(KademliaCommand::PutRecord { mut record, query_id }) => {
865 tracing::debug!(target: LOG_TARGET, query = ?query_id, key = ?record.key, "store record to DHT");
866
867 record.publisher = Some(self.local_key.clone().into_preimage());
869
870 record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl));
872
873 let key = Key::new(record.key.clone());
874
875 self.store.put(record.clone());
876
877 self.engine.start_put_record(
878 query_id,
879 record,
880 self.routing_table.closest(key, self.replication_factor).into(),
881 );
882 }
883 Some(KademliaCommand::PutRecordToPeers { mut record, query_id, peers, update_local_store }) => {
884 tracing::debug!(target: LOG_TARGET, query = ?query_id, key = ?record.key, "store record to DHT to specified peers");
885
886 record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl));
888
889 if update_local_store {
890 self.store.put(record.clone());
891 }
892
893 let peers = peers.into_iter().filter_map(|peer| {
895 if peer == self.service.local_peer_id() {
896 return None;
897 }
898
899 match self.routing_table.entry(Key::from(peer)) {
900 KBucketEntry::Occupied(entry) => Some(entry.clone()),
901 KBucketEntry::Vacant(entry) if !entry.addresses.is_empty() => Some(entry.clone()),
902 _ => None,
903 }
904 }).collect();
905
906 self.engine.start_put_record_to_peers(
907 query_id,
908 record,
909 peers,
910 );
911 }
912 Some(KademliaCommand::GetRecord { key, quorum, query_id }) => {
913 tracing::debug!(target: LOG_TARGET, ?key, "get record from DHT");
914
915 match (self.store.get(&key), quorum) {
916 (Some(record), Quorum::One) => {
917 let _ = self
918 .event_tx
919 .send(KademliaEvent::GetRecordSuccess { query_id, records: RecordsType::LocalStore(record.clone()) })
920 .await;
921 }
922 (record, _) => {
923 self.engine.start_get_record(
924 query_id,
925 key.clone(),
926 self.routing_table.closest(Key::new(key.clone()), self.replication_factor).into(),
927 quorum,
928 if record.is_some() { 1 } else { 0 },
929 );
930 }
931 }
932
933 }
934 Some(KademliaCommand::AddKnownPeer { peer, addresses }) => {
935 tracing::trace!(
936 target: LOG_TARGET,
937 ?peer,
938 ?addresses,
939 "add known peer",
940 );
941
942 self.routing_table.add_known_peer(
943 peer,
944 addresses.clone(),
945 self.peers
946 .get(&peer)
947 .map_or(ConnectionType::NotConnected, |_| ConnectionType::Connected),
948 );
949 self.service.add_known_address(&peer, addresses.into_iter());
950
951 }
952 Some(KademliaCommand::StoreRecord { mut record }) => {
953 tracing::debug!(
954 target: LOG_TARGET,
955 key = ?record.key,
956 "store record in local store",
957 );
958
959 record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl));
961
962 self.store.put(record);
963 }
964 None => return Err(Error::EssentialTaskClosed),
965 }
966 },
967 }
968 }
969 }
970}
971
972#[cfg(test)]
973mod tests {
974 use std::collections::HashSet;
975
976 use super::*;
977 use crate::{
978 codec::ProtocolCodec,
979 crypto::ed25519::Keypair,
980 transport::{
981 manager::{limits::ConnectionLimitsConfig, TransportManager},
982 KEEP_ALIVE_TIMEOUT,
983 },
984 types::protocol::ProtocolName,
985 BandwidthSink,
986 };
987 use tokio::sync::mpsc::channel;
988
989 #[allow(unused)]
990 struct Context {
991 _cmd_tx: Sender<KademliaCommand>,
992 event_rx: Receiver<KademliaEvent>,
993 }
994
995 fn make_kademlia() -> (Kademlia, Context, TransportManager) {
996 let (manager, handle) = TransportManager::new(
997 Keypair::generate(),
998 HashSet::new(),
999 BandwidthSink::new(),
1000 8usize,
1001 ConnectionLimitsConfig::default(),
1002 );
1003
1004 let peer = PeerId::random();
1005 let (transport_service, _tx) = TransportService::new(
1006 peer,
1007 ProtocolName::from("/kad/1"),
1008 Vec::new(),
1009 Default::default(),
1010 handle,
1011 KEEP_ALIVE_TIMEOUT,
1012 );
1013 let (event_tx, event_rx) = channel(64);
1014 let (_cmd_tx, cmd_rx) = channel(64);
1015
1016 let config = Config {
1017 protocol_names: vec![ProtocolName::from("/kad/1")],
1018 known_peers: HashMap::new(),
1019 codec: ProtocolCodec::UnsignedVarint(None),
1020 replication_factor: 20usize,
1021 update_mode: RoutingTableUpdateMode::Automatic,
1022 validation_mode: IncomingRecordValidationMode::Automatic,
1023 record_ttl: Duration::from_secs(36 * 60 * 60),
1024 provider_ttl: Duration::from_secs(48 * 60 * 60),
1025 event_tx,
1026 cmd_rx,
1027 };
1028
1029 (
1030 Kademlia::new(transport_service, config),
1031 Context { _cmd_tx, event_rx },
1032 manager,
1033 )
1034 }
1035
1036 #[tokio::test]
1037 async fn check_get_records_update() {
1038 let (mut kademlia, _context, _manager) = make_kademlia();
1039
1040 let key = RecordKey::from(vec![1, 2, 3]);
1041 let records = vec![
1042 PeerRecord {
1044 peer: PeerId::random(),
1045 record: Record::new(key.clone(), vec![0x1]),
1046 },
1047 PeerRecord {
1048 peer: PeerId::random(),
1049 record: Record::new(key.clone(), vec![0x1]),
1050 },
1051 PeerRecord {
1053 peer: PeerId::random(),
1054 record: Record::new(key.clone(), vec![0x2]),
1055 },
1056 ];
1057
1058 let query_id = QueryId(1);
1059 let action = QueryAction::GetRecordQueryDone { query_id, records };
1060 assert!(kademlia.on_query_action(action).await.is_ok());
1061
1062 assert!(kademlia.store.get(&key).is_none());
1064 }
1065
1066 #[tokio::test]
1067 async fn check_get_records_update_with_expired_records() {
1068 let (mut kademlia, _context, _manager) = make_kademlia();
1069
1070 let key = RecordKey::from(vec![1, 2, 3]);
1071 let expired = std::time::Instant::now() - std::time::Duration::from_secs(10);
1072 let records = vec![
1073 PeerRecord {
1075 peer: PeerId::random(),
1076 record: Record {
1077 key: key.clone(),
1078 value: vec![0x1],
1079 publisher: None,
1080 expires: Some(expired),
1081 },
1082 },
1083 PeerRecord {
1084 peer: PeerId::random(),
1085 record: Record::new(key.clone(), vec![0x1]),
1086 },
1087 PeerRecord {
1089 peer: PeerId::random(),
1090 record: Record::new(key.clone(), vec![0x2]),
1091 },
1092 PeerRecord {
1093 peer: PeerId::random(),
1094 record: Record::new(key.clone(), vec![0x2]),
1095 },
1096 ];
1097
1098 let query_id = QueryId(1);
1099 let action = QueryAction::GetRecordQueryDone { query_id, records };
1100 assert!(kademlia.on_query_action(action).await.is_ok());
1101
1102 assert!(kademlia.store.get(&key).is_none());
1104 }
1105}