libp2p_kad/behaviour.rs
1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Implementation of the `Kademlia` network behaviour.
22
23mod test;
24
25use crate::addresses::Addresses;
26use crate::handler::{Handler, HandlerEvent, HandlerIn, RequestId};
27use crate::kbucket::{self, Distance, KBucketConfig, KBucketsTable, NodeStatus};
28use crate::protocol::{ConnectionType, KadPeer, ProtocolConfig};
29use crate::query::{Query, QueryConfig, QueryId, QueryPool, QueryPoolState};
30use crate::record::{
31 self,
32 store::{self, RecordStore},
33 ProviderRecord, Record,
34};
35use crate::{bootstrap, K_VALUE};
36use crate::{jobs::*, protocol};
37use fnv::FnvHashSet;
38use libp2p_core::{transport::PortUse, ConnectedPoint, Endpoint, Multiaddr};
39use libp2p_identity::PeerId;
40use libp2p_swarm::behaviour::{
41 AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm,
42};
43use libp2p_swarm::{
44 dial_opts::{self, DialOpts},
45 ConnectionDenied, ConnectionHandler, ConnectionId, DialError, ExternalAddresses,
46 ListenAddresses, NetworkBehaviour, NotifyHandler, StreamProtocol, THandler, THandlerInEvent,
47 THandlerOutEvent, ToSwarm,
48};
49use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
50use std::fmt;
51use std::num::NonZeroUsize;
52use std::task::{Context, Poll, Waker};
53use std::time::Duration;
54use std::vec;
55use thiserror::Error;
56use tracing::Level;
57use web_time::Instant;
58
59pub use crate::query::QueryStats;
60
61/// `Behaviour` is a `NetworkBehaviour` that implements the libp2p
62/// Kademlia protocol.
63pub struct Behaviour<TStore> {
64 /// The Kademlia routing table.
65 kbuckets: KBucketsTable<kbucket::Key<PeerId>, Addresses>,
66
67 /// The k-bucket insertion strategy.
68 kbucket_inserts: BucketInserts,
69
70 /// Configuration of the wire protocol.
71 protocol_config: ProtocolConfig,
72
73 /// Configuration of [`RecordStore`] filtering.
74 record_filtering: StoreInserts,
75
76 /// The currently active (i.e. in-progress) queries.
77 queries: QueryPool,
78
79 /// The currently connected peers.
80 ///
81 /// This is a superset of the connected peers currently in the routing table.
82 connected_peers: FnvHashSet<PeerId>,
83
84 /// Periodic job for re-publication of provider records for keys
85 /// provided by the local node.
86 add_provider_job: Option<AddProviderJob>,
87
88 /// Periodic job for (re-)replication and (re-)publishing of
89 /// regular (value-)records.
90 put_record_job: Option<PutRecordJob>,
91
92 /// The TTL of regular (value-)records.
93 record_ttl: Option<Duration>,
94
95 /// The TTL of provider records.
96 provider_record_ttl: Option<Duration>,
97
98 /// Queued events to return when the behaviour is being polled.
99 queued_events: VecDeque<ToSwarm<Event, HandlerIn>>,
100
101 listen_addresses: ListenAddresses,
102
103 external_addresses: ExternalAddresses,
104
105 connections: HashMap<ConnectionId, PeerId>,
106
107 /// See [`Config::caching`].
108 caching: Caching,
109
110 local_peer_id: PeerId,
111
112 mode: Mode,
113 auto_mode: bool,
114 no_events_waker: Option<Waker>,
115
116 /// The record storage.
117 store: TStore,
118
119 /// Tracks the status of the current bootstrap.
120 bootstrap_status: bootstrap::Status,
121}
122
123/// The configurable strategies for the insertion of peers
124/// and their addresses into the k-buckets of the Kademlia
125/// routing table.
126#[derive(Copy, Clone, Debug, PartialEq, Eq)]
127pub enum BucketInserts {
128 /// Whenever a connection to a peer is established as a
129 /// result of a dialing attempt and that peer is not yet
130 /// in the routing table, it is inserted as long as there
131 /// is a free slot in the corresponding k-bucket. If the
132 /// k-bucket is full but still has a free pending slot,
133 /// it may be inserted into the routing table at a later time if an unresponsive
134 /// disconnected peer is evicted from the bucket.
135 OnConnected,
136 /// New peers and addresses are only added to the routing table via
137 /// explicit calls to [`Behaviour::add_address`].
138 ///
139 /// > **Note**: Even though peers can only get into the
140 /// > routing table as a result of [`Behaviour::add_address`],
141 /// > routing table entries are still updated as peers
142 /// > connect and disconnect (i.e. the order of the entries
143 /// > as well as the network addresses).
144 Manual,
145}
146
147/// The configurable filtering strategies for the acceptance of
148/// incoming records.
149///
150/// This can be used for e.g. signature verification or validating
151/// the accompanying [`Key`].
152///
153/// [`Key`]: crate::record::Key
154#[derive(Copy, Clone, Debug, PartialEq, Eq)]
155pub enum StoreInserts {
156 /// Whenever a (provider) record is received,
157 /// the record is forwarded immediately to the [`RecordStore`].
158 Unfiltered,
159 /// Whenever a (provider) record is received, an event is emitted.
160 /// Provider records generate a [`InboundRequest::AddProvider`] under [`Event::InboundRequest`],
161 /// normal records generate a [`InboundRequest::PutRecord`] under [`Event::InboundRequest`].
162 ///
163 /// When deemed valid, a (provider) record needs to be explicitly stored in
164 /// the [`RecordStore`] via [`RecordStore::put`] or [`RecordStore::add_provider`],
165 /// whichever is applicable. A mutable reference to the [`RecordStore`] can
166 /// be retrieved via [`Behaviour::store_mut`].
167 FilterBoth,
168}
169
170/// The configuration for the `Kademlia` behaviour.
171///
172/// The configuration is consumed by [`Behaviour::new`].
173#[derive(Debug, Clone)]
174pub struct Config {
175 kbucket_config: KBucketConfig,
176 query_config: QueryConfig,
177 protocol_config: ProtocolConfig,
178 record_ttl: Option<Duration>,
179 record_replication_interval: Option<Duration>,
180 record_publication_interval: Option<Duration>,
181 record_filtering: StoreInserts,
182 provider_record_ttl: Option<Duration>,
183 provider_publication_interval: Option<Duration>,
184 kbucket_inserts: BucketInserts,
185 caching: Caching,
186 periodic_bootstrap_interval: Option<Duration>,
187 automatic_bootstrap_throttle: Option<Duration>,
188}
189
190impl Default for Config {
191 /// Returns the default configuration.
192 ///
193 /// Deprecated: use `Config::new` instead.
194 fn default() -> Self {
195 Self::new(protocol::DEFAULT_PROTO_NAME)
196 }
197}
198
199/// The configuration for Kademlia "write-back" caching after successful
200/// lookups via [`Behaviour::get_record`].
201#[derive(Debug, Clone)]
202pub enum Caching {
203 /// Caching is disabled and the peers closest to records being looked up
204 /// that do not return a record are not tracked, i.e.
205 /// [`GetRecordOk::FinishedWithNoAdditionalRecord`] is always empty.
206 Disabled,
207 /// Up to `max_peers` peers not returning a record that are closest to the key
208 /// being looked up are tracked and returned in [`GetRecordOk::FinishedWithNoAdditionalRecord`].
209 /// The write-back operation must be performed explicitly, if
210 /// desired and after choosing a record from the results, via [`Behaviour::put_record_to`].
211 Enabled { max_peers: u16 },
212}
213
214impl Config {
215 /// Builds a new `Config` with the given protocol name.
216 pub fn new(protocol_name: StreamProtocol) -> Self {
217 Config {
218 kbucket_config: KBucketConfig::default(),
219 query_config: QueryConfig::default(),
220 protocol_config: ProtocolConfig::new(protocol_name),
221 record_ttl: Some(Duration::from_secs(48 * 60 * 60)),
222 record_replication_interval: Some(Duration::from_secs(60 * 60)),
223 record_publication_interval: Some(Duration::from_secs(22 * 60 * 60)),
224 record_filtering: StoreInserts::Unfiltered,
225 provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)),
226 provider_record_ttl: Some(Duration::from_secs(48 * 60 * 60)),
227 kbucket_inserts: BucketInserts::OnConnected,
228 caching: Caching::Enabled { max_peers: 1 },
229 periodic_bootstrap_interval: Some(Duration::from_secs(5 * 60)),
230 automatic_bootstrap_throttle: Some(bootstrap::DEFAULT_AUTOMATIC_THROTTLE),
231 }
232 }
233
234 /// Returns the default configuration.
235 #[deprecated(note = "Use `Config::new` instead")]
236 #[allow(clippy::should_implement_trait)]
237 pub fn default() -> Self {
238 Default::default()
239 }
240
241 /// Sets custom protocol names.
242 ///
243 /// Kademlia nodes only communicate with other nodes using the same protocol
244 /// name. Using custom name(s) therefore allows to segregate the DHT from
245 /// others, if that is desired.
246 ///
247 /// More than one protocol name can be supplied. In this case the node will
248 /// be able to talk to other nodes supporting any of the provided names.
249 /// Multiple names must be used with caution to avoid network partitioning.
250 #[deprecated(note = "Use `Config::new` instead")]
251 #[allow(deprecated)]
252 pub fn set_protocol_names(&mut self, names: Vec<StreamProtocol>) -> &mut Self {
253 self.protocol_config.set_protocol_names(names);
254 self
255 }
256
257 /// Sets the timeout for a single query.
258 ///
259 /// > **Note**: A single query usually comprises at least as many requests
260 /// > as the replication factor, i.e. this is not a request timeout.
261 ///
262 /// The default is 60 seconds.
263 pub fn set_query_timeout(&mut self, timeout: Duration) -> &mut Self {
264 self.query_config.timeout = timeout;
265 self
266 }
267
268 /// Sets the replication factor to use.
269 ///
270 /// The replication factor determines to how many closest peers
271 /// a record is replicated. The default is [`crate::K_VALUE`].
272 pub fn set_replication_factor(&mut self, replication_factor: NonZeroUsize) -> &mut Self {
273 self.query_config.replication_factor = replication_factor;
274 self
275 }
276
277 /// Sets the allowed level of parallelism for iterative queries.
278 ///
279 /// The `α` parameter in the Kademlia paper. The maximum number of peers
280 /// that an iterative query is allowed to wait for in parallel while
281 /// iterating towards the closest nodes to a target. Defaults to
282 /// `ALPHA_VALUE`.
283 ///
284 /// This only controls the level of parallelism of an iterative query, not
285 /// the level of parallelism of a query to a fixed set of peers.
286 ///
287 /// When used with [`Config::disjoint_query_paths`] it equals
288 /// the amount of disjoint paths used.
289 pub fn set_parallelism(&mut self, parallelism: NonZeroUsize) -> &mut Self {
290 self.query_config.parallelism = parallelism;
291 self
292 }
293
294 /// Require iterative queries to use disjoint paths for increased resiliency
295 /// in the presence of potentially adversarial nodes.
296 ///
297 /// When enabled the number of disjoint paths used equals the configured
298 /// parallelism.
299 ///
300 /// See the S/Kademlia paper for more information on the high level design
301 /// as well as its security improvements.
302 pub fn disjoint_query_paths(&mut self, enabled: bool) -> &mut Self {
303 self.query_config.disjoint_query_paths = enabled;
304 self
305 }
306
307 /// Sets the TTL for stored records.
308 ///
309 /// The TTL should be significantly longer than the (re-)publication
310 /// interval, to avoid premature expiration of records. The default is 36
311 /// hours.
312 ///
313 /// `None` means records never expire.
314 ///
315 /// Does not apply to provider records.
316 pub fn set_record_ttl(&mut self, record_ttl: Option<Duration>) -> &mut Self {
317 self.record_ttl = record_ttl;
318 self
319 }
320
321 /// Sets whether or not records should be filtered before being stored.
322 ///
323 /// See [`StoreInserts`] for the different values.
324 /// Defaults to [`StoreInserts::Unfiltered`].
325 pub fn set_record_filtering(&mut self, filtering: StoreInserts) -> &mut Self {
326 self.record_filtering = filtering;
327 self
328 }
329
330 /// Sets the (re-)replication interval for stored records.
331 ///
332 /// Periodic replication of stored records ensures that the records
333 /// are always replicated to the available nodes closest to the key in the
334 /// context of DHT topology changes (i.e. nodes joining and leaving), thus
335 /// ensuring persistence until the record expires. Replication does not
336 /// prolong the regular lifetime of a record (for otherwise it would live
337 /// forever regardless of the configured TTL). The expiry of a record
338 /// is only extended through re-publication.
339 ///
340 /// This interval should be significantly shorter than the publication
341 /// interval, to ensure persistence between re-publications. The default
342 /// is 1 hour.
343 ///
344 /// `None` means that stored records are never re-replicated.
345 ///
346 /// Does not apply to provider records.
347 pub fn set_replication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
348 self.record_replication_interval = interval;
349 self
350 }
351
352 /// Sets the (re-)publication interval of stored records.
353 ///
354 /// Records persist in the DHT until they expire. By default, published
355 /// records are re-published in regular intervals for as long as the record
356 /// exists in the local storage of the original publisher, thereby extending
357 /// the records lifetime.
358 ///
359 /// This interval should be significantly shorter than the record TTL, to
360 /// ensure records do not expire prematurely. The default is 24 hours.
361 ///
362 /// `None` means that stored records are never automatically re-published.
363 ///
364 /// Does not apply to provider records.
365 pub fn set_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
366 self.record_publication_interval = interval;
367 self
368 }
369
370 /// Sets the TTL for provider records.
371 ///
372 /// `None` means that stored provider records never expire.
373 ///
374 /// Must be significantly larger than the provider publication interval.
375 pub fn set_provider_record_ttl(&mut self, ttl: Option<Duration>) -> &mut Self {
376 self.provider_record_ttl = ttl;
377 self
378 }
379
380 /// Sets the interval at which provider records for keys provided
381 /// by the local node are re-published.
382 ///
383 /// `None` means that stored provider records are never automatically
384 /// re-published.
385 ///
386 /// Must be significantly less than the provider record TTL.
387 pub fn set_provider_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
388 self.provider_publication_interval = interval;
389 self
390 }
391
392 /// Modifies the maximum allowed size of individual Kademlia packets.
393 ///
394 /// It might be necessary to increase this value if trying to put large
395 /// records.
396 pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self {
397 self.protocol_config.set_max_packet_size(size);
398 self
399 }
400
401 /// Sets the k-bucket insertion strategy for the Kademlia routing table.
402 pub fn set_kbucket_inserts(&mut self, inserts: BucketInserts) -> &mut Self {
403 self.kbucket_inserts = inserts;
404 self
405 }
406
407 /// Sets the [`Caching`] strategy to use for successful lookups.
408 ///
409 /// The default is [`Caching::Enabled`] with a `max_peers` of 1.
410 /// Hence, with default settings and a lookup quorum of 1, a successful lookup
411 /// will result in the record being cached at the closest node to the key that
412 /// did not return the record, i.e. the standard Kademlia behaviour.
413 pub fn set_caching(&mut self, c: Caching) -> &mut Self {
414 self.caching = c;
415 self
416 }
417
418 /// Sets the interval on which [`Behaviour::bootstrap`] is called periodically.
419 ///
420 /// * Default to `5` minutes.
421 /// * Set to `None` to disable periodic bootstrap.
422 pub fn set_periodic_bootstrap_interval(&mut self, interval: Option<Duration>) -> &mut Self {
423 self.periodic_bootstrap_interval = interval;
424 self
425 }
426
427 /// Sets the configuration for the k-buckets.
428 ///
429 /// * Default to K_VALUE.
430 pub fn set_kbucket_size(&mut self, size: NonZeroUsize) -> &mut Self {
431 self.kbucket_config.set_bucket_size(size);
432 self
433 }
434
435 /// Sets the timeout duration after creation of a pending entry after which
436 /// it becomes eligible for insertion into a full bucket, replacing the
437 /// least-recently (dis)connected node.
438 ///
439 /// * Default to `60` s.
440 pub fn set_kbucket_pending_timeout(&mut self, timeout: Duration) -> &mut Self {
441 self.kbucket_config.set_pending_timeout(timeout);
442 self
443 }
444
445 /// Sets the time to wait before calling [`Behaviour::bootstrap`] after a new peer is inserted in the routing table.
446 /// This prevent cascading bootstrap requests when multiple peers are inserted into the routing table "at the same time".
447 /// This also allows to wait a little bit for other potential peers to be inserted into the routing table before
448 /// triggering a bootstrap, giving more context to the future bootstrap request.
449 ///
450 /// * Default to `500` ms.
451 /// * Set to `Some(Duration::ZERO)` to never wait before triggering a bootstrap request when a new peer
452 /// is inserted in the routing table.
453 /// * Set to `None` to disable automatic bootstrap (no bootstrap request will be triggered when a new
454 /// peer is inserted in the routing table).
455 #[cfg(test)]
456 pub(crate) fn set_automatic_bootstrap_throttle(
457 &mut self,
458 duration: Option<Duration>,
459 ) -> &mut Self {
460 self.automatic_bootstrap_throttle = duration;
461 self
462 }
463}
464
465impl<TStore> Behaviour<TStore>
466where
467 TStore: RecordStore + Send + 'static,
468{
469 /// Creates a new `Kademlia` network behaviour with a default configuration.
470 pub fn new(id: PeerId, store: TStore) -> Self {
471 Self::with_config(id, store, Default::default())
472 }
473
474 /// Get the protocol name of this kademlia instance.
475 pub fn protocol_names(&self) -> &[StreamProtocol] {
476 self.protocol_config.protocol_names()
477 }
478
479 /// Creates a new `Kademlia` network behaviour with the given configuration.
480 pub fn with_config(id: PeerId, store: TStore, config: Config) -> Self {
481 let local_key = kbucket::Key::from(id);
482
483 let put_record_job = config
484 .record_replication_interval
485 .or(config.record_publication_interval)
486 .map(|interval| {
487 PutRecordJob::new(
488 id,
489 interval,
490 config.record_publication_interval,
491 config.record_ttl,
492 )
493 });
494
495 let add_provider_job = config
496 .provider_publication_interval
497 .map(AddProviderJob::new);
498
499 Behaviour {
500 store,
501 caching: config.caching,
502 kbuckets: KBucketsTable::new(local_key, config.kbucket_config),
503 kbucket_inserts: config.kbucket_inserts,
504 protocol_config: config.protocol_config,
505 record_filtering: config.record_filtering,
506 queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
507 listen_addresses: Default::default(),
508 queries: QueryPool::new(config.query_config),
509 connected_peers: Default::default(),
510 add_provider_job,
511 put_record_job,
512 record_ttl: config.record_ttl,
513 provider_record_ttl: config.provider_record_ttl,
514 external_addresses: Default::default(),
515 local_peer_id: id,
516 connections: Default::default(),
517 mode: Mode::Client,
518 auto_mode: true,
519 no_events_waker: None,
520 bootstrap_status: bootstrap::Status::new(
521 config.periodic_bootstrap_interval,
522 config.automatic_bootstrap_throttle,
523 ),
524 }
525 }
526
527 /// Gets an iterator over immutable references to all running queries.
528 pub fn iter_queries(&self) -> impl Iterator<Item = QueryRef<'_>> {
529 self.queries.iter().filter_map(|query| {
530 if !query.is_finished() {
531 Some(QueryRef { query })
532 } else {
533 None
534 }
535 })
536 }
537
538 /// Gets an iterator over mutable references to all running queries.
539 pub fn iter_queries_mut(&mut self) -> impl Iterator<Item = QueryMut<'_>> {
540 self.queries.iter_mut().filter_map(|query| {
541 if !query.is_finished() {
542 Some(QueryMut { query })
543 } else {
544 None
545 }
546 })
547 }
548
549 /// Gets an immutable reference to a running query, if it exists.
550 pub fn query(&self, id: &QueryId) -> Option<QueryRef<'_>> {
551 self.queries.get(id).and_then(|query| {
552 if !query.is_finished() {
553 Some(QueryRef { query })
554 } else {
555 None
556 }
557 })
558 }
559
560 /// Gets a mutable reference to a running query, if it exists.
561 pub fn query_mut<'a>(&'a mut self, id: &QueryId) -> Option<QueryMut<'a>> {
562 self.queries.get_mut(id).and_then(|query| {
563 if !query.is_finished() {
564 Some(QueryMut { query })
565 } else {
566 None
567 }
568 })
569 }
570
571 /// Adds a known listen address of a peer participating in the DHT to the
572 /// routing table.
573 ///
574 /// Explicitly adding addresses of peers serves two purposes:
575 ///
576 /// 1. In order for a node to join the DHT, it must know about at least
577 /// one other node of the DHT.
578 ///
579 /// 2. When a remote peer initiates a connection and that peer is not
580 /// yet in the routing table, the `Kademlia` behaviour must be
581 /// informed of an address on which that peer is listening for
582 /// connections before it can be added to the routing table
583 /// from where it can subsequently be discovered by all peers
584 /// in the DHT.
585 ///
586 /// If the routing table has been updated as a result of this operation,
587 /// a [`Event::RoutingUpdated`] event is emitted.
588 pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) -> RoutingUpdate {
589 // ensuring address is a fully-qualified /p2p multiaddr
590 let Ok(address) = address.with_p2p(*peer) else {
591 return RoutingUpdate::Failed;
592 };
593 let key = kbucket::Key::from(*peer);
594 match self.kbuckets.entry(&key) {
595 Some(kbucket::Entry::Present(mut entry, _)) => {
596 if entry.value().insert(address) {
597 self.queued_events
598 .push_back(ToSwarm::GenerateEvent(Event::RoutingUpdated {
599 peer: *peer,
600 is_new_peer: false,
601 addresses: entry.value().clone(),
602 old_peer: None,
603 bucket_range: self
604 .kbuckets
605 .bucket(&key)
606 .map(|b| b.range())
607 .expect("Not kbucket::Entry::SelfEntry."),
608 }))
609 }
610 RoutingUpdate::Success
611 }
612 Some(kbucket::Entry::Pending(mut entry, _)) => {
613 entry.value().insert(address);
614 RoutingUpdate::Pending
615 }
616 Some(kbucket::Entry::Absent(entry)) => {
617 let addresses = Addresses::new(address);
618 let status = if self.connected_peers.contains(peer) {
619 NodeStatus::Connected
620 } else {
621 NodeStatus::Disconnected
622 };
623 match entry.insert(addresses.clone(), status) {
624 kbucket::InsertResult::Inserted => {
625 self.bootstrap_on_low_peers();
626
627 self.queued_events.push_back(ToSwarm::GenerateEvent(
628 Event::RoutingUpdated {
629 peer: *peer,
630 is_new_peer: true,
631 addresses,
632 old_peer: None,
633 bucket_range: self
634 .kbuckets
635 .bucket(&key)
636 .map(|b| b.range())
637 .expect("Not kbucket::Entry::SelfEntry."),
638 },
639 ));
640 RoutingUpdate::Success
641 }
642 kbucket::InsertResult::Full => {
643 tracing::debug!(%peer, "Bucket full. Peer not added to routing table");
644 RoutingUpdate::Failed
645 }
646 kbucket::InsertResult::Pending { disconnected } => {
647 self.queued_events.push_back(ToSwarm::Dial {
648 opts: DialOpts::peer_id(disconnected.into_preimage()).build(),
649 });
650 RoutingUpdate::Pending
651 }
652 }
653 }
654 None => RoutingUpdate::Failed,
655 }
656 }
657
658 /// Removes an address of a peer from the routing table.
659 ///
660 /// If the given address is the last address of the peer in the
661 /// routing table, the peer is removed from the routing table
662 /// and `Some` is returned with a view of the removed entry.
663 /// The same applies if the peer is currently pending insertion
664 /// into the routing table.
665 ///
666 /// If the given peer or address is not in the routing table,
667 /// this is a no-op.
668 pub fn remove_address(
669 &mut self,
670 peer: &PeerId,
671 address: &Multiaddr,
672 ) -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> {
673 let address = &address.to_owned().with_p2p(*peer).ok()?;
674 let key = kbucket::Key::from(*peer);
675 match self.kbuckets.entry(&key)? {
676 kbucket::Entry::Present(mut entry, _) => {
677 if entry.value().remove(address).is_err() {
678 Some(entry.remove()) // it is the last address, thus remove the peer.
679 } else {
680 None
681 }
682 }
683 kbucket::Entry::Pending(mut entry, _) => {
684 if entry.value().remove(address).is_err() {
685 Some(entry.remove()) // it is the last address, thus remove the peer.
686 } else {
687 None
688 }
689 }
690 kbucket::Entry::Absent(..) => None,
691 }
692 }
693
694 /// Removes a peer from the routing table.
695 ///
696 /// Returns `None` if the peer was not in the routing table,
697 /// not even pending insertion.
698 pub fn remove_peer(
699 &mut self,
700 peer: &PeerId,
701 ) -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> {
702 let key = kbucket::Key::from(*peer);
703 match self.kbuckets.entry(&key)? {
704 kbucket::Entry::Present(entry, _) => Some(entry.remove()),
705 kbucket::Entry::Pending(entry, _) => Some(entry.remove()),
706 kbucket::Entry::Absent(..) => None,
707 }
708 }
709
710 /// Returns an iterator over all non-empty buckets in the routing table.
711 pub fn kbuckets(
712 &mut self,
713 ) -> impl Iterator<Item = kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>> {
714 self.kbuckets.iter().filter(|b| !b.is_empty())
715 }
716
717 /// Returns the k-bucket for the distance to the given key.
718 ///
719 /// Returns `None` if the given key refers to the local key.
720 pub fn kbucket<K>(
721 &mut self,
722 key: K,
723 ) -> Option<kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>>
724 where
725 K: Into<kbucket::Key<K>> + Clone,
726 {
727 self.kbuckets.bucket(&key.into())
728 }
729
730 /// Initiates an iterative query for the closest peers to the given key.
731 ///
732 /// The result of the query is delivered in a
733 /// [`Event::OutboundQueryProgressed{QueryResult::GetClosestPeers}`].
734 pub fn get_closest_peers<K>(&mut self, key: K) -> QueryId
735 where
736 K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
737 {
738 let target: kbucket::Key<K> = key.clone().into();
739 let key: Vec<u8> = key.into();
740 let info = QueryInfo::GetClosestPeers {
741 key,
742 step: ProgressStep::first(),
743 };
744 let peer_keys: Vec<kbucket::Key<PeerId>> = self.kbuckets.closest_keys(&target).collect();
745 self.queries.add_iter_closest(target, peer_keys, info)
746 }
747
748 /// Returns closest peers to the given key; takes peers from local routing table only.
749 pub fn get_closest_local_peers<'a, K: Clone>(
750 &'a mut self,
751 key: &'a kbucket::Key<K>,
752 ) -> impl Iterator<Item = kbucket::Key<PeerId>> + 'a {
753 self.kbuckets.closest_keys(key)
754 }
755
756 /// Performs a lookup for a record in the DHT.
757 ///
758 /// The result of this operation is delivered in a
759 /// [`Event::OutboundQueryProgressed{QueryResult::GetRecord}`].
760 pub fn get_record(&mut self, key: record::Key) -> QueryId {
761 let record = if let Some(record) = self.store.get(&key) {
762 if record.is_expired(Instant::now()) {
763 self.store.remove(&key);
764 None
765 } else {
766 Some(PeerRecord {
767 peer: None,
768 record: record.into_owned(),
769 })
770 }
771 } else {
772 None
773 };
774
775 let step = ProgressStep::first();
776
777 let target = kbucket::Key::new(key.clone());
778 let info = if record.is_some() {
779 QueryInfo::GetRecord {
780 key,
781 step: step.next(),
782 found_a_record: true,
783 cache_candidates: BTreeMap::new(),
784 }
785 } else {
786 QueryInfo::GetRecord {
787 key,
788 step: step.clone(),
789 found_a_record: false,
790 cache_candidates: BTreeMap::new(),
791 }
792 };
793 let peers = self.kbuckets.closest_keys(&target);
794 let id = self.queries.add_iter_closest(target.clone(), peers, info);
795
796 // No queries were actually done for the results yet.
797 let stats = QueryStats::empty();
798
799 if let Some(record) = record {
800 self.queued_events
801 .push_back(ToSwarm::GenerateEvent(Event::OutboundQueryProgressed {
802 id,
803 result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(record))),
804 step,
805 stats,
806 }));
807 }
808
809 id
810 }
811
812 /// Stores a record in the DHT, locally as well as at the nodes
813 /// closest to the key as per the xor distance metric.
814 ///
815 /// Returns `Ok` if a record has been stored locally, providing the
816 /// `QueryId` of the initial query that replicates the record in the DHT.
817 /// The result of the query is eventually reported as a
818 /// [`Event::OutboundQueryProgressed{QueryResult::PutRecord}`].
819 ///
820 /// The record is always stored locally with the given expiration. If the record's
821 /// expiration is `None`, the common case, it does not expire in local storage
822 /// but is still replicated with the configured record TTL. To remove the record
823 /// locally and stop it from being re-published in the DHT, see [`Behaviour::remove_record`].
824 ///
825 /// After the initial publication of the record, it is subject to (re-)replication
826 /// and (re-)publication as per the configured intervals. Periodic (re-)publication
827 /// does not update the record's expiration in local storage, thus a given record
828 /// with an explicit expiration will always expire at that instant and until then
829 /// is subject to regular (re-)replication and (re-)publication.
830 pub fn put_record(
831 &mut self,
832 mut record: Record,
833 quorum: Quorum,
834 ) -> Result<QueryId, store::Error> {
835 record.publisher = Some(*self.kbuckets.local_key().preimage());
836 self.store.put(record.clone())?;
837 record.expires = record
838 .expires
839 .or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl));
840 let quorum = quorum.eval(self.queries.config().replication_factor);
841 let target = kbucket::Key::new(record.key.clone());
842 let peers = self.kbuckets.closest_keys(&target);
843 let context = PutRecordContext::Publish;
844 let info = QueryInfo::PutRecord {
845 context,
846 record,
847 quorum,
848 phase: PutRecordPhase::GetClosestPeers,
849 };
850 Ok(self.queries.add_iter_closest(target.clone(), peers, info))
851 }
852
853 /// Stores a record at specific peers, without storing it locally.
854 ///
855 /// The given [`Quorum`] is understood in the context of the total
856 /// number of distinct peers given.
857 ///
858 /// If the record's expiration is `None`, the configured record TTL is used.
859 ///
860 /// > **Note**: This is not a regular Kademlia DHT operation. It needs to be
861 /// > used to selectively update or store a record to specific peers
862 /// > for the purpose of e.g. making sure these peers have the latest
863 /// > "version" of a record or to "cache" a record at further peers
864 /// > to increase the lookup success rate on the DHT for other peers.
865 /// >
866 /// > In particular, there is no automatic storing of records performed, and this
867 /// > method must be used to ensure the standard Kademlia
868 /// > procedure of "caching" (i.e. storing) a found record at the closest
869 /// > node to the key that _did not_ return it.
870 pub fn put_record_to<I>(&mut self, mut record: Record, peers: I, quorum: Quorum) -> QueryId
871 where
872 I: ExactSizeIterator<Item = PeerId>,
873 {
874 let quorum = if peers.len() > 0 {
875 quorum.eval(NonZeroUsize::new(peers.len()).expect("> 0"))
876 } else {
877 // If no peers are given, we just let the query fail immediately
878 // due to the fact that the quorum must be at least one, instead of
879 // introducing a new kind of error.
880 NonZeroUsize::new(1).expect("1 > 0")
881 };
882 record.expires = record
883 .expires
884 .or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl));
885 let context = PutRecordContext::Custom;
886 let info = QueryInfo::PutRecord {
887 context,
888 record,
889 quorum,
890 phase: PutRecordPhase::PutRecord {
891 success: Vec::new(),
892 get_closest_peers_stats: QueryStats::empty(),
893 },
894 };
895 self.queries.add_fixed(peers, info)
896 }
897
898 /// Removes the record with the given key from _local_ storage,
899 /// if the local node is the publisher of the record.
900 ///
901 /// Has no effect if a record for the given key is stored locally but
902 /// the local node is not a publisher of the record.
903 ///
904 /// This is a _local_ operation. However, it also has the effect that
905 /// the record will no longer be periodically re-published, allowing the
906 /// record to eventually expire throughout the DHT.
907 pub fn remove_record(&mut self, key: &record::Key) {
908 if let Some(r) = self.store.get(key) {
909 if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
910 self.store.remove(key)
911 }
912 }
913 }
914
915 /// Gets a mutable reference to the record store.
916 pub fn store_mut(&mut self) -> &mut TStore {
917 &mut self.store
918 }
919
920 /// Bootstraps the local node to join the DHT.
921 ///
922 /// Bootstrapping is a multi-step operation that starts with a lookup of the local node's
923 /// own ID in the DHT. This introduces the local node to the other nodes
924 /// in the DHT and populates its routing table with the closest neighbours.
925 ///
926 /// Subsequently, all buckets farther from the bucket of the closest neighbour are
927 /// refreshed by initiating an additional bootstrapping query for each such
928 /// bucket with random keys.
929 ///
930 /// Returns `Ok` if bootstrapping has been initiated with a self-lookup, providing the
931 /// `QueryId` for the entire bootstrapping process. The progress of bootstrapping is
932 /// reported via [`Event::OutboundQueryProgressed{QueryResult::Bootstrap}`] events,
933 /// with one such event per bootstrapping query.
934 ///
935 /// Returns `Err` if bootstrapping is impossible due an empty routing table.
936 ///
937 /// > **Note**: Bootstrapping requires at least one node of the DHT to be known.
938 /// > See [`Behaviour::add_address`].
939 ///
940 /// > **Note**: Bootstrap does not require to be called manually. It is periodically
941 /// > invoked at regular intervals based on the configured `periodic_bootstrap_interval` (see
942 /// > [`Config::set_periodic_bootstrap_interval`] for details) and it is also automatically invoked
943 /// > when a new peer is inserted in the routing table.
944 /// > This parameter is used to call [`Behaviour::bootstrap`] periodically and automatically
945 /// > to ensure a healthy routing table.
946 pub fn bootstrap(&mut self) -> Result<QueryId, NoKnownPeers> {
947 let local_key = *self.kbuckets.local_key();
948 let info = QueryInfo::Bootstrap {
949 peer: *local_key.preimage(),
950 remaining: None,
951 step: ProgressStep::first(),
952 };
953 let peers = self.kbuckets.closest_keys(&local_key).collect::<Vec<_>>();
954 if peers.is_empty() {
955 self.bootstrap_status.reset_timers();
956 Err(NoKnownPeers())
957 } else {
958 self.bootstrap_status.on_started();
959 Ok(self.queries.add_iter_closest(local_key, peers, info))
960 }
961 }
962
963 /// Establishes the local node as a provider of a value for the given key.
964 ///
965 /// This operation publishes a provider record with the given key and
966 /// identity of the local node to the peers closest to the key, thus establishing
967 /// the local node as a provider.
968 ///
969 /// Returns `Ok` if a provider record has been stored locally, providing the
970 /// `QueryId` of the initial query that announces the local node as a provider.
971 ///
972 /// The publication of the provider records is periodically repeated as per the
973 /// configured interval, to renew the expiry and account for changes to the DHT
974 /// topology. A provider record may be removed from local storage and
975 /// thus no longer re-published by calling [`Behaviour::stop_providing`].
976 ///
977 /// In contrast to the standard Kademlia push-based model for content distribution
978 /// implemented by [`Behaviour::put_record`], the provider API implements a
979 /// pull-based model that may be used in addition or as an alternative.
980 /// The means by which the actual value is obtained from a provider is out of scope
981 /// of the libp2p Kademlia provider API.
982 ///
983 /// The results of the (repeated) provider announcements sent by this node are
984 /// reported via [`Event::OutboundQueryProgressed{QueryResult::StartProviding}`].
985 pub fn start_providing(&mut self, key: record::Key) -> Result<QueryId, store::Error> {
986 // Note: We store our own provider records locally without local addresses
987 // to avoid redundant storage and outdated addresses. Instead these are
988 // acquired on demand when returning a `ProviderRecord` for the local node.
989 let local_addrs = Vec::new();
990 let record = ProviderRecord::new(
991 key.clone(),
992 *self.kbuckets.local_key().preimage(),
993 local_addrs,
994 );
995 self.store.add_provider(record)?;
996 let target = kbucket::Key::new(key.clone());
997 let peers = self.kbuckets.closest_keys(&target);
998 let context = AddProviderContext::Publish;
999 let info = QueryInfo::AddProvider {
1000 context,
1001 key,
1002 phase: AddProviderPhase::GetClosestPeers,
1003 };
1004 let id = self.queries.add_iter_closest(target.clone(), peers, info);
1005 Ok(id)
1006 }
1007
1008 /// Stops the local node from announcing that it is a provider for the given key.
1009 ///
1010 /// This is a local operation. The local node will still be considered as a
1011 /// provider for the key by other nodes until these provider records expire.
1012 pub fn stop_providing(&mut self, key: &record::Key) {
1013 self.store
1014 .remove_provider(key, self.kbuckets.local_key().preimage());
1015 }
1016
1017 /// Performs a lookup for providers of a value to the given key.
1018 ///
1019 /// The result of this operation is delivered in a
1020 /// reported via [`Event::OutboundQueryProgressed{QueryResult::GetProviders}`].
1021 pub fn get_providers(&mut self, key: record::Key) -> QueryId {
1022 let providers: HashSet<_> = self
1023 .store
1024 .providers(&key)
1025 .into_iter()
1026 .filter(|p| !p.is_expired(Instant::now()))
1027 .map(|p| p.provider)
1028 .collect();
1029
1030 let step = ProgressStep::first();
1031
1032 let info = QueryInfo::GetProviders {
1033 key: key.clone(),
1034 providers_found: providers.len(),
1035 step: if providers.is_empty() {
1036 step.clone()
1037 } else {
1038 step.next()
1039 },
1040 };
1041
1042 let target = kbucket::Key::new(key.clone());
1043 let peers = self.kbuckets.closest_keys(&target);
1044 let id = self.queries.add_iter_closest(target.clone(), peers, info);
1045
1046 // No queries were actually done for the results yet.
1047 let stats = QueryStats::empty();
1048
1049 if !providers.is_empty() {
1050 self.queued_events
1051 .push_back(ToSwarm::GenerateEvent(Event::OutboundQueryProgressed {
1052 id,
1053 result: QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders {
1054 key,
1055 providers,
1056 })),
1057 step,
1058 stats,
1059 }));
1060 }
1061 id
1062 }
1063
1064 /// Set the [`Mode`] in which we should operate.
1065 ///
1066 /// By default, we are in [`Mode::Client`] and will swap into [`Mode::Server`] as soon as we have a confirmed, external address via [`FromSwarm::ExternalAddrConfirmed`].
1067 ///
1068 /// Setting a mode via this function disables this automatic behaviour and unconditionally operates in the specified mode.
1069 /// To reactivate the automatic configuration, pass [`None`] instead.
1070 pub fn set_mode(&mut self, mode: Option<Mode>) {
1071 match mode {
1072 Some(mode) => {
1073 self.mode = mode;
1074 self.auto_mode = false;
1075 self.reconfigure_mode();
1076 }
1077 None => {
1078 self.auto_mode = true;
1079 self.determine_mode_from_external_addresses();
1080 }
1081 }
1082
1083 if let Some(waker) = self.no_events_waker.take() {
1084 waker.wake();
1085 }
1086 }
1087
1088 fn reconfigure_mode(&mut self) {
1089 if self.connections.is_empty() {
1090 return;
1091 }
1092
1093 let num_connections = self.connections.len();
1094
1095 tracing::debug!(
1096 "Re-configuring {} established connection{}",
1097 num_connections,
1098 if num_connections > 1 { "s" } else { "" }
1099 );
1100
1101 self.queued_events
1102 .extend(
1103 self.connections
1104 .iter()
1105 .map(|(conn_id, peer_id)| ToSwarm::NotifyHandler {
1106 peer_id: *peer_id,
1107 handler: NotifyHandler::One(*conn_id),
1108 event: HandlerIn::ReconfigureMode {
1109 new_mode: self.mode,
1110 },
1111 }),
1112 );
1113 }
1114
1115 fn determine_mode_from_external_addresses(&mut self) {
1116 let old_mode = self.mode;
1117
1118 self.mode = match (self.external_addresses.as_slice(), self.mode) {
1119 ([], Mode::Server) => {
1120 tracing::debug!("Switching to client-mode because we no longer have any confirmed external addresses");
1121
1122 Mode::Client
1123 }
1124 ([], Mode::Client) => {
1125 // Previously client-mode, now also client-mode because no external addresses.
1126
1127 Mode::Client
1128 }
1129 (confirmed_external_addresses, Mode::Client) => {
1130 if tracing::enabled!(Level::DEBUG) {
1131 let confirmed_external_addresses =
1132 to_comma_separated_list(confirmed_external_addresses);
1133
1134 tracing::debug!("Switching to server-mode assuming that one of [{confirmed_external_addresses}] is externally reachable");
1135 }
1136
1137 Mode::Server
1138 }
1139 (confirmed_external_addresses, Mode::Server) => {
1140 debug_assert!(
1141 !confirmed_external_addresses.is_empty(),
1142 "Previous match arm handled empty list"
1143 );
1144
1145 // Previously, server-mode, now also server-mode because > 1 external address. Don't log anything to avoid spam.
1146
1147 Mode::Server
1148 }
1149 };
1150
1151 self.reconfigure_mode();
1152
1153 if old_mode != self.mode {
1154 self.queued_events
1155 .push_back(ToSwarm::GenerateEvent(Event::ModeChanged {
1156 new_mode: self.mode,
1157 }));
1158 }
1159 }
1160
1161 /// Processes discovered peers from a successful request in an iterative `Query`.
1162 fn discovered<'a, I>(&'a mut self, query_id: &QueryId, source: &PeerId, peers: I)
1163 where
1164 I: Iterator<Item = &'a KadPeer> + Clone,
1165 {
1166 let local_id = self.kbuckets.local_key().preimage();
1167 let others_iter = peers.filter(|p| &p.node_id != local_id);
1168 if let Some(query) = self.queries.get_mut(query_id) {
1169 tracing::trace!(peer=%source, query=?query_id, "Request to peer in query succeeded");
1170 for peer in others_iter.clone() {
1171 tracing::trace!(
1172 ?peer,
1173 %source,
1174 query=?query_id,
1175 "Peer reported by source in query"
1176 );
1177 let addrs = peer.multiaddrs.iter().cloned().collect();
1178 query.peers.addresses.insert(peer.node_id, addrs);
1179 }
1180 query.on_success(source, others_iter.cloned().map(|kp| kp.node_id))
1181 }
1182 }
1183
1184 /// Finds the closest peers to a `target` in the context of a request by
1185 /// the `source` peer, such that the `source` peer is never included in the
1186 /// result.
1187 fn find_closest<T: Clone>(
1188 &mut self,
1189 target: &kbucket::Key<T>,
1190 source: &PeerId,
1191 ) -> Vec<KadPeer> {
1192 self.kbuckets
1193 .closest(target)
1194 .filter(|e| e.node.key.preimage() != source)
1195 .take(self.queries.config().replication_factor.get())
1196 .map(KadPeer::from)
1197 .collect()
1198 }
1199
1200 /// Collects all peers who are known to be providers of the value for a given `Multihash`.
1201 fn provider_peers(&mut self, key: &record::Key, source: &PeerId) -> Vec<KadPeer> {
1202 let kbuckets = &mut self.kbuckets;
1203 let connected = &mut self.connected_peers;
1204 let listen_addresses = &self.listen_addresses;
1205 let external_addresses = &self.external_addresses;
1206
1207 self.store
1208 .providers(key)
1209 .into_iter()
1210 .filter_map(move |p| {
1211 if &p.provider != source {
1212 let node_id = p.provider;
1213 let multiaddrs = p.addresses;
1214 let connection_ty = if connected.contains(&node_id) {
1215 ConnectionType::Connected
1216 } else {
1217 ConnectionType::NotConnected
1218 };
1219 if multiaddrs.is_empty() {
1220 // The provider is either the local node and we fill in
1221 // the local addresses on demand, or it is a legacy
1222 // provider record without addresses, in which case we
1223 // try to find addresses in the routing table, as was
1224 // done before provider records were stored along with
1225 // their addresses.
1226 if &node_id == kbuckets.local_key().preimage() {
1227 Some(
1228 listen_addresses
1229 .iter()
1230 .chain(external_addresses.iter())
1231 .cloned()
1232 .collect::<Vec<_>>(),
1233 )
1234 } else {
1235 let key = kbucket::Key::from(node_id);
1236 kbuckets
1237 .entry(&key)
1238 .as_mut()
1239 .and_then(|e| e.view())
1240 .map(|e| e.node.value.clone().into_vec())
1241 }
1242 } else {
1243 Some(multiaddrs)
1244 }
1245 .map(|multiaddrs| KadPeer {
1246 node_id,
1247 multiaddrs,
1248 connection_ty,
1249 })
1250 } else {
1251 None
1252 }
1253 })
1254 .take(self.queries.config().replication_factor.get())
1255 .collect()
1256 }
1257
1258 /// Starts an iterative `ADD_PROVIDER` query for the given key.
1259 fn start_add_provider(&mut self, key: record::Key, context: AddProviderContext) {
1260 let info = QueryInfo::AddProvider {
1261 context,
1262 key: key.clone(),
1263 phase: AddProviderPhase::GetClosestPeers,
1264 };
1265 let target = kbucket::Key::new(key);
1266 let peers = self.kbuckets.closest_keys(&target);
1267 self.queries.add_iter_closest(target.clone(), peers, info);
1268 }
1269
1270 /// Starts an iterative `PUT_VALUE` query for the given record.
1271 fn start_put_record(&mut self, record: Record, quorum: Quorum, context: PutRecordContext) {
1272 let quorum = quorum.eval(self.queries.config().replication_factor);
1273 let target = kbucket::Key::new(record.key.clone());
1274 let peers = self.kbuckets.closest_keys(&target);
1275 let info = QueryInfo::PutRecord {
1276 record,
1277 quorum,
1278 context,
1279 phase: PutRecordPhase::GetClosestPeers,
1280 };
1281 self.queries.add_iter_closest(target.clone(), peers, info);
1282 }
1283
1284 /// Updates the routing table with a new connection status and address of a peer.
1285 fn connection_updated(
1286 &mut self,
1287 peer: PeerId,
1288 address: Option<Multiaddr>,
1289 new_status: NodeStatus,
1290 ) {
1291 let key = kbucket::Key::from(peer);
1292 match self.kbuckets.entry(&key) {
1293 Some(kbucket::Entry::Present(mut entry, old_status)) => {
1294 if old_status != new_status {
1295 entry.update(new_status)
1296 }
1297 if let Some(address) = address {
1298 if entry.value().insert(address) {
1299 self.queued_events.push_back(ToSwarm::GenerateEvent(
1300 Event::RoutingUpdated {
1301 peer,
1302 is_new_peer: false,
1303 addresses: entry.value().clone(),
1304 old_peer: None,
1305 bucket_range: self
1306 .kbuckets
1307 .bucket(&key)
1308 .map(|b| b.range())
1309 .expect("Not kbucket::Entry::SelfEntry."),
1310 },
1311 ))
1312 }
1313 }
1314 }
1315
1316 Some(kbucket::Entry::Pending(mut entry, old_status)) => {
1317 if let Some(address) = address {
1318 entry.value().insert(address);
1319 }
1320 if old_status != new_status {
1321 entry.update(new_status);
1322 }
1323 }
1324
1325 Some(kbucket::Entry::Absent(entry)) => {
1326 // Only connected nodes with a known address are newly inserted.
1327 if new_status != NodeStatus::Connected {
1328 return;
1329 }
1330 match (address, self.kbucket_inserts) {
1331 (None, _) => {
1332 self.queued_events
1333 .push_back(ToSwarm::GenerateEvent(Event::UnroutablePeer { peer }));
1334 }
1335 (Some(a), BucketInserts::Manual) => {
1336 self.queued_events
1337 .push_back(ToSwarm::GenerateEvent(Event::RoutablePeer {
1338 peer,
1339 address: a,
1340 }));
1341 }
1342 (Some(a), BucketInserts::OnConnected) => {
1343 let addresses = Addresses::new(a);
1344 match entry.insert(addresses.clone(), new_status) {
1345 kbucket::InsertResult::Inserted => {
1346 self.bootstrap_on_low_peers();
1347
1348 let event = Event::RoutingUpdated {
1349 peer,
1350 is_new_peer: true,
1351 addresses,
1352 old_peer: None,
1353 bucket_range: self
1354 .kbuckets
1355 .bucket(&key)
1356 .map(|b| b.range())
1357 .expect("Not kbucket::Entry::SelfEntry."),
1358 };
1359 self.queued_events.push_back(ToSwarm::GenerateEvent(event));
1360 }
1361 kbucket::InsertResult::Full => {
1362 tracing::debug!(
1363 %peer,
1364 "Bucket full. Peer not added to routing table"
1365 );
1366 let address = addresses.first().clone();
1367 self.queued_events.push_back(ToSwarm::GenerateEvent(
1368 Event::RoutablePeer { peer, address },
1369 ));
1370 }
1371 kbucket::InsertResult::Pending { disconnected } => {
1372 let address = addresses.first().clone();
1373 self.queued_events.push_back(ToSwarm::GenerateEvent(
1374 Event::PendingRoutablePeer { peer, address },
1375 ));
1376
1377 // `disconnected` might already be in the process of re-connecting.
1378 // In other words `disconnected` might have already re-connected but
1379 // is not yet confirmed to support the Kademlia protocol via
1380 // [`HandlerEvent::ProtocolConfirmed`].
1381 //
1382 // Only try dialing peer if not currently connected.
1383 if !self.connected_peers.contains(disconnected.preimage()) {
1384 self.queued_events.push_back(ToSwarm::Dial {
1385 opts: DialOpts::peer_id(disconnected.into_preimage())
1386 .build(),
1387 })
1388 }
1389 }
1390 }
1391 }
1392 }
1393 }
1394 _ => {}
1395 }
1396 }
1397
1398 /// A new peer has been inserted in the routing table but we check if the routing
1399 /// table is currently small (less that `K_VALUE` peers are present) and only
1400 /// trigger a bootstrap in that case
1401 fn bootstrap_on_low_peers(&mut self) {
1402 if self
1403 .kbuckets()
1404 .map(|kbucket| kbucket.num_entries())
1405 .sum::<usize>()
1406 < K_VALUE.get()
1407 {
1408 self.bootstrap_status.trigger();
1409 }
1410 }
1411
1412 /// Handles a finished (i.e. successful) query.
1413 fn query_finished(&mut self, q: Query) -> Option<Event> {
1414 let query_id = q.id();
1415 tracing::trace!(query=?query_id, "Query finished");
1416 match q.info {
1417 QueryInfo::Bootstrap {
1418 peer,
1419 remaining,
1420 mut step,
1421 } => {
1422 let local_key = *self.kbuckets.local_key();
1423 let mut remaining = remaining.unwrap_or_else(|| {
1424 debug_assert_eq!(&peer, local_key.preimage());
1425 // The lookup for the local key finished. To complete the bootstrap process,
1426 // a bucket refresh should be performed for every bucket farther away than
1427 // the first non-empty bucket (which are most likely no more than the last
1428 // few, i.e. farthest, buckets).
1429 self.kbuckets
1430 .iter()
1431 .skip_while(|b| b.is_empty())
1432 .skip(1) // Skip the bucket with the closest neighbour.
1433 .map(|b| {
1434 // Try to find a key that falls into the bucket. While such keys can
1435 // be generated fully deterministically, the current libp2p kademlia
1436 // wire protocol requires transmission of the preimages of the actual
1437 // keys in the DHT keyspace, hence for now this is just a "best effort"
1438 // to find a key that hashes into a specific bucket. The probabilities
1439 // of finding a key in the bucket `b` with as most 16 trials are as
1440 // follows:
1441 //
1442 // Pr(bucket-255) = 1 - (1/2)^16 ~= 1
1443 // Pr(bucket-254) = 1 - (3/4)^16 ~= 1
1444 // Pr(bucket-253) = 1 - (7/8)^16 ~= 0.88
1445 // Pr(bucket-252) = 1 - (15/16)^16 ~= 0.64
1446 // ...
1447 let mut target = kbucket::Key::from(PeerId::random());
1448 for _ in 0..16 {
1449 let d = local_key.distance(&target);
1450 if b.contains(&d) {
1451 break;
1452 }
1453 target = kbucket::Key::from(PeerId::random());
1454 }
1455 target
1456 })
1457 .collect::<Vec<_>>()
1458 .into_iter()
1459 });
1460
1461 let num_remaining = remaining.len() as u32;
1462
1463 if let Some(target) = remaining.next() {
1464 let info = QueryInfo::Bootstrap {
1465 peer: *target.preimage(),
1466 remaining: Some(remaining),
1467 step: step.next(),
1468 };
1469 let peers = self.kbuckets.closest_keys(&target);
1470 self.queries
1471 .continue_iter_closest(query_id, target, peers, info);
1472 } else {
1473 step.last = true;
1474 self.bootstrap_status.on_finish();
1475 };
1476
1477 Some(Event::OutboundQueryProgressed {
1478 id: query_id,
1479 stats: q.stats,
1480 result: QueryResult::Bootstrap(Ok(BootstrapOk {
1481 peer,
1482 num_remaining,
1483 })),
1484 step,
1485 })
1486 }
1487
1488 QueryInfo::GetClosestPeers { key, mut step } => {
1489 step.last = true;
1490
1491 Some(Event::OutboundQueryProgressed {
1492 id: query_id,
1493 stats: q.stats,
1494 result: QueryResult::GetClosestPeers(Ok(GetClosestPeersOk {
1495 key,
1496 peers: q.peers.into_peerinfos_iter().collect(),
1497 })),
1498 step,
1499 })
1500 }
1501
1502 QueryInfo::GetProviders { mut step, .. } => {
1503 step.last = true;
1504
1505 Some(Event::OutboundQueryProgressed {
1506 id: query_id,
1507 stats: q.stats,
1508 result: QueryResult::GetProviders(Ok(
1509 GetProvidersOk::FinishedWithNoAdditionalRecord {
1510 closest_peers: q.peers.into_peerids_iter().collect(),
1511 },
1512 )),
1513 step,
1514 })
1515 }
1516
1517 QueryInfo::AddProvider {
1518 context,
1519 key,
1520 phase: AddProviderPhase::GetClosestPeers,
1521 } => {
1522 let provider_id = self.local_peer_id;
1523 let external_addresses = self.external_addresses.iter().cloned().collect();
1524 let info = QueryInfo::AddProvider {
1525 context,
1526 key,
1527 phase: AddProviderPhase::AddProvider {
1528 provider_id,
1529 external_addresses,
1530 get_closest_peers_stats: q.stats,
1531 },
1532 };
1533 self.queries
1534 .continue_fixed(query_id, q.peers.into_peerids_iter(), info);
1535 None
1536 }
1537
1538 QueryInfo::AddProvider {
1539 context,
1540 key,
1541 phase:
1542 AddProviderPhase::AddProvider {
1543 get_closest_peers_stats,
1544 ..
1545 },
1546 } => match context {
1547 AddProviderContext::Publish => Some(Event::OutboundQueryProgressed {
1548 id: query_id,
1549 stats: get_closest_peers_stats.merge(q.stats),
1550 result: QueryResult::StartProviding(Ok(AddProviderOk { key })),
1551 step: ProgressStep::first_and_last(),
1552 }),
1553 AddProviderContext::Republish => Some(Event::OutboundQueryProgressed {
1554 id: query_id,
1555 stats: get_closest_peers_stats.merge(q.stats),
1556 result: QueryResult::RepublishProvider(Ok(AddProviderOk { key })),
1557 step: ProgressStep::first_and_last(),
1558 }),
1559 },
1560
1561 QueryInfo::GetRecord {
1562 key,
1563 mut step,
1564 found_a_record,
1565 cache_candidates,
1566 } => {
1567 step.last = true;
1568
1569 let results = if found_a_record {
1570 Ok(GetRecordOk::FinishedWithNoAdditionalRecord { cache_candidates })
1571 } else {
1572 Err(GetRecordError::NotFound {
1573 key,
1574 closest_peers: q.peers.into_peerids_iter().collect(),
1575 })
1576 };
1577 Some(Event::OutboundQueryProgressed {
1578 id: query_id,
1579 stats: q.stats,
1580 result: QueryResult::GetRecord(results),
1581 step,
1582 })
1583 }
1584
1585 QueryInfo::PutRecord {
1586 context,
1587 record,
1588 quorum,
1589 phase: PutRecordPhase::GetClosestPeers,
1590 } => {
1591 let info = QueryInfo::PutRecord {
1592 context,
1593 record,
1594 quorum,
1595 phase: PutRecordPhase::PutRecord {
1596 success: vec![],
1597 get_closest_peers_stats: q.stats,
1598 },
1599 };
1600 self.queries
1601 .continue_fixed(query_id, q.peers.into_peerids_iter(), info);
1602 None
1603 }
1604
1605 QueryInfo::PutRecord {
1606 context,
1607 record,
1608 quorum,
1609 phase:
1610 PutRecordPhase::PutRecord {
1611 success,
1612 get_closest_peers_stats,
1613 },
1614 } => {
1615 let mk_result = |key: record::Key| {
1616 if success.len() >= quorum.get() {
1617 Ok(PutRecordOk { key })
1618 } else {
1619 Err(PutRecordError::QuorumFailed {
1620 key,
1621 quorum,
1622 success,
1623 })
1624 }
1625 };
1626 match context {
1627 PutRecordContext::Publish | PutRecordContext::Custom => {
1628 Some(Event::OutboundQueryProgressed {
1629 id: query_id,
1630 stats: get_closest_peers_stats.merge(q.stats),
1631 result: QueryResult::PutRecord(mk_result(record.key)),
1632 step: ProgressStep::first_and_last(),
1633 })
1634 }
1635 PutRecordContext::Republish => Some(Event::OutboundQueryProgressed {
1636 id: query_id,
1637 stats: get_closest_peers_stats.merge(q.stats),
1638 result: QueryResult::RepublishRecord(mk_result(record.key)),
1639 step: ProgressStep::first_and_last(),
1640 }),
1641 PutRecordContext::Replicate => {
1642 tracing::debug!(record=?record.key, "Record replicated");
1643 None
1644 }
1645 }
1646 }
1647 }
1648 }
1649
1650 /// Handles a query that timed out.
1651 fn query_timeout(&mut self, query: Query) -> Option<Event> {
1652 let query_id = query.id();
1653 tracing::trace!(query=?query_id, "Query timed out");
1654 match query.info {
1655 QueryInfo::Bootstrap {
1656 peer,
1657 mut remaining,
1658 mut step,
1659 } => {
1660 let num_remaining = remaining.as_ref().map(|r| r.len().saturating_sub(1) as u32);
1661
1662 // Continue with the next bootstrap query if `remaining` is not empty.
1663 if let Some((target, remaining)) =
1664 remaining.take().and_then(|mut r| Some((r.next()?, r)))
1665 {
1666 let info = QueryInfo::Bootstrap {
1667 peer: target.into_preimage(),
1668 remaining: Some(remaining),
1669 step: step.next(),
1670 };
1671 let peers = self.kbuckets.closest_keys(&target);
1672 self.queries
1673 .continue_iter_closest(query_id, target, peers, info);
1674 } else {
1675 step.last = true;
1676 self.bootstrap_status.on_finish();
1677 }
1678
1679 Some(Event::OutboundQueryProgressed {
1680 id: query_id,
1681 stats: query.stats,
1682 result: QueryResult::Bootstrap(Err(BootstrapError::Timeout {
1683 peer,
1684 num_remaining,
1685 })),
1686 step,
1687 })
1688 }
1689
1690 QueryInfo::AddProvider { context, key, .. } => Some(match context {
1691 AddProviderContext::Publish => Event::OutboundQueryProgressed {
1692 id: query_id,
1693 stats: query.stats,
1694 result: QueryResult::StartProviding(Err(AddProviderError::Timeout { key })),
1695 step: ProgressStep::first_and_last(),
1696 },
1697 AddProviderContext::Republish => Event::OutboundQueryProgressed {
1698 id: query_id,
1699 stats: query.stats,
1700 result: QueryResult::RepublishProvider(Err(AddProviderError::Timeout { key })),
1701 step: ProgressStep::first_and_last(),
1702 },
1703 }),
1704
1705 QueryInfo::GetClosestPeers { key, mut step } => {
1706 step.last = true;
1707 Some(Event::OutboundQueryProgressed {
1708 id: query_id,
1709 stats: query.stats,
1710 result: QueryResult::GetClosestPeers(Err(GetClosestPeersError::Timeout {
1711 key,
1712 peers: query.peers.into_peerinfos_iter().collect(),
1713 })),
1714 step,
1715 })
1716 }
1717
1718 QueryInfo::PutRecord {
1719 record,
1720 quorum,
1721 context,
1722 phase,
1723 } => {
1724 let err = Err(PutRecordError::Timeout {
1725 key: record.key,
1726 quorum,
1727 success: match phase {
1728 PutRecordPhase::GetClosestPeers => vec![],
1729 PutRecordPhase::PutRecord { ref success, .. } => success.clone(),
1730 },
1731 });
1732 match context {
1733 PutRecordContext::Publish | PutRecordContext::Custom => {
1734 Some(Event::OutboundQueryProgressed {
1735 id: query_id,
1736 stats: query.stats,
1737 result: QueryResult::PutRecord(err),
1738 step: ProgressStep::first_and_last(),
1739 })
1740 }
1741 PutRecordContext::Republish => Some(Event::OutboundQueryProgressed {
1742 id: query_id,
1743 stats: query.stats,
1744 result: QueryResult::RepublishRecord(err),
1745 step: ProgressStep::first_and_last(),
1746 }),
1747 PutRecordContext::Replicate => match phase {
1748 PutRecordPhase::GetClosestPeers => {
1749 tracing::warn!(
1750 "Locating closest peers for replication failed: {:?}",
1751 err
1752 );
1753 None
1754 }
1755 PutRecordPhase::PutRecord { .. } => {
1756 tracing::debug!("Replicating record failed: {:?}", err);
1757 None
1758 }
1759 },
1760 }
1761 }
1762
1763 QueryInfo::GetRecord { key, mut step, .. } => {
1764 step.last = true;
1765
1766 Some(Event::OutboundQueryProgressed {
1767 id: query_id,
1768 stats: query.stats,
1769 result: QueryResult::GetRecord(Err(GetRecordError::Timeout { key })),
1770 step,
1771 })
1772 }
1773
1774 QueryInfo::GetProviders { key, mut step, .. } => {
1775 step.last = true;
1776
1777 Some(Event::OutboundQueryProgressed {
1778 id: query_id,
1779 stats: query.stats,
1780 result: QueryResult::GetProviders(Err(GetProvidersError::Timeout {
1781 key,
1782 closest_peers: query.peers.into_peerids_iter().collect(),
1783 })),
1784 step,
1785 })
1786 }
1787 }
1788 }
1789
1790 /// Processes a record received from a peer.
1791 fn record_received(
1792 &mut self,
1793 source: PeerId,
1794 connection: ConnectionId,
1795 request_id: RequestId,
1796 mut record: Record,
1797 ) {
1798 if record.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
1799 // If the (alleged) publisher is the local node, do nothing. The record of
1800 // the original publisher should never change as a result of replication
1801 // and the publisher is always assumed to have the "right" value.
1802 self.queued_events.push_back(ToSwarm::NotifyHandler {
1803 peer_id: source,
1804 handler: NotifyHandler::One(connection),
1805 event: HandlerIn::PutRecordRes {
1806 key: record.key,
1807 value: record.value,
1808 request_id,
1809 },
1810 });
1811 return;
1812 }
1813
1814 let now = Instant::now();
1815
1816 // Calculate the expiration exponentially inversely proportional to the
1817 // number of nodes between the local node and the closest node to the key
1818 // (beyond the replication factor). This ensures avoiding over-caching
1819 // outside of the k closest nodes to a key.
1820 let target = kbucket::Key::new(record.key.clone());
1821 let num_between = self.kbuckets.count_nodes_between(&target);
1822 let k = self.queries.config().replication_factor.get();
1823 let num_beyond_k = (usize::max(k, num_between) - k) as u32;
1824 let expiration = self
1825 .record_ttl
1826 .map(|ttl| now + exp_decrease(ttl, num_beyond_k));
1827 // The smaller TTL prevails. Only if neither TTL is set is the record
1828 // stored "forever".
1829 record.expires = record.expires.or(expiration).min(expiration);
1830
1831 if let Some(job) = self.put_record_job.as_mut() {
1832 // Ignore the record in the next run of the replication
1833 // job, since we can assume the sender replicated the
1834 // record to the k closest peers. Effectively, only
1835 // one of the k closest peers performs a replication
1836 // in the configured interval, assuming a shared interval.
1837 job.skip(record.key.clone())
1838 }
1839
1840 // While records received from a publisher, as well as records that do
1841 // not exist locally should always (attempted to) be stored, there is a
1842 // choice here w.r.t. the handling of replicated records whose keys refer
1843 // to records that exist locally: The value and / or the publisher may
1844 // either be overridden or left unchanged. At the moment and in the
1845 // absence of a decisive argument for another option, both are always
1846 // overridden as it avoids having to load the existing record in the
1847 // first place.
1848
1849 if !record.is_expired(now) {
1850 // The record is cloned because of the weird libp2p protocol
1851 // requirement to send back the value in the response, although this
1852 // is a waste of resources.
1853 match self.record_filtering {
1854 StoreInserts::Unfiltered => match self.store.put(record.clone()) {
1855 Ok(()) => {
1856 tracing::debug!(
1857 record=?record.key,
1858 "Record stored: {} bytes",
1859 record.value.len()
1860 );
1861 self.queued_events.push_back(ToSwarm::GenerateEvent(
1862 Event::InboundRequest {
1863 request: InboundRequest::PutRecord {
1864 source,
1865 connection,
1866 record: None,
1867 },
1868 },
1869 ));
1870 }
1871 Err(e) => {
1872 tracing::info!("Record not stored: {:?}", e);
1873 self.queued_events.push_back(ToSwarm::NotifyHandler {
1874 peer_id: source,
1875 handler: NotifyHandler::One(connection),
1876 event: HandlerIn::Reset(request_id),
1877 });
1878
1879 return;
1880 }
1881 },
1882 StoreInserts::FilterBoth => {
1883 self.queued_events
1884 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1885 request: InboundRequest::PutRecord {
1886 source,
1887 connection,
1888 record: Some(record.clone()),
1889 },
1890 }));
1891 }
1892 }
1893 }
1894
1895 // The remote receives a [`HandlerIn::PutRecordRes`] even in the
1896 // case where the record is discarded due to being expired. Given that
1897 // the remote sent the local node a [`HandlerEvent::PutRecord`]
1898 // request, the remote perceives the local node as one node among the k
1899 // closest nodes to the target. In addition returning
1900 // [`HandlerIn::PutRecordRes`] does not reveal any internal
1901 // information to a possibly malicious remote node.
1902 self.queued_events.push_back(ToSwarm::NotifyHandler {
1903 peer_id: source,
1904 handler: NotifyHandler::One(connection),
1905 event: HandlerIn::PutRecordRes {
1906 key: record.key,
1907 value: record.value,
1908 request_id,
1909 },
1910 })
1911 }
1912
1913 /// Processes a provider record received from a peer.
1914 fn provider_received(&mut self, key: record::Key, provider: KadPeer) {
1915 if &provider.node_id != self.kbuckets.local_key().preimage() {
1916 let record = ProviderRecord {
1917 key,
1918 provider: provider.node_id,
1919 expires: self.provider_record_ttl.map(|ttl| Instant::now() + ttl),
1920 addresses: provider.multiaddrs,
1921 };
1922 match self.record_filtering {
1923 StoreInserts::Unfiltered => {
1924 if let Err(e) = self.store.add_provider(record) {
1925 tracing::info!("Provider record not stored: {:?}", e);
1926 return;
1927 }
1928
1929 self.queued_events
1930 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1931 request: InboundRequest::AddProvider { record: None },
1932 }));
1933 }
1934 StoreInserts::FilterBoth => {
1935 self.queued_events
1936 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1937 request: InboundRequest::AddProvider {
1938 record: Some(record),
1939 },
1940 }));
1941 }
1942 }
1943 }
1944 }
1945
1946 fn address_failed(&mut self, peer_id: PeerId, address: &Multiaddr) {
1947 let key = kbucket::Key::from(peer_id);
1948
1949 if let Some(addrs) = self.kbuckets.entry(&key).as_mut().and_then(|e| e.value()) {
1950 // TODO: Ideally, the address should only be removed if the error can
1951 // be classified as "permanent" but since `err` is currently a borrowed
1952 // trait object without a `'static` bound, even downcasting for inspection
1953 // of the error is not possible (and also not truly desirable or ergonomic).
1954 // The error passed in should rather be a dedicated enum.
1955 if addrs.remove(address).is_ok() {
1956 tracing::debug!(
1957 peer=%peer_id,
1958 %address,
1959 "Address removed from peer due to error."
1960 );
1961 } else {
1962 // Despite apparently having no reachable address (any longer),
1963 // the peer is kept in the routing table with the last address to avoid
1964 // (temporary) loss of network connectivity to "flush" the routing
1965 // table. Once in, a peer is only removed from the routing table
1966 // if it is the least recently connected peer, currently disconnected
1967 // and is unreachable in the context of another peer pending insertion
1968 // into the same bucket. This is handled transparently by the
1969 // `KBucketsTable` and takes effect through `KBucketsTable::take_applied_pending`
1970 // within `Behaviour::poll`.
1971 tracing::debug!(
1972 peer=%peer_id,
1973 %address,
1974 "Last remaining address of peer is unreachable."
1975 );
1976 }
1977 }
1978
1979 for query in self.queries.iter_mut() {
1980 if let Some(addrs) = query.peers.addresses.get_mut(&peer_id) {
1981 addrs.retain(|a| a != address);
1982 }
1983 }
1984 }
1985
1986 fn on_connection_established(
1987 &mut self,
1988 ConnectionEstablished {
1989 peer_id,
1990 failed_addresses,
1991 other_established,
1992 ..
1993 }: ConnectionEstablished,
1994 ) {
1995 for addr in failed_addresses {
1996 self.address_failed(peer_id, addr);
1997 }
1998
1999 // Peer's first connection.
2000 if other_established == 0 {
2001 self.connected_peers.insert(peer_id);
2002 }
2003 }
2004
2005 fn on_address_change(
2006 &mut self,
2007 AddressChange {
2008 peer_id: peer,
2009 old,
2010 new,
2011 ..
2012 }: AddressChange,
2013 ) {
2014 let (old, new) = (old.get_remote_address(), new.get_remote_address());
2015
2016 // Update routing table.
2017 if let Some(addrs) = self
2018 .kbuckets
2019 .entry(&kbucket::Key::from(peer))
2020 .as_mut()
2021 .and_then(|e| e.value())
2022 {
2023 if addrs.replace(old, new) {
2024 tracing::debug!(
2025 %peer,
2026 old_address=%old,
2027 new_address=%new,
2028 "Old address replaced with new address for peer."
2029 );
2030 } else {
2031 tracing::debug!(
2032 %peer,
2033 old_address=%old,
2034 new_address=%new,
2035 "Old address not replaced with new address for peer as old address wasn't present.",
2036 );
2037 }
2038 } else {
2039 tracing::debug!(
2040 %peer,
2041 old_address=%old,
2042 new_address=%new,
2043 "Old address not replaced with new address for peer as peer is not present in the \
2044 routing table."
2045 );
2046 }
2047
2048 // Update query address cache.
2049 //
2050 // Given two connected nodes: local node A and remote node B. Say node B
2051 // is not in node A's routing table. Additionally node B is part of the
2052 // `Query::addresses` list of an ongoing query on node A. Say Node
2053 // B triggers an address change and then disconnects. Later on the
2054 // earlier mentioned query on node A would like to connect to node B.
2055 // Without replacing the address in the `Query::addresses` set node
2056 // A would attempt to dial the old and not the new address.
2057 //
2058 // While upholding correctness, iterating through all discovered
2059 // addresses of a peer in all currently ongoing queries might have a
2060 // large performance impact. If so, the code below might be worth
2061 // revisiting.
2062 for query in self.queries.iter_mut() {
2063 if let Some(addrs) = query.peers.addresses.get_mut(&peer) {
2064 for addr in addrs.iter_mut() {
2065 if addr == old {
2066 *addr = new.clone();
2067 }
2068 }
2069 }
2070 }
2071 }
2072
2073 fn on_dial_failure(&mut self, DialFailure { peer_id, error, .. }: DialFailure) {
2074 let Some(peer_id) = peer_id else { return };
2075
2076 match error {
2077 DialError::LocalPeerId { .. }
2078 | DialError::WrongPeerId { .. }
2079 | DialError::Aborted
2080 | DialError::Denied { .. }
2081 | DialError::Transport(_)
2082 | DialError::NoAddresses => {
2083 if let DialError::Transport(addresses) = error {
2084 for (addr, _) in addresses {
2085 self.address_failed(peer_id, addr)
2086 }
2087 }
2088
2089 for query in self.queries.iter_mut() {
2090 query.on_failure(&peer_id);
2091 }
2092 }
2093 DialError::DialPeerConditionFalse(
2094 dial_opts::PeerCondition::Disconnected
2095 | dial_opts::PeerCondition::NotDialing
2096 | dial_opts::PeerCondition::DisconnectedAndNotDialing,
2097 ) => {
2098 // We might (still) be connected, or about to be connected, thus do not report the
2099 // failure to the queries.
2100 }
2101 DialError::DialPeerConditionFalse(dial_opts::PeerCondition::Always) => {
2102 unreachable!("DialPeerCondition::Always can not trigger DialPeerConditionFalse.");
2103 }
2104 }
2105 }
2106
2107 fn on_connection_closed(
2108 &mut self,
2109 ConnectionClosed {
2110 peer_id,
2111 remaining_established,
2112 connection_id,
2113 ..
2114 }: ConnectionClosed,
2115 ) {
2116 self.connections.remove(&connection_id);
2117
2118 if remaining_established == 0 {
2119 for query in self.queries.iter_mut() {
2120 query.on_failure(&peer_id);
2121 }
2122 self.connection_updated(peer_id, None, NodeStatus::Disconnected);
2123 self.connected_peers.remove(&peer_id);
2124 }
2125 }
2126
2127 /// Preloads a new [`Handler`] with requests that are waiting to be sent to the newly connected peer.
2128 fn preload_new_handler(
2129 &mut self,
2130 handler: &mut Handler,
2131 connection_id: ConnectionId,
2132 peer: PeerId,
2133 ) {
2134 self.connections.insert(connection_id, peer);
2135 // Queue events for sending pending RPCs to the connected peer.
2136 // There can be only one pending RPC for a particular peer and query per definition.
2137 for (_peer_id, event) in self.queries.iter_mut().filter_map(|q| {
2138 q.pending_rpcs
2139 .iter()
2140 .position(|(p, _)| p == &peer)
2141 .map(|p| q.pending_rpcs.remove(p))
2142 }) {
2143 handler.on_behaviour_event(event)
2144 }
2145 }
2146}
2147
2148/// Exponentially decrease the given duration (base 2).
2149fn exp_decrease(ttl: Duration, exp: u32) -> Duration {
2150 Duration::from_secs(ttl.as_secs().checked_shr(exp).unwrap_or(0))
2151}
2152
2153impl<TStore> NetworkBehaviour for Behaviour<TStore>
2154where
2155 TStore: RecordStore + Send + 'static,
2156{
2157 type ConnectionHandler = Handler;
2158 type ToSwarm = Event;
2159
2160 fn handle_established_inbound_connection(
2161 &mut self,
2162 connection_id: ConnectionId,
2163 peer: PeerId,
2164 local_addr: &Multiaddr,
2165 remote_addr: &Multiaddr,
2166 ) -> Result<THandler<Self>, ConnectionDenied> {
2167 let connected_point = ConnectedPoint::Listener {
2168 local_addr: local_addr.clone(),
2169 send_back_addr: remote_addr.clone(),
2170 };
2171
2172 let mut handler = Handler::new(
2173 self.protocol_config.clone(),
2174 connected_point,
2175 peer,
2176 self.mode,
2177 );
2178 self.preload_new_handler(&mut handler, connection_id, peer);
2179
2180 Ok(handler)
2181 }
2182
2183 fn handle_established_outbound_connection(
2184 &mut self,
2185 connection_id: ConnectionId,
2186 peer: PeerId,
2187 addr: &Multiaddr,
2188 role_override: Endpoint,
2189 port_use: PortUse,
2190 ) -> Result<THandler<Self>, ConnectionDenied> {
2191 let connected_point = ConnectedPoint::Dialer {
2192 address: addr.clone(),
2193 role_override,
2194 port_use,
2195 };
2196
2197 let mut handler = Handler::new(
2198 self.protocol_config.clone(),
2199 connected_point,
2200 peer,
2201 self.mode,
2202 );
2203 self.preload_new_handler(&mut handler, connection_id, peer);
2204
2205 Ok(handler)
2206 }
2207
2208 fn handle_pending_outbound_connection(
2209 &mut self,
2210 _connection_id: ConnectionId,
2211 maybe_peer: Option<PeerId>,
2212 _addresses: &[Multiaddr],
2213 _effective_role: Endpoint,
2214 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
2215 let peer_id = match maybe_peer {
2216 None => return Ok(vec![]),
2217 Some(peer) => peer,
2218 };
2219
2220 // We should order addresses from decreasing likelihood of connectivity, so start with
2221 // the addresses of that peer in the k-buckets.
2222 let key = kbucket::Key::from(peer_id);
2223 let mut peer_addrs =
2224 if let Some(kbucket::Entry::Present(mut entry, _)) = self.kbuckets.entry(&key) {
2225 let addrs = entry.value().iter().cloned().collect::<Vec<_>>();
2226 debug_assert!(!addrs.is_empty(), "Empty peer addresses in routing table.");
2227 addrs
2228 } else {
2229 Vec::new()
2230 };
2231
2232 // We add to that a temporary list of addresses from the ongoing queries.
2233 for query in self.queries.iter() {
2234 if let Some(addrs) = query.peers.addresses.get(&peer_id) {
2235 peer_addrs.extend(addrs.iter().cloned())
2236 }
2237 }
2238
2239 Ok(peer_addrs)
2240 }
2241
2242 fn on_connection_handler_event(
2243 &mut self,
2244 source: PeerId,
2245 connection: ConnectionId,
2246 event: THandlerOutEvent<Self>,
2247 ) {
2248 match event {
2249 HandlerEvent::ProtocolConfirmed { endpoint } => {
2250 debug_assert!(self.connected_peers.contains(&source));
2251 // The remote's address can only be put into the routing table,
2252 // and thus shared with other nodes, if the local node is the dialer,
2253 // since the remote address on an inbound connection may be specific
2254 // to that connection (e.g. typically the TCP port numbers).
2255 let address = match endpoint {
2256 ConnectedPoint::Dialer { address, .. } => Some(address),
2257 ConnectedPoint::Listener { .. } => None,
2258 };
2259
2260 self.connection_updated(source, address, NodeStatus::Connected);
2261 }
2262
2263 HandlerEvent::ProtocolNotSupported { endpoint } => {
2264 let address = match endpoint {
2265 ConnectedPoint::Dialer { address, .. } => Some(address),
2266 ConnectedPoint::Listener { .. } => None,
2267 };
2268 self.connection_updated(source, address, NodeStatus::Disconnected);
2269 }
2270
2271 HandlerEvent::FindNodeReq { key, request_id } => {
2272 let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
2273
2274 self.queued_events
2275 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2276 request: InboundRequest::FindNode {
2277 num_closer_peers: closer_peers.len(),
2278 },
2279 }));
2280
2281 self.queued_events.push_back(ToSwarm::NotifyHandler {
2282 peer_id: source,
2283 handler: NotifyHandler::One(connection),
2284 event: HandlerIn::FindNodeRes {
2285 closer_peers,
2286 request_id,
2287 },
2288 });
2289 }
2290
2291 HandlerEvent::FindNodeRes {
2292 closer_peers,
2293 query_id,
2294 } => {
2295 self.discovered(&query_id, &source, closer_peers.iter());
2296 }
2297
2298 HandlerEvent::GetProvidersReq { key, request_id } => {
2299 let provider_peers = self.provider_peers(&key, &source);
2300 let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
2301
2302 self.queued_events
2303 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2304 request: InboundRequest::GetProvider {
2305 num_closer_peers: closer_peers.len(),
2306 num_provider_peers: provider_peers.len(),
2307 },
2308 }));
2309
2310 self.queued_events.push_back(ToSwarm::NotifyHandler {
2311 peer_id: source,
2312 handler: NotifyHandler::One(connection),
2313 event: HandlerIn::GetProvidersRes {
2314 closer_peers,
2315 provider_peers,
2316 request_id,
2317 },
2318 });
2319 }
2320
2321 HandlerEvent::GetProvidersRes {
2322 closer_peers,
2323 provider_peers,
2324 query_id,
2325 } => {
2326 let peers = closer_peers.iter().chain(provider_peers.iter());
2327 self.discovered(&query_id, &source, peers);
2328 if let Some(query) = self.queries.get_mut(&query_id) {
2329 let stats = query.stats().clone();
2330 if let QueryInfo::GetProviders {
2331 ref key,
2332 ref mut providers_found,
2333 ref mut step,
2334 ..
2335 } = query.info
2336 {
2337 *providers_found += provider_peers.len();
2338 let providers = provider_peers.iter().map(|p| p.node_id).collect();
2339
2340 self.queued_events.push_back(ToSwarm::GenerateEvent(
2341 Event::OutboundQueryProgressed {
2342 id: query_id,
2343 result: QueryResult::GetProviders(Ok(
2344 GetProvidersOk::FoundProviders {
2345 key: key.clone(),
2346 providers,
2347 },
2348 )),
2349 step: step.clone(),
2350 stats,
2351 },
2352 ));
2353 *step = step.next();
2354 }
2355 }
2356 }
2357 HandlerEvent::QueryError { query_id, error } => {
2358 tracing::debug!(
2359 peer=%source,
2360 query=?query_id,
2361 "Request to peer in query failed with {:?}",
2362 error
2363 );
2364 // If the query to which the error relates is still active,
2365 // signal the failure w.r.t. `source`.
2366 if let Some(query) = self.queries.get_mut(&query_id) {
2367 query.on_failure(&source)
2368 }
2369 }
2370
2371 HandlerEvent::AddProvider { key, provider } => {
2372 // Only accept a provider record from a legitimate peer.
2373 if provider.node_id != source {
2374 return;
2375 }
2376
2377 self.provider_received(key, provider);
2378 }
2379
2380 HandlerEvent::GetRecord { key, request_id } => {
2381 // Lookup the record locally.
2382 let record = match self.store.get(&key) {
2383 Some(record) => {
2384 if record.is_expired(Instant::now()) {
2385 self.store.remove(&key);
2386 None
2387 } else {
2388 Some(record.into_owned())
2389 }
2390 }
2391 None => None,
2392 };
2393
2394 let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
2395
2396 self.queued_events
2397 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2398 request: InboundRequest::GetRecord {
2399 num_closer_peers: closer_peers.len(),
2400 present_locally: record.is_some(),
2401 },
2402 }));
2403
2404 self.queued_events.push_back(ToSwarm::NotifyHandler {
2405 peer_id: source,
2406 handler: NotifyHandler::One(connection),
2407 event: HandlerIn::GetRecordRes {
2408 record,
2409 closer_peers,
2410 request_id,
2411 },
2412 });
2413 }
2414
2415 HandlerEvent::GetRecordRes {
2416 record,
2417 closer_peers,
2418 query_id,
2419 } => {
2420 if let Some(query) = self.queries.get_mut(&query_id) {
2421 let stats = query.stats().clone();
2422 if let QueryInfo::GetRecord {
2423 key,
2424 ref mut step,
2425 ref mut found_a_record,
2426 cache_candidates,
2427 } = &mut query.info
2428 {
2429 if let Some(record) = record {
2430 *found_a_record = true;
2431 let record = PeerRecord {
2432 peer: Some(source),
2433 record,
2434 };
2435
2436 self.queued_events.push_back(ToSwarm::GenerateEvent(
2437 Event::OutboundQueryProgressed {
2438 id: query_id,
2439 result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(
2440 record,
2441 ))),
2442 step: step.clone(),
2443 stats,
2444 },
2445 ));
2446
2447 *step = step.next();
2448 } else {
2449 tracing::trace!(record=?key, %source, "Record not found at source");
2450 if let Caching::Enabled { max_peers } = self.caching {
2451 let source_key = kbucket::Key::from(source);
2452 let target_key = kbucket::Key::from(key.clone());
2453 let distance = source_key.distance(&target_key);
2454 cache_candidates.insert(distance, source);
2455 if cache_candidates.len() > max_peers as usize {
2456 // TODO: `pop_last()` would be nice once stabilised.
2457 // See https://github.com/rust-lang/rust/issues/62924.
2458 let last =
2459 *cache_candidates.keys().next_back().expect("len > 0");
2460 cache_candidates.remove(&last);
2461 }
2462 }
2463 }
2464 }
2465 }
2466
2467 self.discovered(&query_id, &source, closer_peers.iter());
2468 }
2469
2470 HandlerEvent::PutRecord { record, request_id } => {
2471 self.record_received(source, connection, request_id, record);
2472 }
2473
2474 HandlerEvent::PutRecordRes { query_id, .. } => {
2475 if let Some(query) = self.queries.get_mut(&query_id) {
2476 query.on_success(&source, vec![]);
2477 if let QueryInfo::PutRecord {
2478 phase: PutRecordPhase::PutRecord { success, .. },
2479 quorum,
2480 ..
2481 } = &mut query.info
2482 {
2483 success.push(source);
2484
2485 let quorum = quorum.get();
2486 if success.len() >= quorum {
2487 let peers = success.clone();
2488 let finished = query.try_finish(peers.iter());
2489 if !finished {
2490 tracing::debug!(
2491 peer=%source,
2492 query=?query_id,
2493 "PutRecord query reached quorum ({}/{}) with response \
2494 from peer but could not yet finish.",
2495 peers.len(),
2496 quorum,
2497 );
2498 }
2499 }
2500 }
2501 }
2502 }
2503 };
2504 }
2505
2506 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
2507 fn poll(
2508 &mut self,
2509 cx: &mut Context<'_>,
2510 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
2511 let now = Instant::now();
2512
2513 // Calculate the available capacity for queries triggered by background jobs.
2514 let mut jobs_query_capacity = JOBS_MAX_QUERIES.saturating_sub(self.queries.size());
2515
2516 // Run the periodic provider announcement job.
2517 if let Some(mut job) = self.add_provider_job.take() {
2518 let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2519 for i in 0..num {
2520 if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2521 self.start_add_provider(r.key, AddProviderContext::Republish)
2522 } else {
2523 jobs_query_capacity -= i;
2524 break;
2525 }
2526 }
2527 self.add_provider_job = Some(job);
2528 }
2529
2530 // Run the periodic record replication / publication job.
2531 if let Some(mut job) = self.put_record_job.take() {
2532 let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2533 for _ in 0..num {
2534 if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2535 let context =
2536 if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
2537 PutRecordContext::Republish
2538 } else {
2539 PutRecordContext::Replicate
2540 };
2541 self.start_put_record(r, Quorum::All, context)
2542 } else {
2543 break;
2544 }
2545 }
2546 self.put_record_job = Some(job);
2547 }
2548
2549 // Poll bootstrap periodically and automatically.
2550 if let Poll::Ready(()) = self.bootstrap_status.poll_next_bootstrap(cx) {
2551 if let Err(e) = self.bootstrap() {
2552 tracing::warn!("Failed to trigger bootstrap: {e}");
2553 }
2554 }
2555
2556 loop {
2557 // Drain queued events first.
2558 if let Some(event) = self.queued_events.pop_front() {
2559 return Poll::Ready(event);
2560 }
2561
2562 // Drain applied pending entries from the routing table.
2563 if let Some(entry) = self.kbuckets.take_applied_pending() {
2564 let kbucket::Node { key, value } = entry.inserted;
2565 let peer_id = key.into_preimage();
2566 self.queued_events
2567 .push_back(ToSwarm::NewExternalAddrOfPeer {
2568 peer_id,
2569 address: value.first().clone(),
2570 });
2571 let event = Event::RoutingUpdated {
2572 bucket_range: self
2573 .kbuckets
2574 .bucket(&key)
2575 .map(|b| b.range())
2576 .expect("Self to never be applied from pending."),
2577 peer: peer_id,
2578 is_new_peer: true,
2579 addresses: value,
2580 old_peer: entry.evicted.map(|n| n.key.into_preimage()),
2581 };
2582 return Poll::Ready(ToSwarm::GenerateEvent(event));
2583 }
2584
2585 // Look for a finished query.
2586 loop {
2587 match self.queries.poll(now) {
2588 QueryPoolState::Finished(q) => {
2589 if let Some(event) = self.query_finished(q) {
2590 return Poll::Ready(ToSwarm::GenerateEvent(event));
2591 }
2592 }
2593 QueryPoolState::Timeout(q) => {
2594 if let Some(event) = self.query_timeout(q) {
2595 return Poll::Ready(ToSwarm::GenerateEvent(event));
2596 }
2597 }
2598 QueryPoolState::Waiting(Some((query, peer_id))) => {
2599 let event = query.info.to_request(query.id());
2600 // TODO: AddProvider requests yield no response, so the query completes
2601 // as soon as all requests have been sent. However, the handler should
2602 // better emit an event when the request has been sent (and report
2603 // an error if sending fails), instead of immediately reporting
2604 // "success" somewhat prematurely here.
2605 if let QueryInfo::AddProvider {
2606 phase: AddProviderPhase::AddProvider { .. },
2607 ..
2608 } = &query.info
2609 {
2610 query.on_success(&peer_id, vec![])
2611 }
2612
2613 if self.connected_peers.contains(&peer_id) {
2614 self.queued_events.push_back(ToSwarm::NotifyHandler {
2615 peer_id,
2616 event,
2617 handler: NotifyHandler::Any,
2618 });
2619 } else if &peer_id != self.kbuckets.local_key().preimage() {
2620 query.pending_rpcs.push((peer_id, event));
2621 self.queued_events.push_back(ToSwarm::Dial {
2622 opts: DialOpts::peer_id(peer_id).build(),
2623 });
2624 }
2625 }
2626 QueryPoolState::Waiting(None) | QueryPoolState::Idle => break,
2627 }
2628 }
2629
2630 // No immediate event was produced as a result of a finished query.
2631 // If no new events have been queued either, signal `NotReady` to
2632 // be polled again later.
2633 if self.queued_events.is_empty() {
2634 self.no_events_waker = Some(cx.waker().clone());
2635
2636 return Poll::Pending;
2637 }
2638 }
2639 }
2640
2641 fn on_swarm_event(&mut self, event: FromSwarm) {
2642 self.listen_addresses.on_swarm_event(&event);
2643 let external_addresses_changed = self.external_addresses.on_swarm_event(&event);
2644
2645 if self.auto_mode && external_addresses_changed {
2646 self.determine_mode_from_external_addresses();
2647 }
2648
2649 match event {
2650 FromSwarm::ConnectionEstablished(connection_established) => {
2651 self.on_connection_established(connection_established)
2652 }
2653 FromSwarm::ConnectionClosed(connection_closed) => {
2654 self.on_connection_closed(connection_closed)
2655 }
2656 FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure),
2657 FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
2658 FromSwarm::NewListenAddr(_) if self.connected_peers.is_empty() => {
2659 // A new listen addr was just discovered and we have no connected peers,
2660 // it can mean that our network interfaces were not up but they are now
2661 // so it might be a good idea to trigger a bootstrap.
2662 self.bootstrap_status.trigger();
2663 }
2664 _ => {}
2665 }
2666 }
2667}
2668
2669/// Peer Info combines a Peer ID with a set of multiaddrs that the peer is listening on.
2670#[derive(Debug, Clone, PartialEq, Eq)]
2671pub struct PeerInfo {
2672 pub peer_id: PeerId,
2673 pub addrs: Vec<Multiaddr>,
2674}
2675
2676/// A quorum w.r.t. the configured replication factor specifies the minimum
2677/// number of distinct nodes that must be successfully contacted in order
2678/// for a query to succeed.
2679#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2680pub enum Quorum {
2681 One,
2682 Majority,
2683 All,
2684 N(NonZeroUsize),
2685}
2686
2687impl Quorum {
2688 /// Evaluate the quorum w.r.t a given total (number of peers).
2689 fn eval(&self, total: NonZeroUsize) -> NonZeroUsize {
2690 match self {
2691 Quorum::One => NonZeroUsize::new(1).expect("1 != 0"),
2692 Quorum::Majority => NonZeroUsize::new(total.get() / 2 + 1).expect("n + 1 != 0"),
2693 Quorum::All => total,
2694 Quorum::N(n) => NonZeroUsize::min(total, *n),
2695 }
2696 }
2697}
2698
2699/// A record either received by the given peer or retrieved from the local
2700/// record store.
2701#[derive(Debug, Clone, PartialEq, Eq)]
2702pub struct PeerRecord {
2703 /// The peer from whom the record was received. `None` if the record was
2704 /// retrieved from local storage.
2705 pub peer: Option<PeerId>,
2706 pub record: Record,
2707}
2708
2709//////////////////////////////////////////////////////////////////////////////
2710// Events
2711
2712/// The events produced by the `Kademlia` behaviour.
2713///
2714/// See [`NetworkBehaviour::poll`].
2715#[derive(Debug, Clone)]
2716#[allow(clippy::large_enum_variant)]
2717pub enum Event {
2718 /// An inbound request has been received and handled.
2719 //
2720 // Note on the difference between 'request' and 'query': A request is a
2721 // single request-response style exchange with a single remote peer. A query
2722 // is made of multiple requests across multiple remote peers.
2723 InboundRequest { request: InboundRequest },
2724
2725 /// An outbound query has made progress.
2726 OutboundQueryProgressed {
2727 /// The ID of the query that finished.
2728 id: QueryId,
2729 /// The intermediate result of the query.
2730 result: QueryResult,
2731 /// Execution statistics from the query.
2732 stats: QueryStats,
2733 /// Indicates which event this is, if therer are multiple responses for a single query.
2734 step: ProgressStep,
2735 },
2736
2737 /// The routing table has been updated with a new peer and / or
2738 /// address, thereby possibly evicting another peer.
2739 RoutingUpdated {
2740 /// The ID of the peer that was added or updated.
2741 peer: PeerId,
2742 /// Whether this is a new peer and was thus just added to the routing
2743 /// table, or whether it is an existing peer who's addresses changed.
2744 is_new_peer: bool,
2745 /// The full list of known addresses of `peer`.
2746 addresses: Addresses,
2747 /// Returns the minimum inclusive and maximum inclusive distance for
2748 /// the bucket of the peer.
2749 bucket_range: (Distance, Distance),
2750 /// The ID of the peer that was evicted from the routing table to make
2751 /// room for the new peer, if any.
2752 old_peer: Option<PeerId>,
2753 },
2754
2755 /// A peer has connected for whom no listen address is known.
2756 ///
2757 /// If the peer is to be added to the routing table, a known
2758 /// listen address for the peer must be provided via [`Behaviour::add_address`].
2759 UnroutablePeer { peer: PeerId },
2760
2761 /// A connection to a peer has been established for whom a listen address
2762 /// is known but the peer has not been added to the routing table either
2763 /// because [`BucketInserts::Manual`] is configured or because
2764 /// the corresponding bucket is full.
2765 ///
2766 /// If the peer is to be included in the routing table, it must
2767 /// must be explicitly added via [`Behaviour::add_address`], possibly after
2768 /// removing another peer.
2769 ///
2770 /// See [`Behaviour::kbucket`] for insight into the contents of
2771 /// the k-bucket of `peer`.
2772 RoutablePeer { peer: PeerId, address: Multiaddr },
2773
2774 /// A connection to a peer has been established for whom a listen address
2775 /// is known but the peer is only pending insertion into the routing table
2776 /// if the least-recently disconnected peer is unresponsive, i.e. the peer
2777 /// may not make it into the routing table.
2778 ///
2779 /// If the peer is to be unconditionally included in the routing table,
2780 /// it should be explicitly added via [`Behaviour::add_address`] after
2781 /// removing another peer.
2782 ///
2783 /// See [`Behaviour::kbucket`] for insight into the contents of
2784 /// the k-bucket of `peer`.
2785 PendingRoutablePeer { peer: PeerId, address: Multiaddr },
2786
2787 /// This peer's mode has been updated automatically.
2788 ///
2789 /// This happens in response to an external
2790 /// address being added or removed.
2791 ModeChanged { new_mode: Mode },
2792}
2793
2794/// Information about progress events.
2795#[derive(Debug, Clone)]
2796pub struct ProgressStep {
2797 /// The index into the event
2798 pub count: NonZeroUsize,
2799 /// Is this the final event?
2800 pub last: bool,
2801}
2802
2803impl ProgressStep {
2804 fn first() -> Self {
2805 Self {
2806 count: NonZeroUsize::new(1).expect("1 to be greater than 0."),
2807 last: false,
2808 }
2809 }
2810
2811 fn first_and_last() -> Self {
2812 let mut first = ProgressStep::first();
2813 first.last = true;
2814 first
2815 }
2816
2817 fn next(&self) -> Self {
2818 assert!(!self.last);
2819 let count = NonZeroUsize::new(self.count.get() + 1).expect("Adding 1 not to result in 0.");
2820
2821 Self { count, last: false }
2822 }
2823}
2824
2825/// Information about a received and handled inbound request.
2826#[derive(Debug, Clone)]
2827pub enum InboundRequest {
2828 /// Request for the list of nodes whose IDs are the closest to `key`.
2829 FindNode { num_closer_peers: usize },
2830 /// Same as `FindNode`, but should also return the entries of the local
2831 /// providers list for this key.
2832 GetProvider {
2833 num_closer_peers: usize,
2834 num_provider_peers: usize,
2835 },
2836 /// A peer sent an add provider request.
2837 /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`ProviderRecord`] is
2838 /// included.
2839 ///
2840 /// See [`StoreInserts`] and [`Config::set_record_filtering`] for details..
2841 AddProvider { record: Option<ProviderRecord> },
2842 /// Request to retrieve a record.
2843 GetRecord {
2844 num_closer_peers: usize,
2845 present_locally: bool,
2846 },
2847 /// A peer sent a put record request.
2848 /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`Record`] is included.
2849 ///
2850 /// See [`StoreInserts`] and [`Config::set_record_filtering`].
2851 PutRecord {
2852 source: PeerId,
2853 connection: ConnectionId,
2854 record: Option<Record>,
2855 },
2856}
2857
2858/// The results of Kademlia queries.
2859#[derive(Debug, Clone)]
2860pub enum QueryResult {
2861 /// The result of [`Behaviour::bootstrap`].
2862 Bootstrap(BootstrapResult),
2863
2864 /// The result of [`Behaviour::get_closest_peers`].
2865 GetClosestPeers(GetClosestPeersResult),
2866
2867 /// The result of [`Behaviour::get_providers`].
2868 GetProviders(GetProvidersResult),
2869
2870 /// The result of [`Behaviour::start_providing`].
2871 StartProviding(AddProviderResult),
2872
2873 /// The result of a (automatic) republishing of a provider record.
2874 RepublishProvider(AddProviderResult),
2875
2876 /// The result of [`Behaviour::get_record`].
2877 GetRecord(GetRecordResult),
2878
2879 /// The result of [`Behaviour::put_record`].
2880 PutRecord(PutRecordResult),
2881
2882 /// The result of a (automatic) republishing of a (value-)record.
2883 RepublishRecord(PutRecordResult),
2884}
2885
2886/// The result of [`Behaviour::get_record`].
2887pub type GetRecordResult = Result<GetRecordOk, GetRecordError>;
2888
2889/// The successful result of [`Behaviour::get_record`].
2890#[derive(Debug, Clone)]
2891pub enum GetRecordOk {
2892 FoundRecord(PeerRecord),
2893 FinishedWithNoAdditionalRecord {
2894 /// If caching is enabled, these are the peers closest
2895 /// _to the record key_ (not the local node) that were queried but
2896 /// did not return the record, sorted by distance to the record key
2897 /// from closest to farthest. How many of these are tracked is configured
2898 /// by [`Config::set_caching`].
2899 ///
2900 /// Writing back the cache at these peers is a manual operation.
2901 /// ie. you may wish to use these candidates with [`Behaviour::put_record_to`]
2902 /// after selecting one of the returned records.
2903 cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
2904 },
2905}
2906
2907/// The error result of [`Behaviour::get_record`].
2908#[derive(Debug, Clone, Error)]
2909pub enum GetRecordError {
2910 #[error("the record was not found")]
2911 NotFound {
2912 key: record::Key,
2913 closest_peers: Vec<PeerId>,
2914 },
2915 #[error("the quorum failed; needed {quorum} peers")]
2916 QuorumFailed {
2917 key: record::Key,
2918 records: Vec<PeerRecord>,
2919 quorum: NonZeroUsize,
2920 },
2921 #[error("the request timed out")]
2922 Timeout { key: record::Key },
2923}
2924
2925impl GetRecordError {
2926 /// Gets the key of the record for which the operation failed.
2927 pub fn key(&self) -> &record::Key {
2928 match self {
2929 GetRecordError::QuorumFailed { key, .. } => key,
2930 GetRecordError::Timeout { key, .. } => key,
2931 GetRecordError::NotFound { key, .. } => key,
2932 }
2933 }
2934
2935 /// Extracts the key of the record for which the operation failed,
2936 /// consuming the error.
2937 pub fn into_key(self) -> record::Key {
2938 match self {
2939 GetRecordError::QuorumFailed { key, .. } => key,
2940 GetRecordError::Timeout { key, .. } => key,
2941 GetRecordError::NotFound { key, .. } => key,
2942 }
2943 }
2944}
2945
2946/// The result of [`Behaviour::put_record`].
2947pub type PutRecordResult = Result<PutRecordOk, PutRecordError>;
2948
2949/// The successful result of [`Behaviour::put_record`].
2950#[derive(Debug, Clone)]
2951pub struct PutRecordOk {
2952 pub key: record::Key,
2953}
2954
2955/// The error result of [`Behaviour::put_record`].
2956#[derive(Debug, Clone, Error)]
2957pub enum PutRecordError {
2958 #[error("the quorum failed; needed {quorum} peers")]
2959 QuorumFailed {
2960 key: record::Key,
2961 /// [`PeerId`]s of the peers the record was successfully stored on.
2962 success: Vec<PeerId>,
2963 quorum: NonZeroUsize,
2964 },
2965 #[error("the request timed out")]
2966 Timeout {
2967 key: record::Key,
2968 /// [`PeerId`]s of the peers the record was successfully stored on.
2969 success: Vec<PeerId>,
2970 quorum: NonZeroUsize,
2971 },
2972}
2973
2974impl PutRecordError {
2975 /// Gets the key of the record for which the operation failed.
2976 pub fn key(&self) -> &record::Key {
2977 match self {
2978 PutRecordError::QuorumFailed { key, .. } => key,
2979 PutRecordError::Timeout { key, .. } => key,
2980 }
2981 }
2982
2983 /// Extracts the key of the record for which the operation failed,
2984 /// consuming the error.
2985 pub fn into_key(self) -> record::Key {
2986 match self {
2987 PutRecordError::QuorumFailed { key, .. } => key,
2988 PutRecordError::Timeout { key, .. } => key,
2989 }
2990 }
2991}
2992
2993/// The result of [`Behaviour::bootstrap`].
2994pub type BootstrapResult = Result<BootstrapOk, BootstrapError>;
2995
2996/// The successful result of [`Behaviour::bootstrap`].
2997#[derive(Debug, Clone)]
2998pub struct BootstrapOk {
2999 pub peer: PeerId,
3000 pub num_remaining: u32,
3001}
3002
3003/// The error result of [`Behaviour::bootstrap`].
3004#[derive(Debug, Clone, Error)]
3005pub enum BootstrapError {
3006 #[error("the request timed out")]
3007 Timeout {
3008 peer: PeerId,
3009 num_remaining: Option<u32>,
3010 },
3011}
3012
3013/// The result of [`Behaviour::get_closest_peers`].
3014pub type GetClosestPeersResult = Result<GetClosestPeersOk, GetClosestPeersError>;
3015
3016/// The successful result of [`Behaviour::get_closest_peers`].
3017#[derive(Debug, Clone)]
3018pub struct GetClosestPeersOk {
3019 pub key: Vec<u8>,
3020 pub peers: Vec<PeerInfo>,
3021}
3022
3023/// The error result of [`Behaviour::get_closest_peers`].
3024#[derive(Debug, Clone, Error)]
3025pub enum GetClosestPeersError {
3026 #[error("the request timed out")]
3027 Timeout { key: Vec<u8>, peers: Vec<PeerInfo> },
3028}
3029
3030impl GetClosestPeersError {
3031 /// Gets the key for which the operation failed.
3032 pub fn key(&self) -> &Vec<u8> {
3033 match self {
3034 GetClosestPeersError::Timeout { key, .. } => key,
3035 }
3036 }
3037
3038 /// Extracts the key for which the operation failed,
3039 /// consuming the error.
3040 pub fn into_key(self) -> Vec<u8> {
3041 match self {
3042 GetClosestPeersError::Timeout { key, .. } => key,
3043 }
3044 }
3045}
3046
3047/// The result of [`Behaviour::get_providers`].
3048pub type GetProvidersResult = Result<GetProvidersOk, GetProvidersError>;
3049
3050/// The successful result of [`Behaviour::get_providers`].
3051#[derive(Debug, Clone)]
3052pub enum GetProvidersOk {
3053 FoundProviders {
3054 key: record::Key,
3055 /// The new set of providers discovered.
3056 providers: HashSet<PeerId>,
3057 },
3058 FinishedWithNoAdditionalRecord {
3059 closest_peers: Vec<PeerId>,
3060 },
3061}
3062
3063/// The error result of [`Behaviour::get_providers`].
3064#[derive(Debug, Clone, Error)]
3065pub enum GetProvidersError {
3066 #[error("the request timed out")]
3067 Timeout {
3068 key: record::Key,
3069 closest_peers: Vec<PeerId>,
3070 },
3071}
3072
3073impl GetProvidersError {
3074 /// Gets the key for which the operation failed.
3075 pub fn key(&self) -> &record::Key {
3076 match self {
3077 GetProvidersError::Timeout { key, .. } => key,
3078 }
3079 }
3080
3081 /// Extracts the key for which the operation failed,
3082 /// consuming the error.
3083 pub fn into_key(self) -> record::Key {
3084 match self {
3085 GetProvidersError::Timeout { key, .. } => key,
3086 }
3087 }
3088}
3089
3090/// The result of publishing a provider record.
3091pub type AddProviderResult = Result<AddProviderOk, AddProviderError>;
3092
3093/// The successful result of publishing a provider record.
3094#[derive(Debug, Clone)]
3095pub struct AddProviderOk {
3096 pub key: record::Key,
3097}
3098
3099/// The possible errors when publishing a provider record.
3100#[derive(Debug, Clone, Error)]
3101pub enum AddProviderError {
3102 #[error("the request timed out")]
3103 Timeout { key: record::Key },
3104}
3105
3106impl AddProviderError {
3107 /// Gets the key for which the operation failed.
3108 pub fn key(&self) -> &record::Key {
3109 match self {
3110 AddProviderError::Timeout { key, .. } => key,
3111 }
3112 }
3113
3114 /// Extracts the key for which the operation failed,
3115 pub fn into_key(self) -> record::Key {
3116 match self {
3117 AddProviderError::Timeout { key, .. } => key,
3118 }
3119 }
3120}
3121
3122impl From<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> for KadPeer {
3123 fn from(e: kbucket::EntryView<kbucket::Key<PeerId>, Addresses>) -> KadPeer {
3124 KadPeer {
3125 node_id: e.node.key.into_preimage(),
3126 multiaddrs: e.node.value.into_vec(),
3127 connection_ty: match e.status {
3128 NodeStatus::Connected => ConnectionType::Connected,
3129 NodeStatus::Disconnected => ConnectionType::NotConnected,
3130 },
3131 }
3132 }
3133}
3134
3135/// The context of a [`QueryInfo::AddProvider`] query.
3136#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3137pub enum AddProviderContext {
3138 /// The context is a [`Behaviour::start_providing`] operation.
3139 Publish,
3140 /// The context is periodic republishing of provider announcements
3141 /// initiated earlier via [`Behaviour::start_providing`].
3142 Republish,
3143}
3144
3145/// The context of a [`QueryInfo::PutRecord`] query.
3146#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3147pub enum PutRecordContext {
3148 /// The context is a [`Behaviour::put_record`] operation.
3149 Publish,
3150 /// The context is periodic republishing of records stored
3151 /// earlier via [`Behaviour::put_record`].
3152 Republish,
3153 /// The context is periodic replication (i.e. without extending
3154 /// the record TTL) of stored records received earlier from another peer.
3155 Replicate,
3156 /// The context is a custom store operation targeting specific
3157 /// peers initiated by [`Behaviour::put_record_to`].
3158 Custom,
3159}
3160
3161/// Information about a running query.
3162#[derive(Debug, Clone)]
3163pub enum QueryInfo {
3164 /// A query initiated by [`Behaviour::bootstrap`].
3165 Bootstrap {
3166 /// The targeted peer ID.
3167 peer: PeerId,
3168 /// The remaining random peer IDs to query, one per
3169 /// bucket that still needs refreshing.
3170 ///
3171 /// This is `None` if the initial self-lookup has not
3172 /// yet completed and `Some` with an exhausted iterator
3173 /// if bootstrapping is complete.
3174 remaining: Option<vec::IntoIter<kbucket::Key<PeerId>>>,
3175 step: ProgressStep,
3176 },
3177
3178 /// A (repeated) query initiated by [`Behaviour::get_closest_peers`].
3179 GetClosestPeers {
3180 /// The key being queried (the preimage).
3181 key: Vec<u8>,
3182 /// Current index of events.
3183 step: ProgressStep,
3184 },
3185
3186 /// A (repeated) query initiated by [`Behaviour::get_providers`].
3187 GetProviders {
3188 /// The key for which to search for providers.
3189 key: record::Key,
3190 /// The number of providers found so far.
3191 providers_found: usize,
3192 /// Current index of events.
3193 step: ProgressStep,
3194 },
3195
3196 /// A (repeated) query initiated by [`Behaviour::start_providing`].
3197 AddProvider {
3198 /// The record key.
3199 key: record::Key,
3200 /// The current phase of the query.
3201 phase: AddProviderPhase,
3202 /// The execution context of the query.
3203 context: AddProviderContext,
3204 },
3205
3206 /// A (repeated) query initiated by [`Behaviour::put_record`].
3207 PutRecord {
3208 record: Record,
3209 /// The expected quorum of responses w.r.t. the replication factor.
3210 quorum: NonZeroUsize,
3211 /// The current phase of the query.
3212 phase: PutRecordPhase,
3213 /// The execution context of the query.
3214 context: PutRecordContext,
3215 },
3216
3217 /// A (repeated) query initiated by [`Behaviour::get_record`].
3218 GetRecord {
3219 /// The key to look for.
3220 key: record::Key,
3221 /// Current index of events.
3222 step: ProgressStep,
3223 /// Did we find at least one record?
3224 found_a_record: bool,
3225 /// The peers closest to the `key` that were queried but did not return a record,
3226 /// i.e. the peers that are candidates for caching the record.
3227 cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
3228 },
3229}
3230
3231impl QueryInfo {
3232 /// Creates an event for a handler to issue an outgoing request in the
3233 /// context of a query.
3234 fn to_request(&self, query_id: QueryId) -> HandlerIn {
3235 match &self {
3236 QueryInfo::Bootstrap { peer, .. } => HandlerIn::FindNodeReq {
3237 key: peer.to_bytes(),
3238 query_id,
3239 },
3240 QueryInfo::GetClosestPeers { key, .. } => HandlerIn::FindNodeReq {
3241 key: key.clone(),
3242 query_id,
3243 },
3244 QueryInfo::GetProviders { key, .. } => HandlerIn::GetProvidersReq {
3245 key: key.clone(),
3246 query_id,
3247 },
3248 QueryInfo::AddProvider { key, phase, .. } => match phase {
3249 AddProviderPhase::GetClosestPeers => HandlerIn::FindNodeReq {
3250 key: key.to_vec(),
3251 query_id,
3252 },
3253 AddProviderPhase::AddProvider {
3254 provider_id,
3255 external_addresses,
3256 ..
3257 } => HandlerIn::AddProvider {
3258 key: key.clone(),
3259 provider: crate::protocol::KadPeer {
3260 node_id: *provider_id,
3261 multiaddrs: external_addresses.clone(),
3262 connection_ty: crate::protocol::ConnectionType::Connected,
3263 },
3264 query_id,
3265 },
3266 },
3267 QueryInfo::GetRecord { key, .. } => HandlerIn::GetRecord {
3268 key: key.clone(),
3269 query_id,
3270 },
3271 QueryInfo::PutRecord { record, phase, .. } => match phase {
3272 PutRecordPhase::GetClosestPeers => HandlerIn::FindNodeReq {
3273 key: record.key.to_vec(),
3274 query_id,
3275 },
3276 PutRecordPhase::PutRecord { .. } => HandlerIn::PutRecord {
3277 record: record.clone(),
3278 query_id,
3279 },
3280 },
3281 }
3282 }
3283}
3284
3285/// The phases of a [`QueryInfo::AddProvider`] query.
3286#[derive(Debug, Clone)]
3287pub enum AddProviderPhase {
3288 /// The query is searching for the closest nodes to the record key.
3289 GetClosestPeers,
3290
3291 /// The query advertises the local node as a provider for the key to
3292 /// the closest nodes to the key.
3293 AddProvider {
3294 /// The local peer ID that is advertised as a provider.
3295 provider_id: PeerId,
3296 /// The external addresses of the provider being advertised.
3297 external_addresses: Vec<Multiaddr>,
3298 /// Query statistics from the finished `GetClosestPeers` phase.
3299 get_closest_peers_stats: QueryStats,
3300 },
3301}
3302
3303/// The phases of a [`QueryInfo::PutRecord`] query.
3304#[derive(Debug, Clone, PartialEq, Eq)]
3305pub enum PutRecordPhase {
3306 /// The query is searching for the closest nodes to the record key.
3307 GetClosestPeers,
3308
3309 /// The query is replicating the record to the closest nodes to the key.
3310 PutRecord {
3311 /// A list of peers the given record has been successfully replicated to.
3312 success: Vec<PeerId>,
3313 /// Query statistics from the finished `GetClosestPeers` phase.
3314 get_closest_peers_stats: QueryStats,
3315 },
3316}
3317
3318/// A mutable reference to a running query.
3319pub struct QueryMut<'a> {
3320 query: &'a mut Query,
3321}
3322
3323impl<'a> QueryMut<'a> {
3324 pub fn id(&self) -> QueryId {
3325 self.query.id()
3326 }
3327
3328 /// Gets information about the type and state of the query.
3329 pub fn info(&self) -> &QueryInfo {
3330 &self.query.info
3331 }
3332
3333 /// Gets execution statistics about the query.
3334 ///
3335 /// For a multi-phase query such as `put_record`, these are the
3336 /// statistics of the current phase.
3337 pub fn stats(&self) -> &QueryStats {
3338 self.query.stats()
3339 }
3340
3341 /// Finishes the query asap, without waiting for the
3342 /// regular termination conditions.
3343 pub fn finish(&mut self) {
3344 self.query.finish()
3345 }
3346}
3347
3348/// An immutable reference to a running query.
3349pub struct QueryRef<'a> {
3350 query: &'a Query,
3351}
3352
3353impl<'a> QueryRef<'a> {
3354 pub fn id(&self) -> QueryId {
3355 self.query.id()
3356 }
3357
3358 /// Gets information about the type and state of the query.
3359 pub fn info(&self) -> &QueryInfo {
3360 &self.query.info
3361 }
3362
3363 /// Gets execution statistics about the query.
3364 ///
3365 /// For a multi-phase query such as `put_record`, these are the
3366 /// statistics of the current phase.
3367 pub fn stats(&self) -> &QueryStats {
3368 self.query.stats()
3369 }
3370}
3371
3372/// An operation failed to due no known peers in the routing table.
3373#[derive(Debug, Clone)]
3374pub struct NoKnownPeers();
3375
3376impl fmt::Display for NoKnownPeers {
3377 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3378 write!(f, "No known peers.")
3379 }
3380}
3381
3382impl std::error::Error for NoKnownPeers {}
3383
3384/// The possible outcomes of [`Behaviour::add_address`].
3385#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3386pub enum RoutingUpdate {
3387 /// The given peer and address has been added to the routing
3388 /// table.
3389 Success,
3390 /// The peer and address is pending insertion into
3391 /// the routing table, if a disconnected peer fails
3392 /// to respond. If the given peer and address ends up
3393 /// in the routing table, [`Event::RoutingUpdated`]
3394 /// is eventually emitted.
3395 Pending,
3396 /// The routing table update failed, either because the
3397 /// corresponding bucket for the peer is full and the
3398 /// pending slot(s) are occupied, or because the given
3399 /// peer ID is deemed invalid (e.g. refers to the local
3400 /// peer ID).
3401 Failed,
3402}
3403
3404#[derive(PartialEq, Copy, Clone, Debug)]
3405pub enum Mode {
3406 Client,
3407 Server,
3408}
3409
3410impl fmt::Display for Mode {
3411 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3412 match self {
3413 Mode::Client => write!(f, "client"),
3414 Mode::Server => write!(f, "server"),
3415 }
3416 }
3417}
3418
3419fn to_comma_separated_list<T>(confirmed_external_addresses: &[T]) -> String
3420where
3421 T: ToString,
3422{
3423 confirmed_external_addresses
3424 .iter()
3425 .map(|addr| addr.to_string())
3426 .collect::<Vec<_>>()
3427 .join(", ")
3428}