litep2p/protocol/libp2p/kademlia/
mod.rs

1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! [`/ipfs/kad/1.0.0`](https://github.com/libp2p/specs/blob/master/kad-dht/README.md) implementation.
22
23use crate::{
24    error::{Error, ImmediateDialError, SubstreamError},
25    protocol::{
26        libp2p::kademlia::{
27            bucket::KBucketEntry,
28            executor::{QueryContext, QueryExecutor, QueryResult},
29            handle::KademliaCommand,
30            message::KademliaMessage,
31            query::{QueryAction, QueryEngine},
32            record::ProviderRecord,
33            routing_table::RoutingTable,
34            store::MemoryStore,
35            types::{ConnectionType, KademliaPeer, Key},
36        },
37        Direction, TransportEvent, TransportService,
38    },
39    substream::Substream,
40    types::SubstreamId,
41    PeerId,
42};
43
44use bytes::{Bytes, BytesMut};
45use futures::StreamExt;
46use multiaddr::Multiaddr;
47use tokio::sync::mpsc::{Receiver, Sender};
48
49use std::{
50    collections::{hash_map::Entry, HashMap},
51    time::{Duration, Instant},
52};
53
54pub use self::handle::RecordsType;
55pub use config::{Config, ConfigBuilder};
56pub use handle::{
57    IncomingRecordValidationMode, KademliaEvent, KademliaHandle, Quorum, RoutingTableUpdateMode,
58};
59pub use query::QueryId;
60pub use record::{Key as RecordKey, PeerRecord, Record};
61
62/// Logging target for the file.
63const LOG_TARGET: &str = "litep2p::ipfs::kademlia";
64
65/// Parallelism factor, `α`.
66const PARALLELISM_FACTOR: usize = 3;
67
68mod bucket;
69mod config;
70mod executor;
71mod handle;
72mod message;
73mod query;
74mod record;
75mod routing_table;
76mod store;
77mod types;
78
79mod schema {
80    pub(super) mod kademlia {
81        include!(concat!(env!("OUT_DIR"), "/kademlia.rs"));
82    }
83}
84
85/// Peer action.
86#[derive(Debug)]
87enum PeerAction {
88    /// Send `FIND_NODE` message to peer.
89    SendFindNode(QueryId),
90
91    /// Send `PUT_VALUE` message to peer.
92    SendPutValue(Bytes),
93}
94
95/// Peer context.
96#[derive(Default)]
97struct PeerContext {
98    /// Pending action, if any.
99    pending_actions: HashMap<SubstreamId, PeerAction>,
100}
101
102impl PeerContext {
103    /// Create new [`PeerContext`].
104    pub fn new() -> Self {
105        Self {
106            pending_actions: HashMap::new(),
107        }
108    }
109
110    /// Add pending action for peer.
111    pub fn add_pending_action(&mut self, substream_id: SubstreamId, action: PeerAction) {
112        self.pending_actions.insert(substream_id, action);
113    }
114}
115
116/// Main Kademlia object.
117pub(crate) struct Kademlia {
118    /// Transport service.
119    service: TransportService,
120
121    /// Local Kademlia key.
122    local_key: Key<PeerId>,
123
124    /// Connected peers,
125    peers: HashMap<PeerId, PeerContext>,
126
127    /// TX channel for sending events to `KademliaHandle`.
128    event_tx: Sender<KademliaEvent>,
129
130    /// RX channel for receiving commands from `KademliaHandle`.
131    cmd_rx: Receiver<KademliaCommand>,
132
133    /// Routing table.
134    routing_table: RoutingTable,
135
136    /// Replication factor.
137    replication_factor: usize,
138
139    /// Record store.
140    store: MemoryStore,
141
142    /// Pending outbound substreams.
143    pending_substreams: HashMap<SubstreamId, PeerId>,
144
145    /// Pending dials.
146    pending_dials: HashMap<PeerId, Vec<PeerAction>>,
147
148    /// Routing table update mode.
149    update_mode: RoutingTableUpdateMode,
150
151    /// Incoming records validation mode.
152    validation_mode: IncomingRecordValidationMode,
153
154    /// Default record TTL.
155    record_ttl: Duration,
156
157    /// Provider record TTL.
158    provider_ttl: Duration,
159
160    /// Query engine.
161    engine: QueryEngine,
162
163    /// Query executor.
164    executor: QueryExecutor,
165}
166
167impl Kademlia {
168    /// Create new [`Kademlia`].
169    pub(crate) fn new(mut service: TransportService, config: Config) -> Self {
170        let local_peer_id = service.local_peer_id();
171        let local_key = Key::from(service.local_peer_id());
172        let mut routing_table = RoutingTable::new(local_key.clone());
173
174        for (peer, addresses) in config.known_peers {
175            tracing::trace!(target: LOG_TARGET, ?peer, ?addresses, "add bootstrap peer");
176
177            routing_table.add_known_peer(peer, addresses.clone(), ConnectionType::NotConnected);
178            service.add_known_address(&peer, addresses.into_iter());
179        }
180
181        Self {
182            service,
183            routing_table,
184            peers: HashMap::new(),
185            cmd_rx: config.cmd_rx,
186            store: MemoryStore::new(),
187            event_tx: config.event_tx,
188            local_key,
189            pending_dials: HashMap::new(),
190            executor: QueryExecutor::new(),
191            pending_substreams: HashMap::new(),
192            update_mode: config.update_mode,
193            validation_mode: config.validation_mode,
194            record_ttl: config.record_ttl,
195            provider_ttl: config.provider_ttl,
196            replication_factor: config.replication_factor,
197            engine: QueryEngine::new(local_peer_id, config.replication_factor, PARALLELISM_FACTOR),
198        }
199    }
200
201    /// Connection established to remote peer.
202    fn on_connection_established(&mut self, peer: PeerId) -> crate::Result<()> {
203        tracing::trace!(target: LOG_TARGET, ?peer, "connection established");
204
205        match self.peers.entry(peer) {
206            Entry::Vacant(entry) => {
207                if let KBucketEntry::Occupied(entry) = self.routing_table.entry(Key::from(peer)) {
208                    entry.connection = ConnectionType::Connected;
209                }
210
211                let Some(actions) = self.pending_dials.remove(&peer) else {
212                    entry.insert(PeerContext::new());
213                    return Ok(());
214                };
215
216                // go over all pending actions, open substreams and save the state to `PeerContext`
217                // from which it will be later queried when the substream opens
218                let mut context = PeerContext::new();
219
220                for action in actions {
221                    match self.service.open_substream(peer) {
222                        Ok(substream_id) => {
223                            context.add_pending_action(substream_id, action);
224                        }
225                        Err(error) => {
226                            tracing::debug!(
227                                target: LOG_TARGET,
228                                ?peer,
229                                ?action,
230                                ?error,
231                                "connection established to peer but failed to open substream",
232                            );
233
234                            if let PeerAction::SendFindNode(query_id) = action {
235                                self.engine.register_response_failure(query_id, peer);
236                            }
237                        }
238                    }
239                }
240
241                entry.insert(context);
242                Ok(())
243            }
244            Entry::Occupied(_) => Err(Error::PeerAlreadyExists(peer)),
245        }
246    }
247
248    /// Disconnect peer from `Kademlia`.
249    ///
250    /// Peer is disconnected either because the substream was detected closed
251    /// or because the connection was closed.
252    ///
253    /// The peer is kept in the routing table but its connection state is set
254    /// as `NotConnected`, meaning it can be evicted from a k-bucket if another
255    /// peer that shares the bucket connects.
256    async fn disconnect_peer(&mut self, peer: PeerId, query: Option<QueryId>) {
257        tracing::trace!(target: LOG_TARGET, ?peer, ?query, "disconnect peer");
258
259        if let Some(query) = query {
260            self.engine.register_response_failure(query, peer);
261        }
262
263        if let Some(PeerContext { pending_actions }) = self.peers.remove(&peer) {
264            pending_actions.into_iter().for_each(|(_, action)| {
265                if let PeerAction::SendFindNode(query_id) = action {
266                    self.engine.register_response_failure(query_id, peer);
267                }
268            });
269        }
270
271        if let KBucketEntry::Occupied(entry) = self.routing_table.entry(Key::from(peer)) {
272            entry.connection = ConnectionType::NotConnected;
273        }
274    }
275
276    /// Local node opened a substream to remote node.
277    async fn on_outbound_substream(
278        &mut self,
279        peer: PeerId,
280        substream_id: SubstreamId,
281        substream: Substream,
282    ) -> crate::Result<()> {
283        tracing::trace!(
284            target: LOG_TARGET,
285            ?peer,
286            ?substream_id,
287            "outbound substream opened",
288        );
289        let _ = self.pending_substreams.remove(&substream_id);
290
291        let pending_action = &mut self
292            .peers
293            .get_mut(&peer)
294            .ok_or(Error::PeerDoesntExist(peer))?
295            .pending_actions
296            .remove(&substream_id);
297
298        match pending_action.take() {
299            None => {
300                tracing::trace!(
301                    target: LOG_TARGET,
302                    ?peer,
303                    ?substream_id,
304                    "pending action doesn't exist for peer, closing substream",
305                );
306
307                let _ = substream.close().await;
308                return Ok(());
309            }
310            Some(PeerAction::SendFindNode(query)) => {
311                match self.engine.next_peer_action(&query, &peer) {
312                    Some(QueryAction::SendMessage {
313                        query,
314                        peer,
315                        message,
316                    }) => {
317                        tracing::trace!(target: LOG_TARGET, ?peer, ?query, "start sending message to peer");
318
319                        self.executor.send_request_read_response(
320                            peer,
321                            Some(query),
322                            message,
323                            substream,
324                        );
325                    }
326                    // query finished while the substream was being opened
327                    None => {
328                        let _ = substream.close().await;
329                    }
330                    action => {
331                        tracing::warn!(target: LOG_TARGET, ?query, ?peer, ?action, "unexpected action for `FIND_NODE`");
332                        let _ = substream.close().await;
333                        debug_assert!(false);
334                    }
335                }
336            }
337            Some(PeerAction::SendPutValue(message)) => {
338                tracing::trace!(target: LOG_TARGET, ?peer, "send `PUT_VALUE` response");
339
340                self.executor.send_message(peer, message, substream);
341            }
342        }
343
344        Ok(())
345    }
346
347    /// Remote opened a substream to local node.
348    async fn on_inbound_substream(&mut self, peer: PeerId, substream: Substream) {
349        tracing::trace!(target: LOG_TARGET, ?peer, "inbound substream opened");
350
351        self.executor.read_message(peer, None, substream);
352    }
353
354    /// Update routing table if the routing table update mode was set to automatic.
355    ///
356    /// Inform user about the potential routing table, allowing them to update it manually if
357    /// the mode was set to manual.
358    async fn update_routing_table(&mut self, peers: &[KademliaPeer]) {
359        let peers: Vec<_> =
360            peers.iter().filter(|peer| peer.peer != self.service.local_peer_id()).collect();
361
362        // inform user about the routing table update, regardless of what the routing table update
363        // mode is
364        let _ = self
365            .event_tx
366            .send(KademliaEvent::RoutingTableUpdate {
367                peers: peers.iter().map(|peer| peer.peer).collect::<Vec<PeerId>>(),
368            })
369            .await;
370
371        for info in peers {
372            self.service.add_known_address(&info.peer, info.addresses.iter().cloned());
373
374            if std::matches!(self.update_mode, RoutingTableUpdateMode::Automatic) {
375                self.routing_table.add_known_peer(
376                    info.peer,
377                    info.addresses.clone(),
378                    self.peers
379                        .get(&info.peer)
380                        .map_or(ConnectionType::NotConnected, |_| ConnectionType::Connected),
381                );
382            }
383        }
384    }
385
386    /// Handle received message.
387    async fn on_message_received(
388        &mut self,
389        peer: PeerId,
390        query_id: Option<QueryId>,
391        message: BytesMut,
392        substream: Substream,
393    ) -> crate::Result<()> {
394        tracing::trace!(target: LOG_TARGET, ?peer, query = ?query_id, "handle message from peer");
395
396        match KademliaMessage::from_bytes(message).ok_or(Error::InvalidData)? {
397            ref message @ KademliaMessage::FindNode {
398                ref target,
399                ref peers,
400            } => {
401                match query_id {
402                    Some(query_id) => {
403                        tracing::trace!(
404                            target: LOG_TARGET,
405                            ?peer,
406                            ?target,
407                            query = ?query_id,
408                            "handle `FIND_NODE` response",
409                        );
410
411                        // update routing table and inform user about the update
412                        self.update_routing_table(peers).await;
413                        self.engine.register_response(query_id, peer, message.clone());
414                    }
415                    None => {
416                        tracing::trace!(
417                            target: LOG_TARGET,
418                            ?peer,
419                            ?target,
420                            "handle `FIND_NODE` request",
421                        );
422
423                        let message = KademliaMessage::find_node_response(
424                            target,
425                            self.routing_table
426                                .closest(Key::from(target.clone()), self.replication_factor),
427                        );
428                        self.executor.send_message(peer, message.into(), substream);
429                    }
430                }
431            }
432            KademliaMessage::PutValue { record } => {
433                tracing::trace!(
434                    target: LOG_TARGET,
435                    ?peer,
436                    record_key = ?record.key,
437                    "handle `PUT_VALUE` message",
438                );
439
440                if let IncomingRecordValidationMode::Automatic = self.validation_mode {
441                    self.store.put(record.clone());
442                }
443
444                let _ = self.event_tx.send(KademliaEvent::IncomingRecord { record }).await;
445            }
446            ref message @ KademliaMessage::GetRecord {
447                ref key,
448                ref record,
449                ref peers,
450            } => {
451                match (query_id, key) {
452                    (Some(query_id), _) => {
453                        tracing::trace!(
454                            target: LOG_TARGET,
455                            ?peer,
456                            query = ?query_id,
457                            ?peers,
458                            ?record,
459                            "handle `GET_VALUE` response",
460                        );
461
462                        // update routing table and inform user about the update
463                        self.update_routing_table(peers).await;
464                        self.engine.register_response(query_id, peer, message.clone());
465                    }
466                    (None, Some(key)) => {
467                        tracing::trace!(
468                            target: LOG_TARGET,
469                            ?peer,
470                            ?key,
471                            "handle `GET_VALUE` request",
472                        );
473
474                        let value = self.store.get(key).cloned();
475                        let closest_peers = self
476                            .routing_table
477                            .closest(Key::from(key.to_vec()), self.replication_factor);
478
479                        let message = KademliaMessage::get_value_response(
480                            (*key).clone(),
481                            closest_peers,
482                            value,
483                        );
484                        self.executor.send_message(peer, message.into(), substream);
485                    }
486                    (None, None) => tracing::debug!(
487                        target: LOG_TARGET,
488                        ?peer,
489                        ?message,
490                        "unable to handle `GET_RECORD` request with empty key",
491                    ),
492                }
493            }
494            KademliaMessage::AddProvider { key, providers } => {
495                tracing::trace!(
496                    target: LOG_TARGET,
497                    ?peer,
498                    ?key,
499                    ?providers,
500                    "handle `ADD_PROVIDER` message",
501                );
502
503                match (providers.len(), providers.first()) {
504                    (1, Some(provider)) =>
505                        if provider.peer == peer {
506                            self.store.put_provider(ProviderRecord {
507                                key,
508                                provider: peer,
509                                addresses: provider.addresses.clone(),
510                                expires: Instant::now() + self.provider_ttl,
511                            });
512                        } else {
513                            tracing::trace!(
514                                target: LOG_TARGET,
515                                publisher = ?peer,
516                                provider = ?provider.peer,
517                                "ignoring `ADD_PROVIDER` message with `publisher` != `provider`"
518                            )
519                        },
520                    (n, _) => {
521                        tracing::trace!(
522                            target: LOG_TARGET,
523                            publisher = ?peer,
524                            ?n,
525                            "ignoring `ADD_PROVIDER` message with `n` != 1 providers"
526                        )
527                    }
528                }
529            }
530            ref message @ KademliaMessage::GetProviders {
531                ref key,
532                ref peers,
533                ref providers,
534            } => {
535                match (query_id, key) {
536                    (Some(query_id), key) => {
537                        // Note: key is not required, but can be non-empty. We just ignore it here.
538                        tracing::trace!(
539                            target: LOG_TARGET,
540                            ?peer,
541                            query = ?query_id,
542                            ?key,
543                            ?peers,
544                            ?providers,
545                            "handle `GET_PROVIDERS` response",
546                        );
547
548                        // update routing table and inform user about the update
549                        self.update_routing_table(peers).await;
550
551                        self.engine.register_response(query_id, peer, message.clone());
552                    }
553                    (None, Some(key)) => {
554                        tracing::trace!(
555                            target: LOG_TARGET,
556                            ?peer,
557                            ?key,
558                            "handle `GET_PROVIDERS` request",
559                        );
560
561                        let providers = self.store.get_providers(key);
562                        // TODO: if local peer is among the providers, update its `ProviderRecord`
563                        //       to have up-to-date addresses.
564                        //       Requires https://github.com/paritytech/litep2p/issues/211.
565
566                        let closer_peers = self
567                            .routing_table
568                            .closest(Key::from(key.to_vec()), self.replication_factor);
569
570                        let message = KademliaMessage::get_providers_response(
571                            key.clone(),
572                            providers,
573                            &closer_peers,
574                        );
575                        self.executor.send_message(peer, message.into(), substream);
576                    }
577                    (None, None) => tracing::debug!(
578                        target: LOG_TARGET,
579                        ?peer,
580                        ?message,
581                        "unable to handle `GET_PROVIDERS` request with empty key",
582                    ),
583                }
584            }
585        }
586
587        Ok(())
588    }
589
590    /// Failed to open substream to remote peer.
591    async fn on_substream_open_failure(
592        &mut self,
593        substream_id: SubstreamId,
594        error: SubstreamError,
595    ) {
596        tracing::trace!(
597            target: LOG_TARGET,
598            ?substream_id,
599            ?error,
600            "failed to open substream"
601        );
602
603        let Some(peer) = self.pending_substreams.remove(&substream_id) else {
604            tracing::debug!(
605                target: LOG_TARGET,
606                ?substream_id,
607                "outbound substream failed for non-existent peer"
608            );
609            return;
610        };
611
612        if let Some(context) = self.peers.get_mut(&peer) {
613            let query = match context.pending_actions.remove(&substream_id) {
614                Some(PeerAction::SendFindNode(query)) => Some(query),
615                _ => None,
616            };
617
618            self.disconnect_peer(peer, query).await;
619        }
620    }
621
622    /// Handle dial failure.
623    fn on_dial_failure(&mut self, peer: PeerId, address: Multiaddr) {
624        tracing::trace!(target: LOG_TARGET, ?peer, ?address, "failed to dial peer");
625
626        let Some(actions) = self.pending_dials.remove(&peer) else {
627            return;
628        };
629
630        for action in actions {
631            if let PeerAction::SendFindNode(query_id) = action {
632                tracing::trace!(
633                    target: LOG_TARGET,
634                    ?peer,
635                    query = ?query_id,
636                    ?address,
637                    "report failure for pending query",
638                );
639
640                self.engine.register_response_failure(query_id, peer);
641            }
642        }
643    }
644
645    /// Open a substream with a peer or dial the peer.
646    fn open_substream_or_dial(
647        &mut self,
648        peer: PeerId,
649        action: PeerAction,
650        query: Option<QueryId>,
651    ) -> Result<(), Error> {
652        match self.service.open_substream(peer) {
653            Ok(substream_id) => {
654                self.pending_substreams.insert(substream_id, peer);
655                self.peers.entry(peer).or_default().pending_actions.insert(substream_id, action);
656
657                Ok(())
658            }
659            Err(err) => {
660                tracing::trace!(target: LOG_TARGET, ?query, ?peer, ?err, "Failed to open substream. Dialing peer");
661
662                match self.service.dial(&peer) {
663                    Ok(()) => {
664                        self.pending_dials.entry(peer).or_default().push(action);
665                        Ok(())
666                    }
667
668                    // Already connected is a recoverable error.
669                    Err(ImmediateDialError::AlreadyConnected) => {
670                        // Dial returned `Error::AlreadyConnected`, retry opening the substream.
671                        match self.service.open_substream(peer) {
672                            Ok(substream_id) => {
673                                self.pending_substreams.insert(substream_id, peer);
674                                self.peers
675                                    .entry(peer)
676                                    .or_default()
677                                    .pending_actions
678                                    .insert(substream_id, action);
679                                Ok(())
680                            }
681                            Err(err) => {
682                                tracing::trace!(target: LOG_TARGET, ?query, ?peer, ?err, "Failed to open substream a second time");
683                                Err(err.into())
684                            }
685                        }
686                    }
687
688                    Err(error) => {
689                        tracing::trace!(target: LOG_TARGET, ?query, ?peer, ?error, "Failed to dial peer");
690                        Err(error.into())
691                    }
692                }
693            }
694        }
695    }
696
697    /// Handle next query action.
698    async fn on_query_action(&mut self, action: QueryAction) -> Result<(), (QueryId, PeerId)> {
699        match action {
700            QueryAction::SendMessage { query, peer, .. } => {
701                if self
702                    .open_substream_or_dial(peer, PeerAction::SendFindNode(query), Some(query))
703                    .is_err()
704                {
705                    // Announce the error to the query engine.
706                    self.engine.register_response_failure(query, peer);
707                }
708                Ok(())
709            }
710            QueryAction::FindNodeQuerySucceeded {
711                target,
712                peers,
713                query,
714            } => {
715                tracing::debug!(
716                    target: LOG_TARGET,
717                    ?query,
718                    peer = ?target,
719                    num_peers = ?peers.len(),
720                    "`FIND_NODE` succeeded",
721                );
722
723                let _ = self
724                    .event_tx
725                    .send(KademliaEvent::FindNodeSuccess {
726                        target,
727                        query_id: query,
728                        peers: peers.into_iter().map(|info| (info.peer, info.addresses)).collect(),
729                    })
730                    .await;
731                Ok(())
732            }
733            QueryAction::PutRecordToFoundNodes { record, peers } => {
734                tracing::trace!(
735                    target: LOG_TARGET,
736                    record_key = ?record.key,
737                    num_peers = ?peers.len(),
738                    "store record to found peers",
739                );
740                let key = record.key.clone();
741                let message = KademliaMessage::put_value(record);
742
743                for peer in peers {
744                    if let Err(error) = self.open_substream_or_dial(
745                        peer.peer,
746                        PeerAction::SendPutValue(message.clone()),
747                        None,
748                    ) {
749                        tracing::debug!(
750                            target: LOG_TARGET,
751                            ?peer,
752                            ?key,
753                            ?error,
754                            "failed to put record to peer",
755                        );
756                    }
757                }
758
759                Ok(())
760            }
761            QueryAction::GetRecordQueryDone { query_id, records } => {
762                let _ = self
763                    .event_tx
764                    .send(KademliaEvent::GetRecordSuccess {
765                        query_id,
766                        records: RecordsType::Network(records),
767                    })
768                    .await;
769                Ok(())
770            }
771            QueryAction::QueryFailed { query } => {
772                tracing::debug!(target: LOG_TARGET, ?query, "query failed");
773
774                let _ = self.event_tx.send(KademliaEvent::QueryFailed { query_id: query }).await;
775                Ok(())
776            }
777            QueryAction::QuerySucceeded { .. } => unreachable!(),
778        }
779    }
780
781    /// [`Kademlia`] event loop.
782    pub async fn run(mut self) -> crate::Result<()> {
783        tracing::debug!(target: LOG_TARGET, "starting kademlia event loop");
784
785        loop {
786            // poll `QueryEngine` for next actions.
787            while let Some(action) = self.engine.next_action() {
788                if let Err((query, peer)) = self.on_query_action(action).await {
789                    self.disconnect_peer(peer, Some(query)).await;
790                }
791            }
792
793            tokio::select! {
794                event = self.service.next() => match event {
795                    Some(TransportEvent::ConnectionEstablished { peer, .. }) => {
796                        if let Err(error) = self.on_connection_established(peer) {
797                            tracing::debug!(target: LOG_TARGET, ?error, "failed to handle established connection");
798                        }
799                    }
800                    Some(TransportEvent::ConnectionClosed { peer }) => {
801                        self.disconnect_peer(peer, None).await;
802                    }
803                    Some(TransportEvent::SubstreamOpened { peer, direction, substream, .. }) => {
804                        match direction {
805                            Direction::Inbound => self.on_inbound_substream(peer, substream).await,
806                            Direction::Outbound(substream_id) => {
807                                if let Err(error) = self.on_outbound_substream(peer, substream_id, substream).await {
808                                    tracing::debug!(
809                                        target: LOG_TARGET,
810                                        ?peer,
811                                        ?substream_id,
812                                        ?error,
813                                        "failed to handle outbound substream",
814                                    );
815                                }
816                            }
817                        }
818                    },
819                    Some(TransportEvent::SubstreamOpenFailure { substream, error }) => {
820                        self.on_substream_open_failure(substream, error).await;
821                    }
822                    Some(TransportEvent::DialFailure { peer, address, .. }) => self.on_dial_failure(peer, address),
823                    None => return Err(Error::EssentialTaskClosed),
824                },
825                context = self.executor.next() => {
826                    let QueryContext { peer, query_id, result } = context.unwrap();
827
828                    match result {
829                        QueryResult::SendSuccess { substream } => {
830                            tracing::trace!(target: LOG_TARGET, ?peer, query = ?query_id, "message sent to peer");
831                            let _ = substream.close().await;
832                        }
833                        QueryResult::ReadSuccess { substream, message } => {
834                            tracing::trace!(target: LOG_TARGET, ?peer, query = ?query_id, "message read from peer");
835
836                            if let Err(error) = self.on_message_received(peer, query_id, message, substream).await {
837                                tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to process message");
838                            }
839                        }
840                        QueryResult::SubstreamClosed | QueryResult::Timeout => {
841                            tracing::debug!(
842                                target: LOG_TARGET,
843                                ?peer,
844                                query = ?query_id,
845                                ?result,
846                                "failed to read message from substream",
847                            );
848
849                            self.disconnect_peer(peer, query_id).await;
850                        }
851                    }
852                }
853                command = self.cmd_rx.recv() => {
854                    match command {
855                        Some(KademliaCommand::FindNode { peer, query_id }) => {
856                            tracing::debug!(target: LOG_TARGET, ?peer, query = ?query_id, "starting `FIND_NODE` query");
857
858                            self.engine.start_find_node(
859                                query_id,
860                                peer,
861                                self.routing_table.closest(Key::from(peer), self.replication_factor).into()
862                            );
863                        }
864                        Some(KademliaCommand::PutRecord { mut record, query_id }) => {
865                            tracing::debug!(target: LOG_TARGET, query = ?query_id, key = ?record.key, "store record to DHT");
866
867                            // For `PUT_VALUE` requests originating locally we are always the publisher.
868                            record.publisher = Some(self.local_key.clone().into_preimage());
869
870                            // Make sure TTL is set.
871                            record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl));
872
873                            let key = Key::new(record.key.clone());
874
875                            self.store.put(record.clone());
876
877                            self.engine.start_put_record(
878                                query_id,
879                                record,
880                                self.routing_table.closest(key, self.replication_factor).into(),
881                            );
882                        }
883                        Some(KademliaCommand::PutRecordToPeers { mut record, query_id, peers, update_local_store }) => {
884                            tracing::debug!(target: LOG_TARGET, query = ?query_id, key = ?record.key, "store record to DHT to specified peers");
885
886                            // Make sure TTL is set.
887                            record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl));
888
889                            if update_local_store {
890                                self.store.put(record.clone());
891                            }
892
893                            // Put the record to the specified peers.
894                            let peers = peers.into_iter().filter_map(|peer| {
895                                if peer == self.service.local_peer_id() {
896                                    return None;
897                                }
898
899                                match self.routing_table.entry(Key::from(peer)) {
900                                    KBucketEntry::Occupied(entry) => Some(entry.clone()),
901                                    KBucketEntry::Vacant(entry) if !entry.addresses.is_empty() => Some(entry.clone()),
902                                    _ => None,
903                                }
904                            }).collect();
905
906                            self.engine.start_put_record_to_peers(
907                                query_id,
908                                record,
909                                peers,
910                            );
911                        }
912                        Some(KademliaCommand::GetRecord { key, quorum, query_id }) => {
913                            tracing::debug!(target: LOG_TARGET, ?key, "get record from DHT");
914
915                            match (self.store.get(&key), quorum) {
916                                (Some(record), Quorum::One) => {
917                                    let _ = self
918                                        .event_tx
919                                        .send(KademliaEvent::GetRecordSuccess { query_id, records: RecordsType::LocalStore(record.clone()) })
920                                        .await;
921                                }
922                                (record, _) => {
923                                    self.engine.start_get_record(
924                                        query_id,
925                                        key.clone(),
926                                        self.routing_table.closest(Key::new(key.clone()), self.replication_factor).into(),
927                                        quorum,
928                                        if record.is_some() { 1 } else { 0 },
929                                    );
930                                }
931                            }
932
933                        }
934                        Some(KademliaCommand::AddKnownPeer { peer, addresses }) => {
935                            tracing::trace!(
936                                target: LOG_TARGET,
937                                ?peer,
938                                ?addresses,
939                                "add known peer",
940                            );
941
942                            self.routing_table.add_known_peer(
943                                peer,
944                                addresses.clone(),
945                                self.peers
946                                    .get(&peer)
947                                    .map_or(ConnectionType::NotConnected, |_| ConnectionType::Connected),
948                            );
949                            self.service.add_known_address(&peer, addresses.into_iter());
950
951                        }
952                        Some(KademliaCommand::StoreRecord { mut record }) => {
953                            tracing::debug!(
954                                target: LOG_TARGET,
955                                key = ?record.key,
956                                "store record in local store",
957                            );
958
959                            // Make sure TTL is set.
960                            record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl));
961
962                            self.store.put(record);
963                        }
964                        None => return Err(Error::EssentialTaskClosed),
965                    }
966                },
967            }
968        }
969    }
970}
971
972#[cfg(test)]
973mod tests {
974    use std::collections::HashSet;
975
976    use super::*;
977    use crate::{
978        codec::ProtocolCodec,
979        crypto::ed25519::Keypair,
980        transport::{
981            manager::{limits::ConnectionLimitsConfig, TransportManager},
982            KEEP_ALIVE_TIMEOUT,
983        },
984        types::protocol::ProtocolName,
985        BandwidthSink,
986    };
987    use tokio::sync::mpsc::channel;
988
989    #[allow(unused)]
990    struct Context {
991        _cmd_tx: Sender<KademliaCommand>,
992        event_rx: Receiver<KademliaEvent>,
993    }
994
995    fn make_kademlia() -> (Kademlia, Context, TransportManager) {
996        let (manager, handle) = TransportManager::new(
997            Keypair::generate(),
998            HashSet::new(),
999            BandwidthSink::new(),
1000            8usize,
1001            ConnectionLimitsConfig::default(),
1002        );
1003
1004        let peer = PeerId::random();
1005        let (transport_service, _tx) = TransportService::new(
1006            peer,
1007            ProtocolName::from("/kad/1"),
1008            Vec::new(),
1009            Default::default(),
1010            handle,
1011            KEEP_ALIVE_TIMEOUT,
1012        );
1013        let (event_tx, event_rx) = channel(64);
1014        let (_cmd_tx, cmd_rx) = channel(64);
1015
1016        let config = Config {
1017            protocol_names: vec![ProtocolName::from("/kad/1")],
1018            known_peers: HashMap::new(),
1019            codec: ProtocolCodec::UnsignedVarint(None),
1020            replication_factor: 20usize,
1021            update_mode: RoutingTableUpdateMode::Automatic,
1022            validation_mode: IncomingRecordValidationMode::Automatic,
1023            record_ttl: Duration::from_secs(36 * 60 * 60),
1024            provider_ttl: Duration::from_secs(48 * 60 * 60),
1025            event_tx,
1026            cmd_rx,
1027        };
1028
1029        (
1030            Kademlia::new(transport_service, config),
1031            Context { _cmd_tx, event_rx },
1032            manager,
1033        )
1034    }
1035
1036    #[tokio::test]
1037    async fn check_get_records_update() {
1038        let (mut kademlia, _context, _manager) = make_kademlia();
1039
1040        let key = RecordKey::from(vec![1, 2, 3]);
1041        let records = vec![
1042            // 2 peers backing the same record.
1043            PeerRecord {
1044                peer: PeerId::random(),
1045                record: Record::new(key.clone(), vec![0x1]),
1046            },
1047            PeerRecord {
1048                peer: PeerId::random(),
1049                record: Record::new(key.clone(), vec![0x1]),
1050            },
1051            // only 1 peer backing the record.
1052            PeerRecord {
1053                peer: PeerId::random(),
1054                record: Record::new(key.clone(), vec![0x2]),
1055            },
1056        ];
1057
1058        let query_id = QueryId(1);
1059        let action = QueryAction::GetRecordQueryDone { query_id, records };
1060        assert!(kademlia.on_query_action(action).await.is_ok());
1061
1062        // Check the local storage should not get updated.
1063        assert!(kademlia.store.get(&key).is_none());
1064    }
1065
1066    #[tokio::test]
1067    async fn check_get_records_update_with_expired_records() {
1068        let (mut kademlia, _context, _manager) = make_kademlia();
1069
1070        let key = RecordKey::from(vec![1, 2, 3]);
1071        let expired = std::time::Instant::now() - std::time::Duration::from_secs(10);
1072        let records = vec![
1073            // 2 peers backing the same record, one record is expired.
1074            PeerRecord {
1075                peer: PeerId::random(),
1076                record: Record {
1077                    key: key.clone(),
1078                    value: vec![0x1],
1079                    publisher: None,
1080                    expires: Some(expired),
1081                },
1082            },
1083            PeerRecord {
1084                peer: PeerId::random(),
1085                record: Record::new(key.clone(), vec![0x1]),
1086            },
1087            // 2 peer backing the record.
1088            PeerRecord {
1089                peer: PeerId::random(),
1090                record: Record::new(key.clone(), vec![0x2]),
1091            },
1092            PeerRecord {
1093                peer: PeerId::random(),
1094                record: Record::new(key.clone(), vec![0x2]),
1095            },
1096        ];
1097
1098        let query_id = QueryId(1);
1099        let action = QueryAction::GetRecordQueryDone { query_id, records };
1100        assert!(kademlia.on_query_action(action).await.is_ok());
1101
1102        // Check the local storage should not get updated.
1103        assert!(kademlia.store.get(&key).is_none());
1104    }
1105}