libp2p_kad/behaviour.rs
1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Implementation of the `Kademlia` network behaviour.
22
23mod test;
24
25use crate::addresses::Addresses;
26use crate::handler::{Handler, HandlerEvent, HandlerIn, RequestId};
27use crate::jobs::*;
28use crate::kbucket::{self, Distance, KBucketsTable, NodeStatus};
29use crate::protocol::{ConnectionType, KadPeer, ProtocolConfig};
30use crate::query::{Query, QueryConfig, QueryId, QueryPool, QueryPoolState};
31use crate::record_priv::{
32 self,
33 store::{self, RecordStore},
34 ProviderRecord, Record,
35};
36use crate::K_VALUE;
37use fnv::{FnvHashMap, FnvHashSet};
38use instant::Instant;
39use libp2p_core::{ConnectedPoint, Endpoint, Multiaddr};
40use libp2p_identity::PeerId;
41use libp2p_swarm::behaviour::{
42 AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm,
43};
44use libp2p_swarm::{
45 dial_opts::{self, DialOpts},
46 ConnectionDenied, ConnectionHandler, ConnectionId, DialError, ExternalAddresses,
47 ListenAddresses, NetworkBehaviour, NotifyHandler, PollParameters, StreamProtocol, THandler,
48 THandlerInEvent, THandlerOutEvent, ToSwarm,
49};
50use log::{debug, info, warn};
51use smallvec::SmallVec;
52use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
53use std::fmt;
54use std::num::NonZeroUsize;
55use std::task::{Context, Poll, Waker};
56use std::time::Duration;
57use std::vec;
58use thiserror::Error;
59
60pub use crate::query::QueryStats;
61
62/// `Behaviour` is a `NetworkBehaviour` that implements the libp2p
63/// Kademlia protocol.
64pub struct Behaviour<TStore> {
65 /// The Kademlia routing table.
66 kbuckets: KBucketsTable<kbucket::Key<PeerId>, Addresses>,
67
68 /// The k-bucket insertion strategy.
69 kbucket_inserts: BucketInserts,
70
71 /// Configuration of the wire protocol.
72 protocol_config: ProtocolConfig,
73
74 /// Configuration of [`RecordStore`] filtering.
75 record_filtering: StoreInserts,
76
77 /// The currently active (i.e. in-progress) queries.
78 queries: QueryPool<QueryInner>,
79
80 /// The currently connected peers.
81 ///
82 /// This is a superset of the connected peers currently in the routing table.
83 connected_peers: FnvHashSet<PeerId>,
84
85 /// Periodic job for re-publication of provider records for keys
86 /// provided by the local node.
87 add_provider_job: Option<AddProviderJob>,
88
89 /// Periodic job for (re-)replication and (re-)publishing of
90 /// regular (value-)records.
91 put_record_job: Option<PutRecordJob>,
92
93 /// The TTL of regular (value-)records.
94 record_ttl: Option<Duration>,
95
96 /// The TTL of provider records.
97 provider_record_ttl: Option<Duration>,
98
99 /// How long to keep connections alive when they're idle.
100 connection_idle_timeout: Duration,
101
102 /// Queued events to return when the behaviour is being polled.
103 queued_events: VecDeque<ToSwarm<Event, HandlerIn>>,
104
105 listen_addresses: ListenAddresses,
106
107 external_addresses: ExternalAddresses,
108
109 connections: HashMap<ConnectionId, PeerId>,
110
111 /// See [`Config::caching`].
112 caching: Caching,
113
114 local_peer_id: PeerId,
115
116 mode: Mode,
117 auto_mode: bool,
118 no_events_waker: Option<Waker>,
119
120 /// The record storage.
121 store: TStore,
122}
123
124/// The configurable strategies for the insertion of peers
125/// and their addresses into the k-buckets of the Kademlia
126/// routing table.
127#[derive(Copy, Clone, Debug, PartialEq, Eq)]
128pub enum BucketInserts {
129 /// Whenever a connection to a peer is established as a
130 /// result of a dialing attempt and that peer is not yet
131 /// in the routing table, it is inserted as long as there
132 /// is a free slot in the corresponding k-bucket. If the
133 /// k-bucket is full but still has a free pending slot,
134 /// it may be inserted into the routing table at a later time if an unresponsive
135 /// disconnected peer is evicted from the bucket.
136 OnConnected,
137 /// New peers and addresses are only added to the routing table via
138 /// explicit calls to [`Behaviour::add_address`].
139 ///
140 /// > **Note**: Even though peers can only get into the
141 /// > routing table as a result of [`Behaviour::add_address`],
142 /// > routing table entries are still updated as peers
143 /// > connect and disconnect (i.e. the order of the entries
144 /// > as well as the network addresses).
145 Manual,
146}
147
148/// The configurable filtering strategies for the acceptance of
149/// incoming records.
150///
151/// This can be used for e.g. signature verification or validating
152/// the accompanying [`Key`].
153///
154/// [`Key`]: crate::record_priv::Key
155#[derive(Copy, Clone, Debug, PartialEq, Eq)]
156pub enum StoreInserts {
157 /// Whenever a (provider) record is received,
158 /// the record is forwarded immediately to the [`RecordStore`].
159 Unfiltered,
160 /// Whenever a (provider) record is received, an event is emitted.
161 /// Provider records generate a [`InboundRequest::AddProvider`] under [`Event::InboundRequest`],
162 /// normal records generate a [`InboundRequest::PutRecord`] under [`Event::InboundRequest`].
163 ///
164 /// When deemed valid, a (provider) record needs to be explicitly stored in
165 /// the [`RecordStore`] via [`RecordStore::put`] or [`RecordStore::add_provider`],
166 /// whichever is applicable. A mutable reference to the [`RecordStore`] can
167 /// be retrieved via [`Behaviour::store_mut`].
168 FilterBoth,
169}
170
171/// The configuration for the `Kademlia` behaviour.
172///
173/// The configuration is consumed by [`Behaviour::new`].
174#[derive(Debug, Clone)]
175pub struct Config {
176 kbucket_pending_timeout: Duration,
177 query_config: QueryConfig,
178 protocol_config: ProtocolConfig,
179 record_ttl: Option<Duration>,
180 record_replication_interval: Option<Duration>,
181 record_publication_interval: Option<Duration>,
182 record_filtering: StoreInserts,
183 provider_record_ttl: Option<Duration>,
184 provider_publication_interval: Option<Duration>,
185 connection_idle_timeout: Duration,
186 kbucket_inserts: BucketInserts,
187 caching: Caching,
188}
189
190impl Default for Config {
191 fn default() -> Self {
192 Config {
193 kbucket_pending_timeout: Duration::from_secs(60),
194 query_config: QueryConfig::default(),
195 protocol_config: Default::default(),
196 record_ttl: Some(Duration::from_secs(36 * 60 * 60)),
197 record_replication_interval: Some(Duration::from_secs(60 * 60)),
198 record_publication_interval: Some(Duration::from_secs(24 * 60 * 60)),
199 record_filtering: StoreInserts::Unfiltered,
200 provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)),
201 provider_record_ttl: Some(Duration::from_secs(24 * 60 * 60)),
202 connection_idle_timeout: Duration::from_secs(10),
203 kbucket_inserts: BucketInserts::OnConnected,
204 caching: Caching::Enabled { max_peers: 1 },
205 }
206 }
207}
208
209/// The configuration for Kademlia "write-back" caching after successful
210/// lookups via [`Behaviour::get_record`].
211#[derive(Debug, Clone)]
212pub enum Caching {
213 /// Caching is disabled and the peers closest to records being looked up
214 /// that do not return a record are not tracked, i.e.
215 /// [`GetRecordOk::FinishedWithNoAdditionalRecord`] is always empty.
216 Disabled,
217 /// Up to `max_peers` peers not returning a record that are closest to the key
218 /// being looked up are tracked and returned in [`GetRecordOk::FinishedWithNoAdditionalRecord`].
219 /// The write-back operation must be performed explicitly, if
220 /// desired and after choosing a record from the results, via [`Behaviour::put_record_to`].
221 Enabled { max_peers: u16 },
222}
223
224impl Config {
225 /// Sets custom protocol names.
226 ///
227 /// Kademlia nodes only communicate with other nodes using the same protocol
228 /// name. Using custom name(s) therefore allows to segregate the DHT from
229 /// others, if that is desired.
230 ///
231 /// More than one protocol name can be supplied. In this case the node will
232 /// be able to talk to other nodes supporting any of the provided names.
233 /// Multiple names must be used with caution to avoid network partitioning.
234 pub fn set_protocol_names(&mut self, names: Vec<StreamProtocol>) -> &mut Self {
235 self.protocol_config.set_protocol_names(names);
236 self
237 }
238
239 /// Sets the timeout for a single query.
240 ///
241 /// > **Note**: A single query usually comprises at least as many requests
242 /// > as the replication factor, i.e. this is not a request timeout.
243 ///
244 /// The default is 60 seconds.
245 pub fn set_query_timeout(&mut self, timeout: Duration) -> &mut Self {
246 self.query_config.timeout = timeout;
247 self
248 }
249
250 /// Sets the replication factor to use.
251 ///
252 /// The replication factor determines to how many closest peers
253 /// a record is replicated. The default is [`K_VALUE`].
254 pub fn set_replication_factor(&mut self, replication_factor: NonZeroUsize) -> &mut Self {
255 self.query_config.replication_factor = replication_factor;
256 self
257 }
258
259 /// Sets the allowed level of parallelism for iterative queries.
260 ///
261 /// The `α` parameter in the Kademlia paper. The maximum number of peers
262 /// that an iterative query is allowed to wait for in parallel while
263 /// iterating towards the closest nodes to a target. Defaults to
264 /// `ALPHA_VALUE`.
265 ///
266 /// This only controls the level of parallelism of an iterative query, not
267 /// the level of parallelism of a query to a fixed set of peers.
268 ///
269 /// When used with [`Config::disjoint_query_paths`] it equals
270 /// the amount of disjoint paths used.
271 pub fn set_parallelism(&mut self, parallelism: NonZeroUsize) -> &mut Self {
272 self.query_config.parallelism = parallelism;
273 self
274 }
275
276 /// Require iterative queries to use disjoint paths for increased resiliency
277 /// in the presence of potentially adversarial nodes.
278 ///
279 /// When enabled the number of disjoint paths used equals the configured
280 /// parallelism.
281 ///
282 /// See the S/Kademlia paper for more information on the high level design
283 /// as well as its security improvements.
284 pub fn disjoint_query_paths(&mut self, enabled: bool) -> &mut Self {
285 self.query_config.disjoint_query_paths = enabled;
286 self
287 }
288
289 /// Sets the TTL for stored records.
290 ///
291 /// The TTL should be significantly longer than the (re-)publication
292 /// interval, to avoid premature expiration of records. The default is 36
293 /// hours.
294 ///
295 /// `None` means records never expire.
296 ///
297 /// Does not apply to provider records.
298 pub fn set_record_ttl(&mut self, record_ttl: Option<Duration>) -> &mut Self {
299 self.record_ttl = record_ttl;
300 self
301 }
302
303 /// Sets whether or not records should be filtered before being stored.
304 ///
305 /// See [`StoreInserts`] for the different values.
306 /// Defaults to [`StoreInserts::Unfiltered`].
307 pub fn set_record_filtering(&mut self, filtering: StoreInserts) -> &mut Self {
308 self.record_filtering = filtering;
309 self
310 }
311
312 /// Sets the (re-)replication interval for stored records.
313 ///
314 /// Periodic replication of stored records ensures that the records
315 /// are always replicated to the available nodes closest to the key in the
316 /// context of DHT topology changes (i.e. nodes joining and leaving), thus
317 /// ensuring persistence until the record expires. Replication does not
318 /// prolong the regular lifetime of a record (for otherwise it would live
319 /// forever regardless of the configured TTL). The expiry of a record
320 /// is only extended through re-publication.
321 ///
322 /// This interval should be significantly shorter than the publication
323 /// interval, to ensure persistence between re-publications. The default
324 /// is 1 hour.
325 ///
326 /// `None` means that stored records are never re-replicated.
327 ///
328 /// Does not apply to provider records.
329 pub fn set_replication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
330 self.record_replication_interval = interval;
331 self
332 }
333
334 /// Sets the (re-)publication interval of stored records.
335 ///
336 /// Records persist in the DHT until they expire. By default, published
337 /// records are re-published in regular intervals for as long as the record
338 /// exists in the local storage of the original publisher, thereby extending
339 /// the records lifetime.
340 ///
341 /// This interval should be significantly shorter than the record TTL, to
342 /// ensure records do not expire prematurely. The default is 24 hours.
343 ///
344 /// `None` means that stored records are never automatically re-published.
345 ///
346 /// Does not apply to provider records.
347 pub fn set_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
348 self.record_publication_interval = interval;
349 self
350 }
351
352 /// Sets the TTL for provider records.
353 ///
354 /// `None` means that stored provider records never expire.
355 ///
356 /// Must be significantly larger than the provider publication interval.
357 pub fn set_provider_record_ttl(&mut self, ttl: Option<Duration>) -> &mut Self {
358 self.provider_record_ttl = ttl;
359 self
360 }
361
362 /// Sets the interval at which provider records for keys provided
363 /// by the local node are re-published.
364 ///
365 /// `None` means that stored provider records are never automatically
366 /// re-published.
367 ///
368 /// Must be significantly less than the provider record TTL.
369 pub fn set_provider_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
370 self.provider_publication_interval = interval;
371 self
372 }
373
374 /// Sets the amount of time to keep connections alive when they're idle.
375 #[deprecated(
376 note = "Set a global idle connection timeout via `SwarmBuilder::idle_connection_timeout` instead."
377 )]
378 pub fn set_connection_idle_timeout(&mut self, duration: Duration) -> &mut Self {
379 self.connection_idle_timeout = duration;
380 self
381 }
382
383 /// Modifies the maximum allowed size of individual Kademlia packets.
384 ///
385 /// It might be necessary to increase this value if trying to put large
386 /// records.
387 pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self {
388 self.protocol_config.set_max_packet_size(size);
389 self
390 }
391
392 /// Sets the k-bucket insertion strategy for the Kademlia routing table.
393 pub fn set_kbucket_inserts(&mut self, inserts: BucketInserts) -> &mut Self {
394 self.kbucket_inserts = inserts;
395 self
396 }
397
398 /// Sets the [`Caching`] strategy to use for successful lookups.
399 ///
400 /// The default is [`Caching::Enabled`] with a `max_peers` of 1.
401 /// Hence, with default settings and a lookup quorum of 1, a successful lookup
402 /// will result in the record being cached at the closest node to the key that
403 /// did not return the record, i.e. the standard Kademlia behaviour.
404 pub fn set_caching(&mut self, c: Caching) -> &mut Self {
405 self.caching = c;
406 self
407 }
408}
409
410impl<TStore> Behaviour<TStore>
411where
412 TStore: RecordStore + Send + 'static,
413{
414 /// Creates a new `Kademlia` network behaviour with a default configuration.
415 pub fn new(id: PeerId, store: TStore) -> Self {
416 Self::with_config(id, store, Default::default())
417 }
418
419 /// Get the protocol name of this kademlia instance.
420 pub fn protocol_names(&self) -> &[StreamProtocol] {
421 self.protocol_config.protocol_names()
422 }
423
424 /// Creates a new `Kademlia` network behaviour with the given configuration.
425 pub fn with_config(id: PeerId, store: TStore, config: Config) -> Self {
426 let local_key = kbucket::Key::from(id);
427
428 let put_record_job = config
429 .record_replication_interval
430 .or(config.record_publication_interval)
431 .map(|interval| {
432 PutRecordJob::new(
433 id,
434 interval,
435 config.record_publication_interval,
436 config.record_ttl,
437 )
438 });
439
440 let add_provider_job = config
441 .provider_publication_interval
442 .map(AddProviderJob::new);
443
444 Behaviour {
445 store,
446 caching: config.caching,
447 kbuckets: KBucketsTable::new(local_key, config.kbucket_pending_timeout),
448 kbucket_inserts: config.kbucket_inserts,
449 protocol_config: config.protocol_config,
450 record_filtering: config.record_filtering,
451 queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
452 listen_addresses: Default::default(),
453 queries: QueryPool::new(config.query_config),
454 connected_peers: Default::default(),
455 add_provider_job,
456 put_record_job,
457 record_ttl: config.record_ttl,
458 provider_record_ttl: config.provider_record_ttl,
459 connection_idle_timeout: config.connection_idle_timeout,
460 external_addresses: Default::default(),
461 local_peer_id: id,
462 connections: Default::default(),
463 mode: Mode::Client,
464 auto_mode: true,
465 no_events_waker: None,
466 }
467 }
468
469 /// Gets an iterator over immutable references to all running queries.
470 pub fn iter_queries(&self) -> impl Iterator<Item = QueryRef<'_>> {
471 self.queries.iter().filter_map(|query| {
472 if !query.is_finished() {
473 Some(QueryRef { query })
474 } else {
475 None
476 }
477 })
478 }
479
480 /// Gets an iterator over mutable references to all running queries.
481 pub fn iter_queries_mut(&mut self) -> impl Iterator<Item = QueryMut<'_>> {
482 self.queries.iter_mut().filter_map(|query| {
483 if !query.is_finished() {
484 Some(QueryMut { query })
485 } else {
486 None
487 }
488 })
489 }
490
491 /// Gets an immutable reference to a running query, if it exists.
492 pub fn query(&self, id: &QueryId) -> Option<QueryRef<'_>> {
493 self.queries.get(id).and_then(|query| {
494 if !query.is_finished() {
495 Some(QueryRef { query })
496 } else {
497 None
498 }
499 })
500 }
501
502 /// Gets a mutable reference to a running query, if it exists.
503 pub fn query_mut<'a>(&'a mut self, id: &QueryId) -> Option<QueryMut<'a>> {
504 self.queries.get_mut(id).and_then(|query| {
505 if !query.is_finished() {
506 Some(QueryMut { query })
507 } else {
508 None
509 }
510 })
511 }
512
513 /// Adds a known listen address of a peer participating in the DHT to the
514 /// routing table.
515 ///
516 /// Explicitly adding addresses of peers serves two purposes:
517 ///
518 /// 1. In order for a node to join the DHT, it must know about at least
519 /// one other node of the DHT.
520 ///
521 /// 2. When a remote peer initiates a connection and that peer is not
522 /// yet in the routing table, the `Kademlia` behaviour must be
523 /// informed of an address on which that peer is listening for
524 /// connections before it can be added to the routing table
525 /// from where it can subsequently be discovered by all peers
526 /// in the DHT.
527 ///
528 /// If the routing table has been updated as a result of this operation,
529 /// a [`Event::RoutingUpdated`] event is emitted.
530 pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) -> RoutingUpdate {
531 let key = kbucket::Key::from(*peer);
532 match self.kbuckets.entry(&key) {
533 kbucket::Entry::Present(mut entry, _) => {
534 if entry.value().insert(address) {
535 self.queued_events
536 .push_back(ToSwarm::GenerateEvent(Event::RoutingUpdated {
537 peer: *peer,
538 is_new_peer: false,
539 addresses: entry.value().clone(),
540 old_peer: None,
541 bucket_range: self
542 .kbuckets
543 .bucket(&key)
544 .map(|b| b.range())
545 .expect("Not kbucket::Entry::SelfEntry."),
546 }))
547 }
548 RoutingUpdate::Success
549 }
550 kbucket::Entry::Pending(mut entry, _) => {
551 entry.value().insert(address);
552 RoutingUpdate::Pending
553 }
554 kbucket::Entry::Absent(entry) => {
555 let addresses = Addresses::new(address);
556 let status = if self.connected_peers.contains(peer) {
557 NodeStatus::Connected
558 } else {
559 NodeStatus::Disconnected
560 };
561 match entry.insert(addresses.clone(), status) {
562 kbucket::InsertResult::Inserted => {
563 self.queued_events.push_back(ToSwarm::GenerateEvent(
564 Event::RoutingUpdated {
565 peer: *peer,
566 is_new_peer: true,
567 addresses,
568 old_peer: None,
569 bucket_range: self
570 .kbuckets
571 .bucket(&key)
572 .map(|b| b.range())
573 .expect("Not kbucket::Entry::SelfEntry."),
574 },
575 ));
576 RoutingUpdate::Success
577 }
578 kbucket::InsertResult::Full => {
579 debug!("Bucket full. Peer not added to routing table: {}", peer);
580 RoutingUpdate::Failed
581 }
582 kbucket::InsertResult::Pending { disconnected } => {
583 self.queued_events.push_back(ToSwarm::Dial {
584 opts: DialOpts::peer_id(disconnected.into_preimage())
585 .condition(dial_opts::PeerCondition::NotDialing)
586 .build(),
587 });
588 RoutingUpdate::Pending
589 }
590 }
591 }
592 kbucket::Entry::SelfEntry => RoutingUpdate::Failed,
593 }
594 }
595
596 /// Removes an address of a peer from the routing table.
597 ///
598 /// If the given address is the last address of the peer in the
599 /// routing table, the peer is removed from the routing table
600 /// and `Some` is returned with a view of the removed entry.
601 /// The same applies if the peer is currently pending insertion
602 /// into the routing table.
603 ///
604 /// If the given peer or address is not in the routing table,
605 /// this is a no-op.
606 pub fn remove_address(
607 &mut self,
608 peer: &PeerId,
609 address: &Multiaddr,
610 ) -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> {
611 let key = kbucket::Key::from(*peer);
612 match self.kbuckets.entry(&key) {
613 kbucket::Entry::Present(mut entry, _) => {
614 if entry.value().remove(address).is_err() {
615 Some(entry.remove()) // it is the last address, thus remove the peer.
616 } else {
617 None
618 }
619 }
620 kbucket::Entry::Pending(mut entry, _) => {
621 if entry.value().remove(address).is_err() {
622 Some(entry.remove()) // it is the last address, thus remove the peer.
623 } else {
624 None
625 }
626 }
627 kbucket::Entry::Absent(..) | kbucket::Entry::SelfEntry => None,
628 }
629 }
630
631 /// Removes a peer from the routing table.
632 ///
633 /// Returns `None` if the peer was not in the routing table,
634 /// not even pending insertion.
635 pub fn remove_peer(
636 &mut self,
637 peer: &PeerId,
638 ) -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> {
639 let key = kbucket::Key::from(*peer);
640 match self.kbuckets.entry(&key) {
641 kbucket::Entry::Present(entry, _) => Some(entry.remove()),
642 kbucket::Entry::Pending(entry, _) => Some(entry.remove()),
643 kbucket::Entry::Absent(..) | kbucket::Entry::SelfEntry => None,
644 }
645 }
646
647 /// Returns an iterator over all non-empty buckets in the routing table.
648 pub fn kbuckets(
649 &mut self,
650 ) -> impl Iterator<Item = kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>> {
651 self.kbuckets.iter().filter(|b| !b.is_empty())
652 }
653
654 /// Returns the k-bucket for the distance to the given key.
655 ///
656 /// Returns `None` if the given key refers to the local key.
657 pub fn kbucket<K>(
658 &mut self,
659 key: K,
660 ) -> Option<kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>>
661 where
662 K: Into<kbucket::Key<K>> + Clone,
663 {
664 self.kbuckets.bucket(&key.into())
665 }
666
667 /// Initiates an iterative query for the closest peers to the given key.
668 ///
669 /// The result of the query is delivered in a
670 /// [`Event::OutboundQueryProgressed{QueryResult::GetClosestPeers}`].
671 pub fn get_closest_peers<K>(&mut self, key: K) -> QueryId
672 where
673 K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
674 {
675 let target: kbucket::Key<K> = key.clone().into();
676 let key: Vec<u8> = key.into();
677 let info = QueryInfo::GetClosestPeers {
678 key,
679 step: ProgressStep::first(),
680 };
681 let peer_keys: Vec<kbucket::Key<PeerId>> = self.kbuckets.closest_keys(&target).collect();
682 let inner = QueryInner::new(info);
683 self.queries.add_iter_closest(target, peer_keys, inner)
684 }
685
686 /// Returns closest peers to the given key; takes peers from local routing table only.
687 pub fn get_closest_local_peers<'a, K: Clone>(
688 &'a mut self,
689 key: &'a kbucket::Key<K>,
690 ) -> impl Iterator<Item = kbucket::Key<PeerId>> + 'a {
691 self.kbuckets.closest_keys(key)
692 }
693
694 /// Performs a lookup for a record in the DHT.
695 ///
696 /// The result of this operation is delivered in a
697 /// [`Event::OutboundQueryProgressed{QueryResult::GetRecord}`].
698 pub fn get_record(&mut self, key: record_priv::Key) -> QueryId {
699 let record = if let Some(record) = self.store.get(&key) {
700 if record.is_expired(Instant::now()) {
701 self.store.remove(&key);
702 None
703 } else {
704 Some(PeerRecord {
705 peer: None,
706 record: record.into_owned(),
707 })
708 }
709 } else {
710 None
711 };
712
713 let step = ProgressStep::first();
714
715 let target = kbucket::Key::new(key.clone());
716 let info = if record.is_some() {
717 QueryInfo::GetRecord {
718 key,
719 step: step.next(),
720 found_a_record: true,
721 cache_candidates: BTreeMap::new(),
722 }
723 } else {
724 QueryInfo::GetRecord {
725 key,
726 step: step.clone(),
727 found_a_record: false,
728 cache_candidates: BTreeMap::new(),
729 }
730 };
731 let peers = self.kbuckets.closest_keys(&target);
732 let inner = QueryInner::new(info);
733 let id = self.queries.add_iter_closest(target.clone(), peers, inner);
734
735 // No queries were actually done for the results yet.
736 let stats = QueryStats::empty();
737
738 if let Some(record) = record {
739 self.queued_events
740 .push_back(ToSwarm::GenerateEvent(Event::OutboundQueryProgressed {
741 id,
742 result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(record))),
743 step,
744 stats,
745 }));
746 }
747
748 id
749 }
750
751 /// Stores a record in the DHT, locally as well as at the nodes
752 /// closest to the key as per the xor distance metric.
753 ///
754 /// Returns `Ok` if a record has been stored locally, providing the
755 /// `QueryId` of the initial query that replicates the record in the DHT.
756 /// The result of the query is eventually reported as a
757 /// [`Event::OutboundQueryProgressed{QueryResult::PutRecord}`].
758 ///
759 /// The record is always stored locally with the given expiration. If the record's
760 /// expiration is `None`, the common case, it does not expire in local storage
761 /// but is still replicated with the configured record TTL. To remove the record
762 /// locally and stop it from being re-published in the DHT, see [`Behaviour::remove_record`].
763 ///
764 /// After the initial publication of the record, it is subject to (re-)replication
765 /// and (re-)publication as per the configured intervals. Periodic (re-)publication
766 /// does not update the record's expiration in local storage, thus a given record
767 /// with an explicit expiration will always expire at that instant and until then
768 /// is subject to regular (re-)replication and (re-)publication.
769 pub fn put_record(
770 &mut self,
771 mut record: Record,
772 quorum: Quorum,
773 ) -> Result<QueryId, store::Error> {
774 record.publisher = Some(*self.kbuckets.local_key().preimage());
775 self.store.put(record.clone())?;
776 record.expires = record
777 .expires
778 .or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl));
779 let quorum = quorum.eval(self.queries.config().replication_factor);
780 let target = kbucket::Key::new(record.key.clone());
781 let peers = self.kbuckets.closest_keys(&target);
782 let context = PutRecordContext::Publish;
783 let info = QueryInfo::PutRecord {
784 context,
785 record,
786 quorum,
787 phase: PutRecordPhase::GetClosestPeers,
788 };
789 let inner = QueryInner::new(info);
790 Ok(self.queries.add_iter_closest(target.clone(), peers, inner))
791 }
792
793 /// Stores a record at specific peers, without storing it locally.
794 ///
795 /// The given [`Quorum`] is understood in the context of the total
796 /// number of distinct peers given.
797 ///
798 /// If the record's expiration is `None`, the configured record TTL is used.
799 ///
800 /// > **Note**: This is not a regular Kademlia DHT operation. It needs to be
801 /// > used to selectively update or store a record to specific peers
802 /// > for the purpose of e.g. making sure these peers have the latest
803 /// > "version" of a record or to "cache" a record at further peers
804 /// > to increase the lookup success rate on the DHT for other peers.
805 /// >
806 /// > In particular, there is no automatic storing of records performed, and this
807 /// > method must be used to ensure the standard Kademlia
808 /// > procedure of "caching" (i.e. storing) a found record at the closest
809 /// > node to the key that _did not_ return it.
810 pub fn put_record_to<I>(&mut self, mut record: Record, peers: I, quorum: Quorum) -> QueryId
811 where
812 I: ExactSizeIterator<Item = PeerId>,
813 {
814 let quorum = if peers.len() > 0 {
815 quorum.eval(NonZeroUsize::new(peers.len()).expect("> 0"))
816 } else {
817 // If no peers are given, we just let the query fail immediately
818 // due to the fact that the quorum must be at least one, instead of
819 // introducing a new kind of error.
820 NonZeroUsize::new(1).expect("1 > 0")
821 };
822 record.expires = record
823 .expires
824 .or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl));
825 let context = PutRecordContext::Custom;
826 let info = QueryInfo::PutRecord {
827 context,
828 record,
829 quorum,
830 phase: PutRecordPhase::PutRecord {
831 success: Vec::new(),
832 get_closest_peers_stats: QueryStats::empty(),
833 },
834 };
835 let inner = QueryInner::new(info);
836 self.queries.add_fixed(peers, inner)
837 }
838
839 /// Removes the record with the given key from _local_ storage,
840 /// if the local node is the publisher of the record.
841 ///
842 /// Has no effect if a record for the given key is stored locally but
843 /// the local node is not a publisher of the record.
844 ///
845 /// This is a _local_ operation. However, it also has the effect that
846 /// the record will no longer be periodically re-published, allowing the
847 /// record to eventually expire throughout the DHT.
848 pub fn remove_record(&mut self, key: &record_priv::Key) {
849 if let Some(r) = self.store.get(key) {
850 if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
851 self.store.remove(key)
852 }
853 }
854 }
855
856 /// Gets a mutable reference to the record store.
857 pub fn store_mut(&mut self) -> &mut TStore {
858 &mut self.store
859 }
860
861 /// Bootstraps the local node to join the DHT.
862 ///
863 /// Bootstrapping is a multi-step operation that starts with a lookup of the local node's
864 /// own ID in the DHT. This introduces the local node to the other nodes
865 /// in the DHT and populates its routing table with the closest neighbours.
866 ///
867 /// Subsequently, all buckets farther from the bucket of the closest neighbour are
868 /// refreshed by initiating an additional bootstrapping query for each such
869 /// bucket with random keys.
870 ///
871 /// Returns `Ok` if bootstrapping has been initiated with a self-lookup, providing the
872 /// `QueryId` for the entire bootstrapping process. The progress of bootstrapping is
873 /// reported via [`Event::OutboundQueryProgressed{QueryResult::Bootstrap}`] events,
874 /// with one such event per bootstrapping query.
875 ///
876 /// Returns `Err` if bootstrapping is impossible due an empty routing table.
877 ///
878 /// > **Note**: Bootstrapping requires at least one node of the DHT to be known.
879 /// > See [`Behaviour::add_address`].
880 pub fn bootstrap(&mut self) -> Result<QueryId, NoKnownPeers> {
881 let local_key = self.kbuckets.local_key().clone();
882 let info = QueryInfo::Bootstrap {
883 peer: *local_key.preimage(),
884 remaining: None,
885 step: ProgressStep::first(),
886 };
887 let peers = self.kbuckets.closest_keys(&local_key).collect::<Vec<_>>();
888 if peers.is_empty() {
889 Err(NoKnownPeers())
890 } else {
891 let inner = QueryInner::new(info);
892 Ok(self.queries.add_iter_closest(local_key, peers, inner))
893 }
894 }
895
896 /// Establishes the local node as a provider of a value for the given key.
897 ///
898 /// This operation publishes a provider record with the given key and
899 /// identity of the local node to the peers closest to the key, thus establishing
900 /// the local node as a provider.
901 ///
902 /// Returns `Ok` if a provider record has been stored locally, providing the
903 /// `QueryId` of the initial query that announces the local node as a provider.
904 ///
905 /// The publication of the provider records is periodically repeated as per the
906 /// configured interval, to renew the expiry and account for changes to the DHT
907 /// topology. A provider record may be removed from local storage and
908 /// thus no longer re-published by calling [`Behaviour::stop_providing`].
909 ///
910 /// In contrast to the standard Kademlia push-based model for content distribution
911 /// implemented by [`Behaviour::put_record`], the provider API implements a
912 /// pull-based model that may be used in addition or as an alternative.
913 /// The means by which the actual value is obtained from a provider is out of scope
914 /// of the libp2p Kademlia provider API.
915 ///
916 /// The results of the (repeated) provider announcements sent by this node are
917 /// reported via [`Event::OutboundQueryProgressed{QueryResult::StartProviding}`].
918 pub fn start_providing(&mut self, key: record_priv::Key) -> Result<QueryId, store::Error> {
919 // Note: We store our own provider records locally without local addresses
920 // to avoid redundant storage and outdated addresses. Instead these are
921 // acquired on demand when returning a `ProviderRecord` for the local node.
922 let local_addrs = Vec::new();
923 let record = ProviderRecord::new(
924 key.clone(),
925 *self.kbuckets.local_key().preimage(),
926 local_addrs,
927 );
928 self.store.add_provider(record)?;
929 let target = kbucket::Key::new(key.clone());
930 let peers = self.kbuckets.closest_keys(&target);
931 let context = AddProviderContext::Publish;
932 let info = QueryInfo::AddProvider {
933 context,
934 key,
935 phase: AddProviderPhase::GetClosestPeers,
936 };
937 let inner = QueryInner::new(info);
938 let id = self.queries.add_iter_closest(target.clone(), peers, inner);
939 Ok(id)
940 }
941
942 /// Stops the local node from announcing that it is a provider for the given key.
943 ///
944 /// This is a local operation. The local node will still be considered as a
945 /// provider for the key by other nodes until these provider records expire.
946 pub fn stop_providing(&mut self, key: &record_priv::Key) {
947 self.store
948 .remove_provider(key, self.kbuckets.local_key().preimage());
949 }
950
951 /// Performs a lookup for providers of a value to the given key.
952 ///
953 /// The result of this operation is delivered in a
954 /// reported via [`Event::OutboundQueryProgressed{QueryResult::GetProviders}`].
955 pub fn get_providers(&mut self, key: record_priv::Key) -> QueryId {
956 let providers: HashSet<_> = self
957 .store
958 .providers(&key)
959 .into_iter()
960 .filter(|p| !p.is_expired(Instant::now()))
961 .map(|p| p.provider)
962 .collect();
963
964 let step = ProgressStep::first();
965
966 let info = QueryInfo::GetProviders {
967 key: key.clone(),
968 providers_found: providers.len(),
969 step: if providers.is_empty() {
970 step.clone()
971 } else {
972 step.next()
973 },
974 };
975
976 let target = kbucket::Key::new(key.clone());
977 let peers = self.kbuckets.closest_keys(&target);
978 let inner = QueryInner::new(info);
979 let id = self.queries.add_iter_closest(target.clone(), peers, inner);
980
981 // No queries were actually done for the results yet.
982 let stats = QueryStats::empty();
983
984 if !providers.is_empty() {
985 self.queued_events
986 .push_back(ToSwarm::GenerateEvent(Event::OutboundQueryProgressed {
987 id,
988 result: QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders {
989 key,
990 providers,
991 })),
992 step,
993 stats,
994 }));
995 }
996 id
997 }
998
999 /// Set the [`Mode`] in which we should operate.
1000 ///
1001 /// By default, we are in [`Mode::Client`] and will swap into [`Mode::Server`] as soon as we have a confirmed, external address via [`FromSwarm::ExternalAddrConfirmed`].
1002 ///
1003 /// Setting a mode via this function disables this automatic behaviour and unconditionally operates in the specified mode.
1004 /// To reactivate the automatic configuration, pass [`None`] instead.
1005 pub fn set_mode(&mut self, mode: Option<Mode>) {
1006 match mode {
1007 Some(mode) => {
1008 self.mode = mode;
1009 self.auto_mode = false;
1010 self.reconfigure_mode();
1011 }
1012 None => {
1013 self.auto_mode = true;
1014 self.determine_mode_from_external_addresses();
1015 }
1016 }
1017
1018 if let Some(waker) = self.no_events_waker.take() {
1019 waker.wake();
1020 }
1021 }
1022
1023 fn reconfigure_mode(&mut self) {
1024 if self.connections.is_empty() {
1025 return;
1026 }
1027
1028 let num_connections = self.connections.len();
1029
1030 log::debug!(
1031 "Re-configuring {} established connection{}",
1032 num_connections,
1033 if num_connections > 1 { "s" } else { "" }
1034 );
1035
1036 self.queued_events
1037 .extend(
1038 self.connections
1039 .iter()
1040 .map(|(conn_id, peer_id)| ToSwarm::NotifyHandler {
1041 peer_id: *peer_id,
1042 handler: NotifyHandler::One(*conn_id),
1043 event: HandlerIn::ReconfigureMode {
1044 new_mode: self.mode,
1045 },
1046 }),
1047 );
1048 }
1049
1050 fn determine_mode_from_external_addresses(&mut self) {
1051 self.mode = match (self.external_addresses.as_slice(), self.mode) {
1052 ([], Mode::Server) => {
1053 log::debug!("Switching to client-mode because we no longer have any confirmed external addresses");
1054
1055 Mode::Client
1056 }
1057 ([], Mode::Client) => {
1058 // Previously client-mode, now also client-mode because no external addresses.
1059
1060 Mode::Client
1061 }
1062 (confirmed_external_addresses, Mode::Client) => {
1063 if log::log_enabled!(log::Level::Debug) {
1064 let confirmed_external_addresses =
1065 to_comma_separated_list(confirmed_external_addresses);
1066
1067 log::debug!("Switching to server-mode assuming that one of [{confirmed_external_addresses}] is externally reachable");
1068 }
1069
1070 Mode::Server
1071 }
1072 (confirmed_external_addresses, Mode::Server) => {
1073 debug_assert!(
1074 !confirmed_external_addresses.is_empty(),
1075 "Previous match arm handled empty list"
1076 );
1077
1078 // Previously, server-mode, now also server-mode because > 1 external address. Don't log anything to avoid spam.
1079
1080 Mode::Server
1081 }
1082 };
1083
1084 self.reconfigure_mode();
1085 }
1086
1087 /// Processes discovered peers from a successful request in an iterative `Query`.
1088 fn discovered<'a, I>(&'a mut self, query_id: &QueryId, source: &PeerId, peers: I)
1089 where
1090 I: Iterator<Item = &'a KadPeer> + Clone,
1091 {
1092 let local_id = self.kbuckets.local_key().preimage();
1093 let others_iter = peers.filter(|p| &p.node_id != local_id);
1094 if let Some(query) = self.queries.get_mut(query_id) {
1095 log::trace!("Request to {:?} in query {:?} succeeded.", source, query_id);
1096 for peer in others_iter.clone() {
1097 log::trace!(
1098 "Peer {:?} reported by {:?} in query {:?}.",
1099 peer,
1100 source,
1101 query_id
1102 );
1103 let addrs = peer.multiaddrs.iter().cloned().collect();
1104 query.inner.addresses.insert(peer.node_id, addrs);
1105 }
1106 query.on_success(source, others_iter.cloned().map(|kp| kp.node_id))
1107 }
1108 }
1109
1110 /// Finds the closest peers to a `target` in the context of a request by
1111 /// the `source` peer, such that the `source` peer is never included in the
1112 /// result.
1113 fn find_closest<T: Clone>(
1114 &mut self,
1115 target: &kbucket::Key<T>,
1116 source: &PeerId,
1117 ) -> Vec<KadPeer> {
1118 if target == self.kbuckets.local_key() {
1119 Vec::new()
1120 } else {
1121 self.kbuckets
1122 .closest(target)
1123 .filter(|e| e.node.key.preimage() != source)
1124 .take(self.queries.config().replication_factor.get())
1125 .map(KadPeer::from)
1126 .collect()
1127 }
1128 }
1129
1130 /// Collects all peers who are known to be providers of the value for a given `Multihash`.
1131 fn provider_peers(&mut self, key: &record_priv::Key, source: &PeerId) -> Vec<KadPeer> {
1132 let kbuckets = &mut self.kbuckets;
1133 let connected = &mut self.connected_peers;
1134 let listen_addresses = &self.listen_addresses;
1135 let external_addresses = &self.external_addresses;
1136
1137 self.store
1138 .providers(key)
1139 .into_iter()
1140 .filter_map(move |p| {
1141 if &p.provider != source {
1142 let node_id = p.provider;
1143 let multiaddrs = p.addresses;
1144 let connection_ty = if connected.contains(&node_id) {
1145 ConnectionType::Connected
1146 } else {
1147 ConnectionType::NotConnected
1148 };
1149 if multiaddrs.is_empty() {
1150 // The provider is either the local node and we fill in
1151 // the local addresses on demand, or it is a legacy
1152 // provider record without addresses, in which case we
1153 // try to find addresses in the routing table, as was
1154 // done before provider records were stored along with
1155 // their addresses.
1156 if &node_id == kbuckets.local_key().preimage() {
1157 Some(
1158 listen_addresses
1159 .iter()
1160 .chain(external_addresses.iter())
1161 .cloned()
1162 .collect::<Vec<_>>(),
1163 )
1164 } else {
1165 let key = kbucket::Key::from(node_id);
1166 kbuckets
1167 .entry(&key)
1168 .view()
1169 .map(|e| e.node.value.clone().into_vec())
1170 }
1171 } else {
1172 Some(multiaddrs)
1173 }
1174 .map(|multiaddrs| KadPeer {
1175 node_id,
1176 multiaddrs,
1177 connection_ty,
1178 })
1179 } else {
1180 None
1181 }
1182 })
1183 .take(self.queries.config().replication_factor.get())
1184 .collect()
1185 }
1186
1187 /// Starts an iterative `ADD_PROVIDER` query for the given key.
1188 fn start_add_provider(&mut self, key: record_priv::Key, context: AddProviderContext) {
1189 let info = QueryInfo::AddProvider {
1190 context,
1191 key: key.clone(),
1192 phase: AddProviderPhase::GetClosestPeers,
1193 };
1194 let target = kbucket::Key::new(key);
1195 let peers = self.kbuckets.closest_keys(&target);
1196 let inner = QueryInner::new(info);
1197 self.queries.add_iter_closest(target.clone(), peers, inner);
1198 }
1199
1200 /// Starts an iterative `PUT_VALUE` query for the given record.
1201 fn start_put_record(&mut self, record: Record, quorum: Quorum, context: PutRecordContext) {
1202 let quorum = quorum.eval(self.queries.config().replication_factor);
1203 let target = kbucket::Key::new(record.key.clone());
1204 let peers = self.kbuckets.closest_keys(&target);
1205 let info = QueryInfo::PutRecord {
1206 record,
1207 quorum,
1208 context,
1209 phase: PutRecordPhase::GetClosestPeers,
1210 };
1211 let inner = QueryInner::new(info);
1212 self.queries.add_iter_closest(target.clone(), peers, inner);
1213 }
1214
1215 /// Updates the routing table with a new connection status and address of a peer.
1216 fn connection_updated(
1217 &mut self,
1218 peer: PeerId,
1219 address: Option<Multiaddr>,
1220 new_status: NodeStatus,
1221 ) {
1222 let key = kbucket::Key::from(peer);
1223 match self.kbuckets.entry(&key) {
1224 kbucket::Entry::Present(mut entry, old_status) => {
1225 if old_status != new_status {
1226 entry.update(new_status)
1227 }
1228 if let Some(address) = address {
1229 if entry.value().insert(address) {
1230 self.queued_events.push_back(ToSwarm::GenerateEvent(
1231 Event::RoutingUpdated {
1232 peer,
1233 is_new_peer: false,
1234 addresses: entry.value().clone(),
1235 old_peer: None,
1236 bucket_range: self
1237 .kbuckets
1238 .bucket(&key)
1239 .map(|b| b.range())
1240 .expect("Not kbucket::Entry::SelfEntry."),
1241 },
1242 ))
1243 }
1244 }
1245 }
1246
1247 kbucket::Entry::Pending(mut entry, old_status) => {
1248 if let Some(address) = address {
1249 entry.value().insert(address);
1250 }
1251 if old_status != new_status {
1252 entry.update(new_status);
1253 }
1254 }
1255
1256 kbucket::Entry::Absent(entry) => {
1257 // Only connected nodes with a known address are newly inserted.
1258 if new_status != NodeStatus::Connected {
1259 return;
1260 }
1261 match (address, self.kbucket_inserts) {
1262 (None, _) => {
1263 self.queued_events
1264 .push_back(ToSwarm::GenerateEvent(Event::UnroutablePeer { peer }));
1265 }
1266 (Some(a), BucketInserts::Manual) => {
1267 self.queued_events
1268 .push_back(ToSwarm::GenerateEvent(Event::RoutablePeer {
1269 peer,
1270 address: a,
1271 }));
1272 }
1273 (Some(a), BucketInserts::OnConnected) => {
1274 let addresses = Addresses::new(a);
1275 match entry.insert(addresses.clone(), new_status) {
1276 kbucket::InsertResult::Inserted => {
1277 let event = Event::RoutingUpdated {
1278 peer,
1279 is_new_peer: true,
1280 addresses,
1281 old_peer: None,
1282 bucket_range: self
1283 .kbuckets
1284 .bucket(&key)
1285 .map(|b| b.range())
1286 .expect("Not kbucket::Entry::SelfEntry."),
1287 };
1288 self.queued_events.push_back(ToSwarm::GenerateEvent(event));
1289 }
1290 kbucket::InsertResult::Full => {
1291 debug!("Bucket full. Peer not added to routing table: {}", peer);
1292 let address = addresses.first().clone();
1293 self.queued_events.push_back(ToSwarm::GenerateEvent(
1294 Event::RoutablePeer { peer, address },
1295 ));
1296 }
1297 kbucket::InsertResult::Pending { disconnected } => {
1298 let address = addresses.first().clone();
1299 self.queued_events.push_back(ToSwarm::GenerateEvent(
1300 Event::PendingRoutablePeer { peer, address },
1301 ));
1302
1303 // `disconnected` might already be in the process of re-connecting.
1304 // In other words `disconnected` might have already re-connected but
1305 // is not yet confirmed to support the Kademlia protocol via
1306 // [`HandlerEvent::ProtocolConfirmed`].
1307 //
1308 // Only try dialing peer if not currently connected.
1309 if !self.connected_peers.contains(disconnected.preimage()) {
1310 self.queued_events.push_back(ToSwarm::Dial {
1311 opts: DialOpts::peer_id(disconnected.into_preimage())
1312 .condition(dial_opts::PeerCondition::NotDialing)
1313 .build(),
1314 })
1315 }
1316 }
1317 }
1318 }
1319 }
1320 }
1321 _ => {}
1322 }
1323 }
1324
1325 /// Handles a finished (i.e. successful) query.
1326 fn query_finished(&mut self, q: Query<QueryInner>) -> Option<Event> {
1327 let query_id = q.id();
1328 log::trace!("Query {:?} finished.", query_id);
1329 let result = q.into_result();
1330 match result.inner.info {
1331 QueryInfo::Bootstrap {
1332 peer,
1333 remaining,
1334 mut step,
1335 } => {
1336 let local_key = self.kbuckets.local_key().clone();
1337 let mut remaining = remaining.unwrap_or_else(|| {
1338 debug_assert_eq!(&peer, local_key.preimage());
1339 // The lookup for the local key finished. To complete the bootstrap process,
1340 // a bucket refresh should be performed for every bucket farther away than
1341 // the first non-empty bucket (which are most likely no more than the last
1342 // few, i.e. farthest, buckets).
1343 self.kbuckets
1344 .iter()
1345 .skip_while(|b| b.is_empty())
1346 .skip(1) // Skip the bucket with the closest neighbour.
1347 .map(|b| {
1348 // Try to find a key that falls into the bucket. While such keys can
1349 // be generated fully deterministically, the current libp2p kademlia
1350 // wire protocol requires transmission of the preimages of the actual
1351 // keys in the DHT keyspace, hence for now this is just a "best effort"
1352 // to find a key that hashes into a specific bucket. The probabilities
1353 // of finding a key in the bucket `b` with as most 16 trials are as
1354 // follows:
1355 //
1356 // Pr(bucket-255) = 1 - (1/2)^16 ~= 1
1357 // Pr(bucket-254) = 1 - (3/4)^16 ~= 1
1358 // Pr(bucket-253) = 1 - (7/8)^16 ~= 0.88
1359 // Pr(bucket-252) = 1 - (15/16)^16 ~= 0.64
1360 // ...
1361 let mut target = kbucket::Key::from(PeerId::random());
1362 for _ in 0..16 {
1363 let d = local_key.distance(&target);
1364 if b.contains(&d) {
1365 break;
1366 }
1367 target = kbucket::Key::from(PeerId::random());
1368 }
1369 target
1370 })
1371 .collect::<Vec<_>>()
1372 .into_iter()
1373 });
1374
1375 let num_remaining = remaining.len() as u32;
1376
1377 if let Some(target) = remaining.next() {
1378 let info = QueryInfo::Bootstrap {
1379 peer: *target.preimage(),
1380 remaining: Some(remaining),
1381 step: step.next(),
1382 };
1383 let peers = self.kbuckets.closest_keys(&target);
1384 let inner = QueryInner::new(info);
1385 self.queries
1386 .continue_iter_closest(query_id, target.clone(), peers, inner);
1387 } else {
1388 step.last = true;
1389 };
1390
1391 Some(Event::OutboundQueryProgressed {
1392 id: query_id,
1393 stats: result.stats,
1394 result: QueryResult::Bootstrap(Ok(BootstrapOk {
1395 peer,
1396 num_remaining,
1397 })),
1398 step,
1399 })
1400 }
1401
1402 QueryInfo::GetClosestPeers { key, mut step } => {
1403 step.last = true;
1404
1405 Some(Event::OutboundQueryProgressed {
1406 id: query_id,
1407 stats: result.stats,
1408 result: QueryResult::GetClosestPeers(Ok(GetClosestPeersOk {
1409 key,
1410 peers: result.peers.collect(),
1411 })),
1412 step,
1413 })
1414 }
1415
1416 QueryInfo::GetProviders { mut step, .. } => {
1417 step.last = true;
1418
1419 Some(Event::OutboundQueryProgressed {
1420 id: query_id,
1421 stats: result.stats,
1422 result: QueryResult::GetProviders(Ok(
1423 GetProvidersOk::FinishedWithNoAdditionalRecord {
1424 closest_peers: result.peers.collect(),
1425 },
1426 )),
1427 step,
1428 })
1429 }
1430
1431 QueryInfo::AddProvider {
1432 context,
1433 key,
1434 phase: AddProviderPhase::GetClosestPeers,
1435 } => {
1436 let provider_id = self.local_peer_id;
1437 let external_addresses = self.external_addresses.iter().cloned().collect();
1438 let inner = QueryInner::new(QueryInfo::AddProvider {
1439 context,
1440 key,
1441 phase: AddProviderPhase::AddProvider {
1442 provider_id,
1443 external_addresses,
1444 get_closest_peers_stats: result.stats,
1445 },
1446 });
1447 self.queries.continue_fixed(query_id, result.peers, inner);
1448 None
1449 }
1450
1451 QueryInfo::AddProvider {
1452 context,
1453 key,
1454 phase:
1455 AddProviderPhase::AddProvider {
1456 get_closest_peers_stats,
1457 ..
1458 },
1459 } => match context {
1460 AddProviderContext::Publish => Some(Event::OutboundQueryProgressed {
1461 id: query_id,
1462 stats: get_closest_peers_stats.merge(result.stats),
1463 result: QueryResult::StartProviding(Ok(AddProviderOk { key })),
1464 step: ProgressStep::first_and_last(),
1465 }),
1466 AddProviderContext::Republish => Some(Event::OutboundQueryProgressed {
1467 id: query_id,
1468 stats: get_closest_peers_stats.merge(result.stats),
1469 result: QueryResult::RepublishProvider(Ok(AddProviderOk { key })),
1470 step: ProgressStep::first_and_last(),
1471 }),
1472 },
1473
1474 QueryInfo::GetRecord {
1475 key,
1476 mut step,
1477 found_a_record,
1478 cache_candidates,
1479 } => {
1480 step.last = true;
1481
1482 let results = if found_a_record {
1483 Ok(GetRecordOk::FinishedWithNoAdditionalRecord { cache_candidates })
1484 } else {
1485 Err(GetRecordError::NotFound {
1486 key,
1487 closest_peers: result.peers.collect(),
1488 })
1489 };
1490 Some(Event::OutboundQueryProgressed {
1491 id: query_id,
1492 stats: result.stats,
1493 result: QueryResult::GetRecord(results),
1494 step,
1495 })
1496 }
1497
1498 QueryInfo::PutRecord {
1499 context,
1500 record,
1501 quorum,
1502 phase: PutRecordPhase::GetClosestPeers,
1503 } => {
1504 let info = QueryInfo::PutRecord {
1505 context,
1506 record,
1507 quorum,
1508 phase: PutRecordPhase::PutRecord {
1509 success: vec![],
1510 get_closest_peers_stats: result.stats,
1511 },
1512 };
1513 let inner = QueryInner::new(info);
1514 self.queries.continue_fixed(query_id, result.peers, inner);
1515 None
1516 }
1517
1518 QueryInfo::PutRecord {
1519 context,
1520 record,
1521 quorum,
1522 phase:
1523 PutRecordPhase::PutRecord {
1524 success,
1525 get_closest_peers_stats,
1526 },
1527 } => {
1528 let mk_result = |key: record_priv::Key| {
1529 if success.len() >= quorum.get() {
1530 Ok(PutRecordOk { key })
1531 } else {
1532 Err(PutRecordError::QuorumFailed {
1533 key,
1534 quorum,
1535 success,
1536 })
1537 }
1538 };
1539 match context {
1540 PutRecordContext::Publish | PutRecordContext::Custom => {
1541 Some(Event::OutboundQueryProgressed {
1542 id: query_id,
1543 stats: get_closest_peers_stats.merge(result.stats),
1544 result: QueryResult::PutRecord(mk_result(record.key)),
1545 step: ProgressStep::first_and_last(),
1546 })
1547 }
1548 PutRecordContext::Republish => Some(Event::OutboundQueryProgressed {
1549 id: query_id,
1550 stats: get_closest_peers_stats.merge(result.stats),
1551 result: QueryResult::RepublishRecord(mk_result(record.key)),
1552 step: ProgressStep::first_and_last(),
1553 }),
1554 PutRecordContext::Replicate => {
1555 debug!("Record replicated: {:?}", record.key);
1556 None
1557 }
1558 }
1559 }
1560 }
1561 }
1562
1563 /// Handles a query that timed out.
1564 fn query_timeout(&mut self, query: Query<QueryInner>) -> Option<Event> {
1565 let query_id = query.id();
1566 log::trace!("Query {:?} timed out.", query_id);
1567 let result = query.into_result();
1568 match result.inner.info {
1569 QueryInfo::Bootstrap {
1570 peer,
1571 mut remaining,
1572 mut step,
1573 } => {
1574 let num_remaining = remaining.as_ref().map(|r| r.len().saturating_sub(1) as u32);
1575
1576 // Continue with the next bootstrap query if `remaining` is not empty.
1577 if let Some((target, remaining)) =
1578 remaining.take().and_then(|mut r| Some((r.next()?, r)))
1579 {
1580 let info = QueryInfo::Bootstrap {
1581 peer: target.clone().into_preimage(),
1582 remaining: Some(remaining),
1583 step: step.next(),
1584 };
1585 let peers = self.kbuckets.closest_keys(&target);
1586 let inner = QueryInner::new(info);
1587 self.queries
1588 .continue_iter_closest(query_id, target.clone(), peers, inner);
1589 } else {
1590 step.last = true;
1591 }
1592
1593 Some(Event::OutboundQueryProgressed {
1594 id: query_id,
1595 stats: result.stats,
1596 result: QueryResult::Bootstrap(Err(BootstrapError::Timeout {
1597 peer,
1598 num_remaining,
1599 })),
1600 step,
1601 })
1602 }
1603
1604 QueryInfo::AddProvider { context, key, .. } => Some(match context {
1605 AddProviderContext::Publish => Event::OutboundQueryProgressed {
1606 id: query_id,
1607 stats: result.stats,
1608 result: QueryResult::StartProviding(Err(AddProviderError::Timeout { key })),
1609 step: ProgressStep::first_and_last(),
1610 },
1611 AddProviderContext::Republish => Event::OutboundQueryProgressed {
1612 id: query_id,
1613 stats: result.stats,
1614 result: QueryResult::RepublishProvider(Err(AddProviderError::Timeout { key })),
1615 step: ProgressStep::first_and_last(),
1616 },
1617 }),
1618
1619 QueryInfo::GetClosestPeers { key, mut step } => {
1620 step.last = true;
1621
1622 Some(Event::OutboundQueryProgressed {
1623 id: query_id,
1624 stats: result.stats,
1625 result: QueryResult::GetClosestPeers(Err(GetClosestPeersError::Timeout {
1626 key,
1627 peers: result.peers.collect(),
1628 })),
1629 step,
1630 })
1631 }
1632
1633 QueryInfo::PutRecord {
1634 record,
1635 quorum,
1636 context,
1637 phase,
1638 } => {
1639 let err = Err(PutRecordError::Timeout {
1640 key: record.key,
1641 quorum,
1642 success: match phase {
1643 PutRecordPhase::GetClosestPeers => vec![],
1644 PutRecordPhase::PutRecord { ref success, .. } => success.clone(),
1645 },
1646 });
1647 match context {
1648 PutRecordContext::Publish | PutRecordContext::Custom => {
1649 Some(Event::OutboundQueryProgressed {
1650 id: query_id,
1651 stats: result.stats,
1652 result: QueryResult::PutRecord(err),
1653 step: ProgressStep::first_and_last(),
1654 })
1655 }
1656 PutRecordContext::Republish => Some(Event::OutboundQueryProgressed {
1657 id: query_id,
1658 stats: result.stats,
1659 result: QueryResult::RepublishRecord(err),
1660 step: ProgressStep::first_and_last(),
1661 }),
1662 PutRecordContext::Replicate => match phase {
1663 PutRecordPhase::GetClosestPeers => {
1664 warn!("Locating closest peers for replication failed: {:?}", err);
1665 None
1666 }
1667 PutRecordPhase::PutRecord { .. } => {
1668 debug!("Replicating record failed: {:?}", err);
1669 None
1670 }
1671 },
1672 }
1673 }
1674
1675 QueryInfo::GetRecord { key, mut step, .. } => {
1676 step.last = true;
1677
1678 Some(Event::OutboundQueryProgressed {
1679 id: query_id,
1680 stats: result.stats,
1681 result: QueryResult::GetRecord(Err(GetRecordError::Timeout { key })),
1682 step,
1683 })
1684 }
1685
1686 QueryInfo::GetProviders { key, mut step, .. } => {
1687 step.last = true;
1688
1689 Some(Event::OutboundQueryProgressed {
1690 id: query_id,
1691 stats: result.stats,
1692 result: QueryResult::GetProviders(Err(GetProvidersError::Timeout {
1693 key,
1694 closest_peers: result.peers.collect(),
1695 })),
1696 step,
1697 })
1698 }
1699 }
1700 }
1701
1702 /// Processes a record received from a peer.
1703 fn record_received(
1704 &mut self,
1705 source: PeerId,
1706 connection: ConnectionId,
1707 request_id: RequestId,
1708 mut record: Record,
1709 ) {
1710 if record.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
1711 // If the (alleged) publisher is the local node, do nothing. The record of
1712 // the original publisher should never change as a result of replication
1713 // and the publisher is always assumed to have the "right" value.
1714 self.queued_events.push_back(ToSwarm::NotifyHandler {
1715 peer_id: source,
1716 handler: NotifyHandler::One(connection),
1717 event: HandlerIn::PutRecordRes {
1718 key: record.key,
1719 value: record.value,
1720 request_id,
1721 },
1722 });
1723 return;
1724 }
1725
1726 let now = Instant::now();
1727
1728 // Calculate the expiration exponentially inversely proportional to the
1729 // number of nodes between the local node and the closest node to the key
1730 // (beyond the replication factor). This ensures avoiding over-caching
1731 // outside of the k closest nodes to a key.
1732 let target = kbucket::Key::new(record.key.clone());
1733 let num_between = self.kbuckets.count_nodes_between(&target);
1734 let k = self.queries.config().replication_factor.get();
1735 let num_beyond_k = (usize::max(k, num_between) - k) as u32;
1736 let expiration = self
1737 .record_ttl
1738 .map(|ttl| now + exp_decrease(ttl, num_beyond_k));
1739 // The smaller TTL prevails. Only if neither TTL is set is the record
1740 // stored "forever".
1741 record.expires = record.expires.or(expiration).min(expiration);
1742
1743 if let Some(job) = self.put_record_job.as_mut() {
1744 // Ignore the record in the next run of the replication
1745 // job, since we can assume the sender replicated the
1746 // record to the k closest peers. Effectively, only
1747 // one of the k closest peers performs a replication
1748 // in the configured interval, assuming a shared interval.
1749 job.skip(record.key.clone())
1750 }
1751
1752 // While records received from a publisher, as well as records that do
1753 // not exist locally should always (attempted to) be stored, there is a
1754 // choice here w.r.t. the handling of replicated records whose keys refer
1755 // to records that exist locally: The value and / or the publisher may
1756 // either be overridden or left unchanged. At the moment and in the
1757 // absence of a decisive argument for another option, both are always
1758 // overridden as it avoids having to load the existing record in the
1759 // first place.
1760
1761 if !record.is_expired(now) {
1762 // The record is cloned because of the weird libp2p protocol
1763 // requirement to send back the value in the response, although this
1764 // is a waste of resources.
1765 match self.record_filtering {
1766 StoreInserts::Unfiltered => match self.store.put(record.clone()) {
1767 Ok(()) => {
1768 debug!(
1769 "Record stored: {:?}; {} bytes",
1770 record.key,
1771 record.value.len()
1772 );
1773 self.queued_events.push_back(ToSwarm::GenerateEvent(
1774 Event::InboundRequest {
1775 request: InboundRequest::PutRecord {
1776 source,
1777 connection,
1778 record: None,
1779 },
1780 },
1781 ));
1782 }
1783 Err(e) => {
1784 info!("Record not stored: {:?}", e);
1785 self.queued_events.push_back(ToSwarm::NotifyHandler {
1786 peer_id: source,
1787 handler: NotifyHandler::One(connection),
1788 event: HandlerIn::Reset(request_id),
1789 });
1790
1791 return;
1792 }
1793 },
1794 StoreInserts::FilterBoth => {
1795 self.queued_events
1796 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1797 request: InboundRequest::PutRecord {
1798 source,
1799 connection,
1800 record: Some(record.clone()),
1801 },
1802 }));
1803 }
1804 }
1805 }
1806
1807 // The remote receives a [`HandlerIn::PutRecordRes`] even in the
1808 // case where the record is discarded due to being expired. Given that
1809 // the remote sent the local node a [`HandlerEvent::PutRecord`]
1810 // request, the remote perceives the local node as one node among the k
1811 // closest nodes to the target. In addition returning
1812 // [`HandlerIn::PutRecordRes`] does not reveal any internal
1813 // information to a possibly malicious remote node.
1814 self.queued_events.push_back(ToSwarm::NotifyHandler {
1815 peer_id: source,
1816 handler: NotifyHandler::One(connection),
1817 event: HandlerIn::PutRecordRes {
1818 key: record.key,
1819 value: record.value,
1820 request_id,
1821 },
1822 })
1823 }
1824
1825 /// Processes a provider record received from a peer.
1826 fn provider_received(&mut self, key: record_priv::Key, provider: KadPeer) {
1827 if &provider.node_id != self.kbuckets.local_key().preimage() {
1828 let record = ProviderRecord {
1829 key,
1830 provider: provider.node_id,
1831 expires: self.provider_record_ttl.map(|ttl| Instant::now() + ttl),
1832 addresses: provider.multiaddrs,
1833 };
1834 match self.record_filtering {
1835 StoreInserts::Unfiltered => {
1836 if let Err(e) = self.store.add_provider(record) {
1837 info!("Provider record not stored: {:?}", e);
1838 return;
1839 }
1840
1841 self.queued_events
1842 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1843 request: InboundRequest::AddProvider { record: None },
1844 }));
1845 }
1846 StoreInserts::FilterBoth => {
1847 self.queued_events
1848 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1849 request: InboundRequest::AddProvider {
1850 record: Some(record),
1851 },
1852 }));
1853 }
1854 }
1855 }
1856 }
1857
1858 fn address_failed(&mut self, peer_id: PeerId, address: &Multiaddr) {
1859 let key = kbucket::Key::from(peer_id);
1860
1861 if let Some(addrs) = self.kbuckets.entry(&key).value() {
1862 // TODO: Ideally, the address should only be removed if the error can
1863 // be classified as "permanent" but since `err` is currently a borrowed
1864 // trait object without a `'static` bound, even downcasting for inspection
1865 // of the error is not possible (and also not truly desirable or ergonomic).
1866 // The error passed in should rather be a dedicated enum.
1867 if addrs.remove(address).is_ok() {
1868 debug!(
1869 "Address '{}' removed from peer '{}' due to error.",
1870 address, peer_id
1871 );
1872 } else {
1873 // Despite apparently having no reachable address (any longer),
1874 // the peer is kept in the routing table with the last address to avoid
1875 // (temporary) loss of network connectivity to "flush" the routing
1876 // table. Once in, a peer is only removed from the routing table
1877 // if it is the least recently connected peer, currently disconnected
1878 // and is unreachable in the context of another peer pending insertion
1879 // into the same bucket. This is handled transparently by the
1880 // `KBucketsTable` and takes effect through `KBucketsTable::take_applied_pending`
1881 // within `Behaviour::poll`.
1882 debug!(
1883 "Last remaining address '{}' of peer '{}' is unreachable.",
1884 address, peer_id,
1885 )
1886 }
1887 }
1888
1889 for query in self.queries.iter_mut() {
1890 if let Some(addrs) = query.inner.addresses.get_mut(&peer_id) {
1891 addrs.retain(|a| a != address);
1892 }
1893 }
1894 }
1895
1896 fn on_connection_established(
1897 &mut self,
1898 ConnectionEstablished {
1899 peer_id,
1900 failed_addresses,
1901 other_established,
1902 ..
1903 }: ConnectionEstablished,
1904 ) {
1905 for addr in failed_addresses {
1906 self.address_failed(peer_id, addr);
1907 }
1908
1909 // Peer's first connection.
1910 if other_established == 0 {
1911 self.connected_peers.insert(peer_id);
1912 }
1913 }
1914
1915 fn on_address_change(
1916 &mut self,
1917 AddressChange {
1918 peer_id: peer,
1919 old,
1920 new,
1921 ..
1922 }: AddressChange,
1923 ) {
1924 let (old, new) = (old.get_remote_address(), new.get_remote_address());
1925
1926 // Update routing table.
1927 if let Some(addrs) = self.kbuckets.entry(&kbucket::Key::from(peer)).value() {
1928 if addrs.replace(old, new) {
1929 debug!(
1930 "Address '{}' replaced with '{}' for peer '{}'.",
1931 old, new, peer
1932 );
1933 } else {
1934 debug!(
1935 "Address '{}' not replaced with '{}' for peer '{}' as old address wasn't \
1936 present.",
1937 old, new, peer
1938 );
1939 }
1940 } else {
1941 debug!(
1942 "Address '{}' not replaced with '{}' for peer '{}' as peer is not present in the \
1943 routing table.",
1944 old, new, peer
1945 );
1946 }
1947
1948 // Update query address cache.
1949 //
1950 // Given two connected nodes: local node A and remote node B. Say node B
1951 // is not in node A's routing table. Additionally node B is part of the
1952 // `QueryInner::addresses` list of an ongoing query on node A. Say Node
1953 // B triggers an address change and then disconnects. Later on the
1954 // earlier mentioned query on node A would like to connect to node B.
1955 // Without replacing the address in the `QueryInner::addresses` set node
1956 // A would attempt to dial the old and not the new address.
1957 //
1958 // While upholding correctness, iterating through all discovered
1959 // addresses of a peer in all currently ongoing queries might have a
1960 // large performance impact. If so, the code below might be worth
1961 // revisiting.
1962 for query in self.queries.iter_mut() {
1963 if let Some(addrs) = query.inner.addresses.get_mut(&peer) {
1964 for addr in addrs.iter_mut() {
1965 if addr == old {
1966 *addr = new.clone();
1967 }
1968 }
1969 }
1970 }
1971 }
1972
1973 fn on_dial_failure(&mut self, DialFailure { peer_id, error, .. }: DialFailure) {
1974 let peer_id = match peer_id {
1975 Some(id) => id,
1976 // Not interested in dial failures to unknown peers.
1977 None => return,
1978 };
1979
1980 match error {
1981 DialError::LocalPeerId { .. }
1982 | DialError::WrongPeerId { .. }
1983 | DialError::Aborted
1984 | DialError::Denied { .. }
1985 | DialError::Transport(_)
1986 | DialError::NoAddresses => {
1987 if let DialError::Transport(addresses) = error {
1988 for (addr, _) in addresses {
1989 self.address_failed(peer_id, addr)
1990 }
1991 }
1992
1993 for query in self.queries.iter_mut() {
1994 query.on_failure(&peer_id);
1995 }
1996 }
1997 DialError::DialPeerConditionFalse(
1998 dial_opts::PeerCondition::Disconnected | dial_opts::PeerCondition::NotDialing,
1999 ) => {
2000 // We might (still) be connected, or about to be connected, thus do not report the
2001 // failure to the queries.
2002 }
2003 DialError::DialPeerConditionFalse(dial_opts::PeerCondition::Always) => {
2004 unreachable!("DialPeerCondition::Always can not trigger DialPeerConditionFalse.");
2005 }
2006 }
2007 }
2008
2009 fn on_connection_closed(
2010 &mut self,
2011 ConnectionClosed {
2012 peer_id,
2013 remaining_established,
2014 connection_id,
2015 ..
2016 }: ConnectionClosed<<Self as NetworkBehaviour>::ConnectionHandler>,
2017 ) {
2018 self.connections.remove(&connection_id);
2019
2020 if remaining_established == 0 {
2021 for query in self.queries.iter_mut() {
2022 query.on_failure(&peer_id);
2023 }
2024 self.connection_updated(peer_id, None, NodeStatus::Disconnected);
2025 self.connected_peers.remove(&peer_id);
2026 }
2027 }
2028
2029 /// Preloads a new [`Handler`] with requests that are waiting to be sent to the newly connected peer.
2030 fn preload_new_handler(
2031 &mut self,
2032 handler: &mut Handler,
2033 connection_id: ConnectionId,
2034 peer: PeerId,
2035 ) {
2036 self.connections.insert(connection_id, peer);
2037 // Queue events for sending pending RPCs to the connected peer.
2038 // There can be only one pending RPC for a particular peer and query per definition.
2039 for (_peer_id, event) in self.queries.iter_mut().filter_map(|q| {
2040 q.inner
2041 .pending_rpcs
2042 .iter()
2043 .position(|(p, _)| p == &peer)
2044 .map(|p| q.inner.pending_rpcs.remove(p))
2045 }) {
2046 handler.on_behaviour_event(event)
2047 }
2048 }
2049}
2050
2051/// Exponentially decrease the given duration (base 2).
2052fn exp_decrease(ttl: Duration, exp: u32) -> Duration {
2053 Duration::from_secs(ttl.as_secs().checked_shr(exp).unwrap_or(0))
2054}
2055
2056impl<TStore> NetworkBehaviour for Behaviour<TStore>
2057where
2058 TStore: RecordStore + Send + 'static,
2059{
2060 type ConnectionHandler = Handler;
2061 type ToSwarm = Event;
2062
2063 fn handle_established_inbound_connection(
2064 &mut self,
2065 connection_id: ConnectionId,
2066 peer: PeerId,
2067 local_addr: &Multiaddr,
2068 remote_addr: &Multiaddr,
2069 ) -> Result<THandler<Self>, ConnectionDenied> {
2070 let connected_point = ConnectedPoint::Listener {
2071 local_addr: local_addr.clone(),
2072 send_back_addr: remote_addr.clone(),
2073 };
2074 let mut handler = Handler::new(
2075 self.protocol_config.clone(),
2076 self.connection_idle_timeout,
2077 connected_point,
2078 peer,
2079 self.mode,
2080 connection_id,
2081 );
2082 self.preload_new_handler(&mut handler, connection_id, peer);
2083
2084 Ok(handler)
2085 }
2086
2087 fn handle_established_outbound_connection(
2088 &mut self,
2089 connection_id: ConnectionId,
2090 peer: PeerId,
2091 addr: &Multiaddr,
2092 role_override: Endpoint,
2093 ) -> Result<THandler<Self>, ConnectionDenied> {
2094 let connected_point = ConnectedPoint::Dialer {
2095 address: addr.clone(),
2096 role_override,
2097 };
2098 let mut handler = Handler::new(
2099 self.protocol_config.clone(),
2100 self.connection_idle_timeout,
2101 connected_point,
2102 peer,
2103 self.mode,
2104 connection_id,
2105 );
2106 self.preload_new_handler(&mut handler, connection_id, peer);
2107
2108 Ok(handler)
2109 }
2110
2111 fn handle_pending_outbound_connection(
2112 &mut self,
2113 _connection_id: ConnectionId,
2114 maybe_peer: Option<PeerId>,
2115 _addresses: &[Multiaddr],
2116 _effective_role: Endpoint,
2117 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
2118 let peer_id = match maybe_peer {
2119 None => return Ok(vec![]),
2120 Some(peer) => peer,
2121 };
2122
2123 // We should order addresses from decreasing likelyhood of connectivity, so start with
2124 // the addresses of that peer in the k-buckets.
2125 let key = kbucket::Key::from(peer_id);
2126 let mut peer_addrs =
2127 if let kbucket::Entry::Present(mut entry, _) = self.kbuckets.entry(&key) {
2128 let addrs = entry.value().iter().cloned().collect::<Vec<_>>();
2129 debug_assert!(!addrs.is_empty(), "Empty peer addresses in routing table.");
2130 addrs
2131 } else {
2132 Vec::new()
2133 };
2134
2135 // We add to that a temporary list of addresses from the ongoing queries.
2136 for query in self.queries.iter() {
2137 if let Some(addrs) = query.inner.addresses.get(&peer_id) {
2138 peer_addrs.extend(addrs.iter().cloned())
2139 }
2140 }
2141
2142 Ok(peer_addrs)
2143 }
2144
2145 fn on_connection_handler_event(
2146 &mut self,
2147 source: PeerId,
2148 connection: ConnectionId,
2149 event: THandlerOutEvent<Self>,
2150 ) {
2151 match event {
2152 HandlerEvent::ProtocolConfirmed { endpoint } => {
2153 debug_assert!(self.connected_peers.contains(&source));
2154 // The remote's address can only be put into the routing table,
2155 // and thus shared with other nodes, if the local node is the dialer,
2156 // since the remote address on an inbound connection may be specific
2157 // to that connection (e.g. typically the TCP port numbers).
2158 let address = match endpoint {
2159 ConnectedPoint::Dialer { address, .. } => Some(address),
2160 ConnectedPoint::Listener { .. } => None,
2161 };
2162
2163 self.connection_updated(source, address, NodeStatus::Connected);
2164 }
2165
2166 HandlerEvent::ProtocolNotSupported { endpoint } => {
2167 let address = match endpoint {
2168 ConnectedPoint::Dialer { address, .. } => Some(address),
2169 ConnectedPoint::Listener { .. } => None,
2170 };
2171 self.connection_updated(source, address, NodeStatus::Disconnected);
2172 }
2173
2174 HandlerEvent::FindNodeReq { key, request_id } => {
2175 let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
2176
2177 self.queued_events
2178 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2179 request: InboundRequest::FindNode {
2180 num_closer_peers: closer_peers.len(),
2181 },
2182 }));
2183
2184 self.queued_events.push_back(ToSwarm::NotifyHandler {
2185 peer_id: source,
2186 handler: NotifyHandler::One(connection),
2187 event: HandlerIn::FindNodeRes {
2188 closer_peers,
2189 request_id,
2190 },
2191 });
2192 }
2193
2194 HandlerEvent::FindNodeRes {
2195 closer_peers,
2196 query_id,
2197 } => {
2198 self.discovered(&query_id, &source, closer_peers.iter());
2199 }
2200
2201 HandlerEvent::GetProvidersReq { key, request_id } => {
2202 let provider_peers = self.provider_peers(&key, &source);
2203 let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
2204
2205 self.queued_events
2206 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2207 request: InboundRequest::GetProvider {
2208 num_closer_peers: closer_peers.len(),
2209 num_provider_peers: provider_peers.len(),
2210 },
2211 }));
2212
2213 self.queued_events.push_back(ToSwarm::NotifyHandler {
2214 peer_id: source,
2215 handler: NotifyHandler::One(connection),
2216 event: HandlerIn::GetProvidersRes {
2217 closer_peers,
2218 provider_peers,
2219 request_id,
2220 },
2221 });
2222 }
2223
2224 HandlerEvent::GetProvidersRes {
2225 closer_peers,
2226 provider_peers,
2227 query_id,
2228 } => {
2229 let peers = closer_peers.iter().chain(provider_peers.iter());
2230 self.discovered(&query_id, &source, peers);
2231 if let Some(query) = self.queries.get_mut(&query_id) {
2232 let stats = query.stats().clone();
2233 if let QueryInfo::GetProviders {
2234 ref key,
2235 ref mut providers_found,
2236 ref mut step,
2237 ..
2238 } = query.inner.info
2239 {
2240 *providers_found += provider_peers.len();
2241 let providers = provider_peers.iter().map(|p| p.node_id).collect();
2242
2243 self.queued_events.push_back(ToSwarm::GenerateEvent(
2244 Event::OutboundQueryProgressed {
2245 id: query_id,
2246 result: QueryResult::GetProviders(Ok(
2247 GetProvidersOk::FoundProviders {
2248 key: key.clone(),
2249 providers,
2250 },
2251 )),
2252 step: step.clone(),
2253 stats,
2254 },
2255 ));
2256 *step = step.next();
2257 }
2258 }
2259 }
2260
2261 HandlerEvent::QueryError { query_id, error } => {
2262 log::debug!(
2263 "Request to {:?} in query {:?} failed with {:?}",
2264 source,
2265 query_id,
2266 error
2267 );
2268 // If the query to which the error relates is still active,
2269 // signal the failure w.r.t. `source`.
2270 if let Some(query) = self.queries.get_mut(&query_id) {
2271 query.on_failure(&source)
2272 }
2273 }
2274
2275 HandlerEvent::AddProvider { key, provider } => {
2276 // Only accept a provider record from a legitimate peer.
2277 if provider.node_id != source {
2278 return;
2279 }
2280
2281 self.provider_received(key, provider);
2282 }
2283
2284 HandlerEvent::GetRecord { key, request_id } => {
2285 // Lookup the record locally.
2286 let record = match self.store.get(&key) {
2287 Some(record) => {
2288 if record.is_expired(Instant::now()) {
2289 self.store.remove(&key);
2290 None
2291 } else {
2292 Some(record.into_owned())
2293 }
2294 }
2295 None => None,
2296 };
2297
2298 let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
2299
2300 self.queued_events
2301 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2302 request: InboundRequest::GetRecord {
2303 num_closer_peers: closer_peers.len(),
2304 present_locally: record.is_some(),
2305 },
2306 }));
2307
2308 self.queued_events.push_back(ToSwarm::NotifyHandler {
2309 peer_id: source,
2310 handler: NotifyHandler::One(connection),
2311 event: HandlerIn::GetRecordRes {
2312 record,
2313 closer_peers,
2314 request_id,
2315 },
2316 });
2317 }
2318
2319 HandlerEvent::GetRecordRes {
2320 record,
2321 closer_peers,
2322 query_id,
2323 } => {
2324 if let Some(query) = self.queries.get_mut(&query_id) {
2325 let stats = query.stats().clone();
2326 if let QueryInfo::GetRecord {
2327 key,
2328 ref mut step,
2329 ref mut found_a_record,
2330 cache_candidates,
2331 } = &mut query.inner.info
2332 {
2333 if let Some(record) = record {
2334 *found_a_record = true;
2335 let record = PeerRecord {
2336 peer: Some(source),
2337 record,
2338 };
2339
2340 self.queued_events.push_back(ToSwarm::GenerateEvent(
2341 Event::OutboundQueryProgressed {
2342 id: query_id,
2343 result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(
2344 record,
2345 ))),
2346 step: step.clone(),
2347 stats,
2348 },
2349 ));
2350
2351 *step = step.next();
2352 } else {
2353 log::trace!("Record with key {:?} not found at {}", key, source);
2354 if let Caching::Enabled { max_peers } = self.caching {
2355 let source_key = kbucket::Key::from(source);
2356 let target_key = kbucket::Key::from(key.clone());
2357 let distance = source_key.distance(&target_key);
2358 cache_candidates.insert(distance, source);
2359 if cache_candidates.len() > max_peers as usize {
2360 // TODO: `pop_last()` would be nice once stabilised.
2361 // See https://github.com/rust-lang/rust/issues/62924.
2362 let last =
2363 *cache_candidates.keys().next_back().expect("len > 0");
2364 cache_candidates.remove(&last);
2365 }
2366 }
2367 }
2368 }
2369 }
2370
2371 self.discovered(&query_id, &source, closer_peers.iter());
2372 }
2373
2374 HandlerEvent::PutRecord { record, request_id } => {
2375 self.record_received(source, connection, request_id, record);
2376 }
2377
2378 HandlerEvent::PutRecordRes { query_id, .. } => {
2379 if let Some(query) = self.queries.get_mut(&query_id) {
2380 query.on_success(&source, vec![]);
2381 if let QueryInfo::PutRecord {
2382 phase: PutRecordPhase::PutRecord { success, .. },
2383 quorum,
2384 ..
2385 } = &mut query.inner.info
2386 {
2387 success.push(source);
2388
2389 let quorum = quorum.get();
2390 if success.len() >= quorum {
2391 let peers = success.clone();
2392 let finished = query.try_finish(peers.iter());
2393 if !finished {
2394 debug!(
2395 "PutRecord query ({:?}) reached quorum ({}/{}) with response \
2396 from peer {} but could not yet finish.",
2397 query_id,
2398 peers.len(),
2399 quorum,
2400 source,
2401 );
2402 }
2403 }
2404 }
2405 }
2406 }
2407 };
2408 }
2409
2410 fn poll(
2411 &mut self,
2412 cx: &mut Context<'_>,
2413 _: &mut impl PollParameters,
2414 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
2415 let now = Instant::now();
2416
2417 // Calculate the available capacity for queries triggered by background jobs.
2418 let mut jobs_query_capacity = JOBS_MAX_QUERIES.saturating_sub(self.queries.size());
2419
2420 // Run the periodic provider announcement job.
2421 if let Some(mut job) = self.add_provider_job.take() {
2422 let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2423 for _ in 0..num {
2424 if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2425 self.start_add_provider(r.key, AddProviderContext::Republish)
2426 } else {
2427 break;
2428 }
2429 }
2430 jobs_query_capacity -= num;
2431 self.add_provider_job = Some(job);
2432 }
2433
2434 // Run the periodic record replication / publication job.
2435 if let Some(mut job) = self.put_record_job.take() {
2436 let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2437 for _ in 0..num {
2438 if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2439 let context =
2440 if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
2441 PutRecordContext::Republish
2442 } else {
2443 PutRecordContext::Replicate
2444 };
2445 self.start_put_record(r, Quorum::All, context)
2446 } else {
2447 break;
2448 }
2449 }
2450 self.put_record_job = Some(job);
2451 }
2452
2453 loop {
2454 // Drain queued events first.
2455 if let Some(event) = self.queued_events.pop_front() {
2456 return Poll::Ready(event);
2457 }
2458
2459 // Drain applied pending entries from the routing table.
2460 if let Some(entry) = self.kbuckets.take_applied_pending() {
2461 let kbucket::Node { key, value } = entry.inserted;
2462 let event = Event::RoutingUpdated {
2463 bucket_range: self
2464 .kbuckets
2465 .bucket(&key)
2466 .map(|b| b.range())
2467 .expect("Self to never be applied from pending."),
2468 peer: key.into_preimage(),
2469 is_new_peer: true,
2470 addresses: value,
2471 old_peer: entry.evicted.map(|n| n.key.into_preimage()),
2472 };
2473 return Poll::Ready(ToSwarm::GenerateEvent(event));
2474 }
2475
2476 // Look for a finished query.
2477 loop {
2478 match self.queries.poll(now) {
2479 QueryPoolState::Finished(q) => {
2480 if let Some(event) = self.query_finished(q) {
2481 return Poll::Ready(ToSwarm::GenerateEvent(event));
2482 }
2483 }
2484 QueryPoolState::Timeout(q) => {
2485 if let Some(event) = self.query_timeout(q) {
2486 return Poll::Ready(ToSwarm::GenerateEvent(event));
2487 }
2488 }
2489 QueryPoolState::Waiting(Some((query, peer_id))) => {
2490 let event = query.inner.info.to_request(query.id());
2491 // TODO: AddProvider requests yield no response, so the query completes
2492 // as soon as all requests have been sent. However, the handler should
2493 // better emit an event when the request has been sent (and report
2494 // an error if sending fails), instead of immediately reporting
2495 // "success" somewhat prematurely here.
2496 if let QueryInfo::AddProvider {
2497 phase: AddProviderPhase::AddProvider { .. },
2498 ..
2499 } = &query.inner.info
2500 {
2501 query.on_success(&peer_id, vec![])
2502 }
2503
2504 if self.connected_peers.contains(&peer_id) {
2505 self.queued_events.push_back(ToSwarm::NotifyHandler {
2506 peer_id,
2507 event,
2508 handler: NotifyHandler::Any,
2509 });
2510 } else if &peer_id != self.kbuckets.local_key().preimage() {
2511 query.inner.pending_rpcs.push((peer_id, event));
2512 self.queued_events.push_back(ToSwarm::Dial {
2513 opts: DialOpts::peer_id(peer_id)
2514 .condition(dial_opts::PeerCondition::NotDialing)
2515 .build(),
2516 });
2517 }
2518 }
2519 QueryPoolState::Waiting(None) | QueryPoolState::Idle => break,
2520 }
2521 }
2522
2523 // No immediate event was produced as a result of a finished query.
2524 // If no new events have been queued either, signal `NotReady` to
2525 // be polled again later.
2526 if self.queued_events.is_empty() {
2527 self.no_events_waker = Some(cx.waker().clone());
2528
2529 return Poll::Pending;
2530 }
2531 }
2532 }
2533
2534 fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
2535 self.listen_addresses.on_swarm_event(&event);
2536 let external_addresses_changed = self.external_addresses.on_swarm_event(&event);
2537
2538 if self.auto_mode && external_addresses_changed {
2539 self.determine_mode_from_external_addresses();
2540 }
2541
2542 match event {
2543 FromSwarm::ConnectionEstablished(connection_established) => {
2544 self.on_connection_established(connection_established)
2545 }
2546 FromSwarm::ConnectionClosed(connection_closed) => {
2547 self.on_connection_closed(connection_closed)
2548 }
2549 FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure),
2550 FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
2551 FromSwarm::ExpiredListenAddr(_)
2552 | FromSwarm::NewExternalAddrCandidate(_)
2553 | FromSwarm::NewListenAddr(_)
2554 | FromSwarm::ListenFailure(_)
2555 | FromSwarm::NewListener(_)
2556 | FromSwarm::ListenerClosed(_)
2557 | FromSwarm::ListenerError(_)
2558 | FromSwarm::ExternalAddrExpired(_)
2559 | FromSwarm::ExternalAddrConfirmed(_) => {}
2560 }
2561 }
2562}
2563
2564/// A quorum w.r.t. the configured replication factor specifies the minimum
2565/// number of distinct nodes that must be successfully contacted in order
2566/// for a query to succeed.
2567#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2568pub enum Quorum {
2569 One,
2570 Majority,
2571 All,
2572 N(NonZeroUsize),
2573}
2574
2575impl Quorum {
2576 /// Evaluate the quorum w.r.t a given total (number of peers).
2577 fn eval(&self, total: NonZeroUsize) -> NonZeroUsize {
2578 match self {
2579 Quorum::One => NonZeroUsize::new(1).expect("1 != 0"),
2580 Quorum::Majority => NonZeroUsize::new(total.get() / 2 + 1).expect("n + 1 != 0"),
2581 Quorum::All => total,
2582 Quorum::N(n) => NonZeroUsize::min(total, *n),
2583 }
2584 }
2585}
2586
2587/// A record either received by the given peer or retrieved from the local
2588/// record store.
2589#[derive(Debug, Clone, PartialEq, Eq)]
2590pub struct PeerRecord {
2591 /// The peer from whom the record was received. `None` if the record was
2592 /// retrieved from local storage.
2593 pub peer: Option<PeerId>,
2594 pub record: Record,
2595}
2596
2597//////////////////////////////////////////////////////////////////////////////
2598// Events
2599
2600/// The events produced by the `Kademlia` behaviour.
2601///
2602/// See [`NetworkBehaviour::poll`].
2603#[derive(Debug, Clone)]
2604#[allow(clippy::large_enum_variant)]
2605pub enum Event {
2606 /// An inbound request has been received and handled.
2607 //
2608 // Note on the difference between 'request' and 'query': A request is a
2609 // single request-response style exchange with a single remote peer. A query
2610 // is made of multiple requests across multiple remote peers.
2611 InboundRequest { request: InboundRequest },
2612
2613 /// An outbound query has made progress.
2614 OutboundQueryProgressed {
2615 /// The ID of the query that finished.
2616 id: QueryId,
2617 /// The intermediate result of the query.
2618 result: QueryResult,
2619 /// Execution statistics from the query.
2620 stats: QueryStats,
2621 /// Indicates which event this is, if therer are multiple responses for a single query.
2622 step: ProgressStep,
2623 },
2624
2625 /// The routing table has been updated with a new peer and / or
2626 /// address, thereby possibly evicting another peer.
2627 RoutingUpdated {
2628 /// The ID of the peer that was added or updated.
2629 peer: PeerId,
2630 /// Whether this is a new peer and was thus just added to the routing
2631 /// table, or whether it is an existing peer who's addresses changed.
2632 is_new_peer: bool,
2633 /// The full list of known addresses of `peer`.
2634 addresses: Addresses,
2635 /// Returns the minimum inclusive and maximum inclusive distance for
2636 /// the bucket of the peer.
2637 bucket_range: (Distance, Distance),
2638 /// The ID of the peer that was evicted from the routing table to make
2639 /// room for the new peer, if any.
2640 old_peer: Option<PeerId>,
2641 },
2642
2643 /// A peer has connected for whom no listen address is known.
2644 ///
2645 /// If the peer is to be added to the routing table, a known
2646 /// listen address for the peer must be provided via [`Behaviour::add_address`].
2647 UnroutablePeer { peer: PeerId },
2648
2649 /// A connection to a peer has been established for whom a listen address
2650 /// is known but the peer has not been added to the routing table either
2651 /// because [`BucketInserts::Manual`] is configured or because
2652 /// the corresponding bucket is full.
2653 ///
2654 /// If the peer is to be included in the routing table, it must
2655 /// must be explicitly added via [`Behaviour::add_address`], possibly after
2656 /// removing another peer.
2657 ///
2658 /// See [`Behaviour::kbucket`] for insight into the contents of
2659 /// the k-bucket of `peer`.
2660 RoutablePeer { peer: PeerId, address: Multiaddr },
2661
2662 /// A connection to a peer has been established for whom a listen address
2663 /// is known but the peer is only pending insertion into the routing table
2664 /// if the least-recently disconnected peer is unresponsive, i.e. the peer
2665 /// may not make it into the routing table.
2666 ///
2667 /// If the peer is to be unconditionally included in the routing table,
2668 /// it should be explicitly added via [`Behaviour::add_address`] after
2669 /// removing another peer.
2670 ///
2671 /// See [`Behaviour::kbucket`] for insight into the contents of
2672 /// the k-bucket of `peer`.
2673 PendingRoutablePeer { peer: PeerId, address: Multiaddr },
2674}
2675
2676/// Information about progress events.
2677#[derive(Debug, Clone)]
2678pub struct ProgressStep {
2679 /// The index into the event
2680 pub count: NonZeroUsize,
2681 /// Is this the final event?
2682 pub last: bool,
2683}
2684
2685impl ProgressStep {
2686 fn first() -> Self {
2687 Self {
2688 count: NonZeroUsize::new(1).expect("1 to be greater than 0."),
2689 last: false,
2690 }
2691 }
2692
2693 fn first_and_last() -> Self {
2694 let mut first = ProgressStep::first();
2695 first.last = true;
2696 first
2697 }
2698
2699 fn next(&self) -> Self {
2700 assert!(!self.last);
2701 let count = NonZeroUsize::new(self.count.get() + 1).expect("Adding 1 not to result in 0.");
2702
2703 Self { count, last: false }
2704 }
2705}
2706
2707/// Information about a received and handled inbound request.
2708#[derive(Debug, Clone)]
2709pub enum InboundRequest {
2710 /// Request for the list of nodes whose IDs are the closest to `key`.
2711 FindNode { num_closer_peers: usize },
2712 /// Same as `FindNode`, but should also return the entries of the local
2713 /// providers list for this key.
2714 GetProvider {
2715 num_closer_peers: usize,
2716 num_provider_peers: usize,
2717 },
2718 /// A peer sent an add provider request.
2719 /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`ProviderRecord`] is
2720 /// included.
2721 ///
2722 /// See [`StoreInserts`] and [`Config::set_record_filtering`] for details..
2723 AddProvider { record: Option<ProviderRecord> },
2724 /// Request to retrieve a record.
2725 GetRecord {
2726 num_closer_peers: usize,
2727 present_locally: bool,
2728 },
2729 /// A peer sent a put record request.
2730 /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`Record`] is included.
2731 ///
2732 /// See [`StoreInserts`] and [`Config::set_record_filtering`].
2733 PutRecord {
2734 source: PeerId,
2735 connection: ConnectionId,
2736 record: Option<Record>,
2737 },
2738}
2739
2740/// The results of Kademlia queries.
2741#[derive(Debug, Clone)]
2742pub enum QueryResult {
2743 /// The result of [`Behaviour::bootstrap`].
2744 Bootstrap(BootstrapResult),
2745
2746 /// The result of [`Behaviour::get_closest_peers`].
2747 GetClosestPeers(GetClosestPeersResult),
2748
2749 /// The result of [`Behaviour::get_providers`].
2750 GetProviders(GetProvidersResult),
2751
2752 /// The result of [`Behaviour::start_providing`].
2753 StartProviding(AddProviderResult),
2754
2755 /// The result of a (automatic) republishing of a provider record.
2756 RepublishProvider(AddProviderResult),
2757
2758 /// The result of [`Behaviour::get_record`].
2759 GetRecord(GetRecordResult),
2760
2761 /// The result of [`Behaviour::put_record`].
2762 PutRecord(PutRecordResult),
2763
2764 /// The result of a (automatic) republishing of a (value-)record.
2765 RepublishRecord(PutRecordResult),
2766}
2767
2768/// The result of [`Behaviour::get_record`].
2769pub type GetRecordResult = Result<GetRecordOk, GetRecordError>;
2770
2771/// The successful result of [`Behaviour::get_record`].
2772#[derive(Debug, Clone)]
2773pub enum GetRecordOk {
2774 FoundRecord(PeerRecord),
2775 FinishedWithNoAdditionalRecord {
2776 /// If caching is enabled, these are the peers closest
2777 /// _to the record key_ (not the local node) that were queried but
2778 /// did not return the record, sorted by distance to the record key
2779 /// from closest to farthest. How many of these are tracked is configured
2780 /// by [`Config::set_caching`].
2781 ///
2782 /// Writing back the cache at these peers is a manual operation.
2783 /// ie. you may wish to use these candidates with [`Behaviour::put_record_to`]
2784 /// after selecting one of the returned records.
2785 cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
2786 },
2787}
2788
2789/// The error result of [`Behaviour::get_record`].
2790#[derive(Debug, Clone, Error)]
2791pub enum GetRecordError {
2792 #[error("the record was not found")]
2793 NotFound {
2794 key: record_priv::Key,
2795 closest_peers: Vec<PeerId>,
2796 },
2797 #[error("the quorum failed; needed {quorum} peers")]
2798 QuorumFailed {
2799 key: record_priv::Key,
2800 records: Vec<PeerRecord>,
2801 quorum: NonZeroUsize,
2802 },
2803 #[error("the request timed out")]
2804 Timeout { key: record_priv::Key },
2805}
2806
2807impl GetRecordError {
2808 /// Gets the key of the record for which the operation failed.
2809 pub fn key(&self) -> &record_priv::Key {
2810 match self {
2811 GetRecordError::QuorumFailed { key, .. } => key,
2812 GetRecordError::Timeout { key, .. } => key,
2813 GetRecordError::NotFound { key, .. } => key,
2814 }
2815 }
2816
2817 /// Extracts the key of the record for which the operation failed,
2818 /// consuming the error.
2819 pub fn into_key(self) -> record_priv::Key {
2820 match self {
2821 GetRecordError::QuorumFailed { key, .. } => key,
2822 GetRecordError::Timeout { key, .. } => key,
2823 GetRecordError::NotFound { key, .. } => key,
2824 }
2825 }
2826}
2827
2828/// The result of [`Behaviour::put_record`].
2829pub type PutRecordResult = Result<PutRecordOk, PutRecordError>;
2830
2831/// The successful result of [`Behaviour::put_record`].
2832#[derive(Debug, Clone)]
2833pub struct PutRecordOk {
2834 pub key: record_priv::Key,
2835}
2836
2837/// The error result of [`Behaviour::put_record`].
2838#[derive(Debug, Clone, Error)]
2839pub enum PutRecordError {
2840 #[error("the quorum failed; needed {quorum} peers")]
2841 QuorumFailed {
2842 key: record_priv::Key,
2843 /// [`PeerId`]s of the peers the record was successfully stored on.
2844 success: Vec<PeerId>,
2845 quorum: NonZeroUsize,
2846 },
2847 #[error("the request timed out")]
2848 Timeout {
2849 key: record_priv::Key,
2850 /// [`PeerId`]s of the peers the record was successfully stored on.
2851 success: Vec<PeerId>,
2852 quorum: NonZeroUsize,
2853 },
2854}
2855
2856impl PutRecordError {
2857 /// Gets the key of the record for which the operation failed.
2858 pub fn key(&self) -> &record_priv::Key {
2859 match self {
2860 PutRecordError::QuorumFailed { key, .. } => key,
2861 PutRecordError::Timeout { key, .. } => key,
2862 }
2863 }
2864
2865 /// Extracts the key of the record for which the operation failed,
2866 /// consuming the error.
2867 pub fn into_key(self) -> record_priv::Key {
2868 match self {
2869 PutRecordError::QuorumFailed { key, .. } => key,
2870 PutRecordError::Timeout { key, .. } => key,
2871 }
2872 }
2873}
2874
2875/// The result of [`Behaviour::bootstrap`].
2876pub type BootstrapResult = Result<BootstrapOk, BootstrapError>;
2877
2878/// The successful result of [`Behaviour::bootstrap`].
2879#[derive(Debug, Clone)]
2880pub struct BootstrapOk {
2881 pub peer: PeerId,
2882 pub num_remaining: u32,
2883}
2884
2885/// The error result of [`Behaviour::bootstrap`].
2886#[derive(Debug, Clone, Error)]
2887pub enum BootstrapError {
2888 #[error("the request timed out")]
2889 Timeout {
2890 peer: PeerId,
2891 num_remaining: Option<u32>,
2892 },
2893}
2894
2895/// The result of [`Behaviour::get_closest_peers`].
2896pub type GetClosestPeersResult = Result<GetClosestPeersOk, GetClosestPeersError>;
2897
2898/// The successful result of [`Behaviour::get_closest_peers`].
2899#[derive(Debug, Clone)]
2900pub struct GetClosestPeersOk {
2901 pub key: Vec<u8>,
2902 pub peers: Vec<PeerId>,
2903}
2904
2905/// The error result of [`Behaviour::get_closest_peers`].
2906#[derive(Debug, Clone, Error)]
2907pub enum GetClosestPeersError {
2908 #[error("the request timed out")]
2909 Timeout { key: Vec<u8>, peers: Vec<PeerId> },
2910}
2911
2912impl GetClosestPeersError {
2913 /// Gets the key for which the operation failed.
2914 pub fn key(&self) -> &Vec<u8> {
2915 match self {
2916 GetClosestPeersError::Timeout { key, .. } => key,
2917 }
2918 }
2919
2920 /// Extracts the key for which the operation failed,
2921 /// consuming the error.
2922 pub fn into_key(self) -> Vec<u8> {
2923 match self {
2924 GetClosestPeersError::Timeout { key, .. } => key,
2925 }
2926 }
2927}
2928
2929/// The result of [`Behaviour::get_providers`].
2930pub type GetProvidersResult = Result<GetProvidersOk, GetProvidersError>;
2931
2932/// The successful result of [`Behaviour::get_providers`].
2933#[derive(Debug, Clone)]
2934pub enum GetProvidersOk {
2935 FoundProviders {
2936 key: record_priv::Key,
2937 /// The new set of providers discovered.
2938 providers: HashSet<PeerId>,
2939 },
2940 FinishedWithNoAdditionalRecord {
2941 closest_peers: Vec<PeerId>,
2942 },
2943}
2944
2945/// The error result of [`Behaviour::get_providers`].
2946#[derive(Debug, Clone, Error)]
2947pub enum GetProvidersError {
2948 #[error("the request timed out")]
2949 Timeout {
2950 key: record_priv::Key,
2951 closest_peers: Vec<PeerId>,
2952 },
2953}
2954
2955impl GetProvidersError {
2956 /// Gets the key for which the operation failed.
2957 pub fn key(&self) -> &record_priv::Key {
2958 match self {
2959 GetProvidersError::Timeout { key, .. } => key,
2960 }
2961 }
2962
2963 /// Extracts the key for which the operation failed,
2964 /// consuming the error.
2965 pub fn into_key(self) -> record_priv::Key {
2966 match self {
2967 GetProvidersError::Timeout { key, .. } => key,
2968 }
2969 }
2970}
2971
2972/// The result of publishing a provider record.
2973pub type AddProviderResult = Result<AddProviderOk, AddProviderError>;
2974
2975/// The successful result of publishing a provider record.
2976#[derive(Debug, Clone)]
2977pub struct AddProviderOk {
2978 pub key: record_priv::Key,
2979}
2980
2981/// The possible errors when publishing a provider record.
2982#[derive(Debug, Clone, Error)]
2983pub enum AddProviderError {
2984 #[error("the request timed out")]
2985 Timeout { key: record_priv::Key },
2986}
2987
2988impl AddProviderError {
2989 /// Gets the key for which the operation failed.
2990 pub fn key(&self) -> &record_priv::Key {
2991 match self {
2992 AddProviderError::Timeout { key, .. } => key,
2993 }
2994 }
2995
2996 /// Extracts the key for which the operation failed,
2997 pub fn into_key(self) -> record_priv::Key {
2998 match self {
2999 AddProviderError::Timeout { key, .. } => key,
3000 }
3001 }
3002}
3003
3004impl From<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> for KadPeer {
3005 fn from(e: kbucket::EntryView<kbucket::Key<PeerId>, Addresses>) -> KadPeer {
3006 KadPeer {
3007 node_id: e.node.key.into_preimage(),
3008 multiaddrs: e.node.value.into_vec(),
3009 connection_ty: match e.status {
3010 NodeStatus::Connected => ConnectionType::Connected,
3011 NodeStatus::Disconnected => ConnectionType::NotConnected,
3012 },
3013 }
3014 }
3015}
3016
3017//////////////////////////////////////////////////////////////////////////////
3018// Internal query state
3019
3020struct QueryInner {
3021 /// The query-specific state.
3022 info: QueryInfo,
3023 /// Addresses of peers discovered during a query.
3024 addresses: FnvHashMap<PeerId, SmallVec<[Multiaddr; 8]>>,
3025 /// A map of pending requests to peers.
3026 ///
3027 /// A request is pending if the targeted peer is not currently connected
3028 /// and these requests are sent as soon as a connection to the peer is established.
3029 pending_rpcs: SmallVec<[(PeerId, HandlerIn); K_VALUE.get()]>,
3030}
3031
3032impl QueryInner {
3033 fn new(info: QueryInfo) -> Self {
3034 QueryInner {
3035 info,
3036 addresses: Default::default(),
3037 pending_rpcs: SmallVec::default(),
3038 }
3039 }
3040}
3041
3042/// The context of a [`QueryInfo::AddProvider`] query.
3043#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3044pub enum AddProviderContext {
3045 /// The context is a [`Behaviour::start_providing`] operation.
3046 Publish,
3047 /// The context is periodic republishing of provider announcements
3048 /// initiated earlier via [`Behaviour::start_providing`].
3049 Republish,
3050}
3051
3052/// The context of a [`QueryInfo::PutRecord`] query.
3053#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3054pub enum PutRecordContext {
3055 /// The context is a [`Behaviour::put_record`] operation.
3056 Publish,
3057 /// The context is periodic republishing of records stored
3058 /// earlier via [`Behaviour::put_record`].
3059 Republish,
3060 /// The context is periodic replication (i.e. without extending
3061 /// the record TTL) of stored records received earlier from another peer.
3062 Replicate,
3063 /// The context is a custom store operation targeting specific
3064 /// peers initiated by [`Behaviour::put_record_to`].
3065 Custom,
3066}
3067
3068/// Information about a running query.
3069#[derive(Debug, Clone)]
3070pub enum QueryInfo {
3071 /// A query initiated by [`Behaviour::bootstrap`].
3072 Bootstrap {
3073 /// The targeted peer ID.
3074 peer: PeerId,
3075 /// The remaining random peer IDs to query, one per
3076 /// bucket that still needs refreshing.
3077 ///
3078 /// This is `None` if the initial self-lookup has not
3079 /// yet completed and `Some` with an exhausted iterator
3080 /// if bootstrapping is complete.
3081 remaining: Option<vec::IntoIter<kbucket::Key<PeerId>>>,
3082 step: ProgressStep,
3083 },
3084
3085 /// A (repeated) query initiated by [`Behaviour::get_closest_peers`].
3086 GetClosestPeers {
3087 /// The key being queried (the preimage).
3088 key: Vec<u8>,
3089 /// Current index of events.
3090 step: ProgressStep,
3091 },
3092
3093 /// A (repeated) query initiated by [`Behaviour::get_providers`].
3094 GetProviders {
3095 /// The key for which to search for providers.
3096 key: record_priv::Key,
3097 /// The number of providers found so far.
3098 providers_found: usize,
3099 /// Current index of events.
3100 step: ProgressStep,
3101 },
3102
3103 /// A (repeated) query initiated by [`Behaviour::start_providing`].
3104 AddProvider {
3105 /// The record key.
3106 key: record_priv::Key,
3107 /// The current phase of the query.
3108 phase: AddProviderPhase,
3109 /// The execution context of the query.
3110 context: AddProviderContext,
3111 },
3112
3113 /// A (repeated) query initiated by [`Behaviour::put_record`].
3114 PutRecord {
3115 record: Record,
3116 /// The expected quorum of responses w.r.t. the replication factor.
3117 quorum: NonZeroUsize,
3118 /// The current phase of the query.
3119 phase: PutRecordPhase,
3120 /// The execution context of the query.
3121 context: PutRecordContext,
3122 },
3123
3124 /// A (repeated) query initiated by [`Behaviour::get_record`].
3125 GetRecord {
3126 /// The key to look for.
3127 key: record_priv::Key,
3128 /// Current index of events.
3129 step: ProgressStep,
3130 /// Did we find at least one record?
3131 found_a_record: bool,
3132 /// The peers closest to the `key` that were queried but did not return a record,
3133 /// i.e. the peers that are candidates for caching the record.
3134 cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
3135 },
3136}
3137
3138impl QueryInfo {
3139 /// Creates an event for a handler to issue an outgoing request in the
3140 /// context of a query.
3141 fn to_request(&self, query_id: QueryId) -> HandlerIn {
3142 match &self {
3143 QueryInfo::Bootstrap { peer, .. } => HandlerIn::FindNodeReq {
3144 key: peer.to_bytes(),
3145 query_id,
3146 },
3147 QueryInfo::GetClosestPeers { key, .. } => HandlerIn::FindNodeReq {
3148 key: key.clone(),
3149 query_id,
3150 },
3151 QueryInfo::GetProviders { key, .. } => HandlerIn::GetProvidersReq {
3152 key: key.clone(),
3153 query_id,
3154 },
3155 QueryInfo::AddProvider { key, phase, .. } => match phase {
3156 AddProviderPhase::GetClosestPeers => HandlerIn::FindNodeReq {
3157 key: key.to_vec(),
3158 query_id,
3159 },
3160 AddProviderPhase::AddProvider {
3161 provider_id,
3162 external_addresses,
3163 ..
3164 } => HandlerIn::AddProvider {
3165 key: key.clone(),
3166 provider: crate::protocol::KadPeer {
3167 node_id: *provider_id,
3168 multiaddrs: external_addresses.clone(),
3169 connection_ty: crate::protocol::ConnectionType::Connected,
3170 },
3171 },
3172 },
3173 QueryInfo::GetRecord { key, .. } => HandlerIn::GetRecord {
3174 key: key.clone(),
3175 query_id,
3176 },
3177 QueryInfo::PutRecord { record, phase, .. } => match phase {
3178 PutRecordPhase::GetClosestPeers => HandlerIn::FindNodeReq {
3179 key: record.key.to_vec(),
3180 query_id,
3181 },
3182 PutRecordPhase::PutRecord { .. } => HandlerIn::PutRecord {
3183 record: record.clone(),
3184 query_id,
3185 },
3186 },
3187 }
3188 }
3189}
3190
3191/// The phases of a [`QueryInfo::AddProvider`] query.
3192#[derive(Debug, Clone)]
3193pub enum AddProviderPhase {
3194 /// The query is searching for the closest nodes to the record key.
3195 GetClosestPeers,
3196
3197 /// The query advertises the local node as a provider for the key to
3198 /// the closest nodes to the key.
3199 AddProvider {
3200 /// The local peer ID that is advertised as a provider.
3201 provider_id: PeerId,
3202 /// The external addresses of the provider being advertised.
3203 external_addresses: Vec<Multiaddr>,
3204 /// Query statistics from the finished `GetClosestPeers` phase.
3205 get_closest_peers_stats: QueryStats,
3206 },
3207}
3208
3209/// The phases of a [`QueryInfo::PutRecord`] query.
3210#[derive(Debug, Clone, PartialEq, Eq)]
3211pub enum PutRecordPhase {
3212 /// The query is searching for the closest nodes to the record key.
3213 GetClosestPeers,
3214
3215 /// The query is replicating the record to the closest nodes to the key.
3216 PutRecord {
3217 /// A list of peers the given record has been successfully replicated to.
3218 success: Vec<PeerId>,
3219 /// Query statistics from the finished `GetClosestPeers` phase.
3220 get_closest_peers_stats: QueryStats,
3221 },
3222}
3223
3224/// A mutable reference to a running query.
3225pub struct QueryMut<'a> {
3226 query: &'a mut Query<QueryInner>,
3227}
3228
3229impl<'a> QueryMut<'a> {
3230 pub fn id(&self) -> QueryId {
3231 self.query.id()
3232 }
3233
3234 /// Gets information about the type and state of the query.
3235 pub fn info(&self) -> &QueryInfo {
3236 &self.query.inner.info
3237 }
3238
3239 /// Gets execution statistics about the query.
3240 ///
3241 /// For a multi-phase query such as `put_record`, these are the
3242 /// statistics of the current phase.
3243 pub fn stats(&self) -> &QueryStats {
3244 self.query.stats()
3245 }
3246
3247 /// Finishes the query asap, without waiting for the
3248 /// regular termination conditions.
3249 pub fn finish(&mut self) {
3250 self.query.finish()
3251 }
3252}
3253
3254/// An immutable reference to a running query.
3255pub struct QueryRef<'a> {
3256 query: &'a Query<QueryInner>,
3257}
3258
3259impl<'a> QueryRef<'a> {
3260 pub fn id(&self) -> QueryId {
3261 self.query.id()
3262 }
3263
3264 /// Gets information about the type and state of the query.
3265 pub fn info(&self) -> &QueryInfo {
3266 &self.query.inner.info
3267 }
3268
3269 /// Gets execution statistics about the query.
3270 ///
3271 /// For a multi-phase query such as `put_record`, these are the
3272 /// statistics of the current phase.
3273 pub fn stats(&self) -> &QueryStats {
3274 self.query.stats()
3275 }
3276}
3277
3278/// An operation failed to due no known peers in the routing table.
3279#[derive(Debug, Clone)]
3280pub struct NoKnownPeers();
3281
3282impl fmt::Display for NoKnownPeers {
3283 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3284 write!(f, "No known peers.")
3285 }
3286}
3287
3288impl std::error::Error for NoKnownPeers {}
3289
3290/// The possible outcomes of [`Behaviour::add_address`].
3291#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3292pub enum RoutingUpdate {
3293 /// The given peer and address has been added to the routing
3294 /// table.
3295 Success,
3296 /// The peer and address is pending insertion into
3297 /// the routing table, if a disconnected peer fails
3298 /// to respond. If the given peer and address ends up
3299 /// in the routing table, [`Event::RoutingUpdated`]
3300 /// is eventually emitted.
3301 Pending,
3302 /// The routing table update failed, either because the
3303 /// corresponding bucket for the peer is full and the
3304 /// pending slot(s) are occupied, or because the given
3305 /// peer ID is deemed invalid (e.g. refers to the local
3306 /// peer ID).
3307 Failed,
3308}
3309
3310#[derive(PartialEq, Copy, Clone, Debug)]
3311pub enum Mode {
3312 Client,
3313 Server,
3314}
3315
3316impl fmt::Display for Mode {
3317 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3318 match self {
3319 Mode::Client => write!(f, "client"),
3320 Mode::Server => write!(f, "server"),
3321 }
3322 }
3323}
3324
3325fn to_comma_separated_list<T>(confirmed_external_addresses: &[T]) -> String
3326where
3327 T: ToString,
3328{
3329 confirmed_external_addresses
3330 .iter()
3331 .map(|addr| addr.to_string())
3332 .collect::<Vec<_>>()
3333 .join(", ")
3334}