libp2p_kad/
behaviour.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Implementation of the `Kademlia` network behaviour.
22
23mod test;
24
25use crate::addresses::Addresses;
26use crate::handler::{Handler, HandlerEvent, HandlerIn, RequestId};
27use crate::kbucket::{self, Distance, KBucketConfig, KBucketsTable, NodeStatus};
28use crate::protocol::{ConnectionType, KadPeer, ProtocolConfig};
29use crate::query::{Query, QueryConfig, QueryId, QueryPool, QueryPoolState};
30use crate::record::{
31    self,
32    store::{self, RecordStore},
33    ProviderRecord, Record,
34};
35use crate::{bootstrap, K_VALUE};
36use crate::{jobs::*, protocol};
37use fnv::FnvHashSet;
38use libp2p_core::{transport::PortUse, ConnectedPoint, Endpoint, Multiaddr};
39use libp2p_identity::PeerId;
40use libp2p_swarm::behaviour::{
41    AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm,
42};
43use libp2p_swarm::{
44    dial_opts::{self, DialOpts},
45    ConnectionDenied, ConnectionHandler, ConnectionId, DialError, ExternalAddresses,
46    ListenAddresses, NetworkBehaviour, NotifyHandler, StreamProtocol, THandler, THandlerInEvent,
47    THandlerOutEvent, ToSwarm,
48};
49use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
50use std::fmt;
51use std::num::NonZeroUsize;
52use std::task::{Context, Poll, Waker};
53use std::time::Duration;
54use std::vec;
55use thiserror::Error;
56use tracing::Level;
57use web_time::Instant;
58
59pub use crate::query::QueryStats;
60
61/// `Behaviour` is a `NetworkBehaviour` that implements the libp2p
62/// Kademlia protocol.
63pub struct Behaviour<TStore> {
64    /// The Kademlia routing table.
65    kbuckets: KBucketsTable<kbucket::Key<PeerId>, Addresses>,
66
67    /// The k-bucket insertion strategy.
68    kbucket_inserts: BucketInserts,
69
70    /// Configuration of the wire protocol.
71    protocol_config: ProtocolConfig,
72
73    /// Configuration of [`RecordStore`] filtering.
74    record_filtering: StoreInserts,
75
76    /// The currently active (i.e. in-progress) queries.
77    queries: QueryPool,
78
79    /// The currently connected peers.
80    ///
81    /// This is a superset of the connected peers currently in the routing table.
82    connected_peers: FnvHashSet<PeerId>,
83
84    /// Periodic job for re-publication of provider records for keys
85    /// provided by the local node.
86    add_provider_job: Option<AddProviderJob>,
87
88    /// Periodic job for (re-)replication and (re-)publishing of
89    /// regular (value-)records.
90    put_record_job: Option<PutRecordJob>,
91
92    /// The TTL of regular (value-)records.
93    record_ttl: Option<Duration>,
94
95    /// The TTL of provider records.
96    provider_record_ttl: Option<Duration>,
97
98    /// Queued events to return when the behaviour is being polled.
99    queued_events: VecDeque<ToSwarm<Event, HandlerIn>>,
100
101    listen_addresses: ListenAddresses,
102
103    external_addresses: ExternalAddresses,
104
105    connections: HashMap<ConnectionId, PeerId>,
106
107    /// See [`Config::caching`].
108    caching: Caching,
109
110    local_peer_id: PeerId,
111
112    mode: Mode,
113    auto_mode: bool,
114    no_events_waker: Option<Waker>,
115
116    /// The record storage.
117    store: TStore,
118
119    /// Tracks the status of the current bootstrap.
120    bootstrap_status: bootstrap::Status,
121}
122
123/// The configurable strategies for the insertion of peers
124/// and their addresses into the k-buckets of the Kademlia
125/// routing table.
126#[derive(Copy, Clone, Debug, PartialEq, Eq)]
127pub enum BucketInserts {
128    /// Whenever a connection to a peer is established as a
129    /// result of a dialing attempt and that peer is not yet
130    /// in the routing table, it is inserted as long as there
131    /// is a free slot in the corresponding k-bucket. If the
132    /// k-bucket is full but still has a free pending slot,
133    /// it may be inserted into the routing table at a later time if an unresponsive
134    /// disconnected peer is evicted from the bucket.
135    OnConnected,
136    /// New peers and addresses are only added to the routing table via
137    /// explicit calls to [`Behaviour::add_address`].
138    ///
139    /// > **Note**: Even though peers can only get into the
140    /// > routing table as a result of [`Behaviour::add_address`],
141    /// > routing table entries are still updated as peers
142    /// > connect and disconnect (i.e. the order of the entries
143    /// > as well as the network addresses).
144    Manual,
145}
146
147/// The configurable filtering strategies for the acceptance of
148/// incoming records.
149///
150/// This can be used for e.g. signature verification or validating
151/// the accompanying [`Key`].
152///
153/// [`Key`]: crate::record::Key
154#[derive(Copy, Clone, Debug, PartialEq, Eq)]
155pub enum StoreInserts {
156    /// Whenever a (provider) record is received,
157    /// the record is forwarded immediately to the [`RecordStore`].
158    Unfiltered,
159    /// Whenever a (provider) record is received, an event is emitted.
160    /// Provider records generate a [`InboundRequest::AddProvider`] under [`Event::InboundRequest`],
161    /// normal records generate a [`InboundRequest::PutRecord`] under [`Event::InboundRequest`].
162    ///
163    /// When deemed valid, a (provider) record needs to be explicitly stored in
164    /// the [`RecordStore`] via [`RecordStore::put`] or [`RecordStore::add_provider`],
165    /// whichever is applicable. A mutable reference to the [`RecordStore`] can
166    /// be retrieved via [`Behaviour::store_mut`].
167    FilterBoth,
168}
169
170/// The configuration for the `Kademlia` behaviour.
171///
172/// The configuration is consumed by [`Behaviour::new`].
173#[derive(Debug, Clone)]
174pub struct Config {
175    kbucket_config: KBucketConfig,
176    query_config: QueryConfig,
177    protocol_config: ProtocolConfig,
178    record_ttl: Option<Duration>,
179    record_replication_interval: Option<Duration>,
180    record_publication_interval: Option<Duration>,
181    record_filtering: StoreInserts,
182    provider_record_ttl: Option<Duration>,
183    provider_publication_interval: Option<Duration>,
184    kbucket_inserts: BucketInserts,
185    caching: Caching,
186    periodic_bootstrap_interval: Option<Duration>,
187    automatic_bootstrap_throttle: Option<Duration>,
188}
189
190impl Default for Config {
191    /// Returns the default configuration.
192    ///
193    /// Deprecated: use `Config::new` instead.
194    fn default() -> Self {
195        Self::new(protocol::DEFAULT_PROTO_NAME)
196    }
197}
198
199/// The configuration for Kademlia "write-back" caching after successful
200/// lookups via [`Behaviour::get_record`].
201#[derive(Debug, Clone)]
202pub enum Caching {
203    /// Caching is disabled and the peers closest to records being looked up
204    /// that do not return a record are not tracked, i.e.
205    /// [`GetRecordOk::FinishedWithNoAdditionalRecord`] is always empty.
206    Disabled,
207    /// Up to `max_peers` peers not returning a record that are closest to the key
208    /// being looked up are tracked and returned in [`GetRecordOk::FinishedWithNoAdditionalRecord`].
209    /// The write-back operation must be performed explicitly, if
210    /// desired and after choosing a record from the results, via [`Behaviour::put_record_to`].
211    Enabled { max_peers: u16 },
212}
213
214impl Config {
215    /// Builds a new `Config` with the given protocol name.
216    pub fn new(protocol_name: StreamProtocol) -> Self {
217        Config {
218            kbucket_config: KBucketConfig::default(),
219            query_config: QueryConfig::default(),
220            protocol_config: ProtocolConfig::new(protocol_name),
221            record_ttl: Some(Duration::from_secs(48 * 60 * 60)),
222            record_replication_interval: Some(Duration::from_secs(60 * 60)),
223            record_publication_interval: Some(Duration::from_secs(22 * 60 * 60)),
224            record_filtering: StoreInserts::Unfiltered,
225            provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)),
226            provider_record_ttl: Some(Duration::from_secs(48 * 60 * 60)),
227            kbucket_inserts: BucketInserts::OnConnected,
228            caching: Caching::Enabled { max_peers: 1 },
229            periodic_bootstrap_interval: Some(Duration::from_secs(5 * 60)),
230            automatic_bootstrap_throttle: Some(bootstrap::DEFAULT_AUTOMATIC_THROTTLE),
231        }
232    }
233
234    /// Returns the default configuration.
235    #[deprecated(note = "Use `Config::new` instead")]
236    #[allow(clippy::should_implement_trait)]
237    pub fn default() -> Self {
238        Default::default()
239    }
240
241    /// Sets custom protocol names.
242    ///
243    /// Kademlia nodes only communicate with other nodes using the same protocol
244    /// name. Using custom name(s) therefore allows to segregate the DHT from
245    /// others, if that is desired.
246    ///
247    /// More than one protocol name can be supplied. In this case the node will
248    /// be able to talk to other nodes supporting any of the provided names.
249    /// Multiple names must be used with caution to avoid network partitioning.
250    #[deprecated(note = "Use `Config::new` instead")]
251    #[allow(deprecated)]
252    pub fn set_protocol_names(&mut self, names: Vec<StreamProtocol>) -> &mut Self {
253        self.protocol_config.set_protocol_names(names);
254        self
255    }
256
257    /// Sets the timeout for a single query.
258    ///
259    /// > **Note**: A single query usually comprises at least as many requests
260    /// > as the replication factor, i.e. this is not a request timeout.
261    ///
262    /// The default is 60 seconds.
263    pub fn set_query_timeout(&mut self, timeout: Duration) -> &mut Self {
264        self.query_config.timeout = timeout;
265        self
266    }
267
268    /// Sets the replication factor to use.
269    ///
270    /// The replication factor determines to how many closest peers
271    /// a record is replicated. The default is [`crate::K_VALUE`].
272    pub fn set_replication_factor(&mut self, replication_factor: NonZeroUsize) -> &mut Self {
273        self.query_config.replication_factor = replication_factor;
274        self
275    }
276
277    /// Sets the allowed level of parallelism for iterative queries.
278    ///
279    /// The `α` parameter in the Kademlia paper. The maximum number of peers
280    /// that an iterative query is allowed to wait for in parallel while
281    /// iterating towards the closest nodes to a target. Defaults to
282    /// `ALPHA_VALUE`.
283    ///
284    /// This only controls the level of parallelism of an iterative query, not
285    /// the level of parallelism of a query to a fixed set of peers.
286    ///
287    /// When used with [`Config::disjoint_query_paths`] it equals
288    /// the amount of disjoint paths used.
289    pub fn set_parallelism(&mut self, parallelism: NonZeroUsize) -> &mut Self {
290        self.query_config.parallelism = parallelism;
291        self
292    }
293
294    /// Require iterative queries to use disjoint paths for increased resiliency
295    /// in the presence of potentially adversarial nodes.
296    ///
297    /// When enabled the number of disjoint paths used equals the configured
298    /// parallelism.
299    ///
300    /// See the S/Kademlia paper for more information on the high level design
301    /// as well as its security improvements.
302    pub fn disjoint_query_paths(&mut self, enabled: bool) -> &mut Self {
303        self.query_config.disjoint_query_paths = enabled;
304        self
305    }
306
307    /// Sets the TTL for stored records.
308    ///
309    /// The TTL should be significantly longer than the (re-)publication
310    /// interval, to avoid premature expiration of records. The default is 36
311    /// hours.
312    ///
313    /// `None` means records never expire.
314    ///
315    /// Does not apply to provider records.
316    pub fn set_record_ttl(&mut self, record_ttl: Option<Duration>) -> &mut Self {
317        self.record_ttl = record_ttl;
318        self
319    }
320
321    /// Sets whether or not records should be filtered before being stored.
322    ///
323    /// See [`StoreInserts`] for the different values.
324    /// Defaults to [`StoreInserts::Unfiltered`].
325    pub fn set_record_filtering(&mut self, filtering: StoreInserts) -> &mut Self {
326        self.record_filtering = filtering;
327        self
328    }
329
330    /// Sets the (re-)replication interval for stored records.
331    ///
332    /// Periodic replication of stored records ensures that the records
333    /// are always replicated to the available nodes closest to the key in the
334    /// context of DHT topology changes (i.e. nodes joining and leaving), thus
335    /// ensuring persistence until the record expires. Replication does not
336    /// prolong the regular lifetime of a record (for otherwise it would live
337    /// forever regardless of the configured TTL). The expiry of a record
338    /// is only extended through re-publication.
339    ///
340    /// This interval should be significantly shorter than the publication
341    /// interval, to ensure persistence between re-publications. The default
342    /// is 1 hour.
343    ///
344    /// `None` means that stored records are never re-replicated.
345    ///
346    /// Does not apply to provider records.
347    pub fn set_replication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
348        self.record_replication_interval = interval;
349        self
350    }
351
352    /// Sets the (re-)publication interval of stored records.
353    ///
354    /// Records persist in the DHT until they expire. By default, published
355    /// records are re-published in regular intervals for as long as the record
356    /// exists in the local storage of the original publisher, thereby extending
357    /// the records lifetime.
358    ///
359    /// This interval should be significantly shorter than the record TTL, to
360    /// ensure records do not expire prematurely. The default is 24 hours.
361    ///
362    /// `None` means that stored records are never automatically re-published.
363    ///
364    /// Does not apply to provider records.
365    pub fn set_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
366        self.record_publication_interval = interval;
367        self
368    }
369
370    /// Sets the TTL for provider records.
371    ///
372    /// `None` means that stored provider records never expire.
373    ///
374    /// Must be significantly larger than the provider publication interval.
375    pub fn set_provider_record_ttl(&mut self, ttl: Option<Duration>) -> &mut Self {
376        self.provider_record_ttl = ttl;
377        self
378    }
379
380    /// Sets the interval at which provider records for keys provided
381    /// by the local node are re-published.
382    ///
383    /// `None` means that stored provider records are never automatically
384    /// re-published.
385    ///
386    /// Must be significantly less than the provider record TTL.
387    pub fn set_provider_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
388        self.provider_publication_interval = interval;
389        self
390    }
391
392    /// Modifies the maximum allowed size of individual Kademlia packets.
393    ///
394    /// It might be necessary to increase this value if trying to put large
395    /// records.
396    pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self {
397        self.protocol_config.set_max_packet_size(size);
398        self
399    }
400
401    /// Sets the k-bucket insertion strategy for the Kademlia routing table.
402    pub fn set_kbucket_inserts(&mut self, inserts: BucketInserts) -> &mut Self {
403        self.kbucket_inserts = inserts;
404        self
405    }
406
407    /// Sets the [`Caching`] strategy to use for successful lookups.
408    ///
409    /// The default is [`Caching::Enabled`] with a `max_peers` of 1.
410    /// Hence, with default settings and a lookup quorum of 1, a successful lookup
411    /// will result in the record being cached at the closest node to the key that
412    /// did not return the record, i.e. the standard Kademlia behaviour.
413    pub fn set_caching(&mut self, c: Caching) -> &mut Self {
414        self.caching = c;
415        self
416    }
417
418    /// Sets the interval on which [`Behaviour::bootstrap`] is called periodically.
419    ///
420    /// * Default to `5` minutes.
421    /// * Set to `None` to disable periodic bootstrap.
422    pub fn set_periodic_bootstrap_interval(&mut self, interval: Option<Duration>) -> &mut Self {
423        self.periodic_bootstrap_interval = interval;
424        self
425    }
426
427    /// Sets the configuration for the k-buckets.
428    ///
429    /// * Default to K_VALUE.
430    pub fn set_kbucket_size(&mut self, size: NonZeroUsize) -> &mut Self {
431        self.kbucket_config.set_bucket_size(size);
432        self
433    }
434
435    /// Sets the timeout duration after creation of a pending entry after which
436    /// it becomes eligible for insertion into a full bucket, replacing the
437    /// least-recently (dis)connected node.
438    ///
439    /// * Default to `60` s.
440    pub fn set_kbucket_pending_timeout(&mut self, timeout: Duration) -> &mut Self {
441        self.kbucket_config.set_pending_timeout(timeout);
442        self
443    }
444
445    /// Sets the time to wait before calling [`Behaviour::bootstrap`] after a new peer is inserted in the routing table.
446    /// This prevent cascading bootstrap requests when multiple peers are inserted into the routing table "at the same time".
447    /// This also allows to wait a little bit for other potential peers to be inserted into the routing table before
448    /// triggering a bootstrap, giving more context to the future bootstrap request.
449    ///
450    /// * Default to `500` ms.
451    /// * Set to `Some(Duration::ZERO)` to never wait before triggering a bootstrap request when a new peer
452    ///     is inserted in the routing table.
453    /// * Set to `None` to disable automatic bootstrap (no bootstrap request will be triggered when a new
454    ///     peer is inserted in the routing table).
455    #[cfg(test)]
456    pub(crate) fn set_automatic_bootstrap_throttle(
457        &mut self,
458        duration: Option<Duration>,
459    ) -> &mut Self {
460        self.automatic_bootstrap_throttle = duration;
461        self
462    }
463}
464
465impl<TStore> Behaviour<TStore>
466where
467    TStore: RecordStore + Send + 'static,
468{
469    /// Creates a new `Kademlia` network behaviour with a default configuration.
470    pub fn new(id: PeerId, store: TStore) -> Self {
471        Self::with_config(id, store, Default::default())
472    }
473
474    /// Get the protocol name of this kademlia instance.
475    pub fn protocol_names(&self) -> &[StreamProtocol] {
476        self.protocol_config.protocol_names()
477    }
478
479    /// Creates a new `Kademlia` network behaviour with the given configuration.
480    pub fn with_config(id: PeerId, store: TStore, config: Config) -> Self {
481        let local_key = kbucket::Key::from(id);
482
483        let put_record_job = config
484            .record_replication_interval
485            .or(config.record_publication_interval)
486            .map(|interval| {
487                PutRecordJob::new(
488                    id,
489                    interval,
490                    config.record_publication_interval,
491                    config.record_ttl,
492                )
493            });
494
495        let add_provider_job = config
496            .provider_publication_interval
497            .map(AddProviderJob::new);
498
499        Behaviour {
500            store,
501            caching: config.caching,
502            kbuckets: KBucketsTable::new(local_key, config.kbucket_config),
503            kbucket_inserts: config.kbucket_inserts,
504            protocol_config: config.protocol_config,
505            record_filtering: config.record_filtering,
506            queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
507            listen_addresses: Default::default(),
508            queries: QueryPool::new(config.query_config),
509            connected_peers: Default::default(),
510            add_provider_job,
511            put_record_job,
512            record_ttl: config.record_ttl,
513            provider_record_ttl: config.provider_record_ttl,
514            external_addresses: Default::default(),
515            local_peer_id: id,
516            connections: Default::default(),
517            mode: Mode::Client,
518            auto_mode: true,
519            no_events_waker: None,
520            bootstrap_status: bootstrap::Status::new(
521                config.periodic_bootstrap_interval,
522                config.automatic_bootstrap_throttle,
523            ),
524        }
525    }
526
527    /// Gets an iterator over immutable references to all running queries.
528    pub fn iter_queries(&self) -> impl Iterator<Item = QueryRef<'_>> {
529        self.queries.iter().filter_map(|query| {
530            if !query.is_finished() {
531                Some(QueryRef { query })
532            } else {
533                None
534            }
535        })
536    }
537
538    /// Gets an iterator over mutable references to all running queries.
539    pub fn iter_queries_mut(&mut self) -> impl Iterator<Item = QueryMut<'_>> {
540        self.queries.iter_mut().filter_map(|query| {
541            if !query.is_finished() {
542                Some(QueryMut { query })
543            } else {
544                None
545            }
546        })
547    }
548
549    /// Gets an immutable reference to a running query, if it exists.
550    pub fn query(&self, id: &QueryId) -> Option<QueryRef<'_>> {
551        self.queries.get(id).and_then(|query| {
552            if !query.is_finished() {
553                Some(QueryRef { query })
554            } else {
555                None
556            }
557        })
558    }
559
560    /// Gets a mutable reference to a running query, if it exists.
561    pub fn query_mut<'a>(&'a mut self, id: &QueryId) -> Option<QueryMut<'a>> {
562        self.queries.get_mut(id).and_then(|query| {
563            if !query.is_finished() {
564                Some(QueryMut { query })
565            } else {
566                None
567            }
568        })
569    }
570
571    /// Adds a known listen address of a peer participating in the DHT to the
572    /// routing table.
573    ///
574    /// Explicitly adding addresses of peers serves two purposes:
575    ///
576    ///   1. In order for a node to join the DHT, it must know about at least
577    ///      one other node of the DHT.
578    ///
579    ///   2. When a remote peer initiates a connection and that peer is not
580    ///      yet in the routing table, the `Kademlia` behaviour must be
581    ///      informed of an address on which that peer is listening for
582    ///      connections before it can be added to the routing table
583    ///      from where it can subsequently be discovered by all peers
584    ///      in the DHT.
585    ///
586    /// If the routing table has been updated as a result of this operation,
587    /// a [`Event::RoutingUpdated`] event is emitted.
588    pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) -> RoutingUpdate {
589        // ensuring address is a fully-qualified /p2p multiaddr
590        let Ok(address) = address.with_p2p(*peer) else {
591            return RoutingUpdate::Failed;
592        };
593        let key = kbucket::Key::from(*peer);
594        match self.kbuckets.entry(&key) {
595            Some(kbucket::Entry::Present(mut entry, _)) => {
596                if entry.value().insert(address) {
597                    self.queued_events
598                        .push_back(ToSwarm::GenerateEvent(Event::RoutingUpdated {
599                            peer: *peer,
600                            is_new_peer: false,
601                            addresses: entry.value().clone(),
602                            old_peer: None,
603                            bucket_range: self
604                                .kbuckets
605                                .bucket(&key)
606                                .map(|b| b.range())
607                                .expect("Not kbucket::Entry::SelfEntry."),
608                        }))
609                }
610                RoutingUpdate::Success
611            }
612            Some(kbucket::Entry::Pending(mut entry, _)) => {
613                entry.value().insert(address);
614                RoutingUpdate::Pending
615            }
616            Some(kbucket::Entry::Absent(entry)) => {
617                let addresses = Addresses::new(address);
618                let status = if self.connected_peers.contains(peer) {
619                    NodeStatus::Connected
620                } else {
621                    NodeStatus::Disconnected
622                };
623                match entry.insert(addresses.clone(), status) {
624                    kbucket::InsertResult::Inserted => {
625                        self.bootstrap_on_low_peers();
626
627                        self.queued_events.push_back(ToSwarm::GenerateEvent(
628                            Event::RoutingUpdated {
629                                peer: *peer,
630                                is_new_peer: true,
631                                addresses,
632                                old_peer: None,
633                                bucket_range: self
634                                    .kbuckets
635                                    .bucket(&key)
636                                    .map(|b| b.range())
637                                    .expect("Not kbucket::Entry::SelfEntry."),
638                            },
639                        ));
640                        RoutingUpdate::Success
641                    }
642                    kbucket::InsertResult::Full => {
643                        tracing::debug!(%peer, "Bucket full. Peer not added to routing table");
644                        RoutingUpdate::Failed
645                    }
646                    kbucket::InsertResult::Pending { disconnected } => {
647                        self.queued_events.push_back(ToSwarm::Dial {
648                            opts: DialOpts::peer_id(disconnected.into_preimage()).build(),
649                        });
650                        RoutingUpdate::Pending
651                    }
652                }
653            }
654            None => RoutingUpdate::Failed,
655        }
656    }
657
658    /// Removes an address of a peer from the routing table.
659    ///
660    /// If the given address is the last address of the peer in the
661    /// routing table, the peer is removed from the routing table
662    /// and `Some` is returned with a view of the removed entry.
663    /// The same applies if the peer is currently pending insertion
664    /// into the routing table.
665    ///
666    /// If the given peer or address is not in the routing table,
667    /// this is a no-op.
668    pub fn remove_address(
669        &mut self,
670        peer: &PeerId,
671        address: &Multiaddr,
672    ) -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> {
673        let address = &address.to_owned().with_p2p(*peer).ok()?;
674        let key = kbucket::Key::from(*peer);
675        match self.kbuckets.entry(&key)? {
676            kbucket::Entry::Present(mut entry, _) => {
677                if entry.value().remove(address).is_err() {
678                    Some(entry.remove()) // it is the last address, thus remove the peer.
679                } else {
680                    None
681                }
682            }
683            kbucket::Entry::Pending(mut entry, _) => {
684                if entry.value().remove(address).is_err() {
685                    Some(entry.remove()) // it is the last address, thus remove the peer.
686                } else {
687                    None
688                }
689            }
690            kbucket::Entry::Absent(..) => None,
691        }
692    }
693
694    /// Removes a peer from the routing table.
695    ///
696    /// Returns `None` if the peer was not in the routing table,
697    /// not even pending insertion.
698    pub fn remove_peer(
699        &mut self,
700        peer: &PeerId,
701    ) -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> {
702        let key = kbucket::Key::from(*peer);
703        match self.kbuckets.entry(&key)? {
704            kbucket::Entry::Present(entry, _) => Some(entry.remove()),
705            kbucket::Entry::Pending(entry, _) => Some(entry.remove()),
706            kbucket::Entry::Absent(..) => None,
707        }
708    }
709
710    /// Returns an iterator over all non-empty buckets in the routing table.
711    pub fn kbuckets(
712        &mut self,
713    ) -> impl Iterator<Item = kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>> {
714        self.kbuckets.iter().filter(|b| !b.is_empty())
715    }
716
717    /// Returns the k-bucket for the distance to the given key.
718    ///
719    /// Returns `None` if the given key refers to the local key.
720    pub fn kbucket<K>(
721        &mut self,
722        key: K,
723    ) -> Option<kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>>
724    where
725        K: Into<kbucket::Key<K>> + Clone,
726    {
727        self.kbuckets.bucket(&key.into())
728    }
729
730    /// Initiates an iterative query for the closest peers to the given key.
731    ///
732    /// The result of the query is delivered in a
733    /// [`Event::OutboundQueryProgressed{QueryResult::GetClosestPeers}`].
734    pub fn get_closest_peers<K>(&mut self, key: K) -> QueryId
735    where
736        K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
737    {
738        let target: kbucket::Key<K> = key.clone().into();
739        let key: Vec<u8> = key.into();
740        let info = QueryInfo::GetClosestPeers {
741            key,
742            step: ProgressStep::first(),
743        };
744        let peer_keys: Vec<kbucket::Key<PeerId>> = self.kbuckets.closest_keys(&target).collect();
745        self.queries.add_iter_closest(target, peer_keys, info)
746    }
747
748    /// Returns closest peers to the given key; takes peers from local routing table only.
749    pub fn get_closest_local_peers<'a, K: Clone>(
750        &'a mut self,
751        key: &'a kbucket::Key<K>,
752    ) -> impl Iterator<Item = kbucket::Key<PeerId>> + 'a {
753        self.kbuckets.closest_keys(key)
754    }
755
756    /// Performs a lookup for a record in the DHT.
757    ///
758    /// The result of this operation is delivered in a
759    /// [`Event::OutboundQueryProgressed{QueryResult::GetRecord}`].
760    pub fn get_record(&mut self, key: record::Key) -> QueryId {
761        let record = if let Some(record) = self.store.get(&key) {
762            if record.is_expired(Instant::now()) {
763                self.store.remove(&key);
764                None
765            } else {
766                Some(PeerRecord {
767                    peer: None,
768                    record: record.into_owned(),
769                })
770            }
771        } else {
772            None
773        };
774
775        let step = ProgressStep::first();
776
777        let target = kbucket::Key::new(key.clone());
778        let info = if record.is_some() {
779            QueryInfo::GetRecord {
780                key,
781                step: step.next(),
782                found_a_record: true,
783                cache_candidates: BTreeMap::new(),
784            }
785        } else {
786            QueryInfo::GetRecord {
787                key,
788                step: step.clone(),
789                found_a_record: false,
790                cache_candidates: BTreeMap::new(),
791            }
792        };
793        let peers = self.kbuckets.closest_keys(&target);
794        let id = self.queries.add_iter_closest(target.clone(), peers, info);
795
796        // No queries were actually done for the results yet.
797        let stats = QueryStats::empty();
798
799        if let Some(record) = record {
800            self.queued_events
801                .push_back(ToSwarm::GenerateEvent(Event::OutboundQueryProgressed {
802                    id,
803                    result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(record))),
804                    step,
805                    stats,
806                }));
807        }
808
809        id
810    }
811
812    /// Stores a record in the DHT, locally as well as at the nodes
813    /// closest to the key as per the xor distance metric.
814    ///
815    /// Returns `Ok` if a record has been stored locally, providing the
816    /// `QueryId` of the initial query that replicates the record in the DHT.
817    /// The result of the query is eventually reported as a
818    /// [`Event::OutboundQueryProgressed{QueryResult::PutRecord}`].
819    ///
820    /// The record is always stored locally with the given expiration. If the record's
821    /// expiration is `None`, the common case, it does not expire in local storage
822    /// but is still replicated with the configured record TTL. To remove the record
823    /// locally and stop it from being re-published in the DHT, see [`Behaviour::remove_record`].
824    ///
825    /// After the initial publication of the record, it is subject to (re-)replication
826    /// and (re-)publication as per the configured intervals. Periodic (re-)publication
827    /// does not update the record's expiration in local storage, thus a given record
828    /// with an explicit expiration will always expire at that instant and until then
829    /// is subject to regular (re-)replication and (re-)publication.
830    pub fn put_record(
831        &mut self,
832        mut record: Record,
833        quorum: Quorum,
834    ) -> Result<QueryId, store::Error> {
835        record.publisher = Some(*self.kbuckets.local_key().preimage());
836        self.store.put(record.clone())?;
837        record.expires = record
838            .expires
839            .or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl));
840        let quorum = quorum.eval(self.queries.config().replication_factor);
841        let target = kbucket::Key::new(record.key.clone());
842        let peers = self.kbuckets.closest_keys(&target);
843        let context = PutRecordContext::Publish;
844        let info = QueryInfo::PutRecord {
845            context,
846            record,
847            quorum,
848            phase: PutRecordPhase::GetClosestPeers,
849        };
850        Ok(self.queries.add_iter_closest(target.clone(), peers, info))
851    }
852
853    /// Stores a record at specific peers, without storing it locally.
854    ///
855    /// The given [`Quorum`] is understood in the context of the total
856    /// number of distinct peers given.
857    ///
858    /// If the record's expiration is `None`, the configured record TTL is used.
859    ///
860    /// > **Note**: This is not a regular Kademlia DHT operation. It needs to be
861    /// > used to selectively update or store a record to specific peers
862    /// > for the purpose of e.g. making sure these peers have the latest
863    /// > "version" of a record or to "cache" a record at further peers
864    /// > to increase the lookup success rate on the DHT for other peers.
865    /// >
866    /// > In particular, there is no automatic storing of records performed, and this
867    /// > method must be used to ensure the standard Kademlia
868    /// > procedure of "caching" (i.e. storing) a found record at the closest
869    /// > node to the key that _did not_ return it.
870    pub fn put_record_to<I>(&mut self, mut record: Record, peers: I, quorum: Quorum) -> QueryId
871    where
872        I: ExactSizeIterator<Item = PeerId>,
873    {
874        let quorum = if peers.len() > 0 {
875            quorum.eval(NonZeroUsize::new(peers.len()).expect("> 0"))
876        } else {
877            // If no peers are given, we just let the query fail immediately
878            // due to the fact that the quorum must be at least one, instead of
879            // introducing a new kind of error.
880            NonZeroUsize::new(1).expect("1 > 0")
881        };
882        record.expires = record
883            .expires
884            .or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl));
885        let context = PutRecordContext::Custom;
886        let info = QueryInfo::PutRecord {
887            context,
888            record,
889            quorum,
890            phase: PutRecordPhase::PutRecord {
891                success: Vec::new(),
892                get_closest_peers_stats: QueryStats::empty(),
893            },
894        };
895        self.queries.add_fixed(peers, info)
896    }
897
898    /// Removes the record with the given key from _local_ storage,
899    /// if the local node is the publisher of the record.
900    ///
901    /// Has no effect if a record for the given key is stored locally but
902    /// the local node is not a publisher of the record.
903    ///
904    /// This is a _local_ operation. However, it also has the effect that
905    /// the record will no longer be periodically re-published, allowing the
906    /// record to eventually expire throughout the DHT.
907    pub fn remove_record(&mut self, key: &record::Key) {
908        if let Some(r) = self.store.get(key) {
909            if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
910                self.store.remove(key)
911            }
912        }
913    }
914
915    /// Gets a mutable reference to the record store.
916    pub fn store_mut(&mut self) -> &mut TStore {
917        &mut self.store
918    }
919
920    /// Bootstraps the local node to join the DHT.
921    ///
922    /// Bootstrapping is a multi-step operation that starts with a lookup of the local node's
923    /// own ID in the DHT. This introduces the local node to the other nodes
924    /// in the DHT and populates its routing table with the closest neighbours.
925    ///
926    /// Subsequently, all buckets farther from the bucket of the closest neighbour are
927    /// refreshed by initiating an additional bootstrapping query for each such
928    /// bucket with random keys.
929    ///
930    /// Returns `Ok` if bootstrapping has been initiated with a self-lookup, providing the
931    /// `QueryId` for the entire bootstrapping process. The progress of bootstrapping is
932    /// reported via [`Event::OutboundQueryProgressed{QueryResult::Bootstrap}`] events,
933    /// with one such event per bootstrapping query.
934    ///
935    /// Returns `Err` if bootstrapping is impossible due an empty routing table.
936    ///
937    /// > **Note**: Bootstrapping requires at least one node of the DHT to be known.
938    /// > See [`Behaviour::add_address`].
939    ///
940    /// > **Note**: Bootstrap does not require to be called manually. It is periodically
941    /// > invoked at regular intervals based on the configured `periodic_bootstrap_interval` (see
942    /// > [`Config::set_periodic_bootstrap_interval`] for details) and it is also automatically invoked
943    /// > when a new peer is inserted in the routing table.
944    /// > This parameter is used to call [`Behaviour::bootstrap`] periodically and automatically
945    /// > to ensure a healthy routing table.
946    pub fn bootstrap(&mut self) -> Result<QueryId, NoKnownPeers> {
947        let local_key = *self.kbuckets.local_key();
948        let info = QueryInfo::Bootstrap {
949            peer: *local_key.preimage(),
950            remaining: None,
951            step: ProgressStep::first(),
952        };
953        let peers = self.kbuckets.closest_keys(&local_key).collect::<Vec<_>>();
954        if peers.is_empty() {
955            self.bootstrap_status.reset_timers();
956            Err(NoKnownPeers())
957        } else {
958            self.bootstrap_status.on_started();
959            Ok(self.queries.add_iter_closest(local_key, peers, info))
960        }
961    }
962
963    /// Establishes the local node as a provider of a value for the given key.
964    ///
965    /// This operation publishes a provider record with the given key and
966    /// identity of the local node to the peers closest to the key, thus establishing
967    /// the local node as a provider.
968    ///
969    /// Returns `Ok` if a provider record has been stored locally, providing the
970    /// `QueryId` of the initial query that announces the local node as a provider.
971    ///
972    /// The publication of the provider records is periodically repeated as per the
973    /// configured interval, to renew the expiry and account for changes to the DHT
974    /// topology. A provider record may be removed from local storage and
975    /// thus no longer re-published by calling [`Behaviour::stop_providing`].
976    ///
977    /// In contrast to the standard Kademlia push-based model for content distribution
978    /// implemented by [`Behaviour::put_record`], the provider API implements a
979    /// pull-based model that may be used in addition or as an alternative.
980    /// The means by which the actual value is obtained from a provider is out of scope
981    /// of the libp2p Kademlia provider API.
982    ///
983    /// The results of the (repeated) provider announcements sent by this node are
984    /// reported via [`Event::OutboundQueryProgressed{QueryResult::StartProviding}`].
985    pub fn start_providing(&mut self, key: record::Key) -> Result<QueryId, store::Error> {
986        // Note: We store our own provider records locally without local addresses
987        // to avoid redundant storage and outdated addresses. Instead these are
988        // acquired on demand when returning a `ProviderRecord` for the local node.
989        let local_addrs = Vec::new();
990        let record = ProviderRecord::new(
991            key.clone(),
992            *self.kbuckets.local_key().preimage(),
993            local_addrs,
994        );
995        self.store.add_provider(record)?;
996        let target = kbucket::Key::new(key.clone());
997        let peers = self.kbuckets.closest_keys(&target);
998        let context = AddProviderContext::Publish;
999        let info = QueryInfo::AddProvider {
1000            context,
1001            key,
1002            phase: AddProviderPhase::GetClosestPeers,
1003        };
1004        let id = self.queries.add_iter_closest(target.clone(), peers, info);
1005        Ok(id)
1006    }
1007
1008    /// Stops the local node from announcing that it is a provider for the given key.
1009    ///
1010    /// This is a local operation. The local node will still be considered as a
1011    /// provider for the key by other nodes until these provider records expire.
1012    pub fn stop_providing(&mut self, key: &record::Key) {
1013        self.store
1014            .remove_provider(key, self.kbuckets.local_key().preimage());
1015    }
1016
1017    /// Performs a lookup for providers of a value to the given key.
1018    ///
1019    /// The result of this operation is delivered in a
1020    /// reported via [`Event::OutboundQueryProgressed{QueryResult::GetProviders}`].
1021    pub fn get_providers(&mut self, key: record::Key) -> QueryId {
1022        let providers: HashSet<_> = self
1023            .store
1024            .providers(&key)
1025            .into_iter()
1026            .filter(|p| !p.is_expired(Instant::now()))
1027            .map(|p| p.provider)
1028            .collect();
1029
1030        let step = ProgressStep::first();
1031
1032        let info = QueryInfo::GetProviders {
1033            key: key.clone(),
1034            providers_found: providers.len(),
1035            step: if providers.is_empty() {
1036                step.clone()
1037            } else {
1038                step.next()
1039            },
1040        };
1041
1042        let target = kbucket::Key::new(key.clone());
1043        let peers = self.kbuckets.closest_keys(&target);
1044        let id = self.queries.add_iter_closest(target.clone(), peers, info);
1045
1046        // No queries were actually done for the results yet.
1047        let stats = QueryStats::empty();
1048
1049        if !providers.is_empty() {
1050            self.queued_events
1051                .push_back(ToSwarm::GenerateEvent(Event::OutboundQueryProgressed {
1052                    id,
1053                    result: QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders {
1054                        key,
1055                        providers,
1056                    })),
1057                    step,
1058                    stats,
1059                }));
1060        }
1061        id
1062    }
1063
1064    /// Set the [`Mode`] in which we should operate.
1065    ///
1066    /// By default, we are in [`Mode::Client`] and will swap into [`Mode::Server`] as soon as we have a confirmed, external address via [`FromSwarm::ExternalAddrConfirmed`].
1067    ///
1068    /// Setting a mode via this function disables this automatic behaviour and unconditionally operates in the specified mode.
1069    /// To reactivate the automatic configuration, pass [`None`] instead.
1070    pub fn set_mode(&mut self, mode: Option<Mode>) {
1071        match mode {
1072            Some(mode) => {
1073                self.mode = mode;
1074                self.auto_mode = false;
1075                self.reconfigure_mode();
1076            }
1077            None => {
1078                self.auto_mode = true;
1079                self.determine_mode_from_external_addresses();
1080            }
1081        }
1082
1083        if let Some(waker) = self.no_events_waker.take() {
1084            waker.wake();
1085        }
1086    }
1087
1088    fn reconfigure_mode(&mut self) {
1089        if self.connections.is_empty() {
1090            return;
1091        }
1092
1093        let num_connections = self.connections.len();
1094
1095        tracing::debug!(
1096            "Re-configuring {} established connection{}",
1097            num_connections,
1098            if num_connections > 1 { "s" } else { "" }
1099        );
1100
1101        self.queued_events
1102            .extend(
1103                self.connections
1104                    .iter()
1105                    .map(|(conn_id, peer_id)| ToSwarm::NotifyHandler {
1106                        peer_id: *peer_id,
1107                        handler: NotifyHandler::One(*conn_id),
1108                        event: HandlerIn::ReconfigureMode {
1109                            new_mode: self.mode,
1110                        },
1111                    }),
1112            );
1113    }
1114
1115    fn determine_mode_from_external_addresses(&mut self) {
1116        let old_mode = self.mode;
1117
1118        self.mode = match (self.external_addresses.as_slice(), self.mode) {
1119            ([], Mode::Server) => {
1120                tracing::debug!("Switching to client-mode because we no longer have any confirmed external addresses");
1121
1122                Mode::Client
1123            }
1124            ([], Mode::Client) => {
1125                // Previously client-mode, now also client-mode because no external addresses.
1126
1127                Mode::Client
1128            }
1129            (confirmed_external_addresses, Mode::Client) => {
1130                if tracing::enabled!(Level::DEBUG) {
1131                    let confirmed_external_addresses =
1132                        to_comma_separated_list(confirmed_external_addresses);
1133
1134                    tracing::debug!("Switching to server-mode assuming that one of [{confirmed_external_addresses}] is externally reachable");
1135                }
1136
1137                Mode::Server
1138            }
1139            (confirmed_external_addresses, Mode::Server) => {
1140                debug_assert!(
1141                    !confirmed_external_addresses.is_empty(),
1142                    "Previous match arm handled empty list"
1143                );
1144
1145                // Previously, server-mode, now also server-mode because > 1 external address. Don't log anything to avoid spam.
1146
1147                Mode::Server
1148            }
1149        };
1150
1151        self.reconfigure_mode();
1152
1153        if old_mode != self.mode {
1154            self.queued_events
1155                .push_back(ToSwarm::GenerateEvent(Event::ModeChanged {
1156                    new_mode: self.mode,
1157                }));
1158        }
1159    }
1160
1161    /// Processes discovered peers from a successful request in an iterative `Query`.
1162    fn discovered<'a, I>(&'a mut self, query_id: &QueryId, source: &PeerId, peers: I)
1163    where
1164        I: Iterator<Item = &'a KadPeer> + Clone,
1165    {
1166        let local_id = self.kbuckets.local_key().preimage();
1167        let others_iter = peers.filter(|p| &p.node_id != local_id);
1168        if let Some(query) = self.queries.get_mut(query_id) {
1169            tracing::trace!(peer=%source, query=?query_id, "Request to peer in query succeeded");
1170            for peer in others_iter.clone() {
1171                tracing::trace!(
1172                    ?peer,
1173                    %source,
1174                    query=?query_id,
1175                    "Peer reported by source in query"
1176                );
1177                let addrs = peer.multiaddrs.iter().cloned().collect();
1178                query.peers.addresses.insert(peer.node_id, addrs);
1179            }
1180            query.on_success(source, others_iter.cloned().map(|kp| kp.node_id))
1181        }
1182    }
1183
1184    /// Finds the closest peers to a `target` in the context of a request by
1185    /// the `source` peer, such that the `source` peer is never included in the
1186    /// result.
1187    fn find_closest<T: Clone>(
1188        &mut self,
1189        target: &kbucket::Key<T>,
1190        source: &PeerId,
1191    ) -> Vec<KadPeer> {
1192        self.kbuckets
1193            .closest(target)
1194            .filter(|e| e.node.key.preimage() != source)
1195            .take(self.queries.config().replication_factor.get())
1196            .map(KadPeer::from)
1197            .collect()
1198    }
1199
1200    /// Collects all peers who are known to be providers of the value for a given `Multihash`.
1201    fn provider_peers(&mut self, key: &record::Key, source: &PeerId) -> Vec<KadPeer> {
1202        let kbuckets = &mut self.kbuckets;
1203        let connected = &mut self.connected_peers;
1204        let listen_addresses = &self.listen_addresses;
1205        let external_addresses = &self.external_addresses;
1206
1207        self.store
1208            .providers(key)
1209            .into_iter()
1210            .filter_map(move |p| {
1211                if &p.provider != source {
1212                    let node_id = p.provider;
1213                    let multiaddrs = p.addresses;
1214                    let connection_ty = if connected.contains(&node_id) {
1215                        ConnectionType::Connected
1216                    } else {
1217                        ConnectionType::NotConnected
1218                    };
1219                    if multiaddrs.is_empty() {
1220                        // The provider is either the local node and we fill in
1221                        // the local addresses on demand, or it is a legacy
1222                        // provider record without addresses, in which case we
1223                        // try to find addresses in the routing table, as was
1224                        // done before provider records were stored along with
1225                        // their addresses.
1226                        if &node_id == kbuckets.local_key().preimage() {
1227                            Some(
1228                                listen_addresses
1229                                    .iter()
1230                                    .chain(external_addresses.iter())
1231                                    .cloned()
1232                                    .collect::<Vec<_>>(),
1233                            )
1234                        } else {
1235                            let key = kbucket::Key::from(node_id);
1236                            kbuckets
1237                                .entry(&key)
1238                                .as_mut()
1239                                .and_then(|e| e.view())
1240                                .map(|e| e.node.value.clone().into_vec())
1241                        }
1242                    } else {
1243                        Some(multiaddrs)
1244                    }
1245                    .map(|multiaddrs| KadPeer {
1246                        node_id,
1247                        multiaddrs,
1248                        connection_ty,
1249                    })
1250                } else {
1251                    None
1252                }
1253            })
1254            .take(self.queries.config().replication_factor.get())
1255            .collect()
1256    }
1257
1258    /// Starts an iterative `ADD_PROVIDER` query for the given key.
1259    fn start_add_provider(&mut self, key: record::Key, context: AddProviderContext) {
1260        let info = QueryInfo::AddProvider {
1261            context,
1262            key: key.clone(),
1263            phase: AddProviderPhase::GetClosestPeers,
1264        };
1265        let target = kbucket::Key::new(key);
1266        let peers = self.kbuckets.closest_keys(&target);
1267        self.queries.add_iter_closest(target.clone(), peers, info);
1268    }
1269
1270    /// Starts an iterative `PUT_VALUE` query for the given record.
1271    fn start_put_record(&mut self, record: Record, quorum: Quorum, context: PutRecordContext) {
1272        let quorum = quorum.eval(self.queries.config().replication_factor);
1273        let target = kbucket::Key::new(record.key.clone());
1274        let peers = self.kbuckets.closest_keys(&target);
1275        let info = QueryInfo::PutRecord {
1276            record,
1277            quorum,
1278            context,
1279            phase: PutRecordPhase::GetClosestPeers,
1280        };
1281        self.queries.add_iter_closest(target.clone(), peers, info);
1282    }
1283
1284    /// Updates the routing table with a new connection status and address of a peer.
1285    fn connection_updated(
1286        &mut self,
1287        peer: PeerId,
1288        address: Option<Multiaddr>,
1289        new_status: NodeStatus,
1290    ) {
1291        let key = kbucket::Key::from(peer);
1292        match self.kbuckets.entry(&key) {
1293            Some(kbucket::Entry::Present(mut entry, old_status)) => {
1294                if old_status != new_status {
1295                    entry.update(new_status)
1296                }
1297                if let Some(address) = address {
1298                    if entry.value().insert(address) {
1299                        self.queued_events.push_back(ToSwarm::GenerateEvent(
1300                            Event::RoutingUpdated {
1301                                peer,
1302                                is_new_peer: false,
1303                                addresses: entry.value().clone(),
1304                                old_peer: None,
1305                                bucket_range: self
1306                                    .kbuckets
1307                                    .bucket(&key)
1308                                    .map(|b| b.range())
1309                                    .expect("Not kbucket::Entry::SelfEntry."),
1310                            },
1311                        ))
1312                    }
1313                }
1314            }
1315
1316            Some(kbucket::Entry::Pending(mut entry, old_status)) => {
1317                if let Some(address) = address {
1318                    entry.value().insert(address);
1319                }
1320                if old_status != new_status {
1321                    entry.update(new_status);
1322                }
1323            }
1324
1325            Some(kbucket::Entry::Absent(entry)) => {
1326                // Only connected nodes with a known address are newly inserted.
1327                if new_status != NodeStatus::Connected {
1328                    return;
1329                }
1330                match (address, self.kbucket_inserts) {
1331                    (None, _) => {
1332                        self.queued_events
1333                            .push_back(ToSwarm::GenerateEvent(Event::UnroutablePeer { peer }));
1334                    }
1335                    (Some(a), BucketInserts::Manual) => {
1336                        self.queued_events
1337                            .push_back(ToSwarm::GenerateEvent(Event::RoutablePeer {
1338                                peer,
1339                                address: a,
1340                            }));
1341                    }
1342                    (Some(a), BucketInserts::OnConnected) => {
1343                        let addresses = Addresses::new(a);
1344                        match entry.insert(addresses.clone(), new_status) {
1345                            kbucket::InsertResult::Inserted => {
1346                                self.bootstrap_on_low_peers();
1347
1348                                let event = Event::RoutingUpdated {
1349                                    peer,
1350                                    is_new_peer: true,
1351                                    addresses,
1352                                    old_peer: None,
1353                                    bucket_range: self
1354                                        .kbuckets
1355                                        .bucket(&key)
1356                                        .map(|b| b.range())
1357                                        .expect("Not kbucket::Entry::SelfEntry."),
1358                                };
1359                                self.queued_events.push_back(ToSwarm::GenerateEvent(event));
1360                            }
1361                            kbucket::InsertResult::Full => {
1362                                tracing::debug!(
1363                                    %peer,
1364                                    "Bucket full. Peer not added to routing table"
1365                                );
1366                                let address = addresses.first().clone();
1367                                self.queued_events.push_back(ToSwarm::GenerateEvent(
1368                                    Event::RoutablePeer { peer, address },
1369                                ));
1370                            }
1371                            kbucket::InsertResult::Pending { disconnected } => {
1372                                let address = addresses.first().clone();
1373                                self.queued_events.push_back(ToSwarm::GenerateEvent(
1374                                    Event::PendingRoutablePeer { peer, address },
1375                                ));
1376
1377                                // `disconnected` might already be in the process of re-connecting.
1378                                // In other words `disconnected` might have already re-connected but
1379                                // is not yet confirmed to support the Kademlia protocol via
1380                                // [`HandlerEvent::ProtocolConfirmed`].
1381                                //
1382                                // Only try dialing peer if not currently connected.
1383                                if !self.connected_peers.contains(disconnected.preimage()) {
1384                                    self.queued_events.push_back(ToSwarm::Dial {
1385                                        opts: DialOpts::peer_id(disconnected.into_preimage())
1386                                            .build(),
1387                                    })
1388                                }
1389                            }
1390                        }
1391                    }
1392                }
1393            }
1394            _ => {}
1395        }
1396    }
1397
1398    /// A new peer has been inserted in the routing table but we check if the routing
1399    /// table is currently small (less that `K_VALUE` peers are present) and only
1400    /// trigger a bootstrap in that case
1401    fn bootstrap_on_low_peers(&mut self) {
1402        if self
1403            .kbuckets()
1404            .map(|kbucket| kbucket.num_entries())
1405            .sum::<usize>()
1406            < K_VALUE.get()
1407        {
1408            self.bootstrap_status.trigger();
1409        }
1410    }
1411
1412    /// Handles a finished (i.e. successful) query.
1413    fn query_finished(&mut self, q: Query) -> Option<Event> {
1414        let query_id = q.id();
1415        tracing::trace!(query=?query_id, "Query finished");
1416        match q.info {
1417            QueryInfo::Bootstrap {
1418                peer,
1419                remaining,
1420                mut step,
1421            } => {
1422                let local_key = *self.kbuckets.local_key();
1423                let mut remaining = remaining.unwrap_or_else(|| {
1424                    debug_assert_eq!(&peer, local_key.preimage());
1425                    // The lookup for the local key finished. To complete the bootstrap process,
1426                    // a bucket refresh should be performed for every bucket farther away than
1427                    // the first non-empty bucket (which are most likely no more than the last
1428                    // few, i.e. farthest, buckets).
1429                    self.kbuckets
1430                        .iter()
1431                        .skip_while(|b| b.is_empty())
1432                        .skip(1) // Skip the bucket with the closest neighbour.
1433                        .map(|b| {
1434                            // Try to find a key that falls into the bucket. While such keys can
1435                            // be generated fully deterministically, the current libp2p kademlia
1436                            // wire protocol requires transmission of the preimages of the actual
1437                            // keys in the DHT keyspace, hence for now this is just a "best effort"
1438                            // to find a key that hashes into a specific bucket. The probabilities
1439                            // of finding a key in the bucket `b` with as most 16 trials are as
1440                            // follows:
1441                            //
1442                            // Pr(bucket-255) = 1 - (1/2)^16   ~= 1
1443                            // Pr(bucket-254) = 1 - (3/4)^16   ~= 1
1444                            // Pr(bucket-253) = 1 - (7/8)^16   ~= 0.88
1445                            // Pr(bucket-252) = 1 - (15/16)^16 ~= 0.64
1446                            // ...
1447                            let mut target = kbucket::Key::from(PeerId::random());
1448                            for _ in 0..16 {
1449                                let d = local_key.distance(&target);
1450                                if b.contains(&d) {
1451                                    break;
1452                                }
1453                                target = kbucket::Key::from(PeerId::random());
1454                            }
1455                            target
1456                        })
1457                        .collect::<Vec<_>>()
1458                        .into_iter()
1459                });
1460
1461                let num_remaining = remaining.len() as u32;
1462
1463                if let Some(target) = remaining.next() {
1464                    let info = QueryInfo::Bootstrap {
1465                        peer: *target.preimage(),
1466                        remaining: Some(remaining),
1467                        step: step.next(),
1468                    };
1469                    let peers = self.kbuckets.closest_keys(&target);
1470                    self.queries
1471                        .continue_iter_closest(query_id, target, peers, info);
1472                } else {
1473                    step.last = true;
1474                    self.bootstrap_status.on_finish();
1475                };
1476
1477                Some(Event::OutboundQueryProgressed {
1478                    id: query_id,
1479                    stats: q.stats,
1480                    result: QueryResult::Bootstrap(Ok(BootstrapOk {
1481                        peer,
1482                        num_remaining,
1483                    })),
1484                    step,
1485                })
1486            }
1487
1488            QueryInfo::GetClosestPeers { key, mut step } => {
1489                step.last = true;
1490
1491                Some(Event::OutboundQueryProgressed {
1492                    id: query_id,
1493                    stats: q.stats,
1494                    result: QueryResult::GetClosestPeers(Ok(GetClosestPeersOk {
1495                        key,
1496                        peers: q.peers.into_peerinfos_iter().collect(),
1497                    })),
1498                    step,
1499                })
1500            }
1501
1502            QueryInfo::GetProviders { mut step, .. } => {
1503                step.last = true;
1504
1505                Some(Event::OutboundQueryProgressed {
1506                    id: query_id,
1507                    stats: q.stats,
1508                    result: QueryResult::GetProviders(Ok(
1509                        GetProvidersOk::FinishedWithNoAdditionalRecord {
1510                            closest_peers: q.peers.into_peerids_iter().collect(),
1511                        },
1512                    )),
1513                    step,
1514                })
1515            }
1516
1517            QueryInfo::AddProvider {
1518                context,
1519                key,
1520                phase: AddProviderPhase::GetClosestPeers,
1521            } => {
1522                let provider_id = self.local_peer_id;
1523                let external_addresses = self.external_addresses.iter().cloned().collect();
1524                let info = QueryInfo::AddProvider {
1525                    context,
1526                    key,
1527                    phase: AddProviderPhase::AddProvider {
1528                        provider_id,
1529                        external_addresses,
1530                        get_closest_peers_stats: q.stats,
1531                    },
1532                };
1533                self.queries
1534                    .continue_fixed(query_id, q.peers.into_peerids_iter(), info);
1535                None
1536            }
1537
1538            QueryInfo::AddProvider {
1539                context,
1540                key,
1541                phase:
1542                    AddProviderPhase::AddProvider {
1543                        get_closest_peers_stats,
1544                        ..
1545                    },
1546            } => match context {
1547                AddProviderContext::Publish => Some(Event::OutboundQueryProgressed {
1548                    id: query_id,
1549                    stats: get_closest_peers_stats.merge(q.stats),
1550                    result: QueryResult::StartProviding(Ok(AddProviderOk { key })),
1551                    step: ProgressStep::first_and_last(),
1552                }),
1553                AddProviderContext::Republish => Some(Event::OutboundQueryProgressed {
1554                    id: query_id,
1555                    stats: get_closest_peers_stats.merge(q.stats),
1556                    result: QueryResult::RepublishProvider(Ok(AddProviderOk { key })),
1557                    step: ProgressStep::first_and_last(),
1558                }),
1559            },
1560
1561            QueryInfo::GetRecord {
1562                key,
1563                mut step,
1564                found_a_record,
1565                cache_candidates,
1566            } => {
1567                step.last = true;
1568
1569                let results = if found_a_record {
1570                    Ok(GetRecordOk::FinishedWithNoAdditionalRecord { cache_candidates })
1571                } else {
1572                    Err(GetRecordError::NotFound {
1573                        key,
1574                        closest_peers: q.peers.into_peerids_iter().collect(),
1575                    })
1576                };
1577                Some(Event::OutboundQueryProgressed {
1578                    id: query_id,
1579                    stats: q.stats,
1580                    result: QueryResult::GetRecord(results),
1581                    step,
1582                })
1583            }
1584
1585            QueryInfo::PutRecord {
1586                context,
1587                record,
1588                quorum,
1589                phase: PutRecordPhase::GetClosestPeers,
1590            } => {
1591                let info = QueryInfo::PutRecord {
1592                    context,
1593                    record,
1594                    quorum,
1595                    phase: PutRecordPhase::PutRecord {
1596                        success: vec![],
1597                        get_closest_peers_stats: q.stats,
1598                    },
1599                };
1600                self.queries
1601                    .continue_fixed(query_id, q.peers.into_peerids_iter(), info);
1602                None
1603            }
1604
1605            QueryInfo::PutRecord {
1606                context,
1607                record,
1608                quorum,
1609                phase:
1610                    PutRecordPhase::PutRecord {
1611                        success,
1612                        get_closest_peers_stats,
1613                    },
1614            } => {
1615                let mk_result = |key: record::Key| {
1616                    if success.len() >= quorum.get() {
1617                        Ok(PutRecordOk { key })
1618                    } else {
1619                        Err(PutRecordError::QuorumFailed {
1620                            key,
1621                            quorum,
1622                            success,
1623                        })
1624                    }
1625                };
1626                match context {
1627                    PutRecordContext::Publish | PutRecordContext::Custom => {
1628                        Some(Event::OutboundQueryProgressed {
1629                            id: query_id,
1630                            stats: get_closest_peers_stats.merge(q.stats),
1631                            result: QueryResult::PutRecord(mk_result(record.key)),
1632                            step: ProgressStep::first_and_last(),
1633                        })
1634                    }
1635                    PutRecordContext::Republish => Some(Event::OutboundQueryProgressed {
1636                        id: query_id,
1637                        stats: get_closest_peers_stats.merge(q.stats),
1638                        result: QueryResult::RepublishRecord(mk_result(record.key)),
1639                        step: ProgressStep::first_and_last(),
1640                    }),
1641                    PutRecordContext::Replicate => {
1642                        tracing::debug!(record=?record.key, "Record replicated");
1643                        None
1644                    }
1645                }
1646            }
1647        }
1648    }
1649
1650    /// Handles a query that timed out.
1651    fn query_timeout(&mut self, query: Query) -> Option<Event> {
1652        let query_id = query.id();
1653        tracing::trace!(query=?query_id, "Query timed out");
1654        match query.info {
1655            QueryInfo::Bootstrap {
1656                peer,
1657                mut remaining,
1658                mut step,
1659            } => {
1660                let num_remaining = remaining.as_ref().map(|r| r.len().saturating_sub(1) as u32);
1661
1662                // Continue with the next bootstrap query if `remaining` is not empty.
1663                if let Some((target, remaining)) =
1664                    remaining.take().and_then(|mut r| Some((r.next()?, r)))
1665                {
1666                    let info = QueryInfo::Bootstrap {
1667                        peer: target.into_preimage(),
1668                        remaining: Some(remaining),
1669                        step: step.next(),
1670                    };
1671                    let peers = self.kbuckets.closest_keys(&target);
1672                    self.queries
1673                        .continue_iter_closest(query_id, target, peers, info);
1674                } else {
1675                    step.last = true;
1676                    self.bootstrap_status.on_finish();
1677                }
1678
1679                Some(Event::OutboundQueryProgressed {
1680                    id: query_id,
1681                    stats: query.stats,
1682                    result: QueryResult::Bootstrap(Err(BootstrapError::Timeout {
1683                        peer,
1684                        num_remaining,
1685                    })),
1686                    step,
1687                })
1688            }
1689
1690            QueryInfo::AddProvider { context, key, .. } => Some(match context {
1691                AddProviderContext::Publish => Event::OutboundQueryProgressed {
1692                    id: query_id,
1693                    stats: query.stats,
1694                    result: QueryResult::StartProviding(Err(AddProviderError::Timeout { key })),
1695                    step: ProgressStep::first_and_last(),
1696                },
1697                AddProviderContext::Republish => Event::OutboundQueryProgressed {
1698                    id: query_id,
1699                    stats: query.stats,
1700                    result: QueryResult::RepublishProvider(Err(AddProviderError::Timeout { key })),
1701                    step: ProgressStep::first_and_last(),
1702                },
1703            }),
1704
1705            QueryInfo::GetClosestPeers { key, mut step } => {
1706                step.last = true;
1707                Some(Event::OutboundQueryProgressed {
1708                    id: query_id,
1709                    stats: query.stats,
1710                    result: QueryResult::GetClosestPeers(Err(GetClosestPeersError::Timeout {
1711                        key,
1712                        peers: query.peers.into_peerinfos_iter().collect(),
1713                    })),
1714                    step,
1715                })
1716            }
1717
1718            QueryInfo::PutRecord {
1719                record,
1720                quorum,
1721                context,
1722                phase,
1723            } => {
1724                let err = Err(PutRecordError::Timeout {
1725                    key: record.key,
1726                    quorum,
1727                    success: match phase {
1728                        PutRecordPhase::GetClosestPeers => vec![],
1729                        PutRecordPhase::PutRecord { ref success, .. } => success.clone(),
1730                    },
1731                });
1732                match context {
1733                    PutRecordContext::Publish | PutRecordContext::Custom => {
1734                        Some(Event::OutboundQueryProgressed {
1735                            id: query_id,
1736                            stats: query.stats,
1737                            result: QueryResult::PutRecord(err),
1738                            step: ProgressStep::first_and_last(),
1739                        })
1740                    }
1741                    PutRecordContext::Republish => Some(Event::OutboundQueryProgressed {
1742                        id: query_id,
1743                        stats: query.stats,
1744                        result: QueryResult::RepublishRecord(err),
1745                        step: ProgressStep::first_and_last(),
1746                    }),
1747                    PutRecordContext::Replicate => match phase {
1748                        PutRecordPhase::GetClosestPeers => {
1749                            tracing::warn!(
1750                                "Locating closest peers for replication failed: {:?}",
1751                                err
1752                            );
1753                            None
1754                        }
1755                        PutRecordPhase::PutRecord { .. } => {
1756                            tracing::debug!("Replicating record failed: {:?}", err);
1757                            None
1758                        }
1759                    },
1760                }
1761            }
1762
1763            QueryInfo::GetRecord { key, mut step, .. } => {
1764                step.last = true;
1765
1766                Some(Event::OutboundQueryProgressed {
1767                    id: query_id,
1768                    stats: query.stats,
1769                    result: QueryResult::GetRecord(Err(GetRecordError::Timeout { key })),
1770                    step,
1771                })
1772            }
1773
1774            QueryInfo::GetProviders { key, mut step, .. } => {
1775                step.last = true;
1776
1777                Some(Event::OutboundQueryProgressed {
1778                    id: query_id,
1779                    stats: query.stats,
1780                    result: QueryResult::GetProviders(Err(GetProvidersError::Timeout {
1781                        key,
1782                        closest_peers: query.peers.into_peerids_iter().collect(),
1783                    })),
1784                    step,
1785                })
1786            }
1787        }
1788    }
1789
1790    /// Processes a record received from a peer.
1791    fn record_received(
1792        &mut self,
1793        source: PeerId,
1794        connection: ConnectionId,
1795        request_id: RequestId,
1796        mut record: Record,
1797    ) {
1798        if record.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
1799            // If the (alleged) publisher is the local node, do nothing. The record of
1800            // the original publisher should never change as a result of replication
1801            // and the publisher is always assumed to have the "right" value.
1802            self.queued_events.push_back(ToSwarm::NotifyHandler {
1803                peer_id: source,
1804                handler: NotifyHandler::One(connection),
1805                event: HandlerIn::PutRecordRes {
1806                    key: record.key,
1807                    value: record.value,
1808                    request_id,
1809                },
1810            });
1811            return;
1812        }
1813
1814        let now = Instant::now();
1815
1816        // Calculate the expiration exponentially inversely proportional to the
1817        // number of nodes between the local node and the closest node to the key
1818        // (beyond the replication factor). This ensures avoiding over-caching
1819        // outside of the k closest nodes to a key.
1820        let target = kbucket::Key::new(record.key.clone());
1821        let num_between = self.kbuckets.count_nodes_between(&target);
1822        let k = self.queries.config().replication_factor.get();
1823        let num_beyond_k = (usize::max(k, num_between) - k) as u32;
1824        let expiration = self
1825            .record_ttl
1826            .map(|ttl| now + exp_decrease(ttl, num_beyond_k));
1827        // The smaller TTL prevails. Only if neither TTL is set is the record
1828        // stored "forever".
1829        record.expires = record.expires.or(expiration).min(expiration);
1830
1831        if let Some(job) = self.put_record_job.as_mut() {
1832            // Ignore the record in the next run of the replication
1833            // job, since we can assume the sender replicated the
1834            // record to the k closest peers. Effectively, only
1835            // one of the k closest peers performs a replication
1836            // in the configured interval, assuming a shared interval.
1837            job.skip(record.key.clone())
1838        }
1839
1840        // While records received from a publisher, as well as records that do
1841        // not exist locally should always (attempted to) be stored, there is a
1842        // choice here w.r.t. the handling of replicated records whose keys refer
1843        // to records that exist locally: The value and / or the publisher may
1844        // either be overridden or left unchanged. At the moment and in the
1845        // absence of a decisive argument for another option, both are always
1846        // overridden as it avoids having to load the existing record in the
1847        // first place.
1848
1849        if !record.is_expired(now) {
1850            // The record is cloned because of the weird libp2p protocol
1851            // requirement to send back the value in the response, although this
1852            // is a waste of resources.
1853            match self.record_filtering {
1854                StoreInserts::Unfiltered => match self.store.put(record.clone()) {
1855                    Ok(()) => {
1856                        tracing::debug!(
1857                            record=?record.key,
1858                            "Record stored: {} bytes",
1859                            record.value.len()
1860                        );
1861                        self.queued_events.push_back(ToSwarm::GenerateEvent(
1862                            Event::InboundRequest {
1863                                request: InboundRequest::PutRecord {
1864                                    source,
1865                                    connection,
1866                                    record: None,
1867                                },
1868                            },
1869                        ));
1870                    }
1871                    Err(e) => {
1872                        tracing::info!("Record not stored: {:?}", e);
1873                        self.queued_events.push_back(ToSwarm::NotifyHandler {
1874                            peer_id: source,
1875                            handler: NotifyHandler::One(connection),
1876                            event: HandlerIn::Reset(request_id),
1877                        });
1878
1879                        return;
1880                    }
1881                },
1882                StoreInserts::FilterBoth => {
1883                    self.queued_events
1884                        .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1885                            request: InboundRequest::PutRecord {
1886                                source,
1887                                connection,
1888                                record: Some(record.clone()),
1889                            },
1890                        }));
1891                }
1892            }
1893        }
1894
1895        // The remote receives a [`HandlerIn::PutRecordRes`] even in the
1896        // case where the record is discarded due to being expired. Given that
1897        // the remote sent the local node a [`HandlerEvent::PutRecord`]
1898        // request, the remote perceives the local node as one node among the k
1899        // closest nodes to the target. In addition returning
1900        // [`HandlerIn::PutRecordRes`] does not reveal any internal
1901        // information to a possibly malicious remote node.
1902        self.queued_events.push_back(ToSwarm::NotifyHandler {
1903            peer_id: source,
1904            handler: NotifyHandler::One(connection),
1905            event: HandlerIn::PutRecordRes {
1906                key: record.key,
1907                value: record.value,
1908                request_id,
1909            },
1910        })
1911    }
1912
1913    /// Processes a provider record received from a peer.
1914    fn provider_received(&mut self, key: record::Key, provider: KadPeer) {
1915        if &provider.node_id != self.kbuckets.local_key().preimage() {
1916            let record = ProviderRecord {
1917                key,
1918                provider: provider.node_id,
1919                expires: self.provider_record_ttl.map(|ttl| Instant::now() + ttl),
1920                addresses: provider.multiaddrs,
1921            };
1922            match self.record_filtering {
1923                StoreInserts::Unfiltered => {
1924                    if let Err(e) = self.store.add_provider(record) {
1925                        tracing::info!("Provider record not stored: {:?}", e);
1926                        return;
1927                    }
1928
1929                    self.queued_events
1930                        .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1931                            request: InboundRequest::AddProvider { record: None },
1932                        }));
1933                }
1934                StoreInserts::FilterBoth => {
1935                    self.queued_events
1936                        .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1937                            request: InboundRequest::AddProvider {
1938                                record: Some(record),
1939                            },
1940                        }));
1941                }
1942            }
1943        }
1944    }
1945
1946    fn address_failed(&mut self, peer_id: PeerId, address: &Multiaddr) {
1947        let key = kbucket::Key::from(peer_id);
1948
1949        if let Some(addrs) = self.kbuckets.entry(&key).as_mut().and_then(|e| e.value()) {
1950            // TODO: Ideally, the address should only be removed if the error can
1951            // be classified as "permanent" but since `err` is currently a borrowed
1952            // trait object without a `'static` bound, even downcasting for inspection
1953            // of the error is not possible (and also not truly desirable or ergonomic).
1954            // The error passed in should rather be a dedicated enum.
1955            if addrs.remove(address).is_ok() {
1956                tracing::debug!(
1957                    peer=%peer_id,
1958                    %address,
1959                    "Address removed from peer due to error."
1960                );
1961            } else {
1962                // Despite apparently having no reachable address (any longer),
1963                // the peer is kept in the routing table with the last address to avoid
1964                // (temporary) loss of network connectivity to "flush" the routing
1965                // table. Once in, a peer is only removed from the routing table
1966                // if it is the least recently connected peer, currently disconnected
1967                // and is unreachable in the context of another peer pending insertion
1968                // into the same bucket. This is handled transparently by the
1969                // `KBucketsTable` and takes effect through `KBucketsTable::take_applied_pending`
1970                // within `Behaviour::poll`.
1971                tracing::debug!(
1972                    peer=%peer_id,
1973                    %address,
1974                    "Last remaining address of peer is unreachable."
1975                );
1976            }
1977        }
1978
1979        for query in self.queries.iter_mut() {
1980            if let Some(addrs) = query.peers.addresses.get_mut(&peer_id) {
1981                addrs.retain(|a| a != address);
1982            }
1983        }
1984    }
1985
1986    fn on_connection_established(
1987        &mut self,
1988        ConnectionEstablished {
1989            peer_id,
1990            failed_addresses,
1991            other_established,
1992            ..
1993        }: ConnectionEstablished,
1994    ) {
1995        for addr in failed_addresses {
1996            self.address_failed(peer_id, addr);
1997        }
1998
1999        // Peer's first connection.
2000        if other_established == 0 {
2001            self.connected_peers.insert(peer_id);
2002        }
2003    }
2004
2005    fn on_address_change(
2006        &mut self,
2007        AddressChange {
2008            peer_id: peer,
2009            old,
2010            new,
2011            ..
2012        }: AddressChange,
2013    ) {
2014        let (old, new) = (old.get_remote_address(), new.get_remote_address());
2015
2016        // Update routing table.
2017        if let Some(addrs) = self
2018            .kbuckets
2019            .entry(&kbucket::Key::from(peer))
2020            .as_mut()
2021            .and_then(|e| e.value())
2022        {
2023            if addrs.replace(old, new) {
2024                tracing::debug!(
2025                    %peer,
2026                    old_address=%old,
2027                    new_address=%new,
2028                    "Old address replaced with new address for peer."
2029                );
2030            } else {
2031                tracing::debug!(
2032                    %peer,
2033                    old_address=%old,
2034                    new_address=%new,
2035                    "Old address not replaced with new address for peer as old address wasn't present.",
2036                );
2037            }
2038        } else {
2039            tracing::debug!(
2040                %peer,
2041                old_address=%old,
2042                new_address=%new,
2043                "Old address not replaced with new address for peer as peer is not present in the \
2044                 routing table."
2045            );
2046        }
2047
2048        // Update query address cache.
2049        //
2050        // Given two connected nodes: local node A and remote node B. Say node B
2051        // is not in node A's routing table. Additionally node B is part of the
2052        // `Query::addresses` list of an ongoing query on node A. Say Node
2053        // B triggers an address change and then disconnects. Later on the
2054        // earlier mentioned query on node A would like to connect to node B.
2055        // Without replacing the address in the `Query::addresses` set node
2056        // A would attempt to dial the old and not the new address.
2057        //
2058        // While upholding correctness, iterating through all discovered
2059        // addresses of a peer in all currently ongoing queries might have a
2060        // large performance impact. If so, the code below might be worth
2061        // revisiting.
2062        for query in self.queries.iter_mut() {
2063            if let Some(addrs) = query.peers.addresses.get_mut(&peer) {
2064                for addr in addrs.iter_mut() {
2065                    if addr == old {
2066                        *addr = new.clone();
2067                    }
2068                }
2069            }
2070        }
2071    }
2072
2073    fn on_dial_failure(&mut self, DialFailure { peer_id, error, .. }: DialFailure) {
2074        let Some(peer_id) = peer_id else { return };
2075
2076        match error {
2077            DialError::LocalPeerId { .. }
2078            | DialError::WrongPeerId { .. }
2079            | DialError::Aborted
2080            | DialError::Denied { .. }
2081            | DialError::Transport(_)
2082            | DialError::NoAddresses => {
2083                if let DialError::Transport(addresses) = error {
2084                    for (addr, _) in addresses {
2085                        self.address_failed(peer_id, addr)
2086                    }
2087                }
2088
2089                for query in self.queries.iter_mut() {
2090                    query.on_failure(&peer_id);
2091                }
2092            }
2093            DialError::DialPeerConditionFalse(
2094                dial_opts::PeerCondition::Disconnected
2095                | dial_opts::PeerCondition::NotDialing
2096                | dial_opts::PeerCondition::DisconnectedAndNotDialing,
2097            ) => {
2098                // We might (still) be connected, or about to be connected, thus do not report the
2099                // failure to the queries.
2100            }
2101            DialError::DialPeerConditionFalse(dial_opts::PeerCondition::Always) => {
2102                unreachable!("DialPeerCondition::Always can not trigger DialPeerConditionFalse.");
2103            }
2104        }
2105    }
2106
2107    fn on_connection_closed(
2108        &mut self,
2109        ConnectionClosed {
2110            peer_id,
2111            remaining_established,
2112            connection_id,
2113            ..
2114        }: ConnectionClosed,
2115    ) {
2116        self.connections.remove(&connection_id);
2117
2118        if remaining_established == 0 {
2119            for query in self.queries.iter_mut() {
2120                query.on_failure(&peer_id);
2121            }
2122            self.connection_updated(peer_id, None, NodeStatus::Disconnected);
2123            self.connected_peers.remove(&peer_id);
2124        }
2125    }
2126
2127    /// Preloads a new [`Handler`] with requests that are waiting to be sent to the newly connected peer.
2128    fn preload_new_handler(
2129        &mut self,
2130        handler: &mut Handler,
2131        connection_id: ConnectionId,
2132        peer: PeerId,
2133    ) {
2134        self.connections.insert(connection_id, peer);
2135        // Queue events for sending pending RPCs to the connected peer.
2136        // There can be only one pending RPC for a particular peer and query per definition.
2137        for (_peer_id, event) in self.queries.iter_mut().filter_map(|q| {
2138            q.pending_rpcs
2139                .iter()
2140                .position(|(p, _)| p == &peer)
2141                .map(|p| q.pending_rpcs.remove(p))
2142        }) {
2143            handler.on_behaviour_event(event)
2144        }
2145    }
2146}
2147
2148/// Exponentially decrease the given duration (base 2).
2149fn exp_decrease(ttl: Duration, exp: u32) -> Duration {
2150    Duration::from_secs(ttl.as_secs().checked_shr(exp).unwrap_or(0))
2151}
2152
2153impl<TStore> NetworkBehaviour for Behaviour<TStore>
2154where
2155    TStore: RecordStore + Send + 'static,
2156{
2157    type ConnectionHandler = Handler;
2158    type ToSwarm = Event;
2159
2160    fn handle_established_inbound_connection(
2161        &mut self,
2162        connection_id: ConnectionId,
2163        peer: PeerId,
2164        local_addr: &Multiaddr,
2165        remote_addr: &Multiaddr,
2166    ) -> Result<THandler<Self>, ConnectionDenied> {
2167        let connected_point = ConnectedPoint::Listener {
2168            local_addr: local_addr.clone(),
2169            send_back_addr: remote_addr.clone(),
2170        };
2171
2172        let mut handler = Handler::new(
2173            self.protocol_config.clone(),
2174            connected_point,
2175            peer,
2176            self.mode,
2177        );
2178        self.preload_new_handler(&mut handler, connection_id, peer);
2179
2180        Ok(handler)
2181    }
2182
2183    fn handle_established_outbound_connection(
2184        &mut self,
2185        connection_id: ConnectionId,
2186        peer: PeerId,
2187        addr: &Multiaddr,
2188        role_override: Endpoint,
2189        port_use: PortUse,
2190    ) -> Result<THandler<Self>, ConnectionDenied> {
2191        let connected_point = ConnectedPoint::Dialer {
2192            address: addr.clone(),
2193            role_override,
2194            port_use,
2195        };
2196
2197        let mut handler = Handler::new(
2198            self.protocol_config.clone(),
2199            connected_point,
2200            peer,
2201            self.mode,
2202        );
2203        self.preload_new_handler(&mut handler, connection_id, peer);
2204
2205        Ok(handler)
2206    }
2207
2208    fn handle_pending_outbound_connection(
2209        &mut self,
2210        _connection_id: ConnectionId,
2211        maybe_peer: Option<PeerId>,
2212        _addresses: &[Multiaddr],
2213        _effective_role: Endpoint,
2214    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
2215        let peer_id = match maybe_peer {
2216            None => return Ok(vec![]),
2217            Some(peer) => peer,
2218        };
2219
2220        // We should order addresses from decreasing likelihood of connectivity, so start with
2221        // the addresses of that peer in the k-buckets.
2222        let key = kbucket::Key::from(peer_id);
2223        let mut peer_addrs =
2224            if let Some(kbucket::Entry::Present(mut entry, _)) = self.kbuckets.entry(&key) {
2225                let addrs = entry.value().iter().cloned().collect::<Vec<_>>();
2226                debug_assert!(!addrs.is_empty(), "Empty peer addresses in routing table.");
2227                addrs
2228            } else {
2229                Vec::new()
2230            };
2231
2232        // We add to that a temporary list of addresses from the ongoing queries.
2233        for query in self.queries.iter() {
2234            if let Some(addrs) = query.peers.addresses.get(&peer_id) {
2235                peer_addrs.extend(addrs.iter().cloned())
2236            }
2237        }
2238
2239        Ok(peer_addrs)
2240    }
2241
2242    fn on_connection_handler_event(
2243        &mut self,
2244        source: PeerId,
2245        connection: ConnectionId,
2246        event: THandlerOutEvent<Self>,
2247    ) {
2248        match event {
2249            HandlerEvent::ProtocolConfirmed { endpoint } => {
2250                debug_assert!(self.connected_peers.contains(&source));
2251                // The remote's address can only be put into the routing table,
2252                // and thus shared with other nodes, if the local node is the dialer,
2253                // since the remote address on an inbound connection may be specific
2254                // to that connection (e.g. typically the TCP port numbers).
2255                let address = match endpoint {
2256                    ConnectedPoint::Dialer { address, .. } => Some(address),
2257                    ConnectedPoint::Listener { .. } => None,
2258                };
2259
2260                self.connection_updated(source, address, NodeStatus::Connected);
2261            }
2262
2263            HandlerEvent::ProtocolNotSupported { endpoint } => {
2264                let address = match endpoint {
2265                    ConnectedPoint::Dialer { address, .. } => Some(address),
2266                    ConnectedPoint::Listener { .. } => None,
2267                };
2268                self.connection_updated(source, address, NodeStatus::Disconnected);
2269            }
2270
2271            HandlerEvent::FindNodeReq { key, request_id } => {
2272                let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
2273
2274                self.queued_events
2275                    .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2276                        request: InboundRequest::FindNode {
2277                            num_closer_peers: closer_peers.len(),
2278                        },
2279                    }));
2280
2281                self.queued_events.push_back(ToSwarm::NotifyHandler {
2282                    peer_id: source,
2283                    handler: NotifyHandler::One(connection),
2284                    event: HandlerIn::FindNodeRes {
2285                        closer_peers,
2286                        request_id,
2287                    },
2288                });
2289            }
2290
2291            HandlerEvent::FindNodeRes {
2292                closer_peers,
2293                query_id,
2294            } => {
2295                self.discovered(&query_id, &source, closer_peers.iter());
2296            }
2297
2298            HandlerEvent::GetProvidersReq { key, request_id } => {
2299                let provider_peers = self.provider_peers(&key, &source);
2300                let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
2301
2302                self.queued_events
2303                    .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2304                        request: InboundRequest::GetProvider {
2305                            num_closer_peers: closer_peers.len(),
2306                            num_provider_peers: provider_peers.len(),
2307                        },
2308                    }));
2309
2310                self.queued_events.push_back(ToSwarm::NotifyHandler {
2311                    peer_id: source,
2312                    handler: NotifyHandler::One(connection),
2313                    event: HandlerIn::GetProvidersRes {
2314                        closer_peers,
2315                        provider_peers,
2316                        request_id,
2317                    },
2318                });
2319            }
2320
2321            HandlerEvent::GetProvidersRes {
2322                closer_peers,
2323                provider_peers,
2324                query_id,
2325            } => {
2326                let peers = closer_peers.iter().chain(provider_peers.iter());
2327                self.discovered(&query_id, &source, peers);
2328                if let Some(query) = self.queries.get_mut(&query_id) {
2329                    let stats = query.stats().clone();
2330                    if let QueryInfo::GetProviders {
2331                        ref key,
2332                        ref mut providers_found,
2333                        ref mut step,
2334                        ..
2335                    } = query.info
2336                    {
2337                        *providers_found += provider_peers.len();
2338                        let providers = provider_peers.iter().map(|p| p.node_id).collect();
2339
2340                        self.queued_events.push_back(ToSwarm::GenerateEvent(
2341                            Event::OutboundQueryProgressed {
2342                                id: query_id,
2343                                result: QueryResult::GetProviders(Ok(
2344                                    GetProvidersOk::FoundProviders {
2345                                        key: key.clone(),
2346                                        providers,
2347                                    },
2348                                )),
2349                                step: step.clone(),
2350                                stats,
2351                            },
2352                        ));
2353                        *step = step.next();
2354                    }
2355                }
2356            }
2357            HandlerEvent::QueryError { query_id, error } => {
2358                tracing::debug!(
2359                    peer=%source,
2360                    query=?query_id,
2361                    "Request to peer in query failed with {:?}",
2362                    error
2363                );
2364                // If the query to which the error relates is still active,
2365                // signal the failure w.r.t. `source`.
2366                if let Some(query) = self.queries.get_mut(&query_id) {
2367                    query.on_failure(&source)
2368                }
2369            }
2370
2371            HandlerEvent::AddProvider { key, provider } => {
2372                // Only accept a provider record from a legitimate peer.
2373                if provider.node_id != source {
2374                    return;
2375                }
2376
2377                self.provider_received(key, provider);
2378            }
2379
2380            HandlerEvent::GetRecord { key, request_id } => {
2381                // Lookup the record locally.
2382                let record = match self.store.get(&key) {
2383                    Some(record) => {
2384                        if record.is_expired(Instant::now()) {
2385                            self.store.remove(&key);
2386                            None
2387                        } else {
2388                            Some(record.into_owned())
2389                        }
2390                    }
2391                    None => None,
2392                };
2393
2394                let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
2395
2396                self.queued_events
2397                    .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2398                        request: InboundRequest::GetRecord {
2399                            num_closer_peers: closer_peers.len(),
2400                            present_locally: record.is_some(),
2401                        },
2402                    }));
2403
2404                self.queued_events.push_back(ToSwarm::NotifyHandler {
2405                    peer_id: source,
2406                    handler: NotifyHandler::One(connection),
2407                    event: HandlerIn::GetRecordRes {
2408                        record,
2409                        closer_peers,
2410                        request_id,
2411                    },
2412                });
2413            }
2414
2415            HandlerEvent::GetRecordRes {
2416                record,
2417                closer_peers,
2418                query_id,
2419            } => {
2420                if let Some(query) = self.queries.get_mut(&query_id) {
2421                    let stats = query.stats().clone();
2422                    if let QueryInfo::GetRecord {
2423                        key,
2424                        ref mut step,
2425                        ref mut found_a_record,
2426                        cache_candidates,
2427                    } = &mut query.info
2428                    {
2429                        if let Some(record) = record {
2430                            *found_a_record = true;
2431                            let record = PeerRecord {
2432                                peer: Some(source),
2433                                record,
2434                            };
2435
2436                            self.queued_events.push_back(ToSwarm::GenerateEvent(
2437                                Event::OutboundQueryProgressed {
2438                                    id: query_id,
2439                                    result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(
2440                                        record,
2441                                    ))),
2442                                    step: step.clone(),
2443                                    stats,
2444                                },
2445                            ));
2446
2447                            *step = step.next();
2448                        } else {
2449                            tracing::trace!(record=?key, %source, "Record not found at source");
2450                            if let Caching::Enabled { max_peers } = self.caching {
2451                                let source_key = kbucket::Key::from(source);
2452                                let target_key = kbucket::Key::from(key.clone());
2453                                let distance = source_key.distance(&target_key);
2454                                cache_candidates.insert(distance, source);
2455                                if cache_candidates.len() > max_peers as usize {
2456                                    // TODO: `pop_last()` would be nice once stabilised.
2457                                    // See https://github.com/rust-lang/rust/issues/62924.
2458                                    let last =
2459                                        *cache_candidates.keys().next_back().expect("len > 0");
2460                                    cache_candidates.remove(&last);
2461                                }
2462                            }
2463                        }
2464                    }
2465                }
2466
2467                self.discovered(&query_id, &source, closer_peers.iter());
2468            }
2469
2470            HandlerEvent::PutRecord { record, request_id } => {
2471                self.record_received(source, connection, request_id, record);
2472            }
2473
2474            HandlerEvent::PutRecordRes { query_id, .. } => {
2475                if let Some(query) = self.queries.get_mut(&query_id) {
2476                    query.on_success(&source, vec![]);
2477                    if let QueryInfo::PutRecord {
2478                        phase: PutRecordPhase::PutRecord { success, .. },
2479                        quorum,
2480                        ..
2481                    } = &mut query.info
2482                    {
2483                        success.push(source);
2484
2485                        let quorum = quorum.get();
2486                        if success.len() >= quorum {
2487                            let peers = success.clone();
2488                            let finished = query.try_finish(peers.iter());
2489                            if !finished {
2490                                tracing::debug!(
2491                                    peer=%source,
2492                                    query=?query_id,
2493                                    "PutRecord query reached quorum ({}/{}) with response \
2494                                     from peer but could not yet finish.",
2495                                    peers.len(),
2496                                    quorum,
2497                                );
2498                            }
2499                        }
2500                    }
2501                }
2502            }
2503        };
2504    }
2505
2506    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
2507    fn poll(
2508        &mut self,
2509        cx: &mut Context<'_>,
2510    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
2511        let now = Instant::now();
2512
2513        // Calculate the available capacity for queries triggered by background jobs.
2514        let mut jobs_query_capacity = JOBS_MAX_QUERIES.saturating_sub(self.queries.size());
2515
2516        // Run the periodic provider announcement job.
2517        if let Some(mut job) = self.add_provider_job.take() {
2518            let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2519            for i in 0..num {
2520                if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2521                    self.start_add_provider(r.key, AddProviderContext::Republish)
2522                } else {
2523                    jobs_query_capacity -= i;
2524                    break;
2525                }
2526            }
2527            self.add_provider_job = Some(job);
2528        }
2529
2530        // Run the periodic record replication / publication job.
2531        if let Some(mut job) = self.put_record_job.take() {
2532            let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2533            for _ in 0..num {
2534                if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2535                    let context =
2536                        if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
2537                            PutRecordContext::Republish
2538                        } else {
2539                            PutRecordContext::Replicate
2540                        };
2541                    self.start_put_record(r, Quorum::All, context)
2542                } else {
2543                    break;
2544                }
2545            }
2546            self.put_record_job = Some(job);
2547        }
2548
2549        // Poll bootstrap periodically and automatically.
2550        if let Poll::Ready(()) = self.bootstrap_status.poll_next_bootstrap(cx) {
2551            if let Err(e) = self.bootstrap() {
2552                tracing::warn!("Failed to trigger bootstrap: {e}");
2553            }
2554        }
2555
2556        loop {
2557            // Drain queued events first.
2558            if let Some(event) = self.queued_events.pop_front() {
2559                return Poll::Ready(event);
2560            }
2561
2562            // Drain applied pending entries from the routing table.
2563            if let Some(entry) = self.kbuckets.take_applied_pending() {
2564                let kbucket::Node { key, value } = entry.inserted;
2565                let peer_id = key.into_preimage();
2566                self.queued_events
2567                    .push_back(ToSwarm::NewExternalAddrOfPeer {
2568                        peer_id,
2569                        address: value.first().clone(),
2570                    });
2571                let event = Event::RoutingUpdated {
2572                    bucket_range: self
2573                        .kbuckets
2574                        .bucket(&key)
2575                        .map(|b| b.range())
2576                        .expect("Self to never be applied from pending."),
2577                    peer: peer_id,
2578                    is_new_peer: true,
2579                    addresses: value,
2580                    old_peer: entry.evicted.map(|n| n.key.into_preimage()),
2581                };
2582                return Poll::Ready(ToSwarm::GenerateEvent(event));
2583            }
2584
2585            // Look for a finished query.
2586            loop {
2587                match self.queries.poll(now) {
2588                    QueryPoolState::Finished(q) => {
2589                        if let Some(event) = self.query_finished(q) {
2590                            return Poll::Ready(ToSwarm::GenerateEvent(event));
2591                        }
2592                    }
2593                    QueryPoolState::Timeout(q) => {
2594                        if let Some(event) = self.query_timeout(q) {
2595                            return Poll::Ready(ToSwarm::GenerateEvent(event));
2596                        }
2597                    }
2598                    QueryPoolState::Waiting(Some((query, peer_id))) => {
2599                        let event = query.info.to_request(query.id());
2600                        // TODO: AddProvider requests yield no response, so the query completes
2601                        // as soon as all requests have been sent. However, the handler should
2602                        // better emit an event when the request has been sent (and report
2603                        // an error if sending fails), instead of immediately reporting
2604                        // "success" somewhat prematurely here.
2605                        if let QueryInfo::AddProvider {
2606                            phase: AddProviderPhase::AddProvider { .. },
2607                            ..
2608                        } = &query.info
2609                        {
2610                            query.on_success(&peer_id, vec![])
2611                        }
2612
2613                        if self.connected_peers.contains(&peer_id) {
2614                            self.queued_events.push_back(ToSwarm::NotifyHandler {
2615                                peer_id,
2616                                event,
2617                                handler: NotifyHandler::Any,
2618                            });
2619                        } else if &peer_id != self.kbuckets.local_key().preimage() {
2620                            query.pending_rpcs.push((peer_id, event));
2621                            self.queued_events.push_back(ToSwarm::Dial {
2622                                opts: DialOpts::peer_id(peer_id).build(),
2623                            });
2624                        }
2625                    }
2626                    QueryPoolState::Waiting(None) | QueryPoolState::Idle => break,
2627                }
2628            }
2629
2630            // No immediate event was produced as a result of a finished query.
2631            // If no new events have been queued either, signal `NotReady` to
2632            // be polled again later.
2633            if self.queued_events.is_empty() {
2634                self.no_events_waker = Some(cx.waker().clone());
2635
2636                return Poll::Pending;
2637            }
2638        }
2639    }
2640
2641    fn on_swarm_event(&mut self, event: FromSwarm) {
2642        self.listen_addresses.on_swarm_event(&event);
2643        let external_addresses_changed = self.external_addresses.on_swarm_event(&event);
2644
2645        if self.auto_mode && external_addresses_changed {
2646            self.determine_mode_from_external_addresses();
2647        }
2648
2649        match event {
2650            FromSwarm::ConnectionEstablished(connection_established) => {
2651                self.on_connection_established(connection_established)
2652            }
2653            FromSwarm::ConnectionClosed(connection_closed) => {
2654                self.on_connection_closed(connection_closed)
2655            }
2656            FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure),
2657            FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
2658            FromSwarm::NewListenAddr(_) if self.connected_peers.is_empty() => {
2659                // A new listen addr was just discovered and we have no connected peers,
2660                // it can mean that our network interfaces were not up but they are now
2661                // so it might be a good idea to trigger a bootstrap.
2662                self.bootstrap_status.trigger();
2663            }
2664            _ => {}
2665        }
2666    }
2667}
2668
2669/// Peer Info combines a Peer ID with a set of multiaddrs that the peer is listening on.
2670#[derive(Debug, Clone, PartialEq, Eq)]
2671pub struct PeerInfo {
2672    pub peer_id: PeerId,
2673    pub addrs: Vec<Multiaddr>,
2674}
2675
2676/// A quorum w.r.t. the configured replication factor specifies the minimum
2677/// number of distinct nodes that must be successfully contacted in order
2678/// for a query to succeed.
2679#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2680pub enum Quorum {
2681    One,
2682    Majority,
2683    All,
2684    N(NonZeroUsize),
2685}
2686
2687impl Quorum {
2688    /// Evaluate the quorum w.r.t a given total (number of peers).
2689    fn eval(&self, total: NonZeroUsize) -> NonZeroUsize {
2690        match self {
2691            Quorum::One => NonZeroUsize::new(1).expect("1 != 0"),
2692            Quorum::Majority => NonZeroUsize::new(total.get() / 2 + 1).expect("n + 1 != 0"),
2693            Quorum::All => total,
2694            Quorum::N(n) => NonZeroUsize::min(total, *n),
2695        }
2696    }
2697}
2698
2699/// A record either received by the given peer or retrieved from the local
2700/// record store.
2701#[derive(Debug, Clone, PartialEq, Eq)]
2702pub struct PeerRecord {
2703    /// The peer from whom the record was received. `None` if the record was
2704    /// retrieved from local storage.
2705    pub peer: Option<PeerId>,
2706    pub record: Record,
2707}
2708
2709//////////////////////////////////////////////////////////////////////////////
2710// Events
2711
2712/// The events produced by the `Kademlia` behaviour.
2713///
2714/// See [`NetworkBehaviour::poll`].
2715#[derive(Debug, Clone)]
2716#[allow(clippy::large_enum_variant)]
2717pub enum Event {
2718    /// An inbound request has been received and handled.
2719    //
2720    // Note on the difference between 'request' and 'query': A request is a
2721    // single request-response style exchange with a single remote peer. A query
2722    // is made of multiple requests across multiple remote peers.
2723    InboundRequest { request: InboundRequest },
2724
2725    /// An outbound query has made progress.
2726    OutboundQueryProgressed {
2727        /// The ID of the query that finished.
2728        id: QueryId,
2729        /// The intermediate result of the query.
2730        result: QueryResult,
2731        /// Execution statistics from the query.
2732        stats: QueryStats,
2733        /// Indicates which event this is, if therer are multiple responses for a single query.
2734        step: ProgressStep,
2735    },
2736
2737    /// The routing table has been updated with a new peer and / or
2738    /// address, thereby possibly evicting another peer.
2739    RoutingUpdated {
2740        /// The ID of the peer that was added or updated.
2741        peer: PeerId,
2742        /// Whether this is a new peer and was thus just added to the routing
2743        /// table, or whether it is an existing peer who's addresses changed.
2744        is_new_peer: bool,
2745        /// The full list of known addresses of `peer`.
2746        addresses: Addresses,
2747        /// Returns the minimum inclusive and maximum inclusive distance for
2748        /// the bucket of the peer.
2749        bucket_range: (Distance, Distance),
2750        /// The ID of the peer that was evicted from the routing table to make
2751        /// room for the new peer, if any.
2752        old_peer: Option<PeerId>,
2753    },
2754
2755    /// A peer has connected for whom no listen address is known.
2756    ///
2757    /// If the peer is to be added to the routing table, a known
2758    /// listen address for the peer must be provided via [`Behaviour::add_address`].
2759    UnroutablePeer { peer: PeerId },
2760
2761    /// A connection to a peer has been established for whom a listen address
2762    /// is known but the peer has not been added to the routing table either
2763    /// because [`BucketInserts::Manual`] is configured or because
2764    /// the corresponding bucket is full.
2765    ///
2766    /// If the peer is to be included in the routing table, it must
2767    /// must be explicitly added via [`Behaviour::add_address`], possibly after
2768    /// removing another peer.
2769    ///
2770    /// See [`Behaviour::kbucket`] for insight into the contents of
2771    /// the k-bucket of `peer`.
2772    RoutablePeer { peer: PeerId, address: Multiaddr },
2773
2774    /// A connection to a peer has been established for whom a listen address
2775    /// is known but the peer is only pending insertion into the routing table
2776    /// if the least-recently disconnected peer is unresponsive, i.e. the peer
2777    /// may not make it into the routing table.
2778    ///
2779    /// If the peer is to be unconditionally included in the routing table,
2780    /// it should be explicitly added via [`Behaviour::add_address`] after
2781    /// removing another peer.
2782    ///
2783    /// See [`Behaviour::kbucket`] for insight into the contents of
2784    /// the k-bucket of `peer`.
2785    PendingRoutablePeer { peer: PeerId, address: Multiaddr },
2786
2787    /// This peer's mode has been updated automatically.
2788    ///
2789    /// This happens in response to an external
2790    /// address being added or removed.
2791    ModeChanged { new_mode: Mode },
2792}
2793
2794/// Information about progress events.
2795#[derive(Debug, Clone)]
2796pub struct ProgressStep {
2797    /// The index into the event
2798    pub count: NonZeroUsize,
2799    /// Is this the final event?
2800    pub last: bool,
2801}
2802
2803impl ProgressStep {
2804    fn first() -> Self {
2805        Self {
2806            count: NonZeroUsize::new(1).expect("1 to be greater than 0."),
2807            last: false,
2808        }
2809    }
2810
2811    fn first_and_last() -> Self {
2812        let mut first = ProgressStep::first();
2813        first.last = true;
2814        first
2815    }
2816
2817    fn next(&self) -> Self {
2818        assert!(!self.last);
2819        let count = NonZeroUsize::new(self.count.get() + 1).expect("Adding 1 not to result in 0.");
2820
2821        Self { count, last: false }
2822    }
2823}
2824
2825/// Information about a received and handled inbound request.
2826#[derive(Debug, Clone)]
2827pub enum InboundRequest {
2828    /// Request for the list of nodes whose IDs are the closest to `key`.
2829    FindNode { num_closer_peers: usize },
2830    /// Same as `FindNode`, but should also return the entries of the local
2831    /// providers list for this key.
2832    GetProvider {
2833        num_closer_peers: usize,
2834        num_provider_peers: usize,
2835    },
2836    /// A peer sent an add provider request.
2837    /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`ProviderRecord`] is
2838    /// included.
2839    ///
2840    /// See [`StoreInserts`] and [`Config::set_record_filtering`] for details..
2841    AddProvider { record: Option<ProviderRecord> },
2842    /// Request to retrieve a record.
2843    GetRecord {
2844        num_closer_peers: usize,
2845        present_locally: bool,
2846    },
2847    /// A peer sent a put record request.
2848    /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`Record`] is included.
2849    ///
2850    /// See [`StoreInserts`] and [`Config::set_record_filtering`].
2851    PutRecord {
2852        source: PeerId,
2853        connection: ConnectionId,
2854        record: Option<Record>,
2855    },
2856}
2857
2858/// The results of Kademlia queries.
2859#[derive(Debug, Clone)]
2860pub enum QueryResult {
2861    /// The result of [`Behaviour::bootstrap`].
2862    Bootstrap(BootstrapResult),
2863
2864    /// The result of [`Behaviour::get_closest_peers`].
2865    GetClosestPeers(GetClosestPeersResult),
2866
2867    /// The result of [`Behaviour::get_providers`].
2868    GetProviders(GetProvidersResult),
2869
2870    /// The result of [`Behaviour::start_providing`].
2871    StartProviding(AddProviderResult),
2872
2873    /// The result of a (automatic) republishing of a provider record.
2874    RepublishProvider(AddProviderResult),
2875
2876    /// The result of [`Behaviour::get_record`].
2877    GetRecord(GetRecordResult),
2878
2879    /// The result of [`Behaviour::put_record`].
2880    PutRecord(PutRecordResult),
2881
2882    /// The result of a (automatic) republishing of a (value-)record.
2883    RepublishRecord(PutRecordResult),
2884}
2885
2886/// The result of [`Behaviour::get_record`].
2887pub type GetRecordResult = Result<GetRecordOk, GetRecordError>;
2888
2889/// The successful result of [`Behaviour::get_record`].
2890#[derive(Debug, Clone)]
2891pub enum GetRecordOk {
2892    FoundRecord(PeerRecord),
2893    FinishedWithNoAdditionalRecord {
2894        /// If caching is enabled, these are the peers closest
2895        /// _to the record key_ (not the local node) that were queried but
2896        /// did not return the record, sorted by distance to the record key
2897        /// from closest to farthest. How many of these are tracked is configured
2898        /// by [`Config::set_caching`].
2899        ///
2900        /// Writing back the cache at these peers is a manual operation.
2901        /// ie. you may wish to use these candidates with [`Behaviour::put_record_to`]
2902        /// after selecting one of the returned records.
2903        cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
2904    },
2905}
2906
2907/// The error result of [`Behaviour::get_record`].
2908#[derive(Debug, Clone, Error)]
2909pub enum GetRecordError {
2910    #[error("the record was not found")]
2911    NotFound {
2912        key: record::Key,
2913        closest_peers: Vec<PeerId>,
2914    },
2915    #[error("the quorum failed; needed {quorum} peers")]
2916    QuorumFailed {
2917        key: record::Key,
2918        records: Vec<PeerRecord>,
2919        quorum: NonZeroUsize,
2920    },
2921    #[error("the request timed out")]
2922    Timeout { key: record::Key },
2923}
2924
2925impl GetRecordError {
2926    /// Gets the key of the record for which the operation failed.
2927    pub fn key(&self) -> &record::Key {
2928        match self {
2929            GetRecordError::QuorumFailed { key, .. } => key,
2930            GetRecordError::Timeout { key, .. } => key,
2931            GetRecordError::NotFound { key, .. } => key,
2932        }
2933    }
2934
2935    /// Extracts the key of the record for which the operation failed,
2936    /// consuming the error.
2937    pub fn into_key(self) -> record::Key {
2938        match self {
2939            GetRecordError::QuorumFailed { key, .. } => key,
2940            GetRecordError::Timeout { key, .. } => key,
2941            GetRecordError::NotFound { key, .. } => key,
2942        }
2943    }
2944}
2945
2946/// The result of [`Behaviour::put_record`].
2947pub type PutRecordResult = Result<PutRecordOk, PutRecordError>;
2948
2949/// The successful result of [`Behaviour::put_record`].
2950#[derive(Debug, Clone)]
2951pub struct PutRecordOk {
2952    pub key: record::Key,
2953}
2954
2955/// The error result of [`Behaviour::put_record`].
2956#[derive(Debug, Clone, Error)]
2957pub enum PutRecordError {
2958    #[error("the quorum failed; needed {quorum} peers")]
2959    QuorumFailed {
2960        key: record::Key,
2961        /// [`PeerId`]s of the peers the record was successfully stored on.
2962        success: Vec<PeerId>,
2963        quorum: NonZeroUsize,
2964    },
2965    #[error("the request timed out")]
2966    Timeout {
2967        key: record::Key,
2968        /// [`PeerId`]s of the peers the record was successfully stored on.
2969        success: Vec<PeerId>,
2970        quorum: NonZeroUsize,
2971    },
2972}
2973
2974impl PutRecordError {
2975    /// Gets the key of the record for which the operation failed.
2976    pub fn key(&self) -> &record::Key {
2977        match self {
2978            PutRecordError::QuorumFailed { key, .. } => key,
2979            PutRecordError::Timeout { key, .. } => key,
2980        }
2981    }
2982
2983    /// Extracts the key of the record for which the operation failed,
2984    /// consuming the error.
2985    pub fn into_key(self) -> record::Key {
2986        match self {
2987            PutRecordError::QuorumFailed { key, .. } => key,
2988            PutRecordError::Timeout { key, .. } => key,
2989        }
2990    }
2991}
2992
2993/// The result of [`Behaviour::bootstrap`].
2994pub type BootstrapResult = Result<BootstrapOk, BootstrapError>;
2995
2996/// The successful result of [`Behaviour::bootstrap`].
2997#[derive(Debug, Clone)]
2998pub struct BootstrapOk {
2999    pub peer: PeerId,
3000    pub num_remaining: u32,
3001}
3002
3003/// The error result of [`Behaviour::bootstrap`].
3004#[derive(Debug, Clone, Error)]
3005pub enum BootstrapError {
3006    #[error("the request timed out")]
3007    Timeout {
3008        peer: PeerId,
3009        num_remaining: Option<u32>,
3010    },
3011}
3012
3013/// The result of [`Behaviour::get_closest_peers`].
3014pub type GetClosestPeersResult = Result<GetClosestPeersOk, GetClosestPeersError>;
3015
3016/// The successful result of [`Behaviour::get_closest_peers`].
3017#[derive(Debug, Clone)]
3018pub struct GetClosestPeersOk {
3019    pub key: Vec<u8>,
3020    pub peers: Vec<PeerInfo>,
3021}
3022
3023/// The error result of [`Behaviour::get_closest_peers`].
3024#[derive(Debug, Clone, Error)]
3025pub enum GetClosestPeersError {
3026    #[error("the request timed out")]
3027    Timeout { key: Vec<u8>, peers: Vec<PeerInfo> },
3028}
3029
3030impl GetClosestPeersError {
3031    /// Gets the key for which the operation failed.
3032    pub fn key(&self) -> &Vec<u8> {
3033        match self {
3034            GetClosestPeersError::Timeout { key, .. } => key,
3035        }
3036    }
3037
3038    /// Extracts the key for which the operation failed,
3039    /// consuming the error.
3040    pub fn into_key(self) -> Vec<u8> {
3041        match self {
3042            GetClosestPeersError::Timeout { key, .. } => key,
3043        }
3044    }
3045}
3046
3047/// The result of [`Behaviour::get_providers`].
3048pub type GetProvidersResult = Result<GetProvidersOk, GetProvidersError>;
3049
3050/// The successful result of [`Behaviour::get_providers`].
3051#[derive(Debug, Clone)]
3052pub enum GetProvidersOk {
3053    FoundProviders {
3054        key: record::Key,
3055        /// The new set of providers discovered.
3056        providers: HashSet<PeerId>,
3057    },
3058    FinishedWithNoAdditionalRecord {
3059        closest_peers: Vec<PeerId>,
3060    },
3061}
3062
3063/// The error result of [`Behaviour::get_providers`].
3064#[derive(Debug, Clone, Error)]
3065pub enum GetProvidersError {
3066    #[error("the request timed out")]
3067    Timeout {
3068        key: record::Key,
3069        closest_peers: Vec<PeerId>,
3070    },
3071}
3072
3073impl GetProvidersError {
3074    /// Gets the key for which the operation failed.
3075    pub fn key(&self) -> &record::Key {
3076        match self {
3077            GetProvidersError::Timeout { key, .. } => key,
3078        }
3079    }
3080
3081    /// Extracts the key for which the operation failed,
3082    /// consuming the error.
3083    pub fn into_key(self) -> record::Key {
3084        match self {
3085            GetProvidersError::Timeout { key, .. } => key,
3086        }
3087    }
3088}
3089
3090/// The result of publishing a provider record.
3091pub type AddProviderResult = Result<AddProviderOk, AddProviderError>;
3092
3093/// The successful result of publishing a provider record.
3094#[derive(Debug, Clone)]
3095pub struct AddProviderOk {
3096    pub key: record::Key,
3097}
3098
3099/// The possible errors when publishing a provider record.
3100#[derive(Debug, Clone, Error)]
3101pub enum AddProviderError {
3102    #[error("the request timed out")]
3103    Timeout { key: record::Key },
3104}
3105
3106impl AddProviderError {
3107    /// Gets the key for which the operation failed.
3108    pub fn key(&self) -> &record::Key {
3109        match self {
3110            AddProviderError::Timeout { key, .. } => key,
3111        }
3112    }
3113
3114    /// Extracts the key for which the operation failed,
3115    pub fn into_key(self) -> record::Key {
3116        match self {
3117            AddProviderError::Timeout { key, .. } => key,
3118        }
3119    }
3120}
3121
3122impl From<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> for KadPeer {
3123    fn from(e: kbucket::EntryView<kbucket::Key<PeerId>, Addresses>) -> KadPeer {
3124        KadPeer {
3125            node_id: e.node.key.into_preimage(),
3126            multiaddrs: e.node.value.into_vec(),
3127            connection_ty: match e.status {
3128                NodeStatus::Connected => ConnectionType::Connected,
3129                NodeStatus::Disconnected => ConnectionType::NotConnected,
3130            },
3131        }
3132    }
3133}
3134
3135/// The context of a [`QueryInfo::AddProvider`] query.
3136#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3137pub enum AddProviderContext {
3138    /// The context is a [`Behaviour::start_providing`] operation.
3139    Publish,
3140    /// The context is periodic republishing of provider announcements
3141    /// initiated earlier via [`Behaviour::start_providing`].
3142    Republish,
3143}
3144
3145/// The context of a [`QueryInfo::PutRecord`] query.
3146#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3147pub enum PutRecordContext {
3148    /// The context is a [`Behaviour::put_record`] operation.
3149    Publish,
3150    /// The context is periodic republishing of records stored
3151    /// earlier via [`Behaviour::put_record`].
3152    Republish,
3153    /// The context is periodic replication (i.e. without extending
3154    /// the record TTL) of stored records received earlier from another peer.
3155    Replicate,
3156    /// The context is a custom store operation targeting specific
3157    /// peers initiated by [`Behaviour::put_record_to`].
3158    Custom,
3159}
3160
3161/// Information about a running query.
3162#[derive(Debug, Clone)]
3163pub enum QueryInfo {
3164    /// A query initiated by [`Behaviour::bootstrap`].
3165    Bootstrap {
3166        /// The targeted peer ID.
3167        peer: PeerId,
3168        /// The remaining random peer IDs to query, one per
3169        /// bucket that still needs refreshing.
3170        ///
3171        /// This is `None` if the initial self-lookup has not
3172        /// yet completed and `Some` with an exhausted iterator
3173        /// if bootstrapping is complete.
3174        remaining: Option<vec::IntoIter<kbucket::Key<PeerId>>>,
3175        step: ProgressStep,
3176    },
3177
3178    /// A (repeated) query initiated by [`Behaviour::get_closest_peers`].
3179    GetClosestPeers {
3180        /// The key being queried (the preimage).
3181        key: Vec<u8>,
3182        /// Current index of events.
3183        step: ProgressStep,
3184    },
3185
3186    /// A (repeated) query initiated by [`Behaviour::get_providers`].
3187    GetProviders {
3188        /// The key for which to search for providers.
3189        key: record::Key,
3190        /// The number of providers found so far.
3191        providers_found: usize,
3192        /// Current index of events.
3193        step: ProgressStep,
3194    },
3195
3196    /// A (repeated) query initiated by [`Behaviour::start_providing`].
3197    AddProvider {
3198        /// The record key.
3199        key: record::Key,
3200        /// The current phase of the query.
3201        phase: AddProviderPhase,
3202        /// The execution context of the query.
3203        context: AddProviderContext,
3204    },
3205
3206    /// A (repeated) query initiated by [`Behaviour::put_record`].
3207    PutRecord {
3208        record: Record,
3209        /// The expected quorum of responses w.r.t. the replication factor.
3210        quorum: NonZeroUsize,
3211        /// The current phase of the query.
3212        phase: PutRecordPhase,
3213        /// The execution context of the query.
3214        context: PutRecordContext,
3215    },
3216
3217    /// A (repeated) query initiated by [`Behaviour::get_record`].
3218    GetRecord {
3219        /// The key to look for.
3220        key: record::Key,
3221        /// Current index of events.
3222        step: ProgressStep,
3223        /// Did we find at least one record?
3224        found_a_record: bool,
3225        /// The peers closest to the `key` that were queried but did not return a record,
3226        /// i.e. the peers that are candidates for caching the record.
3227        cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
3228    },
3229}
3230
3231impl QueryInfo {
3232    /// Creates an event for a handler to issue an outgoing request in the
3233    /// context of a query.
3234    fn to_request(&self, query_id: QueryId) -> HandlerIn {
3235        match &self {
3236            QueryInfo::Bootstrap { peer, .. } => HandlerIn::FindNodeReq {
3237                key: peer.to_bytes(),
3238                query_id,
3239            },
3240            QueryInfo::GetClosestPeers { key, .. } => HandlerIn::FindNodeReq {
3241                key: key.clone(),
3242                query_id,
3243            },
3244            QueryInfo::GetProviders { key, .. } => HandlerIn::GetProvidersReq {
3245                key: key.clone(),
3246                query_id,
3247            },
3248            QueryInfo::AddProvider { key, phase, .. } => match phase {
3249                AddProviderPhase::GetClosestPeers => HandlerIn::FindNodeReq {
3250                    key: key.to_vec(),
3251                    query_id,
3252                },
3253                AddProviderPhase::AddProvider {
3254                    provider_id,
3255                    external_addresses,
3256                    ..
3257                } => HandlerIn::AddProvider {
3258                    key: key.clone(),
3259                    provider: crate::protocol::KadPeer {
3260                        node_id: *provider_id,
3261                        multiaddrs: external_addresses.clone(),
3262                        connection_ty: crate::protocol::ConnectionType::Connected,
3263                    },
3264                    query_id,
3265                },
3266            },
3267            QueryInfo::GetRecord { key, .. } => HandlerIn::GetRecord {
3268                key: key.clone(),
3269                query_id,
3270            },
3271            QueryInfo::PutRecord { record, phase, .. } => match phase {
3272                PutRecordPhase::GetClosestPeers => HandlerIn::FindNodeReq {
3273                    key: record.key.to_vec(),
3274                    query_id,
3275                },
3276                PutRecordPhase::PutRecord { .. } => HandlerIn::PutRecord {
3277                    record: record.clone(),
3278                    query_id,
3279                },
3280            },
3281        }
3282    }
3283}
3284
3285/// The phases of a [`QueryInfo::AddProvider`] query.
3286#[derive(Debug, Clone)]
3287pub enum AddProviderPhase {
3288    /// The query is searching for the closest nodes to the record key.
3289    GetClosestPeers,
3290
3291    /// The query advertises the local node as a provider for the key to
3292    /// the closest nodes to the key.
3293    AddProvider {
3294        /// The local peer ID that is advertised as a provider.
3295        provider_id: PeerId,
3296        /// The external addresses of the provider being advertised.
3297        external_addresses: Vec<Multiaddr>,
3298        /// Query statistics from the finished `GetClosestPeers` phase.
3299        get_closest_peers_stats: QueryStats,
3300    },
3301}
3302
3303/// The phases of a [`QueryInfo::PutRecord`] query.
3304#[derive(Debug, Clone, PartialEq, Eq)]
3305pub enum PutRecordPhase {
3306    /// The query is searching for the closest nodes to the record key.
3307    GetClosestPeers,
3308
3309    /// The query is replicating the record to the closest nodes to the key.
3310    PutRecord {
3311        /// A list of peers the given record has been successfully replicated to.
3312        success: Vec<PeerId>,
3313        /// Query statistics from the finished `GetClosestPeers` phase.
3314        get_closest_peers_stats: QueryStats,
3315    },
3316}
3317
3318/// A mutable reference to a running query.
3319pub struct QueryMut<'a> {
3320    query: &'a mut Query,
3321}
3322
3323impl<'a> QueryMut<'a> {
3324    pub fn id(&self) -> QueryId {
3325        self.query.id()
3326    }
3327
3328    /// Gets information about the type and state of the query.
3329    pub fn info(&self) -> &QueryInfo {
3330        &self.query.info
3331    }
3332
3333    /// Gets execution statistics about the query.
3334    ///
3335    /// For a multi-phase query such as `put_record`, these are the
3336    /// statistics of the current phase.
3337    pub fn stats(&self) -> &QueryStats {
3338        self.query.stats()
3339    }
3340
3341    /// Finishes the query asap, without waiting for the
3342    /// regular termination conditions.
3343    pub fn finish(&mut self) {
3344        self.query.finish()
3345    }
3346}
3347
3348/// An immutable reference to a running query.
3349pub struct QueryRef<'a> {
3350    query: &'a Query,
3351}
3352
3353impl<'a> QueryRef<'a> {
3354    pub fn id(&self) -> QueryId {
3355        self.query.id()
3356    }
3357
3358    /// Gets information about the type and state of the query.
3359    pub fn info(&self) -> &QueryInfo {
3360        &self.query.info
3361    }
3362
3363    /// Gets execution statistics about the query.
3364    ///
3365    /// For a multi-phase query such as `put_record`, these are the
3366    /// statistics of the current phase.
3367    pub fn stats(&self) -> &QueryStats {
3368        self.query.stats()
3369    }
3370}
3371
3372/// An operation failed to due no known peers in the routing table.
3373#[derive(Debug, Clone)]
3374pub struct NoKnownPeers();
3375
3376impl fmt::Display for NoKnownPeers {
3377    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3378        write!(f, "No known peers.")
3379    }
3380}
3381
3382impl std::error::Error for NoKnownPeers {}
3383
3384/// The possible outcomes of [`Behaviour::add_address`].
3385#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3386pub enum RoutingUpdate {
3387    /// The given peer and address has been added to the routing
3388    /// table.
3389    Success,
3390    /// The peer and address is pending insertion into
3391    /// the routing table, if a disconnected peer fails
3392    /// to respond. If the given peer and address ends up
3393    /// in the routing table, [`Event::RoutingUpdated`]
3394    /// is eventually emitted.
3395    Pending,
3396    /// The routing table update failed, either because the
3397    /// corresponding bucket for the peer is full and the
3398    /// pending slot(s) are occupied, or because the given
3399    /// peer ID is deemed invalid (e.g. refers to the local
3400    /// peer ID).
3401    Failed,
3402}
3403
3404#[derive(PartialEq, Copy, Clone, Debug)]
3405pub enum Mode {
3406    Client,
3407    Server,
3408}
3409
3410impl fmt::Display for Mode {
3411    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3412        match self {
3413            Mode::Client => write!(f, "client"),
3414            Mode::Server => write!(f, "server"),
3415        }
3416    }
3417}
3418
3419fn to_comma_separated_list<T>(confirmed_external_addresses: &[T]) -> String
3420where
3421    T: ToString,
3422{
3423    confirmed_external_addresses
3424        .iter()
3425        .map(|addr| addr.to_string())
3426        .collect::<Vec<_>>()
3427        .join(", ")
3428}