libp2p_kad/
behaviour.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Implementation of the `Kademlia` network behaviour.
22
23mod test;
24
25use crate::addresses::Addresses;
26use crate::handler::{Handler, HandlerEvent, HandlerIn, RequestId};
27use crate::jobs::*;
28use crate::kbucket::{self, Distance, KBucketsTable, NodeStatus};
29use crate::protocol::{ConnectionType, KadPeer, ProtocolConfig};
30use crate::query::{Query, QueryConfig, QueryId, QueryPool, QueryPoolState};
31use crate::record_priv::{
32    self,
33    store::{self, RecordStore},
34    ProviderRecord, Record,
35};
36use crate::K_VALUE;
37use fnv::{FnvHashMap, FnvHashSet};
38use instant::Instant;
39use libp2p_core::{ConnectedPoint, Endpoint, Multiaddr};
40use libp2p_identity::PeerId;
41use libp2p_swarm::behaviour::{
42    AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm,
43};
44use libp2p_swarm::{
45    dial_opts::{self, DialOpts},
46    ConnectionDenied, ConnectionHandler, ConnectionId, DialError, ExternalAddresses,
47    ListenAddresses, NetworkBehaviour, NotifyHandler, PollParameters, StreamProtocol, THandler,
48    THandlerInEvent, THandlerOutEvent, ToSwarm,
49};
50use log::{debug, info, warn};
51use smallvec::SmallVec;
52use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
53use std::fmt;
54use std::num::NonZeroUsize;
55use std::task::{Context, Poll, Waker};
56use std::time::Duration;
57use std::vec;
58use thiserror::Error;
59
60pub use crate::query::QueryStats;
61
62/// `Behaviour` is a `NetworkBehaviour` that implements the libp2p
63/// Kademlia protocol.
64pub struct Behaviour<TStore> {
65    /// The Kademlia routing table.
66    kbuckets: KBucketsTable<kbucket::Key<PeerId>, Addresses>,
67
68    /// The k-bucket insertion strategy.
69    kbucket_inserts: BucketInserts,
70
71    /// Configuration of the wire protocol.
72    protocol_config: ProtocolConfig,
73
74    /// Configuration of [`RecordStore`] filtering.
75    record_filtering: StoreInserts,
76
77    /// The currently active (i.e. in-progress) queries.
78    queries: QueryPool<QueryInner>,
79
80    /// The currently connected peers.
81    ///
82    /// This is a superset of the connected peers currently in the routing table.
83    connected_peers: FnvHashSet<PeerId>,
84
85    /// Periodic job for re-publication of provider records for keys
86    /// provided by the local node.
87    add_provider_job: Option<AddProviderJob>,
88
89    /// Periodic job for (re-)replication and (re-)publishing of
90    /// regular (value-)records.
91    put_record_job: Option<PutRecordJob>,
92
93    /// The TTL of regular (value-)records.
94    record_ttl: Option<Duration>,
95
96    /// The TTL of provider records.
97    provider_record_ttl: Option<Duration>,
98
99    /// How long to keep connections alive when they're idle.
100    connection_idle_timeout: Duration,
101
102    /// Queued events to return when the behaviour is being polled.
103    queued_events: VecDeque<ToSwarm<Event, HandlerIn>>,
104
105    listen_addresses: ListenAddresses,
106
107    external_addresses: ExternalAddresses,
108
109    connections: HashMap<ConnectionId, PeerId>,
110
111    /// See [`Config::caching`].
112    caching: Caching,
113
114    local_peer_id: PeerId,
115
116    mode: Mode,
117    auto_mode: bool,
118    no_events_waker: Option<Waker>,
119
120    /// The record storage.
121    store: TStore,
122}
123
124/// The configurable strategies for the insertion of peers
125/// and their addresses into the k-buckets of the Kademlia
126/// routing table.
127#[derive(Copy, Clone, Debug, PartialEq, Eq)]
128pub enum BucketInserts {
129    /// Whenever a connection to a peer is established as a
130    /// result of a dialing attempt and that peer is not yet
131    /// in the routing table, it is inserted as long as there
132    /// is a free slot in the corresponding k-bucket. If the
133    /// k-bucket is full but still has a free pending slot,
134    /// it may be inserted into the routing table at a later time if an unresponsive
135    /// disconnected peer is evicted from the bucket.
136    OnConnected,
137    /// New peers and addresses are only added to the routing table via
138    /// explicit calls to [`Behaviour::add_address`].
139    ///
140    /// > **Note**: Even though peers can only get into the
141    /// > routing table as a result of [`Behaviour::add_address`],
142    /// > routing table entries are still updated as peers
143    /// > connect and disconnect (i.e. the order of the entries
144    /// > as well as the network addresses).
145    Manual,
146}
147
148/// The configurable filtering strategies for the acceptance of
149/// incoming records.
150///
151/// This can be used for e.g. signature verification or validating
152/// the accompanying [`Key`].
153///
154/// [`Key`]: crate::record_priv::Key
155#[derive(Copy, Clone, Debug, PartialEq, Eq)]
156pub enum StoreInserts {
157    /// Whenever a (provider) record is received,
158    /// the record is forwarded immediately to the [`RecordStore`].
159    Unfiltered,
160    /// Whenever a (provider) record is received, an event is emitted.
161    /// Provider records generate a [`InboundRequest::AddProvider`] under [`Event::InboundRequest`],
162    /// normal records generate a [`InboundRequest::PutRecord`] under [`Event::InboundRequest`].
163    ///
164    /// When deemed valid, a (provider) record needs to be explicitly stored in
165    /// the [`RecordStore`] via [`RecordStore::put`] or [`RecordStore::add_provider`],
166    /// whichever is applicable. A mutable reference to the [`RecordStore`] can
167    /// be retrieved via [`Behaviour::store_mut`].
168    FilterBoth,
169}
170
171/// The configuration for the `Kademlia` behaviour.
172///
173/// The configuration is consumed by [`Behaviour::new`].
174#[derive(Debug, Clone)]
175pub struct Config {
176    kbucket_pending_timeout: Duration,
177    query_config: QueryConfig,
178    protocol_config: ProtocolConfig,
179    record_ttl: Option<Duration>,
180    record_replication_interval: Option<Duration>,
181    record_publication_interval: Option<Duration>,
182    record_filtering: StoreInserts,
183    provider_record_ttl: Option<Duration>,
184    provider_publication_interval: Option<Duration>,
185    connection_idle_timeout: Duration,
186    kbucket_inserts: BucketInserts,
187    caching: Caching,
188}
189
190impl Default for Config {
191    fn default() -> Self {
192        Config {
193            kbucket_pending_timeout: Duration::from_secs(60),
194            query_config: QueryConfig::default(),
195            protocol_config: Default::default(),
196            record_ttl: Some(Duration::from_secs(36 * 60 * 60)),
197            record_replication_interval: Some(Duration::from_secs(60 * 60)),
198            record_publication_interval: Some(Duration::from_secs(24 * 60 * 60)),
199            record_filtering: StoreInserts::Unfiltered,
200            provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)),
201            provider_record_ttl: Some(Duration::from_secs(24 * 60 * 60)),
202            connection_idle_timeout: Duration::from_secs(10),
203            kbucket_inserts: BucketInserts::OnConnected,
204            caching: Caching::Enabled { max_peers: 1 },
205        }
206    }
207}
208
209/// The configuration for Kademlia "write-back" caching after successful
210/// lookups via [`Behaviour::get_record`].
211#[derive(Debug, Clone)]
212pub enum Caching {
213    /// Caching is disabled and the peers closest to records being looked up
214    /// that do not return a record are not tracked, i.e.
215    /// [`GetRecordOk::FinishedWithNoAdditionalRecord`] is always empty.
216    Disabled,
217    /// Up to `max_peers` peers not returning a record that are closest to the key
218    /// being looked up are tracked and returned in [`GetRecordOk::FinishedWithNoAdditionalRecord`].
219    /// The write-back operation must be performed explicitly, if
220    /// desired and after choosing a record from the results, via [`Behaviour::put_record_to`].
221    Enabled { max_peers: u16 },
222}
223
224impl Config {
225    /// Sets custom protocol names.
226    ///
227    /// Kademlia nodes only communicate with other nodes using the same protocol
228    /// name. Using custom name(s) therefore allows to segregate the DHT from
229    /// others, if that is desired.
230    ///
231    /// More than one protocol name can be supplied. In this case the node will
232    /// be able to talk to other nodes supporting any of the provided names.
233    /// Multiple names must be used with caution to avoid network partitioning.
234    pub fn set_protocol_names(&mut self, names: Vec<StreamProtocol>) -> &mut Self {
235        self.protocol_config.set_protocol_names(names);
236        self
237    }
238
239    /// Sets the timeout for a single query.
240    ///
241    /// > **Note**: A single query usually comprises at least as many requests
242    /// > as the replication factor, i.e. this is not a request timeout.
243    ///
244    /// The default is 60 seconds.
245    pub fn set_query_timeout(&mut self, timeout: Duration) -> &mut Self {
246        self.query_config.timeout = timeout;
247        self
248    }
249
250    /// Sets the replication factor to use.
251    ///
252    /// The replication factor determines to how many closest peers
253    /// a record is replicated. The default is [`K_VALUE`].
254    pub fn set_replication_factor(&mut self, replication_factor: NonZeroUsize) -> &mut Self {
255        self.query_config.replication_factor = replication_factor;
256        self
257    }
258
259    /// Sets the allowed level of parallelism for iterative queries.
260    ///
261    /// The `α` parameter in the Kademlia paper. The maximum number of peers
262    /// that an iterative query is allowed to wait for in parallel while
263    /// iterating towards the closest nodes to a target. Defaults to
264    /// `ALPHA_VALUE`.
265    ///
266    /// This only controls the level of parallelism of an iterative query, not
267    /// the level of parallelism of a query to a fixed set of peers.
268    ///
269    /// When used with [`Config::disjoint_query_paths`] it equals
270    /// the amount of disjoint paths used.
271    pub fn set_parallelism(&mut self, parallelism: NonZeroUsize) -> &mut Self {
272        self.query_config.parallelism = parallelism;
273        self
274    }
275
276    /// Require iterative queries to use disjoint paths for increased resiliency
277    /// in the presence of potentially adversarial nodes.
278    ///
279    /// When enabled the number of disjoint paths used equals the configured
280    /// parallelism.
281    ///
282    /// See the S/Kademlia paper for more information on the high level design
283    /// as well as its security improvements.
284    pub fn disjoint_query_paths(&mut self, enabled: bool) -> &mut Self {
285        self.query_config.disjoint_query_paths = enabled;
286        self
287    }
288
289    /// Sets the TTL for stored records.
290    ///
291    /// The TTL should be significantly longer than the (re-)publication
292    /// interval, to avoid premature expiration of records. The default is 36
293    /// hours.
294    ///
295    /// `None` means records never expire.
296    ///
297    /// Does not apply to provider records.
298    pub fn set_record_ttl(&mut self, record_ttl: Option<Duration>) -> &mut Self {
299        self.record_ttl = record_ttl;
300        self
301    }
302
303    /// Sets whether or not records should be filtered before being stored.
304    ///
305    /// See [`StoreInserts`] for the different values.
306    /// Defaults to [`StoreInserts::Unfiltered`].
307    pub fn set_record_filtering(&mut self, filtering: StoreInserts) -> &mut Self {
308        self.record_filtering = filtering;
309        self
310    }
311
312    /// Sets the (re-)replication interval for stored records.
313    ///
314    /// Periodic replication of stored records ensures that the records
315    /// are always replicated to the available nodes closest to the key in the
316    /// context of DHT topology changes (i.e. nodes joining and leaving), thus
317    /// ensuring persistence until the record expires. Replication does not
318    /// prolong the regular lifetime of a record (for otherwise it would live
319    /// forever regardless of the configured TTL). The expiry of a record
320    /// is only extended through re-publication.
321    ///
322    /// This interval should be significantly shorter than the publication
323    /// interval, to ensure persistence between re-publications. The default
324    /// is 1 hour.
325    ///
326    /// `None` means that stored records are never re-replicated.
327    ///
328    /// Does not apply to provider records.
329    pub fn set_replication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
330        self.record_replication_interval = interval;
331        self
332    }
333
334    /// Sets the (re-)publication interval of stored records.
335    ///
336    /// Records persist in the DHT until they expire. By default, published
337    /// records are re-published in regular intervals for as long as the record
338    /// exists in the local storage of the original publisher, thereby extending
339    /// the records lifetime.
340    ///
341    /// This interval should be significantly shorter than the record TTL, to
342    /// ensure records do not expire prematurely. The default is 24 hours.
343    ///
344    /// `None` means that stored records are never automatically re-published.
345    ///
346    /// Does not apply to provider records.
347    pub fn set_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
348        self.record_publication_interval = interval;
349        self
350    }
351
352    /// Sets the TTL for provider records.
353    ///
354    /// `None` means that stored provider records never expire.
355    ///
356    /// Must be significantly larger than the provider publication interval.
357    pub fn set_provider_record_ttl(&mut self, ttl: Option<Duration>) -> &mut Self {
358        self.provider_record_ttl = ttl;
359        self
360    }
361
362    /// Sets the interval at which provider records for keys provided
363    /// by the local node are re-published.
364    ///
365    /// `None` means that stored provider records are never automatically
366    /// re-published.
367    ///
368    /// Must be significantly less than the provider record TTL.
369    pub fn set_provider_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
370        self.provider_publication_interval = interval;
371        self
372    }
373
374    /// Sets the amount of time to keep connections alive when they're idle.
375    #[deprecated(
376        note = "Set a global idle connection timeout via `SwarmBuilder::idle_connection_timeout` instead."
377    )]
378    pub fn set_connection_idle_timeout(&mut self, duration: Duration) -> &mut Self {
379        self.connection_idle_timeout = duration;
380        self
381    }
382
383    /// Modifies the maximum allowed size of individual Kademlia packets.
384    ///
385    /// It might be necessary to increase this value if trying to put large
386    /// records.
387    pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self {
388        self.protocol_config.set_max_packet_size(size);
389        self
390    }
391
392    /// Sets the k-bucket insertion strategy for the Kademlia routing table.
393    pub fn set_kbucket_inserts(&mut self, inserts: BucketInserts) -> &mut Self {
394        self.kbucket_inserts = inserts;
395        self
396    }
397
398    /// Sets the [`Caching`] strategy to use for successful lookups.
399    ///
400    /// The default is [`Caching::Enabled`] with a `max_peers` of 1.
401    /// Hence, with default settings and a lookup quorum of 1, a successful lookup
402    /// will result in the record being cached at the closest node to the key that
403    /// did not return the record, i.e. the standard Kademlia behaviour.
404    pub fn set_caching(&mut self, c: Caching) -> &mut Self {
405        self.caching = c;
406        self
407    }
408}
409
410impl<TStore> Behaviour<TStore>
411where
412    TStore: RecordStore + Send + 'static,
413{
414    /// Creates a new `Kademlia` network behaviour with a default configuration.
415    pub fn new(id: PeerId, store: TStore) -> Self {
416        Self::with_config(id, store, Default::default())
417    }
418
419    /// Get the protocol name of this kademlia instance.
420    pub fn protocol_names(&self) -> &[StreamProtocol] {
421        self.protocol_config.protocol_names()
422    }
423
424    /// Creates a new `Kademlia` network behaviour with the given configuration.
425    pub fn with_config(id: PeerId, store: TStore, config: Config) -> Self {
426        let local_key = kbucket::Key::from(id);
427
428        let put_record_job = config
429            .record_replication_interval
430            .or(config.record_publication_interval)
431            .map(|interval| {
432                PutRecordJob::new(
433                    id,
434                    interval,
435                    config.record_publication_interval,
436                    config.record_ttl,
437                )
438            });
439
440        let add_provider_job = config
441            .provider_publication_interval
442            .map(AddProviderJob::new);
443
444        Behaviour {
445            store,
446            caching: config.caching,
447            kbuckets: KBucketsTable::new(local_key, config.kbucket_pending_timeout),
448            kbucket_inserts: config.kbucket_inserts,
449            protocol_config: config.protocol_config,
450            record_filtering: config.record_filtering,
451            queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
452            listen_addresses: Default::default(),
453            queries: QueryPool::new(config.query_config),
454            connected_peers: Default::default(),
455            add_provider_job,
456            put_record_job,
457            record_ttl: config.record_ttl,
458            provider_record_ttl: config.provider_record_ttl,
459            connection_idle_timeout: config.connection_idle_timeout,
460            external_addresses: Default::default(),
461            local_peer_id: id,
462            connections: Default::default(),
463            mode: Mode::Client,
464            auto_mode: true,
465            no_events_waker: None,
466        }
467    }
468
469    /// Gets an iterator over immutable references to all running queries.
470    pub fn iter_queries(&self) -> impl Iterator<Item = QueryRef<'_>> {
471        self.queries.iter().filter_map(|query| {
472            if !query.is_finished() {
473                Some(QueryRef { query })
474            } else {
475                None
476            }
477        })
478    }
479
480    /// Gets an iterator over mutable references to all running queries.
481    pub fn iter_queries_mut(&mut self) -> impl Iterator<Item = QueryMut<'_>> {
482        self.queries.iter_mut().filter_map(|query| {
483            if !query.is_finished() {
484                Some(QueryMut { query })
485            } else {
486                None
487            }
488        })
489    }
490
491    /// Gets an immutable reference to a running query, if it exists.
492    pub fn query(&self, id: &QueryId) -> Option<QueryRef<'_>> {
493        self.queries.get(id).and_then(|query| {
494            if !query.is_finished() {
495                Some(QueryRef { query })
496            } else {
497                None
498            }
499        })
500    }
501
502    /// Gets a mutable reference to a running query, if it exists.
503    pub fn query_mut<'a>(&'a mut self, id: &QueryId) -> Option<QueryMut<'a>> {
504        self.queries.get_mut(id).and_then(|query| {
505            if !query.is_finished() {
506                Some(QueryMut { query })
507            } else {
508                None
509            }
510        })
511    }
512
513    /// Adds a known listen address of a peer participating in the DHT to the
514    /// routing table.
515    ///
516    /// Explicitly adding addresses of peers serves two purposes:
517    ///
518    ///   1. In order for a node to join the DHT, it must know about at least
519    ///      one other node of the DHT.
520    ///
521    ///   2. When a remote peer initiates a connection and that peer is not
522    ///      yet in the routing table, the `Kademlia` behaviour must be
523    ///      informed of an address on which that peer is listening for
524    ///      connections before it can be added to the routing table
525    ///      from where it can subsequently be discovered by all peers
526    ///      in the DHT.
527    ///
528    /// If the routing table has been updated as a result of this operation,
529    /// a [`Event::RoutingUpdated`] event is emitted.
530    pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) -> RoutingUpdate {
531        let key = kbucket::Key::from(*peer);
532        match self.kbuckets.entry(&key) {
533            kbucket::Entry::Present(mut entry, _) => {
534                if entry.value().insert(address) {
535                    self.queued_events
536                        .push_back(ToSwarm::GenerateEvent(Event::RoutingUpdated {
537                            peer: *peer,
538                            is_new_peer: false,
539                            addresses: entry.value().clone(),
540                            old_peer: None,
541                            bucket_range: self
542                                .kbuckets
543                                .bucket(&key)
544                                .map(|b| b.range())
545                                .expect("Not kbucket::Entry::SelfEntry."),
546                        }))
547                }
548                RoutingUpdate::Success
549            }
550            kbucket::Entry::Pending(mut entry, _) => {
551                entry.value().insert(address);
552                RoutingUpdate::Pending
553            }
554            kbucket::Entry::Absent(entry) => {
555                let addresses = Addresses::new(address);
556                let status = if self.connected_peers.contains(peer) {
557                    NodeStatus::Connected
558                } else {
559                    NodeStatus::Disconnected
560                };
561                match entry.insert(addresses.clone(), status) {
562                    kbucket::InsertResult::Inserted => {
563                        self.queued_events.push_back(ToSwarm::GenerateEvent(
564                            Event::RoutingUpdated {
565                                peer: *peer,
566                                is_new_peer: true,
567                                addresses,
568                                old_peer: None,
569                                bucket_range: self
570                                    .kbuckets
571                                    .bucket(&key)
572                                    .map(|b| b.range())
573                                    .expect("Not kbucket::Entry::SelfEntry."),
574                            },
575                        ));
576                        RoutingUpdate::Success
577                    }
578                    kbucket::InsertResult::Full => {
579                        debug!("Bucket full. Peer not added to routing table: {}", peer);
580                        RoutingUpdate::Failed
581                    }
582                    kbucket::InsertResult::Pending { disconnected } => {
583                        self.queued_events.push_back(ToSwarm::Dial {
584                            opts: DialOpts::peer_id(disconnected.into_preimage())
585                                .condition(dial_opts::PeerCondition::NotDialing)
586                                .build(),
587                        });
588                        RoutingUpdate::Pending
589                    }
590                }
591            }
592            kbucket::Entry::SelfEntry => RoutingUpdate::Failed,
593        }
594    }
595
596    /// Removes an address of a peer from the routing table.
597    ///
598    /// If the given address is the last address of the peer in the
599    /// routing table, the peer is removed from the routing table
600    /// and `Some` is returned with a view of the removed entry.
601    /// The same applies if the peer is currently pending insertion
602    /// into the routing table.
603    ///
604    /// If the given peer or address is not in the routing table,
605    /// this is a no-op.
606    pub fn remove_address(
607        &mut self,
608        peer: &PeerId,
609        address: &Multiaddr,
610    ) -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> {
611        let key = kbucket::Key::from(*peer);
612        match self.kbuckets.entry(&key) {
613            kbucket::Entry::Present(mut entry, _) => {
614                if entry.value().remove(address).is_err() {
615                    Some(entry.remove()) // it is the last address, thus remove the peer.
616                } else {
617                    None
618                }
619            }
620            kbucket::Entry::Pending(mut entry, _) => {
621                if entry.value().remove(address).is_err() {
622                    Some(entry.remove()) // it is the last address, thus remove the peer.
623                } else {
624                    None
625                }
626            }
627            kbucket::Entry::Absent(..) | kbucket::Entry::SelfEntry => None,
628        }
629    }
630
631    /// Removes a peer from the routing table.
632    ///
633    /// Returns `None` if the peer was not in the routing table,
634    /// not even pending insertion.
635    pub fn remove_peer(
636        &mut self,
637        peer: &PeerId,
638    ) -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> {
639        let key = kbucket::Key::from(*peer);
640        match self.kbuckets.entry(&key) {
641            kbucket::Entry::Present(entry, _) => Some(entry.remove()),
642            kbucket::Entry::Pending(entry, _) => Some(entry.remove()),
643            kbucket::Entry::Absent(..) | kbucket::Entry::SelfEntry => None,
644        }
645    }
646
647    /// Returns an iterator over all non-empty buckets in the routing table.
648    pub fn kbuckets(
649        &mut self,
650    ) -> impl Iterator<Item = kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>> {
651        self.kbuckets.iter().filter(|b| !b.is_empty())
652    }
653
654    /// Returns the k-bucket for the distance to the given key.
655    ///
656    /// Returns `None` if the given key refers to the local key.
657    pub fn kbucket<K>(
658        &mut self,
659        key: K,
660    ) -> Option<kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>>
661    where
662        K: Into<kbucket::Key<K>> + Clone,
663    {
664        self.kbuckets.bucket(&key.into())
665    }
666
667    /// Initiates an iterative query for the closest peers to the given key.
668    ///
669    /// The result of the query is delivered in a
670    /// [`Event::OutboundQueryProgressed{QueryResult::GetClosestPeers}`].
671    pub fn get_closest_peers<K>(&mut self, key: K) -> QueryId
672    where
673        K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
674    {
675        let target: kbucket::Key<K> = key.clone().into();
676        let key: Vec<u8> = key.into();
677        let info = QueryInfo::GetClosestPeers {
678            key,
679            step: ProgressStep::first(),
680        };
681        let peer_keys: Vec<kbucket::Key<PeerId>> = self.kbuckets.closest_keys(&target).collect();
682        let inner = QueryInner::new(info);
683        self.queries.add_iter_closest(target, peer_keys, inner)
684    }
685
686    /// Returns closest peers to the given key; takes peers from local routing table only.
687    pub fn get_closest_local_peers<'a, K: Clone>(
688        &'a mut self,
689        key: &'a kbucket::Key<K>,
690    ) -> impl Iterator<Item = kbucket::Key<PeerId>> + 'a {
691        self.kbuckets.closest_keys(key)
692    }
693
694    /// Performs a lookup for a record in the DHT.
695    ///
696    /// The result of this operation is delivered in a
697    /// [`Event::OutboundQueryProgressed{QueryResult::GetRecord}`].
698    pub fn get_record(&mut self, key: record_priv::Key) -> QueryId {
699        let record = if let Some(record) = self.store.get(&key) {
700            if record.is_expired(Instant::now()) {
701                self.store.remove(&key);
702                None
703            } else {
704                Some(PeerRecord {
705                    peer: None,
706                    record: record.into_owned(),
707                })
708            }
709        } else {
710            None
711        };
712
713        let step = ProgressStep::first();
714
715        let target = kbucket::Key::new(key.clone());
716        let info = if record.is_some() {
717            QueryInfo::GetRecord {
718                key,
719                step: step.next(),
720                found_a_record: true,
721                cache_candidates: BTreeMap::new(),
722            }
723        } else {
724            QueryInfo::GetRecord {
725                key,
726                step: step.clone(),
727                found_a_record: false,
728                cache_candidates: BTreeMap::new(),
729            }
730        };
731        let peers = self.kbuckets.closest_keys(&target);
732        let inner = QueryInner::new(info);
733        let id = self.queries.add_iter_closest(target.clone(), peers, inner);
734
735        // No queries were actually done for the results yet.
736        let stats = QueryStats::empty();
737
738        if let Some(record) = record {
739            self.queued_events
740                .push_back(ToSwarm::GenerateEvent(Event::OutboundQueryProgressed {
741                    id,
742                    result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(record))),
743                    step,
744                    stats,
745                }));
746        }
747
748        id
749    }
750
751    /// Stores a record in the DHT, locally as well as at the nodes
752    /// closest to the key as per the xor distance metric.
753    ///
754    /// Returns `Ok` if a record has been stored locally, providing the
755    /// `QueryId` of the initial query that replicates the record in the DHT.
756    /// The result of the query is eventually reported as a
757    /// [`Event::OutboundQueryProgressed{QueryResult::PutRecord}`].
758    ///
759    /// The record is always stored locally with the given expiration. If the record's
760    /// expiration is `None`, the common case, it does not expire in local storage
761    /// but is still replicated with the configured record TTL. To remove the record
762    /// locally and stop it from being re-published in the DHT, see [`Behaviour::remove_record`].
763    ///
764    /// After the initial publication of the record, it is subject to (re-)replication
765    /// and (re-)publication as per the configured intervals. Periodic (re-)publication
766    /// does not update the record's expiration in local storage, thus a given record
767    /// with an explicit expiration will always expire at that instant and until then
768    /// is subject to regular (re-)replication and (re-)publication.
769    pub fn put_record(
770        &mut self,
771        mut record: Record,
772        quorum: Quorum,
773    ) -> Result<QueryId, store::Error> {
774        record.publisher = Some(*self.kbuckets.local_key().preimage());
775        self.store.put(record.clone())?;
776        record.expires = record
777            .expires
778            .or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl));
779        let quorum = quorum.eval(self.queries.config().replication_factor);
780        let target = kbucket::Key::new(record.key.clone());
781        let peers = self.kbuckets.closest_keys(&target);
782        let context = PutRecordContext::Publish;
783        let info = QueryInfo::PutRecord {
784            context,
785            record,
786            quorum,
787            phase: PutRecordPhase::GetClosestPeers,
788        };
789        let inner = QueryInner::new(info);
790        Ok(self.queries.add_iter_closest(target.clone(), peers, inner))
791    }
792
793    /// Stores a record at specific peers, without storing it locally.
794    ///
795    /// The given [`Quorum`] is understood in the context of the total
796    /// number of distinct peers given.
797    ///
798    /// If the record's expiration is `None`, the configured record TTL is used.
799    ///
800    /// > **Note**: This is not a regular Kademlia DHT operation. It needs to be
801    /// > used to selectively update or store a record to specific peers
802    /// > for the purpose of e.g. making sure these peers have the latest
803    /// > "version" of a record or to "cache" a record at further peers
804    /// > to increase the lookup success rate on the DHT for other peers.
805    /// >
806    /// > In particular, there is no automatic storing of records performed, and this
807    /// > method must be used to ensure the standard Kademlia
808    /// > procedure of "caching" (i.e. storing) a found record at the closest
809    /// > node to the key that _did not_ return it.
810    pub fn put_record_to<I>(&mut self, mut record: Record, peers: I, quorum: Quorum) -> QueryId
811    where
812        I: ExactSizeIterator<Item = PeerId>,
813    {
814        let quorum = if peers.len() > 0 {
815            quorum.eval(NonZeroUsize::new(peers.len()).expect("> 0"))
816        } else {
817            // If no peers are given, we just let the query fail immediately
818            // due to the fact that the quorum must be at least one, instead of
819            // introducing a new kind of error.
820            NonZeroUsize::new(1).expect("1 > 0")
821        };
822        record.expires = record
823            .expires
824            .or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl));
825        let context = PutRecordContext::Custom;
826        let info = QueryInfo::PutRecord {
827            context,
828            record,
829            quorum,
830            phase: PutRecordPhase::PutRecord {
831                success: Vec::new(),
832                get_closest_peers_stats: QueryStats::empty(),
833            },
834        };
835        let inner = QueryInner::new(info);
836        self.queries.add_fixed(peers, inner)
837    }
838
839    /// Removes the record with the given key from _local_ storage,
840    /// if the local node is the publisher of the record.
841    ///
842    /// Has no effect if a record for the given key is stored locally but
843    /// the local node is not a publisher of the record.
844    ///
845    /// This is a _local_ operation. However, it also has the effect that
846    /// the record will no longer be periodically re-published, allowing the
847    /// record to eventually expire throughout the DHT.
848    pub fn remove_record(&mut self, key: &record_priv::Key) {
849        if let Some(r) = self.store.get(key) {
850            if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
851                self.store.remove(key)
852            }
853        }
854    }
855
856    /// Gets a mutable reference to the record store.
857    pub fn store_mut(&mut self) -> &mut TStore {
858        &mut self.store
859    }
860
861    /// Bootstraps the local node to join the DHT.
862    ///
863    /// Bootstrapping is a multi-step operation that starts with a lookup of the local node's
864    /// own ID in the DHT. This introduces the local node to the other nodes
865    /// in the DHT and populates its routing table with the closest neighbours.
866    ///
867    /// Subsequently, all buckets farther from the bucket of the closest neighbour are
868    /// refreshed by initiating an additional bootstrapping query for each such
869    /// bucket with random keys.
870    ///
871    /// Returns `Ok` if bootstrapping has been initiated with a self-lookup, providing the
872    /// `QueryId` for the entire bootstrapping process. The progress of bootstrapping is
873    /// reported via [`Event::OutboundQueryProgressed{QueryResult::Bootstrap}`] events,
874    /// with one such event per bootstrapping query.
875    ///
876    /// Returns `Err` if bootstrapping is impossible due an empty routing table.
877    ///
878    /// > **Note**: Bootstrapping requires at least one node of the DHT to be known.
879    /// > See [`Behaviour::add_address`].
880    pub fn bootstrap(&mut self) -> Result<QueryId, NoKnownPeers> {
881        let local_key = self.kbuckets.local_key().clone();
882        let info = QueryInfo::Bootstrap {
883            peer: *local_key.preimage(),
884            remaining: None,
885            step: ProgressStep::first(),
886        };
887        let peers = self.kbuckets.closest_keys(&local_key).collect::<Vec<_>>();
888        if peers.is_empty() {
889            Err(NoKnownPeers())
890        } else {
891            let inner = QueryInner::new(info);
892            Ok(self.queries.add_iter_closest(local_key, peers, inner))
893        }
894    }
895
896    /// Establishes the local node as a provider of a value for the given key.
897    ///
898    /// This operation publishes a provider record with the given key and
899    /// identity of the local node to the peers closest to the key, thus establishing
900    /// the local node as a provider.
901    ///
902    /// Returns `Ok` if a provider record has been stored locally, providing the
903    /// `QueryId` of the initial query that announces the local node as a provider.
904    ///
905    /// The publication of the provider records is periodically repeated as per the
906    /// configured interval, to renew the expiry and account for changes to the DHT
907    /// topology. A provider record may be removed from local storage and
908    /// thus no longer re-published by calling [`Behaviour::stop_providing`].
909    ///
910    /// In contrast to the standard Kademlia push-based model for content distribution
911    /// implemented by [`Behaviour::put_record`], the provider API implements a
912    /// pull-based model that may be used in addition or as an alternative.
913    /// The means by which the actual value is obtained from a provider is out of scope
914    /// of the libp2p Kademlia provider API.
915    ///
916    /// The results of the (repeated) provider announcements sent by this node are
917    /// reported via [`Event::OutboundQueryProgressed{QueryResult::StartProviding}`].
918    pub fn start_providing(&mut self, key: record_priv::Key) -> Result<QueryId, store::Error> {
919        // Note: We store our own provider records locally without local addresses
920        // to avoid redundant storage and outdated addresses. Instead these are
921        // acquired on demand when returning a `ProviderRecord` for the local node.
922        let local_addrs = Vec::new();
923        let record = ProviderRecord::new(
924            key.clone(),
925            *self.kbuckets.local_key().preimage(),
926            local_addrs,
927        );
928        self.store.add_provider(record)?;
929        let target = kbucket::Key::new(key.clone());
930        let peers = self.kbuckets.closest_keys(&target);
931        let context = AddProviderContext::Publish;
932        let info = QueryInfo::AddProvider {
933            context,
934            key,
935            phase: AddProviderPhase::GetClosestPeers,
936        };
937        let inner = QueryInner::new(info);
938        let id = self.queries.add_iter_closest(target.clone(), peers, inner);
939        Ok(id)
940    }
941
942    /// Stops the local node from announcing that it is a provider for the given key.
943    ///
944    /// This is a local operation. The local node will still be considered as a
945    /// provider for the key by other nodes until these provider records expire.
946    pub fn stop_providing(&mut self, key: &record_priv::Key) {
947        self.store
948            .remove_provider(key, self.kbuckets.local_key().preimage());
949    }
950
951    /// Performs a lookup for providers of a value to the given key.
952    ///
953    /// The result of this operation is delivered in a
954    /// reported via [`Event::OutboundQueryProgressed{QueryResult::GetProviders}`].
955    pub fn get_providers(&mut self, key: record_priv::Key) -> QueryId {
956        let providers: HashSet<_> = self
957            .store
958            .providers(&key)
959            .into_iter()
960            .filter(|p| !p.is_expired(Instant::now()))
961            .map(|p| p.provider)
962            .collect();
963
964        let step = ProgressStep::first();
965
966        let info = QueryInfo::GetProviders {
967            key: key.clone(),
968            providers_found: providers.len(),
969            step: if providers.is_empty() {
970                step.clone()
971            } else {
972                step.next()
973            },
974        };
975
976        let target = kbucket::Key::new(key.clone());
977        let peers = self.kbuckets.closest_keys(&target);
978        let inner = QueryInner::new(info);
979        let id = self.queries.add_iter_closest(target.clone(), peers, inner);
980
981        // No queries were actually done for the results yet.
982        let stats = QueryStats::empty();
983
984        if !providers.is_empty() {
985            self.queued_events
986                .push_back(ToSwarm::GenerateEvent(Event::OutboundQueryProgressed {
987                    id,
988                    result: QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders {
989                        key,
990                        providers,
991                    })),
992                    step,
993                    stats,
994                }));
995        }
996        id
997    }
998
999    /// Set the [`Mode`] in which we should operate.
1000    ///
1001    /// By default, we are in [`Mode::Client`] and will swap into [`Mode::Server`] as soon as we have a confirmed, external address via [`FromSwarm::ExternalAddrConfirmed`].
1002    ///
1003    /// Setting a mode via this function disables this automatic behaviour and unconditionally operates in the specified mode.
1004    /// To reactivate the automatic configuration, pass [`None`] instead.
1005    pub fn set_mode(&mut self, mode: Option<Mode>) {
1006        match mode {
1007            Some(mode) => {
1008                self.mode = mode;
1009                self.auto_mode = false;
1010                self.reconfigure_mode();
1011            }
1012            None => {
1013                self.auto_mode = true;
1014                self.determine_mode_from_external_addresses();
1015            }
1016        }
1017
1018        if let Some(waker) = self.no_events_waker.take() {
1019            waker.wake();
1020        }
1021    }
1022
1023    fn reconfigure_mode(&mut self) {
1024        if self.connections.is_empty() {
1025            return;
1026        }
1027
1028        let num_connections = self.connections.len();
1029
1030        log::debug!(
1031            "Re-configuring {} established connection{}",
1032            num_connections,
1033            if num_connections > 1 { "s" } else { "" }
1034        );
1035
1036        self.queued_events
1037            .extend(
1038                self.connections
1039                    .iter()
1040                    .map(|(conn_id, peer_id)| ToSwarm::NotifyHandler {
1041                        peer_id: *peer_id,
1042                        handler: NotifyHandler::One(*conn_id),
1043                        event: HandlerIn::ReconfigureMode {
1044                            new_mode: self.mode,
1045                        },
1046                    }),
1047            );
1048    }
1049
1050    fn determine_mode_from_external_addresses(&mut self) {
1051        self.mode = match (self.external_addresses.as_slice(), self.mode) {
1052            ([], Mode::Server) => {
1053                log::debug!("Switching to client-mode because we no longer have any confirmed external addresses");
1054
1055                Mode::Client
1056            }
1057            ([], Mode::Client) => {
1058                // Previously client-mode, now also client-mode because no external addresses.
1059
1060                Mode::Client
1061            }
1062            (confirmed_external_addresses, Mode::Client) => {
1063                if log::log_enabled!(log::Level::Debug) {
1064                    let confirmed_external_addresses =
1065                        to_comma_separated_list(confirmed_external_addresses);
1066
1067                    log::debug!("Switching to server-mode assuming that one of [{confirmed_external_addresses}] is externally reachable");
1068                }
1069
1070                Mode::Server
1071            }
1072            (confirmed_external_addresses, Mode::Server) => {
1073                debug_assert!(
1074                    !confirmed_external_addresses.is_empty(),
1075                    "Previous match arm handled empty list"
1076                );
1077
1078                // Previously, server-mode, now also server-mode because > 1 external address. Don't log anything to avoid spam.
1079
1080                Mode::Server
1081            }
1082        };
1083
1084        self.reconfigure_mode();
1085    }
1086
1087    /// Processes discovered peers from a successful request in an iterative `Query`.
1088    fn discovered<'a, I>(&'a mut self, query_id: &QueryId, source: &PeerId, peers: I)
1089    where
1090        I: Iterator<Item = &'a KadPeer> + Clone,
1091    {
1092        let local_id = self.kbuckets.local_key().preimage();
1093        let others_iter = peers.filter(|p| &p.node_id != local_id);
1094        if let Some(query) = self.queries.get_mut(query_id) {
1095            log::trace!("Request to {:?} in query {:?} succeeded.", source, query_id);
1096            for peer in others_iter.clone() {
1097                log::trace!(
1098                    "Peer {:?} reported by {:?} in query {:?}.",
1099                    peer,
1100                    source,
1101                    query_id
1102                );
1103                let addrs = peer.multiaddrs.iter().cloned().collect();
1104                query.inner.addresses.insert(peer.node_id, addrs);
1105            }
1106            query.on_success(source, others_iter.cloned().map(|kp| kp.node_id))
1107        }
1108    }
1109
1110    /// Finds the closest peers to a `target` in the context of a request by
1111    /// the `source` peer, such that the `source` peer is never included in the
1112    /// result.
1113    fn find_closest<T: Clone>(
1114        &mut self,
1115        target: &kbucket::Key<T>,
1116        source: &PeerId,
1117    ) -> Vec<KadPeer> {
1118        if target == self.kbuckets.local_key() {
1119            Vec::new()
1120        } else {
1121            self.kbuckets
1122                .closest(target)
1123                .filter(|e| e.node.key.preimage() != source)
1124                .take(self.queries.config().replication_factor.get())
1125                .map(KadPeer::from)
1126                .collect()
1127        }
1128    }
1129
1130    /// Collects all peers who are known to be providers of the value for a given `Multihash`.
1131    fn provider_peers(&mut self, key: &record_priv::Key, source: &PeerId) -> Vec<KadPeer> {
1132        let kbuckets = &mut self.kbuckets;
1133        let connected = &mut self.connected_peers;
1134        let listen_addresses = &self.listen_addresses;
1135        let external_addresses = &self.external_addresses;
1136
1137        self.store
1138            .providers(key)
1139            .into_iter()
1140            .filter_map(move |p| {
1141                if &p.provider != source {
1142                    let node_id = p.provider;
1143                    let multiaddrs = p.addresses;
1144                    let connection_ty = if connected.contains(&node_id) {
1145                        ConnectionType::Connected
1146                    } else {
1147                        ConnectionType::NotConnected
1148                    };
1149                    if multiaddrs.is_empty() {
1150                        // The provider is either the local node and we fill in
1151                        // the local addresses on demand, or it is a legacy
1152                        // provider record without addresses, in which case we
1153                        // try to find addresses in the routing table, as was
1154                        // done before provider records were stored along with
1155                        // their addresses.
1156                        if &node_id == kbuckets.local_key().preimage() {
1157                            Some(
1158                                listen_addresses
1159                                    .iter()
1160                                    .chain(external_addresses.iter())
1161                                    .cloned()
1162                                    .collect::<Vec<_>>(),
1163                            )
1164                        } else {
1165                            let key = kbucket::Key::from(node_id);
1166                            kbuckets
1167                                .entry(&key)
1168                                .view()
1169                                .map(|e| e.node.value.clone().into_vec())
1170                        }
1171                    } else {
1172                        Some(multiaddrs)
1173                    }
1174                    .map(|multiaddrs| KadPeer {
1175                        node_id,
1176                        multiaddrs,
1177                        connection_ty,
1178                    })
1179                } else {
1180                    None
1181                }
1182            })
1183            .take(self.queries.config().replication_factor.get())
1184            .collect()
1185    }
1186
1187    /// Starts an iterative `ADD_PROVIDER` query for the given key.
1188    fn start_add_provider(&mut self, key: record_priv::Key, context: AddProviderContext) {
1189        let info = QueryInfo::AddProvider {
1190            context,
1191            key: key.clone(),
1192            phase: AddProviderPhase::GetClosestPeers,
1193        };
1194        let target = kbucket::Key::new(key);
1195        let peers = self.kbuckets.closest_keys(&target);
1196        let inner = QueryInner::new(info);
1197        self.queries.add_iter_closest(target.clone(), peers, inner);
1198    }
1199
1200    /// Starts an iterative `PUT_VALUE` query for the given record.
1201    fn start_put_record(&mut self, record: Record, quorum: Quorum, context: PutRecordContext) {
1202        let quorum = quorum.eval(self.queries.config().replication_factor);
1203        let target = kbucket::Key::new(record.key.clone());
1204        let peers = self.kbuckets.closest_keys(&target);
1205        let info = QueryInfo::PutRecord {
1206            record,
1207            quorum,
1208            context,
1209            phase: PutRecordPhase::GetClosestPeers,
1210        };
1211        let inner = QueryInner::new(info);
1212        self.queries.add_iter_closest(target.clone(), peers, inner);
1213    }
1214
1215    /// Updates the routing table with a new connection status and address of a peer.
1216    fn connection_updated(
1217        &mut self,
1218        peer: PeerId,
1219        address: Option<Multiaddr>,
1220        new_status: NodeStatus,
1221    ) {
1222        let key = kbucket::Key::from(peer);
1223        match self.kbuckets.entry(&key) {
1224            kbucket::Entry::Present(mut entry, old_status) => {
1225                if old_status != new_status {
1226                    entry.update(new_status)
1227                }
1228                if let Some(address) = address {
1229                    if entry.value().insert(address) {
1230                        self.queued_events.push_back(ToSwarm::GenerateEvent(
1231                            Event::RoutingUpdated {
1232                                peer,
1233                                is_new_peer: false,
1234                                addresses: entry.value().clone(),
1235                                old_peer: None,
1236                                bucket_range: self
1237                                    .kbuckets
1238                                    .bucket(&key)
1239                                    .map(|b| b.range())
1240                                    .expect("Not kbucket::Entry::SelfEntry."),
1241                            },
1242                        ))
1243                    }
1244                }
1245            }
1246
1247            kbucket::Entry::Pending(mut entry, old_status) => {
1248                if let Some(address) = address {
1249                    entry.value().insert(address);
1250                }
1251                if old_status != new_status {
1252                    entry.update(new_status);
1253                }
1254            }
1255
1256            kbucket::Entry::Absent(entry) => {
1257                // Only connected nodes with a known address are newly inserted.
1258                if new_status != NodeStatus::Connected {
1259                    return;
1260                }
1261                match (address, self.kbucket_inserts) {
1262                    (None, _) => {
1263                        self.queued_events
1264                            .push_back(ToSwarm::GenerateEvent(Event::UnroutablePeer { peer }));
1265                    }
1266                    (Some(a), BucketInserts::Manual) => {
1267                        self.queued_events
1268                            .push_back(ToSwarm::GenerateEvent(Event::RoutablePeer {
1269                                peer,
1270                                address: a,
1271                            }));
1272                    }
1273                    (Some(a), BucketInserts::OnConnected) => {
1274                        let addresses = Addresses::new(a);
1275                        match entry.insert(addresses.clone(), new_status) {
1276                            kbucket::InsertResult::Inserted => {
1277                                let event = Event::RoutingUpdated {
1278                                    peer,
1279                                    is_new_peer: true,
1280                                    addresses,
1281                                    old_peer: None,
1282                                    bucket_range: self
1283                                        .kbuckets
1284                                        .bucket(&key)
1285                                        .map(|b| b.range())
1286                                        .expect("Not kbucket::Entry::SelfEntry."),
1287                                };
1288                                self.queued_events.push_back(ToSwarm::GenerateEvent(event));
1289                            }
1290                            kbucket::InsertResult::Full => {
1291                                debug!("Bucket full. Peer not added to routing table: {}", peer);
1292                                let address = addresses.first().clone();
1293                                self.queued_events.push_back(ToSwarm::GenerateEvent(
1294                                    Event::RoutablePeer { peer, address },
1295                                ));
1296                            }
1297                            kbucket::InsertResult::Pending { disconnected } => {
1298                                let address = addresses.first().clone();
1299                                self.queued_events.push_back(ToSwarm::GenerateEvent(
1300                                    Event::PendingRoutablePeer { peer, address },
1301                                ));
1302
1303                                // `disconnected` might already be in the process of re-connecting.
1304                                // In other words `disconnected` might have already re-connected but
1305                                // is not yet confirmed to support the Kademlia protocol via
1306                                // [`HandlerEvent::ProtocolConfirmed`].
1307                                //
1308                                // Only try dialing peer if not currently connected.
1309                                if !self.connected_peers.contains(disconnected.preimage()) {
1310                                    self.queued_events.push_back(ToSwarm::Dial {
1311                                        opts: DialOpts::peer_id(disconnected.into_preimage())
1312                                            .condition(dial_opts::PeerCondition::NotDialing)
1313                                            .build(),
1314                                    })
1315                                }
1316                            }
1317                        }
1318                    }
1319                }
1320            }
1321            _ => {}
1322        }
1323    }
1324
1325    /// Handles a finished (i.e. successful) query.
1326    fn query_finished(&mut self, q: Query<QueryInner>) -> Option<Event> {
1327        let query_id = q.id();
1328        log::trace!("Query {:?} finished.", query_id);
1329        let result = q.into_result();
1330        match result.inner.info {
1331            QueryInfo::Bootstrap {
1332                peer,
1333                remaining,
1334                mut step,
1335            } => {
1336                let local_key = self.kbuckets.local_key().clone();
1337                let mut remaining = remaining.unwrap_or_else(|| {
1338                    debug_assert_eq!(&peer, local_key.preimage());
1339                    // The lookup for the local key finished. To complete the bootstrap process,
1340                    // a bucket refresh should be performed for every bucket farther away than
1341                    // the first non-empty bucket (which are most likely no more than the last
1342                    // few, i.e. farthest, buckets).
1343                    self.kbuckets
1344                        .iter()
1345                        .skip_while(|b| b.is_empty())
1346                        .skip(1) // Skip the bucket with the closest neighbour.
1347                        .map(|b| {
1348                            // Try to find a key that falls into the bucket. While such keys can
1349                            // be generated fully deterministically, the current libp2p kademlia
1350                            // wire protocol requires transmission of the preimages of the actual
1351                            // keys in the DHT keyspace, hence for now this is just a "best effort"
1352                            // to find a key that hashes into a specific bucket. The probabilities
1353                            // of finding a key in the bucket `b` with as most 16 trials are as
1354                            // follows:
1355                            //
1356                            // Pr(bucket-255) = 1 - (1/2)^16   ~= 1
1357                            // Pr(bucket-254) = 1 - (3/4)^16   ~= 1
1358                            // Pr(bucket-253) = 1 - (7/8)^16   ~= 0.88
1359                            // Pr(bucket-252) = 1 - (15/16)^16 ~= 0.64
1360                            // ...
1361                            let mut target = kbucket::Key::from(PeerId::random());
1362                            for _ in 0..16 {
1363                                let d = local_key.distance(&target);
1364                                if b.contains(&d) {
1365                                    break;
1366                                }
1367                                target = kbucket::Key::from(PeerId::random());
1368                            }
1369                            target
1370                        })
1371                        .collect::<Vec<_>>()
1372                        .into_iter()
1373                });
1374
1375                let num_remaining = remaining.len() as u32;
1376
1377                if let Some(target) = remaining.next() {
1378                    let info = QueryInfo::Bootstrap {
1379                        peer: *target.preimage(),
1380                        remaining: Some(remaining),
1381                        step: step.next(),
1382                    };
1383                    let peers = self.kbuckets.closest_keys(&target);
1384                    let inner = QueryInner::new(info);
1385                    self.queries
1386                        .continue_iter_closest(query_id, target.clone(), peers, inner);
1387                } else {
1388                    step.last = true;
1389                };
1390
1391                Some(Event::OutboundQueryProgressed {
1392                    id: query_id,
1393                    stats: result.stats,
1394                    result: QueryResult::Bootstrap(Ok(BootstrapOk {
1395                        peer,
1396                        num_remaining,
1397                    })),
1398                    step,
1399                })
1400            }
1401
1402            QueryInfo::GetClosestPeers { key, mut step } => {
1403                step.last = true;
1404
1405                Some(Event::OutboundQueryProgressed {
1406                    id: query_id,
1407                    stats: result.stats,
1408                    result: QueryResult::GetClosestPeers(Ok(GetClosestPeersOk {
1409                        key,
1410                        peers: result.peers.collect(),
1411                    })),
1412                    step,
1413                })
1414            }
1415
1416            QueryInfo::GetProviders { mut step, .. } => {
1417                step.last = true;
1418
1419                Some(Event::OutboundQueryProgressed {
1420                    id: query_id,
1421                    stats: result.stats,
1422                    result: QueryResult::GetProviders(Ok(
1423                        GetProvidersOk::FinishedWithNoAdditionalRecord {
1424                            closest_peers: result.peers.collect(),
1425                        },
1426                    )),
1427                    step,
1428                })
1429            }
1430
1431            QueryInfo::AddProvider {
1432                context,
1433                key,
1434                phase: AddProviderPhase::GetClosestPeers,
1435            } => {
1436                let provider_id = self.local_peer_id;
1437                let external_addresses = self.external_addresses.iter().cloned().collect();
1438                let inner = QueryInner::new(QueryInfo::AddProvider {
1439                    context,
1440                    key,
1441                    phase: AddProviderPhase::AddProvider {
1442                        provider_id,
1443                        external_addresses,
1444                        get_closest_peers_stats: result.stats,
1445                    },
1446                });
1447                self.queries.continue_fixed(query_id, result.peers, inner);
1448                None
1449            }
1450
1451            QueryInfo::AddProvider {
1452                context,
1453                key,
1454                phase:
1455                    AddProviderPhase::AddProvider {
1456                        get_closest_peers_stats,
1457                        ..
1458                    },
1459            } => match context {
1460                AddProviderContext::Publish => Some(Event::OutboundQueryProgressed {
1461                    id: query_id,
1462                    stats: get_closest_peers_stats.merge(result.stats),
1463                    result: QueryResult::StartProviding(Ok(AddProviderOk { key })),
1464                    step: ProgressStep::first_and_last(),
1465                }),
1466                AddProviderContext::Republish => Some(Event::OutboundQueryProgressed {
1467                    id: query_id,
1468                    stats: get_closest_peers_stats.merge(result.stats),
1469                    result: QueryResult::RepublishProvider(Ok(AddProviderOk { key })),
1470                    step: ProgressStep::first_and_last(),
1471                }),
1472            },
1473
1474            QueryInfo::GetRecord {
1475                key,
1476                mut step,
1477                found_a_record,
1478                cache_candidates,
1479            } => {
1480                step.last = true;
1481
1482                let results = if found_a_record {
1483                    Ok(GetRecordOk::FinishedWithNoAdditionalRecord { cache_candidates })
1484                } else {
1485                    Err(GetRecordError::NotFound {
1486                        key,
1487                        closest_peers: result.peers.collect(),
1488                    })
1489                };
1490                Some(Event::OutboundQueryProgressed {
1491                    id: query_id,
1492                    stats: result.stats,
1493                    result: QueryResult::GetRecord(results),
1494                    step,
1495                })
1496            }
1497
1498            QueryInfo::PutRecord {
1499                context,
1500                record,
1501                quorum,
1502                phase: PutRecordPhase::GetClosestPeers,
1503            } => {
1504                let info = QueryInfo::PutRecord {
1505                    context,
1506                    record,
1507                    quorum,
1508                    phase: PutRecordPhase::PutRecord {
1509                        success: vec![],
1510                        get_closest_peers_stats: result.stats,
1511                    },
1512                };
1513                let inner = QueryInner::new(info);
1514                self.queries.continue_fixed(query_id, result.peers, inner);
1515                None
1516            }
1517
1518            QueryInfo::PutRecord {
1519                context,
1520                record,
1521                quorum,
1522                phase:
1523                    PutRecordPhase::PutRecord {
1524                        success,
1525                        get_closest_peers_stats,
1526                    },
1527            } => {
1528                let mk_result = |key: record_priv::Key| {
1529                    if success.len() >= quorum.get() {
1530                        Ok(PutRecordOk { key })
1531                    } else {
1532                        Err(PutRecordError::QuorumFailed {
1533                            key,
1534                            quorum,
1535                            success,
1536                        })
1537                    }
1538                };
1539                match context {
1540                    PutRecordContext::Publish | PutRecordContext::Custom => {
1541                        Some(Event::OutboundQueryProgressed {
1542                            id: query_id,
1543                            stats: get_closest_peers_stats.merge(result.stats),
1544                            result: QueryResult::PutRecord(mk_result(record.key)),
1545                            step: ProgressStep::first_and_last(),
1546                        })
1547                    }
1548                    PutRecordContext::Republish => Some(Event::OutboundQueryProgressed {
1549                        id: query_id,
1550                        stats: get_closest_peers_stats.merge(result.stats),
1551                        result: QueryResult::RepublishRecord(mk_result(record.key)),
1552                        step: ProgressStep::first_and_last(),
1553                    }),
1554                    PutRecordContext::Replicate => {
1555                        debug!("Record replicated: {:?}", record.key);
1556                        None
1557                    }
1558                }
1559            }
1560        }
1561    }
1562
1563    /// Handles a query that timed out.
1564    fn query_timeout(&mut self, query: Query<QueryInner>) -> Option<Event> {
1565        let query_id = query.id();
1566        log::trace!("Query {:?} timed out.", query_id);
1567        let result = query.into_result();
1568        match result.inner.info {
1569            QueryInfo::Bootstrap {
1570                peer,
1571                mut remaining,
1572                mut step,
1573            } => {
1574                let num_remaining = remaining.as_ref().map(|r| r.len().saturating_sub(1) as u32);
1575
1576                // Continue with the next bootstrap query if `remaining` is not empty.
1577                if let Some((target, remaining)) =
1578                    remaining.take().and_then(|mut r| Some((r.next()?, r)))
1579                {
1580                    let info = QueryInfo::Bootstrap {
1581                        peer: target.clone().into_preimage(),
1582                        remaining: Some(remaining),
1583                        step: step.next(),
1584                    };
1585                    let peers = self.kbuckets.closest_keys(&target);
1586                    let inner = QueryInner::new(info);
1587                    self.queries
1588                        .continue_iter_closest(query_id, target.clone(), peers, inner);
1589                } else {
1590                    step.last = true;
1591                }
1592
1593                Some(Event::OutboundQueryProgressed {
1594                    id: query_id,
1595                    stats: result.stats,
1596                    result: QueryResult::Bootstrap(Err(BootstrapError::Timeout {
1597                        peer,
1598                        num_remaining,
1599                    })),
1600                    step,
1601                })
1602            }
1603
1604            QueryInfo::AddProvider { context, key, .. } => Some(match context {
1605                AddProviderContext::Publish => Event::OutboundQueryProgressed {
1606                    id: query_id,
1607                    stats: result.stats,
1608                    result: QueryResult::StartProviding(Err(AddProviderError::Timeout { key })),
1609                    step: ProgressStep::first_and_last(),
1610                },
1611                AddProviderContext::Republish => Event::OutboundQueryProgressed {
1612                    id: query_id,
1613                    stats: result.stats,
1614                    result: QueryResult::RepublishProvider(Err(AddProviderError::Timeout { key })),
1615                    step: ProgressStep::first_and_last(),
1616                },
1617            }),
1618
1619            QueryInfo::GetClosestPeers { key, mut step } => {
1620                step.last = true;
1621
1622                Some(Event::OutboundQueryProgressed {
1623                    id: query_id,
1624                    stats: result.stats,
1625                    result: QueryResult::GetClosestPeers(Err(GetClosestPeersError::Timeout {
1626                        key,
1627                        peers: result.peers.collect(),
1628                    })),
1629                    step,
1630                })
1631            }
1632
1633            QueryInfo::PutRecord {
1634                record,
1635                quorum,
1636                context,
1637                phase,
1638            } => {
1639                let err = Err(PutRecordError::Timeout {
1640                    key: record.key,
1641                    quorum,
1642                    success: match phase {
1643                        PutRecordPhase::GetClosestPeers => vec![],
1644                        PutRecordPhase::PutRecord { ref success, .. } => success.clone(),
1645                    },
1646                });
1647                match context {
1648                    PutRecordContext::Publish | PutRecordContext::Custom => {
1649                        Some(Event::OutboundQueryProgressed {
1650                            id: query_id,
1651                            stats: result.stats,
1652                            result: QueryResult::PutRecord(err),
1653                            step: ProgressStep::first_and_last(),
1654                        })
1655                    }
1656                    PutRecordContext::Republish => Some(Event::OutboundQueryProgressed {
1657                        id: query_id,
1658                        stats: result.stats,
1659                        result: QueryResult::RepublishRecord(err),
1660                        step: ProgressStep::first_and_last(),
1661                    }),
1662                    PutRecordContext::Replicate => match phase {
1663                        PutRecordPhase::GetClosestPeers => {
1664                            warn!("Locating closest peers for replication failed: {:?}", err);
1665                            None
1666                        }
1667                        PutRecordPhase::PutRecord { .. } => {
1668                            debug!("Replicating record failed: {:?}", err);
1669                            None
1670                        }
1671                    },
1672                }
1673            }
1674
1675            QueryInfo::GetRecord { key, mut step, .. } => {
1676                step.last = true;
1677
1678                Some(Event::OutboundQueryProgressed {
1679                    id: query_id,
1680                    stats: result.stats,
1681                    result: QueryResult::GetRecord(Err(GetRecordError::Timeout { key })),
1682                    step,
1683                })
1684            }
1685
1686            QueryInfo::GetProviders { key, mut step, .. } => {
1687                step.last = true;
1688
1689                Some(Event::OutboundQueryProgressed {
1690                    id: query_id,
1691                    stats: result.stats,
1692                    result: QueryResult::GetProviders(Err(GetProvidersError::Timeout {
1693                        key,
1694                        closest_peers: result.peers.collect(),
1695                    })),
1696                    step,
1697                })
1698            }
1699        }
1700    }
1701
1702    /// Processes a record received from a peer.
1703    fn record_received(
1704        &mut self,
1705        source: PeerId,
1706        connection: ConnectionId,
1707        request_id: RequestId,
1708        mut record: Record,
1709    ) {
1710        if record.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
1711            // If the (alleged) publisher is the local node, do nothing. The record of
1712            // the original publisher should never change as a result of replication
1713            // and the publisher is always assumed to have the "right" value.
1714            self.queued_events.push_back(ToSwarm::NotifyHandler {
1715                peer_id: source,
1716                handler: NotifyHandler::One(connection),
1717                event: HandlerIn::PutRecordRes {
1718                    key: record.key,
1719                    value: record.value,
1720                    request_id,
1721                },
1722            });
1723            return;
1724        }
1725
1726        let now = Instant::now();
1727
1728        // Calculate the expiration exponentially inversely proportional to the
1729        // number of nodes between the local node and the closest node to the key
1730        // (beyond the replication factor). This ensures avoiding over-caching
1731        // outside of the k closest nodes to a key.
1732        let target = kbucket::Key::new(record.key.clone());
1733        let num_between = self.kbuckets.count_nodes_between(&target);
1734        let k = self.queries.config().replication_factor.get();
1735        let num_beyond_k = (usize::max(k, num_between) - k) as u32;
1736        let expiration = self
1737            .record_ttl
1738            .map(|ttl| now + exp_decrease(ttl, num_beyond_k));
1739        // The smaller TTL prevails. Only if neither TTL is set is the record
1740        // stored "forever".
1741        record.expires = record.expires.or(expiration).min(expiration);
1742
1743        if let Some(job) = self.put_record_job.as_mut() {
1744            // Ignore the record in the next run of the replication
1745            // job, since we can assume the sender replicated the
1746            // record to the k closest peers. Effectively, only
1747            // one of the k closest peers performs a replication
1748            // in the configured interval, assuming a shared interval.
1749            job.skip(record.key.clone())
1750        }
1751
1752        // While records received from a publisher, as well as records that do
1753        // not exist locally should always (attempted to) be stored, there is a
1754        // choice here w.r.t. the handling of replicated records whose keys refer
1755        // to records that exist locally: The value and / or the publisher may
1756        // either be overridden or left unchanged. At the moment and in the
1757        // absence of a decisive argument for another option, both are always
1758        // overridden as it avoids having to load the existing record in the
1759        // first place.
1760
1761        if !record.is_expired(now) {
1762            // The record is cloned because of the weird libp2p protocol
1763            // requirement to send back the value in the response, although this
1764            // is a waste of resources.
1765            match self.record_filtering {
1766                StoreInserts::Unfiltered => match self.store.put(record.clone()) {
1767                    Ok(()) => {
1768                        debug!(
1769                            "Record stored: {:?}; {} bytes",
1770                            record.key,
1771                            record.value.len()
1772                        );
1773                        self.queued_events.push_back(ToSwarm::GenerateEvent(
1774                            Event::InboundRequest {
1775                                request: InboundRequest::PutRecord {
1776                                    source,
1777                                    connection,
1778                                    record: None,
1779                                },
1780                            },
1781                        ));
1782                    }
1783                    Err(e) => {
1784                        info!("Record not stored: {:?}", e);
1785                        self.queued_events.push_back(ToSwarm::NotifyHandler {
1786                            peer_id: source,
1787                            handler: NotifyHandler::One(connection),
1788                            event: HandlerIn::Reset(request_id),
1789                        });
1790
1791                        return;
1792                    }
1793                },
1794                StoreInserts::FilterBoth => {
1795                    self.queued_events
1796                        .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1797                            request: InboundRequest::PutRecord {
1798                                source,
1799                                connection,
1800                                record: Some(record.clone()),
1801                            },
1802                        }));
1803                }
1804            }
1805        }
1806
1807        // The remote receives a [`HandlerIn::PutRecordRes`] even in the
1808        // case where the record is discarded due to being expired. Given that
1809        // the remote sent the local node a [`HandlerEvent::PutRecord`]
1810        // request, the remote perceives the local node as one node among the k
1811        // closest nodes to the target. In addition returning
1812        // [`HandlerIn::PutRecordRes`] does not reveal any internal
1813        // information to a possibly malicious remote node.
1814        self.queued_events.push_back(ToSwarm::NotifyHandler {
1815            peer_id: source,
1816            handler: NotifyHandler::One(connection),
1817            event: HandlerIn::PutRecordRes {
1818                key: record.key,
1819                value: record.value,
1820                request_id,
1821            },
1822        })
1823    }
1824
1825    /// Processes a provider record received from a peer.
1826    fn provider_received(&mut self, key: record_priv::Key, provider: KadPeer) {
1827        if &provider.node_id != self.kbuckets.local_key().preimage() {
1828            let record = ProviderRecord {
1829                key,
1830                provider: provider.node_id,
1831                expires: self.provider_record_ttl.map(|ttl| Instant::now() + ttl),
1832                addresses: provider.multiaddrs,
1833            };
1834            match self.record_filtering {
1835                StoreInserts::Unfiltered => {
1836                    if let Err(e) = self.store.add_provider(record) {
1837                        info!("Provider record not stored: {:?}", e);
1838                        return;
1839                    }
1840
1841                    self.queued_events
1842                        .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1843                            request: InboundRequest::AddProvider { record: None },
1844                        }));
1845                }
1846                StoreInserts::FilterBoth => {
1847                    self.queued_events
1848                        .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1849                            request: InboundRequest::AddProvider {
1850                                record: Some(record),
1851                            },
1852                        }));
1853                }
1854            }
1855        }
1856    }
1857
1858    fn address_failed(&mut self, peer_id: PeerId, address: &Multiaddr) {
1859        let key = kbucket::Key::from(peer_id);
1860
1861        if let Some(addrs) = self.kbuckets.entry(&key).value() {
1862            // TODO: Ideally, the address should only be removed if the error can
1863            // be classified as "permanent" but since `err` is currently a borrowed
1864            // trait object without a `'static` bound, even downcasting for inspection
1865            // of the error is not possible (and also not truly desirable or ergonomic).
1866            // The error passed in should rather be a dedicated enum.
1867            if addrs.remove(address).is_ok() {
1868                debug!(
1869                    "Address '{}' removed from peer '{}' due to error.",
1870                    address, peer_id
1871                );
1872            } else {
1873                // Despite apparently having no reachable address (any longer),
1874                // the peer is kept in the routing table with the last address to avoid
1875                // (temporary) loss of network connectivity to "flush" the routing
1876                // table. Once in, a peer is only removed from the routing table
1877                // if it is the least recently connected peer, currently disconnected
1878                // and is unreachable in the context of another peer pending insertion
1879                // into the same bucket. This is handled transparently by the
1880                // `KBucketsTable` and takes effect through `KBucketsTable::take_applied_pending`
1881                // within `Behaviour::poll`.
1882                debug!(
1883                    "Last remaining address '{}' of peer '{}' is unreachable.",
1884                    address, peer_id,
1885                )
1886            }
1887        }
1888
1889        for query in self.queries.iter_mut() {
1890            if let Some(addrs) = query.inner.addresses.get_mut(&peer_id) {
1891                addrs.retain(|a| a != address);
1892            }
1893        }
1894    }
1895
1896    fn on_connection_established(
1897        &mut self,
1898        ConnectionEstablished {
1899            peer_id,
1900            failed_addresses,
1901            other_established,
1902            ..
1903        }: ConnectionEstablished,
1904    ) {
1905        for addr in failed_addresses {
1906            self.address_failed(peer_id, addr);
1907        }
1908
1909        // Peer's first connection.
1910        if other_established == 0 {
1911            self.connected_peers.insert(peer_id);
1912        }
1913    }
1914
1915    fn on_address_change(
1916        &mut self,
1917        AddressChange {
1918            peer_id: peer,
1919            old,
1920            new,
1921            ..
1922        }: AddressChange,
1923    ) {
1924        let (old, new) = (old.get_remote_address(), new.get_remote_address());
1925
1926        // Update routing table.
1927        if let Some(addrs) = self.kbuckets.entry(&kbucket::Key::from(peer)).value() {
1928            if addrs.replace(old, new) {
1929                debug!(
1930                    "Address '{}' replaced with '{}' for peer '{}'.",
1931                    old, new, peer
1932                );
1933            } else {
1934                debug!(
1935                    "Address '{}' not replaced with '{}' for peer '{}' as old address wasn't \
1936                     present.",
1937                    old, new, peer
1938                );
1939            }
1940        } else {
1941            debug!(
1942                "Address '{}' not replaced with '{}' for peer '{}' as peer is not present in the \
1943                 routing table.",
1944                old, new, peer
1945            );
1946        }
1947
1948        // Update query address cache.
1949        //
1950        // Given two connected nodes: local node A and remote node B. Say node B
1951        // is not in node A's routing table. Additionally node B is part of the
1952        // `QueryInner::addresses` list of an ongoing query on node A. Say Node
1953        // B triggers an address change and then disconnects. Later on the
1954        // earlier mentioned query on node A would like to connect to node B.
1955        // Without replacing the address in the `QueryInner::addresses` set node
1956        // A would attempt to dial the old and not the new address.
1957        //
1958        // While upholding correctness, iterating through all discovered
1959        // addresses of a peer in all currently ongoing queries might have a
1960        // large performance impact. If so, the code below might be worth
1961        // revisiting.
1962        for query in self.queries.iter_mut() {
1963            if let Some(addrs) = query.inner.addresses.get_mut(&peer) {
1964                for addr in addrs.iter_mut() {
1965                    if addr == old {
1966                        *addr = new.clone();
1967                    }
1968                }
1969            }
1970        }
1971    }
1972
1973    fn on_dial_failure(&mut self, DialFailure { peer_id, error, .. }: DialFailure) {
1974        let peer_id = match peer_id {
1975            Some(id) => id,
1976            // Not interested in dial failures to unknown peers.
1977            None => return,
1978        };
1979
1980        match error {
1981            DialError::LocalPeerId { .. }
1982            | DialError::WrongPeerId { .. }
1983            | DialError::Aborted
1984            | DialError::Denied { .. }
1985            | DialError::Transport(_)
1986            | DialError::NoAddresses => {
1987                if let DialError::Transport(addresses) = error {
1988                    for (addr, _) in addresses {
1989                        self.address_failed(peer_id, addr)
1990                    }
1991                }
1992
1993                for query in self.queries.iter_mut() {
1994                    query.on_failure(&peer_id);
1995                }
1996            }
1997            DialError::DialPeerConditionFalse(
1998                dial_opts::PeerCondition::Disconnected | dial_opts::PeerCondition::NotDialing,
1999            ) => {
2000                // We might (still) be connected, or about to be connected, thus do not report the
2001                // failure to the queries.
2002            }
2003            DialError::DialPeerConditionFalse(dial_opts::PeerCondition::Always) => {
2004                unreachable!("DialPeerCondition::Always can not trigger DialPeerConditionFalse.");
2005            }
2006        }
2007    }
2008
2009    fn on_connection_closed(
2010        &mut self,
2011        ConnectionClosed {
2012            peer_id,
2013            remaining_established,
2014            connection_id,
2015            ..
2016        }: ConnectionClosed<<Self as NetworkBehaviour>::ConnectionHandler>,
2017    ) {
2018        self.connections.remove(&connection_id);
2019
2020        if remaining_established == 0 {
2021            for query in self.queries.iter_mut() {
2022                query.on_failure(&peer_id);
2023            }
2024            self.connection_updated(peer_id, None, NodeStatus::Disconnected);
2025            self.connected_peers.remove(&peer_id);
2026        }
2027    }
2028
2029    /// Preloads a new [`Handler`] with requests that are waiting to be sent to the newly connected peer.
2030    fn preload_new_handler(
2031        &mut self,
2032        handler: &mut Handler,
2033        connection_id: ConnectionId,
2034        peer: PeerId,
2035    ) {
2036        self.connections.insert(connection_id, peer);
2037        // Queue events for sending pending RPCs to the connected peer.
2038        // There can be only one pending RPC for a particular peer and query per definition.
2039        for (_peer_id, event) in self.queries.iter_mut().filter_map(|q| {
2040            q.inner
2041                .pending_rpcs
2042                .iter()
2043                .position(|(p, _)| p == &peer)
2044                .map(|p| q.inner.pending_rpcs.remove(p))
2045        }) {
2046            handler.on_behaviour_event(event)
2047        }
2048    }
2049}
2050
2051/// Exponentially decrease the given duration (base 2).
2052fn exp_decrease(ttl: Duration, exp: u32) -> Duration {
2053    Duration::from_secs(ttl.as_secs().checked_shr(exp).unwrap_or(0))
2054}
2055
2056impl<TStore> NetworkBehaviour for Behaviour<TStore>
2057where
2058    TStore: RecordStore + Send + 'static,
2059{
2060    type ConnectionHandler = Handler;
2061    type ToSwarm = Event;
2062
2063    fn handle_established_inbound_connection(
2064        &mut self,
2065        connection_id: ConnectionId,
2066        peer: PeerId,
2067        local_addr: &Multiaddr,
2068        remote_addr: &Multiaddr,
2069    ) -> Result<THandler<Self>, ConnectionDenied> {
2070        let connected_point = ConnectedPoint::Listener {
2071            local_addr: local_addr.clone(),
2072            send_back_addr: remote_addr.clone(),
2073        };
2074        let mut handler = Handler::new(
2075            self.protocol_config.clone(),
2076            self.connection_idle_timeout,
2077            connected_point,
2078            peer,
2079            self.mode,
2080            connection_id,
2081        );
2082        self.preload_new_handler(&mut handler, connection_id, peer);
2083
2084        Ok(handler)
2085    }
2086
2087    fn handle_established_outbound_connection(
2088        &mut self,
2089        connection_id: ConnectionId,
2090        peer: PeerId,
2091        addr: &Multiaddr,
2092        role_override: Endpoint,
2093    ) -> Result<THandler<Self>, ConnectionDenied> {
2094        let connected_point = ConnectedPoint::Dialer {
2095            address: addr.clone(),
2096            role_override,
2097        };
2098        let mut handler = Handler::new(
2099            self.protocol_config.clone(),
2100            self.connection_idle_timeout,
2101            connected_point,
2102            peer,
2103            self.mode,
2104            connection_id,
2105        );
2106        self.preload_new_handler(&mut handler, connection_id, peer);
2107
2108        Ok(handler)
2109    }
2110
2111    fn handle_pending_outbound_connection(
2112        &mut self,
2113        _connection_id: ConnectionId,
2114        maybe_peer: Option<PeerId>,
2115        _addresses: &[Multiaddr],
2116        _effective_role: Endpoint,
2117    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
2118        let peer_id = match maybe_peer {
2119            None => return Ok(vec![]),
2120            Some(peer) => peer,
2121        };
2122
2123        // We should order addresses from decreasing likelyhood of connectivity, so start with
2124        // the addresses of that peer in the k-buckets.
2125        let key = kbucket::Key::from(peer_id);
2126        let mut peer_addrs =
2127            if let kbucket::Entry::Present(mut entry, _) = self.kbuckets.entry(&key) {
2128                let addrs = entry.value().iter().cloned().collect::<Vec<_>>();
2129                debug_assert!(!addrs.is_empty(), "Empty peer addresses in routing table.");
2130                addrs
2131            } else {
2132                Vec::new()
2133            };
2134
2135        // We add to that a temporary list of addresses from the ongoing queries.
2136        for query in self.queries.iter() {
2137            if let Some(addrs) = query.inner.addresses.get(&peer_id) {
2138                peer_addrs.extend(addrs.iter().cloned())
2139            }
2140        }
2141
2142        Ok(peer_addrs)
2143    }
2144
2145    fn on_connection_handler_event(
2146        &mut self,
2147        source: PeerId,
2148        connection: ConnectionId,
2149        event: THandlerOutEvent<Self>,
2150    ) {
2151        match event {
2152            HandlerEvent::ProtocolConfirmed { endpoint } => {
2153                debug_assert!(self.connected_peers.contains(&source));
2154                // The remote's address can only be put into the routing table,
2155                // and thus shared with other nodes, if the local node is the dialer,
2156                // since the remote address on an inbound connection may be specific
2157                // to that connection (e.g. typically the TCP port numbers).
2158                let address = match endpoint {
2159                    ConnectedPoint::Dialer { address, .. } => Some(address),
2160                    ConnectedPoint::Listener { .. } => None,
2161                };
2162
2163                self.connection_updated(source, address, NodeStatus::Connected);
2164            }
2165
2166            HandlerEvent::ProtocolNotSupported { endpoint } => {
2167                let address = match endpoint {
2168                    ConnectedPoint::Dialer { address, .. } => Some(address),
2169                    ConnectedPoint::Listener { .. } => None,
2170                };
2171                self.connection_updated(source, address, NodeStatus::Disconnected);
2172            }
2173
2174            HandlerEvent::FindNodeReq { key, request_id } => {
2175                let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
2176
2177                self.queued_events
2178                    .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2179                        request: InboundRequest::FindNode {
2180                            num_closer_peers: closer_peers.len(),
2181                        },
2182                    }));
2183
2184                self.queued_events.push_back(ToSwarm::NotifyHandler {
2185                    peer_id: source,
2186                    handler: NotifyHandler::One(connection),
2187                    event: HandlerIn::FindNodeRes {
2188                        closer_peers,
2189                        request_id,
2190                    },
2191                });
2192            }
2193
2194            HandlerEvent::FindNodeRes {
2195                closer_peers,
2196                query_id,
2197            } => {
2198                self.discovered(&query_id, &source, closer_peers.iter());
2199            }
2200
2201            HandlerEvent::GetProvidersReq { key, request_id } => {
2202                let provider_peers = self.provider_peers(&key, &source);
2203                let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
2204
2205                self.queued_events
2206                    .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2207                        request: InboundRequest::GetProvider {
2208                            num_closer_peers: closer_peers.len(),
2209                            num_provider_peers: provider_peers.len(),
2210                        },
2211                    }));
2212
2213                self.queued_events.push_back(ToSwarm::NotifyHandler {
2214                    peer_id: source,
2215                    handler: NotifyHandler::One(connection),
2216                    event: HandlerIn::GetProvidersRes {
2217                        closer_peers,
2218                        provider_peers,
2219                        request_id,
2220                    },
2221                });
2222            }
2223
2224            HandlerEvent::GetProvidersRes {
2225                closer_peers,
2226                provider_peers,
2227                query_id,
2228            } => {
2229                let peers = closer_peers.iter().chain(provider_peers.iter());
2230                self.discovered(&query_id, &source, peers);
2231                if let Some(query) = self.queries.get_mut(&query_id) {
2232                    let stats = query.stats().clone();
2233                    if let QueryInfo::GetProviders {
2234                        ref key,
2235                        ref mut providers_found,
2236                        ref mut step,
2237                        ..
2238                    } = query.inner.info
2239                    {
2240                        *providers_found += provider_peers.len();
2241                        let providers = provider_peers.iter().map(|p| p.node_id).collect();
2242
2243                        self.queued_events.push_back(ToSwarm::GenerateEvent(
2244                            Event::OutboundQueryProgressed {
2245                                id: query_id,
2246                                result: QueryResult::GetProviders(Ok(
2247                                    GetProvidersOk::FoundProviders {
2248                                        key: key.clone(),
2249                                        providers,
2250                                    },
2251                                )),
2252                                step: step.clone(),
2253                                stats,
2254                            },
2255                        ));
2256                        *step = step.next();
2257                    }
2258                }
2259            }
2260
2261            HandlerEvent::QueryError { query_id, error } => {
2262                log::debug!(
2263                    "Request to {:?} in query {:?} failed with {:?}",
2264                    source,
2265                    query_id,
2266                    error
2267                );
2268                // If the query to which the error relates is still active,
2269                // signal the failure w.r.t. `source`.
2270                if let Some(query) = self.queries.get_mut(&query_id) {
2271                    query.on_failure(&source)
2272                }
2273            }
2274
2275            HandlerEvent::AddProvider { key, provider } => {
2276                // Only accept a provider record from a legitimate peer.
2277                if provider.node_id != source {
2278                    return;
2279                }
2280
2281                self.provider_received(key, provider);
2282            }
2283
2284            HandlerEvent::GetRecord { key, request_id } => {
2285                // Lookup the record locally.
2286                let record = match self.store.get(&key) {
2287                    Some(record) => {
2288                        if record.is_expired(Instant::now()) {
2289                            self.store.remove(&key);
2290                            None
2291                        } else {
2292                            Some(record.into_owned())
2293                        }
2294                    }
2295                    None => None,
2296                };
2297
2298                let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
2299
2300                self.queued_events
2301                    .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2302                        request: InboundRequest::GetRecord {
2303                            num_closer_peers: closer_peers.len(),
2304                            present_locally: record.is_some(),
2305                        },
2306                    }));
2307
2308                self.queued_events.push_back(ToSwarm::NotifyHandler {
2309                    peer_id: source,
2310                    handler: NotifyHandler::One(connection),
2311                    event: HandlerIn::GetRecordRes {
2312                        record,
2313                        closer_peers,
2314                        request_id,
2315                    },
2316                });
2317            }
2318
2319            HandlerEvent::GetRecordRes {
2320                record,
2321                closer_peers,
2322                query_id,
2323            } => {
2324                if let Some(query) = self.queries.get_mut(&query_id) {
2325                    let stats = query.stats().clone();
2326                    if let QueryInfo::GetRecord {
2327                        key,
2328                        ref mut step,
2329                        ref mut found_a_record,
2330                        cache_candidates,
2331                    } = &mut query.inner.info
2332                    {
2333                        if let Some(record) = record {
2334                            *found_a_record = true;
2335                            let record = PeerRecord {
2336                                peer: Some(source),
2337                                record,
2338                            };
2339
2340                            self.queued_events.push_back(ToSwarm::GenerateEvent(
2341                                Event::OutboundQueryProgressed {
2342                                    id: query_id,
2343                                    result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(
2344                                        record,
2345                                    ))),
2346                                    step: step.clone(),
2347                                    stats,
2348                                },
2349                            ));
2350
2351                            *step = step.next();
2352                        } else {
2353                            log::trace!("Record with key {:?} not found at {}", key, source);
2354                            if let Caching::Enabled { max_peers } = self.caching {
2355                                let source_key = kbucket::Key::from(source);
2356                                let target_key = kbucket::Key::from(key.clone());
2357                                let distance = source_key.distance(&target_key);
2358                                cache_candidates.insert(distance, source);
2359                                if cache_candidates.len() > max_peers as usize {
2360                                    // TODO: `pop_last()` would be nice once stabilised.
2361                                    // See https://github.com/rust-lang/rust/issues/62924.
2362                                    let last =
2363                                        *cache_candidates.keys().next_back().expect("len > 0");
2364                                    cache_candidates.remove(&last);
2365                                }
2366                            }
2367                        }
2368                    }
2369                }
2370
2371                self.discovered(&query_id, &source, closer_peers.iter());
2372            }
2373
2374            HandlerEvent::PutRecord { record, request_id } => {
2375                self.record_received(source, connection, request_id, record);
2376            }
2377
2378            HandlerEvent::PutRecordRes { query_id, .. } => {
2379                if let Some(query) = self.queries.get_mut(&query_id) {
2380                    query.on_success(&source, vec![]);
2381                    if let QueryInfo::PutRecord {
2382                        phase: PutRecordPhase::PutRecord { success, .. },
2383                        quorum,
2384                        ..
2385                    } = &mut query.inner.info
2386                    {
2387                        success.push(source);
2388
2389                        let quorum = quorum.get();
2390                        if success.len() >= quorum {
2391                            let peers = success.clone();
2392                            let finished = query.try_finish(peers.iter());
2393                            if !finished {
2394                                debug!(
2395                                    "PutRecord query ({:?}) reached quorum ({}/{}) with response \
2396                                     from peer {} but could not yet finish.",
2397                                    query_id,
2398                                    peers.len(),
2399                                    quorum,
2400                                    source,
2401                                );
2402                            }
2403                        }
2404                    }
2405                }
2406            }
2407        };
2408    }
2409
2410    fn poll(
2411        &mut self,
2412        cx: &mut Context<'_>,
2413        _: &mut impl PollParameters,
2414    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
2415        let now = Instant::now();
2416
2417        // Calculate the available capacity for queries triggered by background jobs.
2418        let mut jobs_query_capacity = JOBS_MAX_QUERIES.saturating_sub(self.queries.size());
2419
2420        // Run the periodic provider announcement job.
2421        if let Some(mut job) = self.add_provider_job.take() {
2422            let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2423            for _ in 0..num {
2424                if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2425                    self.start_add_provider(r.key, AddProviderContext::Republish)
2426                } else {
2427                    break;
2428                }
2429            }
2430            jobs_query_capacity -= num;
2431            self.add_provider_job = Some(job);
2432        }
2433
2434        // Run the periodic record replication / publication job.
2435        if let Some(mut job) = self.put_record_job.take() {
2436            let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2437            for _ in 0..num {
2438                if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2439                    let context =
2440                        if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
2441                            PutRecordContext::Republish
2442                        } else {
2443                            PutRecordContext::Replicate
2444                        };
2445                    self.start_put_record(r, Quorum::All, context)
2446                } else {
2447                    break;
2448                }
2449            }
2450            self.put_record_job = Some(job);
2451        }
2452
2453        loop {
2454            // Drain queued events first.
2455            if let Some(event) = self.queued_events.pop_front() {
2456                return Poll::Ready(event);
2457            }
2458
2459            // Drain applied pending entries from the routing table.
2460            if let Some(entry) = self.kbuckets.take_applied_pending() {
2461                let kbucket::Node { key, value } = entry.inserted;
2462                let event = Event::RoutingUpdated {
2463                    bucket_range: self
2464                        .kbuckets
2465                        .bucket(&key)
2466                        .map(|b| b.range())
2467                        .expect("Self to never be applied from pending."),
2468                    peer: key.into_preimage(),
2469                    is_new_peer: true,
2470                    addresses: value,
2471                    old_peer: entry.evicted.map(|n| n.key.into_preimage()),
2472                };
2473                return Poll::Ready(ToSwarm::GenerateEvent(event));
2474            }
2475
2476            // Look for a finished query.
2477            loop {
2478                match self.queries.poll(now) {
2479                    QueryPoolState::Finished(q) => {
2480                        if let Some(event) = self.query_finished(q) {
2481                            return Poll::Ready(ToSwarm::GenerateEvent(event));
2482                        }
2483                    }
2484                    QueryPoolState::Timeout(q) => {
2485                        if let Some(event) = self.query_timeout(q) {
2486                            return Poll::Ready(ToSwarm::GenerateEvent(event));
2487                        }
2488                    }
2489                    QueryPoolState::Waiting(Some((query, peer_id))) => {
2490                        let event = query.inner.info.to_request(query.id());
2491                        // TODO: AddProvider requests yield no response, so the query completes
2492                        // as soon as all requests have been sent. However, the handler should
2493                        // better emit an event when the request has been sent (and report
2494                        // an error if sending fails), instead of immediately reporting
2495                        // "success" somewhat prematurely here.
2496                        if let QueryInfo::AddProvider {
2497                            phase: AddProviderPhase::AddProvider { .. },
2498                            ..
2499                        } = &query.inner.info
2500                        {
2501                            query.on_success(&peer_id, vec![])
2502                        }
2503
2504                        if self.connected_peers.contains(&peer_id) {
2505                            self.queued_events.push_back(ToSwarm::NotifyHandler {
2506                                peer_id,
2507                                event,
2508                                handler: NotifyHandler::Any,
2509                            });
2510                        } else if &peer_id != self.kbuckets.local_key().preimage() {
2511                            query.inner.pending_rpcs.push((peer_id, event));
2512                            self.queued_events.push_back(ToSwarm::Dial {
2513                                opts: DialOpts::peer_id(peer_id)
2514                                    .condition(dial_opts::PeerCondition::NotDialing)
2515                                    .build(),
2516                            });
2517                        }
2518                    }
2519                    QueryPoolState::Waiting(None) | QueryPoolState::Idle => break,
2520                }
2521            }
2522
2523            // No immediate event was produced as a result of a finished query.
2524            // If no new events have been queued either, signal `NotReady` to
2525            // be polled again later.
2526            if self.queued_events.is_empty() {
2527                self.no_events_waker = Some(cx.waker().clone());
2528
2529                return Poll::Pending;
2530            }
2531        }
2532    }
2533
2534    fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
2535        self.listen_addresses.on_swarm_event(&event);
2536        let external_addresses_changed = self.external_addresses.on_swarm_event(&event);
2537
2538        if self.auto_mode && external_addresses_changed {
2539            self.determine_mode_from_external_addresses();
2540        }
2541
2542        match event {
2543            FromSwarm::ConnectionEstablished(connection_established) => {
2544                self.on_connection_established(connection_established)
2545            }
2546            FromSwarm::ConnectionClosed(connection_closed) => {
2547                self.on_connection_closed(connection_closed)
2548            }
2549            FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure),
2550            FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
2551            FromSwarm::ExpiredListenAddr(_)
2552            | FromSwarm::NewExternalAddrCandidate(_)
2553            | FromSwarm::NewListenAddr(_)
2554            | FromSwarm::ListenFailure(_)
2555            | FromSwarm::NewListener(_)
2556            | FromSwarm::ListenerClosed(_)
2557            | FromSwarm::ListenerError(_)
2558            | FromSwarm::ExternalAddrExpired(_)
2559            | FromSwarm::ExternalAddrConfirmed(_) => {}
2560        }
2561    }
2562}
2563
2564/// A quorum w.r.t. the configured replication factor specifies the minimum
2565/// number of distinct nodes that must be successfully contacted in order
2566/// for a query to succeed.
2567#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2568pub enum Quorum {
2569    One,
2570    Majority,
2571    All,
2572    N(NonZeroUsize),
2573}
2574
2575impl Quorum {
2576    /// Evaluate the quorum w.r.t a given total (number of peers).
2577    fn eval(&self, total: NonZeroUsize) -> NonZeroUsize {
2578        match self {
2579            Quorum::One => NonZeroUsize::new(1).expect("1 != 0"),
2580            Quorum::Majority => NonZeroUsize::new(total.get() / 2 + 1).expect("n + 1 != 0"),
2581            Quorum::All => total,
2582            Quorum::N(n) => NonZeroUsize::min(total, *n),
2583        }
2584    }
2585}
2586
2587/// A record either received by the given peer or retrieved from the local
2588/// record store.
2589#[derive(Debug, Clone, PartialEq, Eq)]
2590pub struct PeerRecord {
2591    /// The peer from whom the record was received. `None` if the record was
2592    /// retrieved from local storage.
2593    pub peer: Option<PeerId>,
2594    pub record: Record,
2595}
2596
2597//////////////////////////////////////////////////////////////////////////////
2598// Events
2599
2600/// The events produced by the `Kademlia` behaviour.
2601///
2602/// See [`NetworkBehaviour::poll`].
2603#[derive(Debug, Clone)]
2604#[allow(clippy::large_enum_variant)]
2605pub enum Event {
2606    /// An inbound request has been received and handled.
2607    //
2608    // Note on the difference between 'request' and 'query': A request is a
2609    // single request-response style exchange with a single remote peer. A query
2610    // is made of multiple requests across multiple remote peers.
2611    InboundRequest { request: InboundRequest },
2612
2613    /// An outbound query has made progress.
2614    OutboundQueryProgressed {
2615        /// The ID of the query that finished.
2616        id: QueryId,
2617        /// The intermediate result of the query.
2618        result: QueryResult,
2619        /// Execution statistics from the query.
2620        stats: QueryStats,
2621        /// Indicates which event this is, if therer are multiple responses for a single query.
2622        step: ProgressStep,
2623    },
2624
2625    /// The routing table has been updated with a new peer and / or
2626    /// address, thereby possibly evicting another peer.
2627    RoutingUpdated {
2628        /// The ID of the peer that was added or updated.
2629        peer: PeerId,
2630        /// Whether this is a new peer and was thus just added to the routing
2631        /// table, or whether it is an existing peer who's addresses changed.
2632        is_new_peer: bool,
2633        /// The full list of known addresses of `peer`.
2634        addresses: Addresses,
2635        /// Returns the minimum inclusive and maximum inclusive distance for
2636        /// the bucket of the peer.
2637        bucket_range: (Distance, Distance),
2638        /// The ID of the peer that was evicted from the routing table to make
2639        /// room for the new peer, if any.
2640        old_peer: Option<PeerId>,
2641    },
2642
2643    /// A peer has connected for whom no listen address is known.
2644    ///
2645    /// If the peer is to be added to the routing table, a known
2646    /// listen address for the peer must be provided via [`Behaviour::add_address`].
2647    UnroutablePeer { peer: PeerId },
2648
2649    /// A connection to a peer has been established for whom a listen address
2650    /// is known but the peer has not been added to the routing table either
2651    /// because [`BucketInserts::Manual`] is configured or because
2652    /// the corresponding bucket is full.
2653    ///
2654    /// If the peer is to be included in the routing table, it must
2655    /// must be explicitly added via [`Behaviour::add_address`], possibly after
2656    /// removing another peer.
2657    ///
2658    /// See [`Behaviour::kbucket`] for insight into the contents of
2659    /// the k-bucket of `peer`.
2660    RoutablePeer { peer: PeerId, address: Multiaddr },
2661
2662    /// A connection to a peer has been established for whom a listen address
2663    /// is known but the peer is only pending insertion into the routing table
2664    /// if the least-recently disconnected peer is unresponsive, i.e. the peer
2665    /// may not make it into the routing table.
2666    ///
2667    /// If the peer is to be unconditionally included in the routing table,
2668    /// it should be explicitly added via [`Behaviour::add_address`] after
2669    /// removing another peer.
2670    ///
2671    /// See [`Behaviour::kbucket`] for insight into the contents of
2672    /// the k-bucket of `peer`.
2673    PendingRoutablePeer { peer: PeerId, address: Multiaddr },
2674}
2675
2676/// Information about progress events.
2677#[derive(Debug, Clone)]
2678pub struct ProgressStep {
2679    /// The index into the event
2680    pub count: NonZeroUsize,
2681    /// Is this the final event?
2682    pub last: bool,
2683}
2684
2685impl ProgressStep {
2686    fn first() -> Self {
2687        Self {
2688            count: NonZeroUsize::new(1).expect("1 to be greater than 0."),
2689            last: false,
2690        }
2691    }
2692
2693    fn first_and_last() -> Self {
2694        let mut first = ProgressStep::first();
2695        first.last = true;
2696        first
2697    }
2698
2699    fn next(&self) -> Self {
2700        assert!(!self.last);
2701        let count = NonZeroUsize::new(self.count.get() + 1).expect("Adding 1 not to result in 0.");
2702
2703        Self { count, last: false }
2704    }
2705}
2706
2707/// Information about a received and handled inbound request.
2708#[derive(Debug, Clone)]
2709pub enum InboundRequest {
2710    /// Request for the list of nodes whose IDs are the closest to `key`.
2711    FindNode { num_closer_peers: usize },
2712    /// Same as `FindNode`, but should also return the entries of the local
2713    /// providers list for this key.
2714    GetProvider {
2715        num_closer_peers: usize,
2716        num_provider_peers: usize,
2717    },
2718    /// A peer sent an add provider request.
2719    /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`ProviderRecord`] is
2720    /// included.
2721    ///
2722    /// See [`StoreInserts`] and [`Config::set_record_filtering`] for details..
2723    AddProvider { record: Option<ProviderRecord> },
2724    /// Request to retrieve a record.
2725    GetRecord {
2726        num_closer_peers: usize,
2727        present_locally: bool,
2728    },
2729    /// A peer sent a put record request.
2730    /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`Record`] is included.
2731    ///
2732    /// See [`StoreInserts`] and [`Config::set_record_filtering`].
2733    PutRecord {
2734        source: PeerId,
2735        connection: ConnectionId,
2736        record: Option<Record>,
2737    },
2738}
2739
2740/// The results of Kademlia queries.
2741#[derive(Debug, Clone)]
2742pub enum QueryResult {
2743    /// The result of [`Behaviour::bootstrap`].
2744    Bootstrap(BootstrapResult),
2745
2746    /// The result of [`Behaviour::get_closest_peers`].
2747    GetClosestPeers(GetClosestPeersResult),
2748
2749    /// The result of [`Behaviour::get_providers`].
2750    GetProviders(GetProvidersResult),
2751
2752    /// The result of [`Behaviour::start_providing`].
2753    StartProviding(AddProviderResult),
2754
2755    /// The result of a (automatic) republishing of a provider record.
2756    RepublishProvider(AddProviderResult),
2757
2758    /// The result of [`Behaviour::get_record`].
2759    GetRecord(GetRecordResult),
2760
2761    /// The result of [`Behaviour::put_record`].
2762    PutRecord(PutRecordResult),
2763
2764    /// The result of a (automatic) republishing of a (value-)record.
2765    RepublishRecord(PutRecordResult),
2766}
2767
2768/// The result of [`Behaviour::get_record`].
2769pub type GetRecordResult = Result<GetRecordOk, GetRecordError>;
2770
2771/// The successful result of [`Behaviour::get_record`].
2772#[derive(Debug, Clone)]
2773pub enum GetRecordOk {
2774    FoundRecord(PeerRecord),
2775    FinishedWithNoAdditionalRecord {
2776        /// If caching is enabled, these are the peers closest
2777        /// _to the record key_ (not the local node) that were queried but
2778        /// did not return the record, sorted by distance to the record key
2779        /// from closest to farthest. How many of these are tracked is configured
2780        /// by [`Config::set_caching`].
2781        ///
2782        /// Writing back the cache at these peers is a manual operation.
2783        /// ie. you may wish to use these candidates with [`Behaviour::put_record_to`]
2784        /// after selecting one of the returned records.
2785        cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
2786    },
2787}
2788
2789/// The error result of [`Behaviour::get_record`].
2790#[derive(Debug, Clone, Error)]
2791pub enum GetRecordError {
2792    #[error("the record was not found")]
2793    NotFound {
2794        key: record_priv::Key,
2795        closest_peers: Vec<PeerId>,
2796    },
2797    #[error("the quorum failed; needed {quorum} peers")]
2798    QuorumFailed {
2799        key: record_priv::Key,
2800        records: Vec<PeerRecord>,
2801        quorum: NonZeroUsize,
2802    },
2803    #[error("the request timed out")]
2804    Timeout { key: record_priv::Key },
2805}
2806
2807impl GetRecordError {
2808    /// Gets the key of the record for which the operation failed.
2809    pub fn key(&self) -> &record_priv::Key {
2810        match self {
2811            GetRecordError::QuorumFailed { key, .. } => key,
2812            GetRecordError::Timeout { key, .. } => key,
2813            GetRecordError::NotFound { key, .. } => key,
2814        }
2815    }
2816
2817    /// Extracts the key of the record for which the operation failed,
2818    /// consuming the error.
2819    pub fn into_key(self) -> record_priv::Key {
2820        match self {
2821            GetRecordError::QuorumFailed { key, .. } => key,
2822            GetRecordError::Timeout { key, .. } => key,
2823            GetRecordError::NotFound { key, .. } => key,
2824        }
2825    }
2826}
2827
2828/// The result of [`Behaviour::put_record`].
2829pub type PutRecordResult = Result<PutRecordOk, PutRecordError>;
2830
2831/// The successful result of [`Behaviour::put_record`].
2832#[derive(Debug, Clone)]
2833pub struct PutRecordOk {
2834    pub key: record_priv::Key,
2835}
2836
2837/// The error result of [`Behaviour::put_record`].
2838#[derive(Debug, Clone, Error)]
2839pub enum PutRecordError {
2840    #[error("the quorum failed; needed {quorum} peers")]
2841    QuorumFailed {
2842        key: record_priv::Key,
2843        /// [`PeerId`]s of the peers the record was successfully stored on.
2844        success: Vec<PeerId>,
2845        quorum: NonZeroUsize,
2846    },
2847    #[error("the request timed out")]
2848    Timeout {
2849        key: record_priv::Key,
2850        /// [`PeerId`]s of the peers the record was successfully stored on.
2851        success: Vec<PeerId>,
2852        quorum: NonZeroUsize,
2853    },
2854}
2855
2856impl PutRecordError {
2857    /// Gets the key of the record for which the operation failed.
2858    pub fn key(&self) -> &record_priv::Key {
2859        match self {
2860            PutRecordError::QuorumFailed { key, .. } => key,
2861            PutRecordError::Timeout { key, .. } => key,
2862        }
2863    }
2864
2865    /// Extracts the key of the record for which the operation failed,
2866    /// consuming the error.
2867    pub fn into_key(self) -> record_priv::Key {
2868        match self {
2869            PutRecordError::QuorumFailed { key, .. } => key,
2870            PutRecordError::Timeout { key, .. } => key,
2871        }
2872    }
2873}
2874
2875/// The result of [`Behaviour::bootstrap`].
2876pub type BootstrapResult = Result<BootstrapOk, BootstrapError>;
2877
2878/// The successful result of [`Behaviour::bootstrap`].
2879#[derive(Debug, Clone)]
2880pub struct BootstrapOk {
2881    pub peer: PeerId,
2882    pub num_remaining: u32,
2883}
2884
2885/// The error result of [`Behaviour::bootstrap`].
2886#[derive(Debug, Clone, Error)]
2887pub enum BootstrapError {
2888    #[error("the request timed out")]
2889    Timeout {
2890        peer: PeerId,
2891        num_remaining: Option<u32>,
2892    },
2893}
2894
2895/// The result of [`Behaviour::get_closest_peers`].
2896pub type GetClosestPeersResult = Result<GetClosestPeersOk, GetClosestPeersError>;
2897
2898/// The successful result of [`Behaviour::get_closest_peers`].
2899#[derive(Debug, Clone)]
2900pub struct GetClosestPeersOk {
2901    pub key: Vec<u8>,
2902    pub peers: Vec<PeerId>,
2903}
2904
2905/// The error result of [`Behaviour::get_closest_peers`].
2906#[derive(Debug, Clone, Error)]
2907pub enum GetClosestPeersError {
2908    #[error("the request timed out")]
2909    Timeout { key: Vec<u8>, peers: Vec<PeerId> },
2910}
2911
2912impl GetClosestPeersError {
2913    /// Gets the key for which the operation failed.
2914    pub fn key(&self) -> &Vec<u8> {
2915        match self {
2916            GetClosestPeersError::Timeout { key, .. } => key,
2917        }
2918    }
2919
2920    /// Extracts the key for which the operation failed,
2921    /// consuming the error.
2922    pub fn into_key(self) -> Vec<u8> {
2923        match self {
2924            GetClosestPeersError::Timeout { key, .. } => key,
2925        }
2926    }
2927}
2928
2929/// The result of [`Behaviour::get_providers`].
2930pub type GetProvidersResult = Result<GetProvidersOk, GetProvidersError>;
2931
2932/// The successful result of [`Behaviour::get_providers`].
2933#[derive(Debug, Clone)]
2934pub enum GetProvidersOk {
2935    FoundProviders {
2936        key: record_priv::Key,
2937        /// The new set of providers discovered.
2938        providers: HashSet<PeerId>,
2939    },
2940    FinishedWithNoAdditionalRecord {
2941        closest_peers: Vec<PeerId>,
2942    },
2943}
2944
2945/// The error result of [`Behaviour::get_providers`].
2946#[derive(Debug, Clone, Error)]
2947pub enum GetProvidersError {
2948    #[error("the request timed out")]
2949    Timeout {
2950        key: record_priv::Key,
2951        closest_peers: Vec<PeerId>,
2952    },
2953}
2954
2955impl GetProvidersError {
2956    /// Gets the key for which the operation failed.
2957    pub fn key(&self) -> &record_priv::Key {
2958        match self {
2959            GetProvidersError::Timeout { key, .. } => key,
2960        }
2961    }
2962
2963    /// Extracts the key for which the operation failed,
2964    /// consuming the error.
2965    pub fn into_key(self) -> record_priv::Key {
2966        match self {
2967            GetProvidersError::Timeout { key, .. } => key,
2968        }
2969    }
2970}
2971
2972/// The result of publishing a provider record.
2973pub type AddProviderResult = Result<AddProviderOk, AddProviderError>;
2974
2975/// The successful result of publishing a provider record.
2976#[derive(Debug, Clone)]
2977pub struct AddProviderOk {
2978    pub key: record_priv::Key,
2979}
2980
2981/// The possible errors when publishing a provider record.
2982#[derive(Debug, Clone, Error)]
2983pub enum AddProviderError {
2984    #[error("the request timed out")]
2985    Timeout { key: record_priv::Key },
2986}
2987
2988impl AddProviderError {
2989    /// Gets the key for which the operation failed.
2990    pub fn key(&self) -> &record_priv::Key {
2991        match self {
2992            AddProviderError::Timeout { key, .. } => key,
2993        }
2994    }
2995
2996    /// Extracts the key for which the operation failed,
2997    pub fn into_key(self) -> record_priv::Key {
2998        match self {
2999            AddProviderError::Timeout { key, .. } => key,
3000        }
3001    }
3002}
3003
3004impl From<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> for KadPeer {
3005    fn from(e: kbucket::EntryView<kbucket::Key<PeerId>, Addresses>) -> KadPeer {
3006        KadPeer {
3007            node_id: e.node.key.into_preimage(),
3008            multiaddrs: e.node.value.into_vec(),
3009            connection_ty: match e.status {
3010                NodeStatus::Connected => ConnectionType::Connected,
3011                NodeStatus::Disconnected => ConnectionType::NotConnected,
3012            },
3013        }
3014    }
3015}
3016
3017//////////////////////////////////////////////////////////////////////////////
3018// Internal query state
3019
3020struct QueryInner {
3021    /// The query-specific state.
3022    info: QueryInfo,
3023    /// Addresses of peers discovered during a query.
3024    addresses: FnvHashMap<PeerId, SmallVec<[Multiaddr; 8]>>,
3025    /// A map of pending requests to peers.
3026    ///
3027    /// A request is pending if the targeted peer is not currently connected
3028    /// and these requests are sent as soon as a connection to the peer is established.
3029    pending_rpcs: SmallVec<[(PeerId, HandlerIn); K_VALUE.get()]>,
3030}
3031
3032impl QueryInner {
3033    fn new(info: QueryInfo) -> Self {
3034        QueryInner {
3035            info,
3036            addresses: Default::default(),
3037            pending_rpcs: SmallVec::default(),
3038        }
3039    }
3040}
3041
3042/// The context of a [`QueryInfo::AddProvider`] query.
3043#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3044pub enum AddProviderContext {
3045    /// The context is a [`Behaviour::start_providing`] operation.
3046    Publish,
3047    /// The context is periodic republishing of provider announcements
3048    /// initiated earlier via [`Behaviour::start_providing`].
3049    Republish,
3050}
3051
3052/// The context of a [`QueryInfo::PutRecord`] query.
3053#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3054pub enum PutRecordContext {
3055    /// The context is a [`Behaviour::put_record`] operation.
3056    Publish,
3057    /// The context is periodic republishing of records stored
3058    /// earlier via [`Behaviour::put_record`].
3059    Republish,
3060    /// The context is periodic replication (i.e. without extending
3061    /// the record TTL) of stored records received earlier from another peer.
3062    Replicate,
3063    /// The context is a custom store operation targeting specific
3064    /// peers initiated by [`Behaviour::put_record_to`].
3065    Custom,
3066}
3067
3068/// Information about a running query.
3069#[derive(Debug, Clone)]
3070pub enum QueryInfo {
3071    /// A query initiated by [`Behaviour::bootstrap`].
3072    Bootstrap {
3073        /// The targeted peer ID.
3074        peer: PeerId,
3075        /// The remaining random peer IDs to query, one per
3076        /// bucket that still needs refreshing.
3077        ///
3078        /// This is `None` if the initial self-lookup has not
3079        /// yet completed and `Some` with an exhausted iterator
3080        /// if bootstrapping is complete.
3081        remaining: Option<vec::IntoIter<kbucket::Key<PeerId>>>,
3082        step: ProgressStep,
3083    },
3084
3085    /// A (repeated) query initiated by [`Behaviour::get_closest_peers`].
3086    GetClosestPeers {
3087        /// The key being queried (the preimage).
3088        key: Vec<u8>,
3089        /// Current index of events.
3090        step: ProgressStep,
3091    },
3092
3093    /// A (repeated) query initiated by [`Behaviour::get_providers`].
3094    GetProviders {
3095        /// The key for which to search for providers.
3096        key: record_priv::Key,
3097        /// The number of providers found so far.
3098        providers_found: usize,
3099        /// Current index of events.
3100        step: ProgressStep,
3101    },
3102
3103    /// A (repeated) query initiated by [`Behaviour::start_providing`].
3104    AddProvider {
3105        /// The record key.
3106        key: record_priv::Key,
3107        /// The current phase of the query.
3108        phase: AddProviderPhase,
3109        /// The execution context of the query.
3110        context: AddProviderContext,
3111    },
3112
3113    /// A (repeated) query initiated by [`Behaviour::put_record`].
3114    PutRecord {
3115        record: Record,
3116        /// The expected quorum of responses w.r.t. the replication factor.
3117        quorum: NonZeroUsize,
3118        /// The current phase of the query.
3119        phase: PutRecordPhase,
3120        /// The execution context of the query.
3121        context: PutRecordContext,
3122    },
3123
3124    /// A (repeated) query initiated by [`Behaviour::get_record`].
3125    GetRecord {
3126        /// The key to look for.
3127        key: record_priv::Key,
3128        /// Current index of events.
3129        step: ProgressStep,
3130        /// Did we find at least one record?
3131        found_a_record: bool,
3132        /// The peers closest to the `key` that were queried but did not return a record,
3133        /// i.e. the peers that are candidates for caching the record.
3134        cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
3135    },
3136}
3137
3138impl QueryInfo {
3139    /// Creates an event for a handler to issue an outgoing request in the
3140    /// context of a query.
3141    fn to_request(&self, query_id: QueryId) -> HandlerIn {
3142        match &self {
3143            QueryInfo::Bootstrap { peer, .. } => HandlerIn::FindNodeReq {
3144                key: peer.to_bytes(),
3145                query_id,
3146            },
3147            QueryInfo::GetClosestPeers { key, .. } => HandlerIn::FindNodeReq {
3148                key: key.clone(),
3149                query_id,
3150            },
3151            QueryInfo::GetProviders { key, .. } => HandlerIn::GetProvidersReq {
3152                key: key.clone(),
3153                query_id,
3154            },
3155            QueryInfo::AddProvider { key, phase, .. } => match phase {
3156                AddProviderPhase::GetClosestPeers => HandlerIn::FindNodeReq {
3157                    key: key.to_vec(),
3158                    query_id,
3159                },
3160                AddProviderPhase::AddProvider {
3161                    provider_id,
3162                    external_addresses,
3163                    ..
3164                } => HandlerIn::AddProvider {
3165                    key: key.clone(),
3166                    provider: crate::protocol::KadPeer {
3167                        node_id: *provider_id,
3168                        multiaddrs: external_addresses.clone(),
3169                        connection_ty: crate::protocol::ConnectionType::Connected,
3170                    },
3171                },
3172            },
3173            QueryInfo::GetRecord { key, .. } => HandlerIn::GetRecord {
3174                key: key.clone(),
3175                query_id,
3176            },
3177            QueryInfo::PutRecord { record, phase, .. } => match phase {
3178                PutRecordPhase::GetClosestPeers => HandlerIn::FindNodeReq {
3179                    key: record.key.to_vec(),
3180                    query_id,
3181                },
3182                PutRecordPhase::PutRecord { .. } => HandlerIn::PutRecord {
3183                    record: record.clone(),
3184                    query_id,
3185                },
3186            },
3187        }
3188    }
3189}
3190
3191/// The phases of a [`QueryInfo::AddProvider`] query.
3192#[derive(Debug, Clone)]
3193pub enum AddProviderPhase {
3194    /// The query is searching for the closest nodes to the record key.
3195    GetClosestPeers,
3196
3197    /// The query advertises the local node as a provider for the key to
3198    /// the closest nodes to the key.
3199    AddProvider {
3200        /// The local peer ID that is advertised as a provider.
3201        provider_id: PeerId,
3202        /// The external addresses of the provider being advertised.
3203        external_addresses: Vec<Multiaddr>,
3204        /// Query statistics from the finished `GetClosestPeers` phase.
3205        get_closest_peers_stats: QueryStats,
3206    },
3207}
3208
3209/// The phases of a [`QueryInfo::PutRecord`] query.
3210#[derive(Debug, Clone, PartialEq, Eq)]
3211pub enum PutRecordPhase {
3212    /// The query is searching for the closest nodes to the record key.
3213    GetClosestPeers,
3214
3215    /// The query is replicating the record to the closest nodes to the key.
3216    PutRecord {
3217        /// A list of peers the given record has been successfully replicated to.
3218        success: Vec<PeerId>,
3219        /// Query statistics from the finished `GetClosestPeers` phase.
3220        get_closest_peers_stats: QueryStats,
3221    },
3222}
3223
3224/// A mutable reference to a running query.
3225pub struct QueryMut<'a> {
3226    query: &'a mut Query<QueryInner>,
3227}
3228
3229impl<'a> QueryMut<'a> {
3230    pub fn id(&self) -> QueryId {
3231        self.query.id()
3232    }
3233
3234    /// Gets information about the type and state of the query.
3235    pub fn info(&self) -> &QueryInfo {
3236        &self.query.inner.info
3237    }
3238
3239    /// Gets execution statistics about the query.
3240    ///
3241    /// For a multi-phase query such as `put_record`, these are the
3242    /// statistics of the current phase.
3243    pub fn stats(&self) -> &QueryStats {
3244        self.query.stats()
3245    }
3246
3247    /// Finishes the query asap, without waiting for the
3248    /// regular termination conditions.
3249    pub fn finish(&mut self) {
3250        self.query.finish()
3251    }
3252}
3253
3254/// An immutable reference to a running query.
3255pub struct QueryRef<'a> {
3256    query: &'a Query<QueryInner>,
3257}
3258
3259impl<'a> QueryRef<'a> {
3260    pub fn id(&self) -> QueryId {
3261        self.query.id()
3262    }
3263
3264    /// Gets information about the type and state of the query.
3265    pub fn info(&self) -> &QueryInfo {
3266        &self.query.inner.info
3267    }
3268
3269    /// Gets execution statistics about the query.
3270    ///
3271    /// For a multi-phase query such as `put_record`, these are the
3272    /// statistics of the current phase.
3273    pub fn stats(&self) -> &QueryStats {
3274        self.query.stats()
3275    }
3276}
3277
3278/// An operation failed to due no known peers in the routing table.
3279#[derive(Debug, Clone)]
3280pub struct NoKnownPeers();
3281
3282impl fmt::Display for NoKnownPeers {
3283    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3284        write!(f, "No known peers.")
3285    }
3286}
3287
3288impl std::error::Error for NoKnownPeers {}
3289
3290/// The possible outcomes of [`Behaviour::add_address`].
3291#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3292pub enum RoutingUpdate {
3293    /// The given peer and address has been added to the routing
3294    /// table.
3295    Success,
3296    /// The peer and address is pending insertion into
3297    /// the routing table, if a disconnected peer fails
3298    /// to respond. If the given peer and address ends up
3299    /// in the routing table, [`Event::RoutingUpdated`]
3300    /// is eventually emitted.
3301    Pending,
3302    /// The routing table update failed, either because the
3303    /// corresponding bucket for the peer is full and the
3304    /// pending slot(s) are occupied, or because the given
3305    /// peer ID is deemed invalid (e.g. refers to the local
3306    /// peer ID).
3307    Failed,
3308}
3309
3310#[derive(PartialEq, Copy, Clone, Debug)]
3311pub enum Mode {
3312    Client,
3313    Server,
3314}
3315
3316impl fmt::Display for Mode {
3317    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3318        match self {
3319            Mode::Client => write!(f, "client"),
3320            Mode::Server => write!(f, "server"),
3321        }
3322    }
3323}
3324
3325fn to_comma_separated_list<T>(confirmed_external_addresses: &[T]) -> String
3326where
3327    T: ToString,
3328{
3329    confirmed_external_addresses
3330        .iter()
3331        .map(|addr| addr.to_string())
3332        .collect::<Vec<_>>()
3333        .join(", ")
3334}