litep2p/protocol/libp2p/kademlia/
mod.rs

1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! [`/ipfs/kad/1.0.0`](https://github.com/libp2p/specs/blob/master/kad-dht/README.md) implementation.
22
23use crate::{
24    error::{Error, ImmediateDialError, SubstreamError},
25    protocol::{
26        libp2p::kademlia::{
27            bucket::KBucketEntry,
28            executor::{QueryContext, QueryExecutor, QueryResult},
29            message::KademliaMessage,
30            query::{QueryAction, QueryEngine},
31            routing_table::RoutingTable,
32            store::{MemoryStore, MemoryStoreAction},
33            types::{ConnectionType, KademliaPeer, Key},
34        },
35        Direction, TransportEvent, TransportService,
36    },
37    substream::Substream,
38    transport::Endpoint,
39    types::SubstreamId,
40    PeerId,
41};
42
43use bytes::{Bytes, BytesMut};
44use futures::StreamExt;
45use multiaddr::Multiaddr;
46use tokio::sync::mpsc::{Receiver, Sender};
47
48use std::{
49    collections::{hash_map::Entry, HashMap},
50    sync::{
51        atomic::{AtomicUsize, Ordering},
52        Arc,
53    },
54    time::{Duration, Instant},
55};
56
57pub use config::{Config, ConfigBuilder};
58pub use handle::{
59    IncomingRecordValidationMode, KademliaCommand, KademliaEvent, KademliaHandle, Quorum,
60    RoutingTableUpdateMode,
61};
62pub use query::QueryId;
63pub use record::{ContentProvider, Key as RecordKey, PeerRecord, Record};
64
65/// Logging target for the file.
66const LOG_TARGET: &str = "litep2p::ipfs::kademlia";
67
68/// Parallelism factor, `α`.
69const PARALLELISM_FACTOR: usize = 3;
70
71mod bucket;
72mod config;
73mod executor;
74mod handle;
75mod message;
76mod query;
77mod record;
78mod routing_table;
79mod store;
80mod types;
81
82mod schema {
83    pub(super) mod kademlia {
84        include!(concat!(env!("OUT_DIR"), "/kademlia.rs"));
85    }
86}
87
88/// Peer action.
89#[derive(Debug, Clone)]
90#[allow(clippy::enum_variant_names)]
91enum PeerAction {
92    /// Find nodes (and values/providers) as part of `FIND_NODE`/`GET_VALUE`/`GET_PROVIDERS` query.
93    // TODO: may be a better naming would be `SendFindRequest`?
94    SendFindNode(QueryId),
95
96    /// Send `PUT_VALUE` message to peer.
97    SendPutValue(QueryId, Bytes),
98
99    /// Send `ADD_PROVIDER` message to peer.
100    SendAddProvider(QueryId, Bytes),
101}
102
103impl PeerAction {
104    fn query_id(&self) -> QueryId {
105        match self {
106            PeerAction::SendFindNode(query_id) => *query_id,
107            PeerAction::SendPutValue(query_id, _) => *query_id,
108            PeerAction::SendAddProvider(query_id, _) => *query_id,
109        }
110    }
111}
112
113/// Peer context.
114#[derive(Default)]
115struct PeerContext {
116    /// Pending action, if any.
117    pending_actions: HashMap<SubstreamId, PeerAction>,
118}
119
120impl PeerContext {
121    /// Create new [`PeerContext`].
122    pub fn new() -> Self {
123        Self {
124            pending_actions: HashMap::new(),
125        }
126    }
127
128    /// Add pending action for peer.
129    pub fn add_pending_action(&mut self, substream_id: SubstreamId, action: PeerAction) {
130        self.pending_actions.insert(substream_id, action);
131    }
132}
133
134/// Main Kademlia object.
135pub(crate) struct Kademlia {
136    /// Transport service.
137    service: TransportService,
138
139    /// Local Kademlia key.
140    local_key: Key<PeerId>,
141
142    /// Connected peers,
143    peers: HashMap<PeerId, PeerContext>,
144
145    /// TX channel for sending events to `KademliaHandle`.
146    event_tx: Sender<KademliaEvent>,
147
148    /// RX channel for receiving commands from `KademliaHandle`.
149    cmd_rx: Receiver<KademliaCommand>,
150
151    /// Next query ID.
152    next_query_id: Arc<AtomicUsize>,
153
154    /// Routing table.
155    routing_table: RoutingTable,
156
157    /// Replication factor.
158    replication_factor: usize,
159
160    /// Record store.
161    store: MemoryStore,
162
163    /// Pending outbound substreams.
164    pending_substreams: HashMap<SubstreamId, PeerId>,
165
166    /// Pending dials.
167    pending_dials: HashMap<PeerId, Vec<PeerAction>>,
168
169    /// Routing table update mode.
170    update_mode: RoutingTableUpdateMode,
171
172    /// Incoming records validation mode.
173    validation_mode: IncomingRecordValidationMode,
174
175    /// Default record TTL.
176    record_ttl: Duration,
177
178    /// Query engine.
179    engine: QueryEngine,
180
181    /// Query executor.
182    executor: QueryExecutor,
183}
184
185impl Kademlia {
186    /// Create new [`Kademlia`].
187    pub(crate) fn new(mut service: TransportService, config: Config) -> Self {
188        let local_peer_id = service.local_peer_id();
189        let local_key = Key::from(service.local_peer_id());
190        let mut routing_table = RoutingTable::new(local_key.clone());
191
192        for (peer, addresses) in config.known_peers {
193            tracing::trace!(target: LOG_TARGET, ?peer, ?addresses, "add bootstrap peer");
194
195            routing_table.add_known_peer(peer, addresses.clone(), ConnectionType::NotConnected);
196            service.add_known_address(&peer, addresses.into_iter());
197        }
198
199        let store = MemoryStore::with_config(local_peer_id, config.memory_store_config);
200
201        Self {
202            service,
203            routing_table,
204            peers: HashMap::new(),
205            cmd_rx: config.cmd_rx,
206            next_query_id: config.next_query_id,
207            store,
208            event_tx: config.event_tx,
209            local_key,
210            pending_dials: HashMap::new(),
211            executor: QueryExecutor::new(),
212            pending_substreams: HashMap::new(),
213            update_mode: config.update_mode,
214            validation_mode: config.validation_mode,
215            record_ttl: config.record_ttl,
216            replication_factor: config.replication_factor,
217            engine: QueryEngine::new(local_peer_id, config.replication_factor, PARALLELISM_FACTOR),
218        }
219    }
220
221    /// Allocate next query ID.
222    fn next_query_id(&mut self) -> QueryId {
223        let query_id = self.next_query_id.fetch_add(1, Ordering::Relaxed);
224
225        QueryId(query_id)
226    }
227
228    /// Connection established to remote peer.
229    fn on_connection_established(&mut self, peer: PeerId, endpoint: Endpoint) -> crate::Result<()> {
230        tracing::trace!(target: LOG_TARGET, ?peer, "connection established");
231
232        match self.peers.entry(peer) {
233            Entry::Vacant(entry) => {
234                // Set the conenction type to connected and potentially save the address in the
235                // table.
236                //
237                // Note: this happens regardless of the state of the kademlia managed peers, because
238                // an already occupied entry in the `self.peers` map does not mean that we are
239                // no longer interested in the address / connection type of the peer.
240                self.routing_table.on_connection_established(Key::from(peer), endpoint);
241
242                let Some(actions) = self.pending_dials.remove(&peer) else {
243                    // Note that we do not add peer entry if we don't have any pending actions.
244                    // This is done to not populate `self.peers` with peers that don't support
245                    // our Kademlia protocol.
246                    return Ok(());
247                };
248
249                // go over all pending actions, open substreams and save the state to `PeerContext`
250                // from which it will be later queried when the substream opens
251                let mut context = PeerContext::new();
252
253                for action in actions {
254                    match self.service.open_substream(peer) {
255                        Ok(substream_id) => {
256                            context.add_pending_action(substream_id, action);
257                        }
258                        Err(error) => {
259                            tracing::debug!(
260                                target: LOG_TARGET,
261                                ?peer,
262                                ?action,
263                                ?error,
264                                "connection established to peer but failed to open substream",
265                            );
266
267                            if let PeerAction::SendFindNode(query_id) = action {
268                                self.engine.register_send_failure(query_id, peer);
269                                self.engine.register_response_failure(query_id, peer);
270                            }
271                        }
272                    }
273                }
274
275                entry.insert(context);
276                Ok(())
277            }
278            Entry::Occupied(_) => {
279                tracing::warn!(
280                    target: LOG_TARGET,
281                    ?peer,
282                    ?endpoint,
283                    "connection already exists, discarding opening substreams, this is unexpected"
284                );
285
286                // Update the connection in the routing table, similar as above. The function call
287                // happens in two places to avoid unnecessary cloning of the endpoint for logging
288                // purposes.
289                self.routing_table.on_connection_established(Key::from(peer), endpoint);
290
291                Err(Error::PeerAlreadyExists(peer))
292            }
293        }
294    }
295
296    /// Disconnect peer from `Kademlia`.
297    ///
298    /// Peer is disconnected either because the substream was detected closed
299    /// or because the connection was closed.
300    ///
301    /// The peer is kept in the routing table but its connection state is set
302    /// as `NotConnected`, meaning it can be evicted from a k-bucket if another
303    /// peer that shares the bucket connects.
304    async fn disconnect_peer(&mut self, peer: PeerId, query: Option<QueryId>) {
305        tracing::trace!(target: LOG_TARGET, ?peer, ?query, "disconnect peer");
306
307        if let Some(query) = query {
308            self.engine.register_peer_failure(query, peer);
309        }
310
311        // Apart from the failing query, we need to fail all other pending queries for the peer
312        // being disconnected.
313        if let Some(PeerContext { pending_actions }) = self.peers.remove(&peer) {
314            pending_actions.into_iter().for_each(|(_, action)| {
315                // Don't report failure twice for the same `query_id` if it was already reported
316                // above. (We can still have other pending queries for the peer that
317                // need to be reported.)
318                let query_id = action.query_id();
319                if Some(query_id) != query {
320                    self.engine.register_peer_failure(query_id, peer);
321                }
322            });
323        }
324
325        if let KBucketEntry::Occupied(entry) = self.routing_table.entry(Key::from(peer)) {
326            entry.connection = ConnectionType::NotConnected;
327        }
328    }
329
330    /// Local node opened a substream to remote node.
331    async fn on_outbound_substream(
332        &mut self,
333        peer: PeerId,
334        substream_id: SubstreamId,
335        substream: Substream,
336    ) -> crate::Result<()> {
337        tracing::trace!(
338            target: LOG_TARGET,
339            ?peer,
340            ?substream_id,
341            "outbound substream opened",
342        );
343        let _ = self.pending_substreams.remove(&substream_id);
344
345        let pending_action = &mut self
346            .peers
347            .get_mut(&peer)
348            // If we opened an outbound substream, we must have pending actions for the peer.
349            .ok_or(Error::PeerDoesntExist(peer))?
350            .pending_actions
351            .remove(&substream_id);
352
353        match pending_action.take() {
354            None => {
355                tracing::trace!(
356                    target: LOG_TARGET,
357                    ?peer,
358                    ?substream_id,
359                    "pending action doesn't exist for peer, closing substream",
360                );
361
362                let _ = substream.close().await;
363                return Ok(());
364            }
365            Some(PeerAction::SendFindNode(query)) => {
366                match self.engine.next_peer_action(&query, &peer) {
367                    Some(QueryAction::SendMessage {
368                        query,
369                        peer,
370                        message,
371                    }) => {
372                        tracing::trace!(target: LOG_TARGET, ?peer, ?query, "start sending message to peer");
373
374                        self.executor.send_request_read_response(
375                            peer,
376                            Some(query),
377                            message,
378                            substream,
379                        );
380                    }
381                    // query finished while the substream was being opened
382                    None => {
383                        let _ = substream.close().await;
384                    }
385                    action => {
386                        tracing::warn!(target: LOG_TARGET, ?query, ?peer, ?action, "unexpected action for `FIND_NODE`");
387                        let _ = substream.close().await;
388                        debug_assert!(false);
389                    }
390                }
391            }
392            Some(PeerAction::SendPutValue(query, message)) => {
393                tracing::trace!(target: LOG_TARGET, ?peer, "send `PUT_VALUE` message");
394
395                self.executor.send_request_eat_response_failure(
396                    peer,
397                    Some(query),
398                    message,
399                    substream,
400                );
401                // TODO: replace this with `send_request_read_response` as part of
402                // https://github.com/paritytech/litep2p/issues/429.
403            }
404            Some(PeerAction::SendAddProvider(query, message)) => {
405                tracing::trace!(target: LOG_TARGET, ?peer, "send `ADD_PROVIDER` message");
406
407                self.executor.send_message(peer, Some(query), message, substream);
408            }
409        }
410
411        Ok(())
412    }
413
414    /// Remote opened a substream to local node.
415    async fn on_inbound_substream(&mut self, peer: PeerId, substream: Substream) {
416        tracing::trace!(target: LOG_TARGET, ?peer, "inbound substream opened");
417
418        // Ensure peer entry exists to treat peer as [`ConnectionType::Connected`].
419        // when inserting into the routing table.
420        self.peers.entry(peer).or_default();
421
422        self.executor.read_message(peer, None, substream);
423    }
424
425    /// Update routing table if the routing table update mode was set to automatic.
426    ///
427    /// Inform user about the potential routing table, allowing them to update it manually if
428    /// the mode was set to manual.
429    async fn update_routing_table(&mut self, peers: &[KademliaPeer]) {
430        let peers: Vec<_> =
431            peers.iter().filter(|peer| peer.peer != self.service.local_peer_id()).collect();
432
433        // inform user about the routing table update, regardless of what the routing table update
434        // mode is
435        let _ = self
436            .event_tx
437            .send(KademliaEvent::RoutingTableUpdate {
438                peers: peers.iter().map(|peer| peer.peer).collect::<Vec<PeerId>>(),
439            })
440            .await;
441
442        for info in peers {
443            let addresses = info.addresses();
444            self.service.add_known_address(&info.peer, addresses.clone().into_iter());
445
446            if std::matches!(self.update_mode, RoutingTableUpdateMode::Automatic) {
447                self.routing_table.add_known_peer(
448                    info.peer,
449                    addresses,
450                    self.peers
451                        .get(&info.peer)
452                        .map_or(ConnectionType::NotConnected, |_| ConnectionType::Connected),
453                );
454            }
455        }
456    }
457
458    /// Handle received message.
459    async fn on_message_received(
460        &mut self,
461        peer: PeerId,
462        query_id: Option<QueryId>,
463        message: BytesMut,
464        substream: Substream,
465    ) -> crate::Result<()> {
466        tracing::trace!(target: LOG_TARGET, ?peer, query = ?query_id, "handle message from peer");
467
468        match KademliaMessage::from_bytes(message, self.replication_factor)
469            .ok_or(Error::InvalidData)?
470        {
471            KademliaMessage::FindNode { target, peers } => {
472                match query_id {
473                    Some(query_id) => {
474                        tracing::trace!(
475                            target: LOG_TARGET,
476                            ?peer,
477                            ?target,
478                            query = ?query_id,
479                            "handle `FIND_NODE` response",
480                        );
481
482                        // update routing table and inform user about the update
483                        self.update_routing_table(&peers).await;
484                        self.engine.register_response(
485                            query_id,
486                            peer,
487                            KademliaMessage::FindNode { target, peers },
488                        );
489                        substream.close().await;
490                    }
491                    None => {
492                        tracing::trace!(
493                            target: LOG_TARGET,
494                            ?peer,
495                            ?target,
496                            "handle `FIND_NODE` request",
497                        );
498
499                        let message = KademliaMessage::find_node_response(
500                            &target,
501                            self.routing_table
502                                .closest(&Key::new(target.as_ref()), self.replication_factor),
503                        );
504                        self.executor.send_message(peer, None, message.into(), substream);
505                    }
506                }
507            }
508            KademliaMessage::PutValue { record } => match query_id {
509                Some(query_id) => {
510                    tracing::trace!(
511                        target: LOG_TARGET,
512                        ?peer,
513                        query = ?query_id,
514                        record_key = ?record.key,
515                        "handle `PUT_VALUE` response",
516                    );
517
518                    self.engine.register_response(
519                        query_id,
520                        peer,
521                        KademliaMessage::PutValue { record },
522                    );
523                    substream.close().await;
524                }
525                None => {
526                    tracing::trace!(
527                        target: LOG_TARGET,
528                        ?peer,
529                        record_key = ?record.key,
530                        "handle `PUT_VALUE` request",
531                    );
532
533                    if let IncomingRecordValidationMode::Automatic = self.validation_mode {
534                        self.store.put(record.clone());
535                    }
536
537                    // Send ACK even if the record was/will be filtered out to not reveal any
538                    // internal state.
539                    let message = KademliaMessage::put_value_response(
540                        record.key.clone(),
541                        record.value.clone(),
542                    );
543                    self.executor.send_message_eat_failure(peer, None, message, substream);
544                    // TODO: replace this with `send_message` as part of
545                    // https://github.com/paritytech/litep2p/issues/429.
546
547                    let _ = self.event_tx.send(KademliaEvent::IncomingRecord { record }).await;
548                }
549            },
550            KademliaMessage::GetRecord { key, record, peers } => {
551                match (query_id, key) {
552                    (Some(query_id), key) => {
553                        tracing::trace!(
554                            target: LOG_TARGET,
555                            ?peer,
556                            query = ?query_id,
557                            ?peers,
558                            ?record,
559                            "handle `GET_VALUE` response",
560                        );
561
562                        // update routing table and inform user about the update
563                        self.update_routing_table(&peers).await;
564
565                        self.engine.register_response(
566                            query_id,
567                            peer,
568                            KademliaMessage::GetRecord { key, record, peers },
569                        );
570
571                        substream.close().await;
572                    }
573                    (None, Some(key)) => {
574                        tracing::trace!(
575                            target: LOG_TARGET,
576                            ?peer,
577                            ?key,
578                            "handle `GET_VALUE` request",
579                        );
580
581                        let value = self.store.get(&key).cloned();
582                        let closest_peers = self
583                            .routing_table
584                            .closest(&Key::new(key.as_ref()), self.replication_factor);
585
586                        let message =
587                            KademliaMessage::get_value_response(key, closest_peers, value);
588                        self.executor.send_message(peer, None, message.into(), substream);
589                    }
590                    (None, None) => tracing::debug!(
591                        target: LOG_TARGET,
592                        ?peer,
593                        ?record,
594                        ?peers,
595                        "unable to handle `GET_RECORD` request with empty key",
596                    ),
597                }
598            }
599            KademliaMessage::AddProvider { key, mut providers } => {
600                tracing::trace!(
601                    target: LOG_TARGET,
602                    ?peer,
603                    ?key,
604                    ?providers,
605                    "handle `ADD_PROVIDER` message",
606                );
607
608                match (providers.len(), providers.pop()) {
609                    (1, Some(provider)) => {
610                        let addresses = provider.addresses();
611
612                        if provider.peer == peer {
613                            self.store.put_provider(
614                                key.clone(),
615                                ContentProvider {
616                                    peer,
617                                    addresses: addresses.clone(),
618                                },
619                            );
620
621                            let _ = self
622                                .event_tx
623                                .send(KademliaEvent::IncomingProvider {
624                                    provided_key: key,
625                                    provider: ContentProvider {
626                                        peer: provider.peer,
627                                        addresses,
628                                    },
629                                })
630                                .await;
631                        } else {
632                            tracing::trace!(
633                                target: LOG_TARGET,
634                                publisher = ?peer,
635                                provider = ?provider.peer,
636                                "ignoring `ADD_PROVIDER` message with `publisher` != `provider`"
637                            )
638                        }
639                    }
640                    (n, _) => {
641                        tracing::trace!(
642                            target: LOG_TARGET,
643                            publisher = ?peer,
644                            ?n,
645                            "ignoring `ADD_PROVIDER` message with `n` != 1 providers"
646                        )
647                    }
648                }
649            }
650            KademliaMessage::GetProviders {
651                key,
652                peers,
653                providers,
654            } => {
655                match (query_id, key) {
656                    (Some(query_id), key) => {
657                        // Note: key is not required, but can be non-empty. We just ignore it here.
658                        tracing::trace!(
659                            target: LOG_TARGET,
660                            ?peer,
661                            query = ?query_id,
662                            ?key,
663                            ?peers,
664                            ?providers,
665                            "handle `GET_PROVIDERS` response",
666                        );
667
668                        // update routing table and inform user about the update
669                        self.update_routing_table(&peers).await;
670
671                        self.engine.register_response(
672                            query_id,
673                            peer,
674                            KademliaMessage::GetProviders {
675                                key,
676                                peers,
677                                providers,
678                            },
679                        );
680
681                        substream.close().await;
682                    }
683                    (None, Some(key)) => {
684                        tracing::trace!(
685                            target: LOG_TARGET,
686                            ?peer,
687                            ?key,
688                            "handle `GET_PROVIDERS` request",
689                        );
690
691                        let mut providers = self.store.get_providers(&key);
692
693                        // Make sure local provider addresses are up to date.
694                        let local_peer_id = self.local_key.clone().into_preimage();
695                        if let Some(p) =
696                            providers.iter_mut().find(|p| p.peer == local_peer_id).as_mut()
697                        {
698                            p.addresses = self.service.public_addresses().get_addresses();
699                        }
700
701                        let closer_peers = self
702                            .routing_table
703                            .closest(&Key::new(key.as_ref()), self.replication_factor);
704
705                        let message =
706                            KademliaMessage::get_providers_response(providers, &closer_peers);
707                        self.executor.send_message(peer, None, message.into(), substream);
708                    }
709                    (None, None) => tracing::debug!(
710                        target: LOG_TARGET,
711                        ?peer,
712                        ?peers,
713                        ?providers,
714                        "unable to handle `GET_PROVIDERS` request with empty key",
715                    ),
716                }
717            }
718        }
719
720        Ok(())
721    }
722
723    /// Failed to open substream to remote peer.
724    async fn on_substream_open_failure(
725        &mut self,
726        substream_id: SubstreamId,
727        error: SubstreamError,
728    ) {
729        tracing::trace!(
730            target: LOG_TARGET,
731            ?substream_id,
732            ?error,
733            "failed to open substream"
734        );
735
736        let Some(peer) = self.pending_substreams.remove(&substream_id) else {
737            tracing::debug!(
738                target: LOG_TARGET,
739                ?substream_id,
740                "outbound substream failed for non-existent peer"
741            );
742            return;
743        };
744
745        if let Some(context) = self.peers.get_mut(&peer) {
746            let query =
747                context.pending_actions.remove(&substream_id).as_ref().map(PeerAction::query_id);
748
749            self.disconnect_peer(peer, query).await;
750        }
751    }
752
753    /// Handle dial failure.
754    fn on_dial_failure(&mut self, peer: PeerId, addresses: Vec<Multiaddr>) {
755        tracing::trace!(target: LOG_TARGET, ?peer, ?addresses, "failed to dial peer");
756
757        self.routing_table.on_dial_failure(Key::from(peer), &addresses);
758
759        let Some(actions) = self.pending_dials.remove(&peer) else {
760            return;
761        };
762
763        for action in actions {
764            let query = action.query_id();
765
766            tracing::trace!(
767                target: LOG_TARGET,
768                ?peer,
769                ?query,
770                ?addresses,
771                "report failure for pending query",
772            );
773
774            // Fail both sending and receiving due to dial failure.
775            self.engine.register_send_failure(query, peer);
776            self.engine.register_response_failure(query, peer);
777        }
778    }
779
780    /// Open a substream with a peer or dial the peer.
781    fn open_substream_or_dial(
782        &mut self,
783        peer: PeerId,
784        action: PeerAction,
785        query: Option<QueryId>,
786    ) -> Result<(), Error> {
787        match self.service.open_substream(peer) {
788            Ok(substream_id) => {
789                self.pending_substreams.insert(substream_id, peer);
790                self.peers.entry(peer).or_default().pending_actions.insert(substream_id, action);
791
792                Ok(())
793            }
794            Err(err) => {
795                tracing::trace!(target: LOG_TARGET, ?query, ?peer, ?err, "Failed to open substream. Dialing peer");
796
797                match self.service.dial(&peer) {
798                    Ok(()) => {
799                        self.pending_dials.entry(peer).or_default().push(action);
800                        Ok(())
801                    }
802
803                    // Already connected is a recoverable error.
804                    Err(ImmediateDialError::AlreadyConnected) => {
805                        // Dial returned `Error::AlreadyConnected`, retry opening the substream.
806                        match self.service.open_substream(peer) {
807                            Ok(substream_id) => {
808                                self.pending_substreams.insert(substream_id, peer);
809                                self.peers
810                                    .entry(peer)
811                                    .or_default()
812                                    .pending_actions
813                                    .insert(substream_id, action);
814                                Ok(())
815                            }
816                            Err(err) => {
817                                tracing::debug!(target: LOG_TARGET, ?query, ?peer, ?err, "Failed to open substream a second time");
818                                Err(err.into())
819                            }
820                        }
821                    }
822
823                    Err(error) => {
824                        tracing::trace!(target: LOG_TARGET, ?query, ?peer, ?error, "Failed to dial peer");
825                        Err(error.into())
826                    }
827                }
828            }
829        }
830    }
831
832    /// Handle next query action.
833    async fn on_query_action(&mut self, action: QueryAction) -> Result<(), (QueryId, PeerId)> {
834        match action {
835            QueryAction::SendMessage { query, peer, .. } => {
836                // This action is used for `FIND_NODE`, `GET_VALUE` and `GET_PROVIDERS` queries.
837                if self
838                    .open_substream_or_dial(peer, PeerAction::SendFindNode(query), Some(query))
839                    .is_err()
840                {
841                    // Announce the error to the query engine.
842                    self.engine.register_send_failure(query, peer);
843                    self.engine.register_response_failure(query, peer);
844                }
845                Ok(())
846            }
847            QueryAction::FindNodeQuerySucceeded {
848                target,
849                peers,
850                query,
851            } => {
852                tracing::debug!(
853                    target: LOG_TARGET,
854                    ?query,
855                    peer = ?target,
856                    num_peers = ?peers.len(),
857                    "`FIND_NODE` succeeded",
858                );
859
860                let _ = self
861                    .event_tx
862                    .send(KademliaEvent::FindNodeSuccess {
863                        target,
864                        query_id: query,
865                        peers: peers
866                            .into_iter()
867                            .map(|info| (info.peer, info.addresses()))
868                            .collect(),
869                    })
870                    .await;
871                Ok(())
872            }
873            QueryAction::PutRecordToFoundNodes {
874                query,
875                record,
876                peers,
877                quorum,
878            } => {
879                tracing::trace!(
880                    target: LOG_TARGET,
881                    ?query,
882                    record_key = ?record.key,
883                    num_peers = ?peers.len(),
884                    "store record to found peers",
885                );
886                let key = record.key.clone();
887                let message: Bytes = KademliaMessage::put_value(record);
888
889                for peer in &peers {
890                    if let Err(error) = self.open_substream_or_dial(
891                        peer.peer,
892                        // `message` is cheaply clonable because of `Bytes` reference counting.
893                        PeerAction::SendPutValue(query, message.clone()),
894                        None,
895                    ) {
896                        tracing::debug!(
897                            target: LOG_TARGET,
898                            ?peer,
899                            ?key,
900                            ?error,
901                            "failed to put record to peer",
902                        );
903                    }
904                }
905
906                self.engine.start_put_record_to_found_nodes_requests_tracking(
907                    query,
908                    key,
909                    peers.into_iter().map(|peer| peer.peer).collect(),
910                    quorum,
911                );
912
913                Ok(())
914            }
915            QueryAction::PutRecordQuerySucceeded { query, key } => {
916                tracing::debug!(target: LOG_TARGET, ?query, "`PUT_VALUE` query succeeded");
917
918                let _ = self
919                    .event_tx
920                    .send(KademliaEvent::PutRecordSuccess {
921                        query_id: query,
922                        key,
923                    })
924                    .await;
925                Ok(())
926            }
927            QueryAction::AddProviderToFoundNodes {
928                query,
929                provided_key,
930                provider,
931                peers,
932                quorum,
933            } => {
934                tracing::trace!(
935                    target: LOG_TARGET,
936                    ?provided_key,
937                    num_peers = ?peers.len(),
938                    "add provider record to found peers",
939                );
940
941                let message = KademliaMessage::add_provider(provided_key.clone(), provider);
942
943                for peer in &peers {
944                    if let Err(error) = self.open_substream_or_dial(
945                        peer.peer,
946                        PeerAction::SendAddProvider(query, message.clone()),
947                        None,
948                    ) {
949                        tracing::debug!(
950                            target: LOG_TARGET,
951                            ?peer,
952                            ?provided_key,
953                            ?error,
954                            "failed to add provider record to peer",
955                        )
956                    }
957                }
958
959                self.engine.start_add_provider_to_found_nodes_requests_tracking(
960                    query,
961                    provided_key,
962                    peers.into_iter().map(|peer| peer.peer).collect(),
963                    quorum,
964                );
965
966                Ok(())
967            }
968            QueryAction::AddProviderQuerySucceeded {
969                query,
970                provided_key,
971            } => {
972                tracing::debug!(target: LOG_TARGET, ?query, "`ADD_PROVIDER` query succeeded");
973
974                let _ = self
975                    .event_tx
976                    .send(KademliaEvent::AddProviderSuccess {
977                        query_id: query,
978                        provided_key,
979                    })
980                    .await;
981                Ok(())
982            }
983            QueryAction::GetRecordQueryDone { query_id } => {
984                let _ = self.event_tx.send(KademliaEvent::GetRecordSuccess { query_id }).await;
985                Ok(())
986            }
987            QueryAction::GetProvidersQueryDone {
988                query_id,
989                provided_key,
990                providers,
991            } => {
992                let _ = self
993                    .event_tx
994                    .send(KademliaEvent::GetProvidersSuccess {
995                        query_id,
996                        provided_key,
997                        providers,
998                    })
999                    .await;
1000                Ok(())
1001            }
1002            QueryAction::QueryFailed { query } => {
1003                tracing::debug!(target: LOG_TARGET, ?query, "query failed");
1004
1005                let _ = self.event_tx.send(KademliaEvent::QueryFailed { query_id: query }).await;
1006                Ok(())
1007            }
1008            QueryAction::GetRecordPartialResult { query_id, record } => {
1009                let _ = self
1010                    .event_tx
1011                    .send(KademliaEvent::GetRecordPartialResult { query_id, record })
1012                    .await;
1013                Ok(())
1014            }
1015            QueryAction::QuerySucceeded { .. } => Ok(()),
1016        }
1017    }
1018
1019    /// [`Kademlia`] event loop.
1020    pub async fn run(mut self) -> crate::Result<()> {
1021        tracing::debug!(target: LOG_TARGET, "starting kademlia event loop");
1022
1023        loop {
1024            // poll `QueryEngine` for next actions.
1025            while let Some(action) = self.engine.next_action() {
1026                if let Err((query, peer)) = self.on_query_action(action).await {
1027                    self.disconnect_peer(peer, Some(query)).await;
1028                }
1029            }
1030
1031            tokio::select! {
1032                event = self.service.next() => match event {
1033                    Some(TransportEvent::ConnectionEstablished { peer, endpoint }) => {
1034                        if let Err(error) = self.on_connection_established(peer, endpoint) {
1035                            tracing::debug!(
1036                                target: LOG_TARGET,
1037                                ?error,
1038                                "failed to handle established connection",
1039                            );
1040                        }
1041                    }
1042                    Some(TransportEvent::ConnectionClosed { peer }) => {
1043                        self.disconnect_peer(peer, None).await;
1044                    }
1045                    Some(TransportEvent::SubstreamOpened { peer, direction, substream, .. }) => {
1046                        match direction {
1047                            Direction::Inbound => self.on_inbound_substream(peer, substream).await,
1048                            Direction::Outbound(substream_id) => {
1049                                if let Err(error) = self
1050                                    .on_outbound_substream(peer, substream_id, substream)
1051                                    .await
1052                                {
1053                                    tracing::debug!(
1054                                        target: LOG_TARGET,
1055                                        ?peer,
1056                                        ?substream_id,
1057                                        ?error,
1058                                        "failed to handle outbound substream",
1059                                    );
1060                                }
1061                            }
1062                        }
1063                    },
1064                    Some(TransportEvent::SubstreamOpenFailure { substream, error }) => {
1065                        self.on_substream_open_failure(substream, error).await;
1066                    }
1067                    Some(TransportEvent::DialFailure { peer, addresses }) =>
1068                        self.on_dial_failure(peer, addresses),
1069                    None => return Err(Error::EssentialTaskClosed),
1070                },
1071                context = self.executor.next() => {
1072                    let QueryContext { peer, query_id, result } = context.unwrap();
1073
1074                    match result {
1075                        QueryResult::SendSuccess { substream } => {
1076                            tracing::trace!(
1077                                target: LOG_TARGET,
1078                                ?peer,
1079                                query = ?query_id,
1080                                "message sent to peer",
1081                            );
1082                            let _ = substream.close().await;
1083
1084                            if let Some(query_id) = query_id {
1085                                self.engine.register_send_success(query_id, peer);
1086                            }
1087                        }
1088                        // This is a workaround to gracefully handle older litep2p nodes not
1089                        // sending/receiving `PUT_VALUE` ACKs. This should eventually be removed.
1090                        // TODO: remove this as part of
1091                        // https://github.com/paritytech/litep2p/issues/429.
1092                        QueryResult::AssumeSendSuccess => {
1093                            tracing::trace!(
1094                                target: LOG_TARGET,
1095                                ?peer,
1096                                query = ?query_id,
1097                                "treating message as sent to peer",
1098                            );
1099
1100                            if let Some(query_id) = query_id {
1101                                self.engine.register_send_success(query_id, peer);
1102                            }
1103                        }
1104                        QueryResult::SendFailure { reason } => {
1105                            tracing::debug!(
1106                                target: LOG_TARGET,
1107                                ?peer,
1108                                query = ?query_id,
1109                                ?reason,
1110                                "failed to send message to peer",
1111                            );
1112
1113                            self.disconnect_peer(peer, query_id).await;
1114                        }
1115                        QueryResult::ReadSuccess { substream, message } => {
1116                            tracing::trace!(
1117                                target: LOG_TARGET,
1118                                ?peer,
1119                                query = ?query_id,
1120                                "message read from peer",
1121                            );
1122
1123                            if let Some(query_id) = query_id {
1124                                // Read success for locally originating requests implies send
1125                                // success.
1126                                self.engine.register_send_success(query_id, peer);
1127                            }
1128
1129                            if let Err(error) = self.on_message_received(
1130                                peer,
1131                                query_id,
1132                                message,
1133                                substream
1134                            ).await {
1135                                tracing::debug!(
1136                                    target: LOG_TARGET,
1137                                    ?peer,
1138                                    ?error,
1139                                    "failed to process message",
1140                                );
1141                            }
1142                        }
1143                        QueryResult::ReadFailure { reason } => {
1144                            tracing::debug!(
1145                                target: LOG_TARGET,
1146                                ?peer,
1147                                query = ?query_id,
1148                                ?reason,
1149                                "failed to read message from substream",
1150                            );
1151
1152                            self.disconnect_peer(peer, query_id).await;
1153                        }
1154                    }
1155                },
1156                command = self.cmd_rx.recv() => {
1157                    match command {
1158                        Some(KademliaCommand::FindNode { peer, query_id }) => {
1159                            tracing::debug!(
1160                                target: LOG_TARGET,
1161                                ?peer,
1162                                query = ?query_id,
1163                                "starting `FIND_NODE` query",
1164                            );
1165
1166                            self.engine.start_find_node(
1167                                query_id,
1168                                peer,
1169                                self.routing_table
1170                                    .closest(&Key::from(peer), self.replication_factor)
1171                                    .into()
1172                            );
1173                        }
1174                        Some(KademliaCommand::PutRecord { mut record, quorum, query_id }) => {
1175                            tracing::debug!(
1176                                target: LOG_TARGET,
1177                                query = ?query_id,
1178                                key = ?record.key,
1179                                "store record to DHT",
1180                            );
1181
1182                            // For `PUT_VALUE` requests originating locally we are always the
1183                            // publisher.
1184                            record.publisher = Some(self.local_key.clone().into_preimage());
1185
1186                            // Make sure TTL is set.
1187                            record.expires = record
1188                                .expires
1189                                .or_else(|| Some(Instant::now() + self.record_ttl));
1190
1191                            let key = Key::new(record.key.clone());
1192
1193                            self.store.put(record.clone());
1194
1195                            self.engine.start_put_record(
1196                                query_id,
1197                                record,
1198                                self.routing_table.closest(&key, self.replication_factor).into(),
1199                                quorum,
1200                            );
1201                        }
1202                        Some(KademliaCommand::PutRecordToPeers {
1203                            mut record,
1204                            query_id,
1205                            peers,
1206                            update_local_store,
1207                            quorum,
1208                        }) => {
1209                            tracing::debug!(
1210                                target: LOG_TARGET,
1211                                query = ?query_id,
1212                                key = ?record.key,
1213                                "store record to DHT to specified peers",
1214                            );
1215
1216                            // Make sure TTL is set.
1217                            record.expires = record
1218                                .expires
1219                                .or_else(|| Some(Instant::now() + self.record_ttl));
1220
1221                            if update_local_store {
1222                                self.store.put(record.clone());
1223                            }
1224
1225                            // Put the record to the specified peers.
1226                            let peers = peers.into_iter().filter_map(|peer| {
1227                                if peer == self.service.local_peer_id() {
1228                                    return None;
1229                                }
1230
1231                                match self.routing_table.entry(Key::from(peer)) {
1232                                    KBucketEntry::Occupied(entry) => Some(entry.clone()),
1233                                    KBucketEntry::Vacant(entry) if !entry.address_store.is_empty() =>
1234                                        Some(entry.clone()),
1235                                    _ => None,
1236                                }
1237                            }).collect();
1238
1239                            self.engine.start_put_record_to_peers(
1240                                query_id,
1241                                record,
1242                                peers,
1243                                quorum,
1244                            );
1245                        }
1246                        Some(KademliaCommand::StartProviding {
1247                            key,
1248                            quorum,
1249                            query_id
1250                        }) => {
1251                            tracing::debug!(
1252                                target: LOG_TARGET,
1253                                query = ?query_id,
1254                                ?key,
1255                                "register as a content provider",
1256                            );
1257
1258                            let addresses = self.service.public_addresses().get_addresses();
1259                            let provider = ContentProvider {
1260                                peer: self.service.local_peer_id(),
1261                                addresses,
1262                            };
1263
1264                            self.store.put_local_provider(key.clone(), quorum);
1265
1266                            self.engine.start_add_provider(
1267                                query_id,
1268                                key.clone(),
1269                                provider,
1270                                self.routing_table
1271                                    .closest(&Key::new(key), self.replication_factor)
1272                                    .into(),
1273                                quorum,
1274                            );
1275                        }
1276                        Some(KademliaCommand::StopProviding {
1277                            key,
1278                        }) => {
1279                            tracing::debug!(
1280                                target: LOG_TARGET,
1281                                ?key,
1282                                "stop providing",
1283                            );
1284
1285                            self.store.remove_local_provider(key);
1286                        }
1287                        Some(KademliaCommand::GetRecord { key, quorum, query_id }) => {
1288                            tracing::debug!(target: LOG_TARGET, ?key, "get record from DHT");
1289
1290                            match (self.store.get(&key), quorum) {
1291                                (Some(record), Quorum::One) => {
1292                                    let _ = self
1293                                        .event_tx
1294                                        .send(KademliaEvent::GetRecordPartialResult { query_id, record: PeerRecord {
1295                                            peer: self.service.local_peer_id(),
1296                                            record: record.clone(),
1297                                        } })
1298                                        .await;
1299
1300                                    let _ = self
1301                                        .event_tx
1302                                        .send(KademliaEvent::GetRecordSuccess {
1303                                            query_id,
1304                                        })
1305                                        .await;
1306                                }
1307                                (record, _) => {
1308                                    let local_record = record.is_some();
1309                                    if let Some(record) = record {
1310                                        let _ = self
1311                                            .event_tx
1312                                            .send(KademliaEvent::GetRecordPartialResult { query_id, record: PeerRecord {
1313                                                peer: self.service.local_peer_id(),
1314                                                record: record.clone(),
1315                                            } })
1316                                            .await;
1317                                    }
1318
1319                                    self.engine.start_get_record(
1320                                        query_id,
1321                                        key.clone(),
1322                                        self.routing_table
1323                                            .closest(&Key::new(key), self.replication_factor)
1324                                            .into(),
1325                                        quorum,
1326                                        local_record,
1327                                    );
1328                                }
1329                            }
1330
1331                        }
1332                        Some(KademliaCommand::GetProviders { key, query_id }) => {
1333                            tracing::debug!(target: LOG_TARGET, ?key, "get providers from DHT");
1334
1335                            let known_providers = self.store.get_providers(&key);
1336
1337                            self.engine.start_get_providers(
1338                                query_id,
1339                                key.clone(),
1340                                self.routing_table
1341                                    .closest(&Key::new(key), self.replication_factor)
1342                                    .into(),
1343                                known_providers,
1344                            );
1345                        }
1346                        Some(KademliaCommand::AddKnownPeer { peer, addresses }) => {
1347                            tracing::trace!(
1348                                target: LOG_TARGET,
1349                                ?peer,
1350                                ?addresses,
1351                                "add known peer",
1352                            );
1353
1354                            self.routing_table.add_known_peer(
1355                                peer,
1356                                addresses.clone(),
1357                                self.peers
1358                                    .get(&peer)
1359                                    .map_or(
1360                                        ConnectionType::NotConnected,
1361                                        |_| ConnectionType::Connected,
1362                                    ),
1363                            );
1364                            self.service.add_known_address(&peer, addresses.into_iter());
1365
1366                        }
1367                        Some(KademliaCommand::StoreRecord { mut record }) => {
1368                            tracing::debug!(
1369                                target: LOG_TARGET,
1370                                key = ?record.key,
1371                                "store record in local store",
1372                            );
1373
1374                            // Make sure TTL is set.
1375                            record.expires =
1376                                record.expires.or_else(|| Some(Instant::now() + self.record_ttl));
1377
1378                            self.store.put(record);
1379                        }
1380                        None => return Err(Error::EssentialTaskClosed),
1381                    }
1382                },
1383                action = self.store.next_action() => match action {
1384                    Some(MemoryStoreAction::RefreshProvider { provided_key, provider, quorum }) => {
1385                        tracing::trace!(
1386                            target: LOG_TARGET,
1387                            ?provided_key,
1388                            "republishing local provider",
1389                        );
1390
1391                        self.store.put_local_provider(provided_key.clone(), quorum);
1392
1393                        // We never update local provider addresses in the store during refresh,
1394                        // as this is done anyway when replying to `GET_PROVIDERS` request.
1395
1396                        let query_id = self.next_query_id();
1397                        self.engine.start_add_provider(
1398                            query_id,
1399                            provided_key.clone(),
1400                            provider,
1401                            self.routing_table
1402                                .closest(&Key::new(provided_key), self.replication_factor)
1403                                .into(),
1404                            quorum,
1405                        );
1406                    }
1407                    None => {}
1408                }
1409            }
1410        }
1411    }
1412}
1413
1414#[cfg(test)]
1415mod tests {
1416    use super::*;
1417    use crate::{
1418        codec::ProtocolCodec,
1419        transport::{
1420            manager::{TransportManager, TransportManagerBuilder},
1421            KEEP_ALIVE_TIMEOUT,
1422        },
1423        types::protocol::ProtocolName,
1424        ConnectionId,
1425    };
1426    use multiaddr::Protocol;
1427    use multihash::Multihash;
1428    use std::str::FromStr;
1429    use tokio::sync::mpsc::channel;
1430
1431    #[allow(unused)]
1432    struct Context {
1433        _cmd_tx: Sender<KademliaCommand>,
1434        event_rx: Receiver<KademliaEvent>,
1435    }
1436
1437    fn make_kademlia() -> (Kademlia, Context, TransportManager) {
1438        let manager = TransportManagerBuilder::new().build();
1439
1440        let peer = PeerId::random();
1441        let (transport_service, _tx) = TransportService::new(
1442            peer,
1443            ProtocolName::from("/kad/1"),
1444            Vec::new(),
1445            Default::default(),
1446            manager.transport_manager_handle(),
1447            KEEP_ALIVE_TIMEOUT,
1448        );
1449        let (event_tx, event_rx) = channel(64);
1450        let (_cmd_tx, cmd_rx) = channel(64);
1451        let next_query_id = Arc::new(AtomicUsize::new(0usize));
1452
1453        let config = Config {
1454            protocol_names: vec![ProtocolName::from("/kad/1")],
1455            known_peers: HashMap::new(),
1456            codec: ProtocolCodec::UnsignedVarint(Some(70 * 1024)),
1457            replication_factor: 20usize,
1458            update_mode: RoutingTableUpdateMode::Automatic,
1459            validation_mode: IncomingRecordValidationMode::Automatic,
1460            record_ttl: Duration::from_secs(36 * 60 * 60),
1461            memory_store_config: Default::default(),
1462            event_tx,
1463            cmd_rx,
1464            next_query_id,
1465        };
1466
1467        (
1468            Kademlia::new(transport_service, config),
1469            Context { _cmd_tx, event_rx },
1470            manager,
1471        )
1472    }
1473
1474    #[tokio::test]
1475    async fn check_get_records_update() {
1476        let (mut kademlia, _context, _manager) = make_kademlia();
1477
1478        let key = RecordKey::from(vec![1, 2, 3]);
1479        let records = vec![
1480            // 2 peers backing the same record.
1481            PeerRecord {
1482                peer: PeerId::random(),
1483                record: Record::new(key.clone(), vec![0x1]),
1484            },
1485            PeerRecord {
1486                peer: PeerId::random(),
1487                record: Record::new(key.clone(), vec![0x1]),
1488            },
1489            // only 1 peer backing the record.
1490            PeerRecord {
1491                peer: PeerId::random(),
1492                record: Record::new(key.clone(), vec![0x2]),
1493            },
1494        ];
1495
1496        for record in records {
1497            let action = QueryAction::GetRecordPartialResult {
1498                query_id: QueryId(1),
1499                record,
1500            };
1501            assert!(kademlia.on_query_action(action).await.is_ok());
1502        }
1503
1504        let query_id = QueryId(1);
1505        let action = QueryAction::GetRecordQueryDone { query_id };
1506        assert!(kademlia.on_query_action(action).await.is_ok());
1507
1508        // Check the local storage should not get updated.
1509        assert!(kademlia.store.get(&key).is_none());
1510    }
1511
1512    #[tokio::test]
1513    async fn check_get_records_update_with_expired_records() {
1514        let (mut kademlia, _context, _manager) = make_kademlia();
1515
1516        let key = RecordKey::from(vec![1, 2, 3]);
1517        let expired = std::time::Instant::now() - std::time::Duration::from_secs(10);
1518        let records = vec![
1519            // 2 peers backing the same record, one record is expired.
1520            PeerRecord {
1521                peer: PeerId::random(),
1522                record: Record {
1523                    key: key.clone(),
1524                    value: vec![0x1],
1525                    publisher: None,
1526                    expires: Some(expired),
1527                },
1528            },
1529            PeerRecord {
1530                peer: PeerId::random(),
1531                record: Record::new(key.clone(), vec![0x1]),
1532            },
1533            // 2 peer backing the record.
1534            PeerRecord {
1535                peer: PeerId::random(),
1536                record: Record::new(key.clone(), vec![0x2]),
1537            },
1538            PeerRecord {
1539                peer: PeerId::random(),
1540                record: Record::new(key.clone(), vec![0x2]),
1541            },
1542        ];
1543
1544        for record in records {
1545            let action = QueryAction::GetRecordPartialResult {
1546                query_id: QueryId(1),
1547                record,
1548            };
1549            assert!(kademlia.on_query_action(action).await.is_ok());
1550        }
1551
1552        kademlia
1553            .on_query_action(QueryAction::GetRecordQueryDone {
1554                query_id: QueryId(1),
1555            })
1556            .await
1557            .unwrap();
1558
1559        // Check the local storage should not get updated.
1560        assert!(kademlia.store.get(&key).is_none());
1561    }
1562
1563    #[tokio::test]
1564    async fn check_address_store_routing_table_updates() {
1565        let (mut kademlia, _context, _manager) = make_kademlia();
1566
1567        let peer = PeerId::random();
1568        let address_a = Multiaddr::from_str("/dns/domain1.com/tcp/30333").unwrap().with(
1569            Protocol::P2p(Multihash::from_bytes(&peer.to_bytes()).unwrap()),
1570        );
1571        let address_b = Multiaddr::from_str("/dns/domain1.com/tcp/30334").unwrap().with(
1572            Protocol::P2p(Multihash::from_bytes(&peer.to_bytes()).unwrap()),
1573        );
1574        let address_c = Multiaddr::from_str("/dns/domain1.com/tcp/30339").unwrap().with(
1575            Protocol::P2p(Multihash::from_bytes(&peer.to_bytes()).unwrap()),
1576        );
1577
1578        // Added only with address a.
1579        kademlia.routing_table.add_known_peer(
1580            peer,
1581            vec![address_a.clone()],
1582            ConnectionType::NotConnected,
1583        );
1584
1585        // Check peer addresses.
1586        match kademlia.routing_table.entry(Key::from(peer)) {
1587            KBucketEntry::Occupied(entry) => {
1588                assert_eq!(entry.addresses(), vec![address_a.clone()]);
1589            }
1590            _ => panic!("Peer not found in routing table"),
1591        };
1592
1593        // Report successful connection with address b via dialer endpoint.
1594        let _ = kademlia.on_connection_established(
1595            peer,
1596            Endpoint::Dialer {
1597                address: address_b.clone(),
1598                connection_id: ConnectionId::from(0),
1599            },
1600        );
1601
1602        // Address B has a higher priority, as it was detected via the dialing mechanism of the
1603        // transport manager, while address A is not dialed yet.
1604        match kademlia.routing_table.entry(Key::from(peer)) {
1605            KBucketEntry::Occupied(entry) => {
1606                assert_eq!(
1607                    entry.addresses(),
1608                    vec![address_b.clone(), address_a.clone()]
1609                );
1610            }
1611            _ => panic!("Peer not found in routing table"),
1612        };
1613
1614        // Report successful connection with a random address via listener endpoint.
1615        let _ = kademlia.on_connection_established(
1616            peer,
1617            Endpoint::Listener {
1618                address: address_c.clone(),
1619                connection_id: ConnectionId::from(0),
1620            },
1621        );
1622        // Address C was not added, as the peer has dialed us possibly on an ephemeral port.
1623        match kademlia.routing_table.entry(Key::from(peer)) {
1624            KBucketEntry::Occupied(entry) => {
1625                assert_eq!(
1626                    entry.addresses(),
1627                    vec![address_b.clone(), address_a.clone()]
1628                );
1629            }
1630            _ => panic!("Peer not found in routing table"),
1631        };
1632
1633        // Address B fails two times (which gives it a lower score than A) and
1634        // makes it subject to removal.
1635        kademlia.on_dial_failure(peer, vec![address_b.clone(), address_b.clone()]);
1636
1637        match kademlia.routing_table.entry(Key::from(peer)) {
1638            KBucketEntry::Occupied(entry) => {
1639                assert_eq!(
1640                    entry.addresses(),
1641                    vec![address_a.clone(), address_b.clone()]
1642                );
1643            }
1644            _ => panic!("Peer not found in routing table"),
1645        };
1646    }
1647}