litep2p/protocol/libp2p/kademlia/
handle.rs

1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::{
22    protocol::libp2p::kademlia::{ContentProvider, PeerRecord, QueryId, Record, RecordKey},
23    PeerId,
24};
25
26use futures::Stream;
27use multiaddr::Multiaddr;
28use tokio::sync::mpsc::{Receiver, Sender};
29
30use std::{
31    num::NonZeroUsize,
32    pin::Pin,
33    sync::{
34        atomic::{AtomicUsize, Ordering},
35        Arc,
36    },
37    task::{Context, Poll},
38};
39
40/// Quorum.
41///
42/// Quorum defines how many peers must be successfully contacted
43/// in order for the query to be considered successful.
44#[derive(Debug, Copy, Clone, PartialEq, Eq)]
45#[cfg_attr(feature = "fuzz", derive(serde::Serialize, serde::Deserialize))]
46pub enum Quorum {
47    /// All peers must be successfully contacted.
48    All,
49
50    /// One peer must be successfully contacted.
51    One,
52
53    /// `N` peers must be successfully contacted.
54    N(NonZeroUsize),
55}
56
57/// Routing table update mode.
58#[derive(Debug, Copy, Clone)]
59pub enum RoutingTableUpdateMode {
60    /// Don't insert discovered peers automatically to the routing tables but
61    /// allow user to do that by calling [`KademliaHandle::add_known_peer()`].
62    Manual,
63
64    /// Automatically add all discovered peers to routing tables.
65    Automatic,
66}
67
68/// Incoming record validation mode.
69#[derive(Debug, Copy, Clone)]
70pub enum IncomingRecordValidationMode {
71    /// Don't insert incoming records automatically to the local DHT store
72    /// and let the user do that by calling [`KademliaHandle::store_record()`].
73    Manual,
74
75    /// Automatically accept all incoming records.
76    Automatic,
77}
78
79/// Kademlia commands.
80#[derive(Debug)]
81#[cfg_attr(feature = "fuzz", derive(serde::Serialize, serde::Deserialize))]
82pub enum KademliaCommand {
83    /// Add known peer.
84    AddKnownPeer {
85        /// Peer ID.
86        peer: PeerId,
87
88        /// Addresses of peer.
89        addresses: Vec<Multiaddr>,
90    },
91
92    /// Send `FIND_NODE` message.
93    FindNode {
94        /// Peer ID.
95        peer: PeerId,
96
97        /// Query ID for the query.
98        query_id: QueryId,
99    },
100
101    /// Store record to DHT.
102    PutRecord {
103        /// Record.
104        record: Record,
105
106        /// [`Quorum`] for the query.
107        quorum: Quorum,
108
109        /// Query ID for the query.
110        query_id: QueryId,
111    },
112
113    /// Store record to DHT to the given peers.
114    ///
115    /// Similar to [`KademliaCommand::PutRecord`] but allows user to specify the peers.
116    PutRecordToPeers {
117        /// Record.
118        record: Record,
119
120        /// [`Quorum`] for the query.
121        quorum: Quorum,
122
123        /// Query ID for the query.
124        query_id: QueryId,
125
126        /// Use the following peers for the put request.
127        peers: Vec<PeerId>,
128
129        /// Update local store.
130        update_local_store: bool,
131    },
132
133    /// Get record from DHT.
134    GetRecord {
135        /// Record key.
136        key: RecordKey,
137
138        /// [`Quorum`] for the query.
139        quorum: Quorum,
140
141        /// Query ID for the query.
142        query_id: QueryId,
143    },
144
145    /// Get providers from DHT.
146    GetProviders {
147        /// Provided key.
148        key: RecordKey,
149
150        /// Query ID for the query.
151        query_id: QueryId,
152    },
153
154    /// Register as a content provider for `key`.
155    StartProviding {
156        /// Provided key.
157        key: RecordKey,
158
159        /// [`Quorum`] for the query.
160        quorum: Quorum,
161
162        /// Query ID for the query.
163        query_id: QueryId,
164    },
165
166    /// Stop providing the key locally and refreshing the provider.
167    StopProviding {
168        /// Provided key.
169        key: RecordKey,
170    },
171
172    /// Store record locally.
173    StoreRecord {
174        // Record.
175        record: Record,
176    },
177}
178
179/// Kademlia events.
180#[derive(Debug, Clone)]
181pub enum KademliaEvent {
182    /// Result for the issued `FIND_NODE` query.
183    FindNodeSuccess {
184        /// Query ID.
185        query_id: QueryId,
186
187        /// Target of the query
188        target: PeerId,
189
190        /// Found nodes and their addresses.
191        peers: Vec<(PeerId, Vec<Multiaddr>)>,
192    },
193
194    /// Routing table update.
195    ///
196    /// Kademlia has discovered one or more peers that should be added to the routing table.
197    /// If [`RoutingTableUpdateMode`] is `Automatic`, user can ignore this event unless some
198    /// upper-level protocols has user for this information.
199    ///
200    /// If the mode was set to `Manual`, user should call [`KademliaHandle::add_known_peer()`]
201    /// in order to add the peers to routing table.
202    RoutingTableUpdate {
203        /// Discovered peers.
204        peers: Vec<PeerId>,
205    },
206
207    /// `GET_VALUE` query succeeded.
208    GetRecordSuccess {
209        /// Query ID.
210        query_id: QueryId,
211    },
212
213    /// `GET_VALUE` inflight query produced a result.
214    ///
215    /// This event is emitted when a peer responds to the query with a record.
216    GetRecordPartialResult {
217        /// Query ID.
218        query_id: QueryId,
219
220        /// Found record.
221        record: PeerRecord,
222    },
223
224    /// `GET_PROVIDERS` query succeeded.
225    GetProvidersSuccess {
226        /// Query ID.
227        query_id: QueryId,
228
229        /// Provided key.
230        provided_key: RecordKey,
231
232        /// Found providers with cached addresses. Returned providers are sorted by distane to the
233        /// provided key.
234        providers: Vec<ContentProvider>,
235    },
236
237    /// `PUT_VALUE` query succeeded.
238    PutRecordSuccess {
239        /// Query ID.
240        query_id: QueryId,
241
242        /// Record key.
243        key: RecordKey,
244    },
245
246    /// `ADD_PROVIDER` query succeeded.
247    AddProviderSuccess {
248        /// Query ID.
249        query_id: QueryId,
250
251        /// Provided key.
252        provided_key: RecordKey,
253    },
254
255    /// Query failed.
256    QueryFailed {
257        /// Query ID.
258        query_id: QueryId,
259    },
260
261    /// Incoming `PUT_VALUE` request received.
262    ///
263    /// In case of using [`IncomingRecordValidationMode::Manual`] and successful validation
264    /// the record must be manually inserted into the local DHT store with
265    /// [`KademliaHandle::store_record()`].
266    IncomingRecord {
267        /// Record.
268        record: Record,
269    },
270
271    /// Incoming `ADD_PROVIDER` request received.
272    IncomingProvider {
273        /// Provided key.
274        provided_key: RecordKey,
275
276        /// Provider.
277        provider: ContentProvider,
278    },
279}
280
281/// Handle for communicating with the Kademlia protocol.
282pub struct KademliaHandle {
283    /// TX channel for sending commands to `Kademlia`.
284    cmd_tx: Sender<KademliaCommand>,
285
286    /// RX channel for receiving events from `Kademlia`.
287    event_rx: Receiver<KademliaEvent>,
288
289    /// Next query ID.
290    next_query_id: Arc<AtomicUsize>,
291}
292
293impl KademliaHandle {
294    /// Create new [`KademliaHandle`].
295    pub(super) fn new(
296        cmd_tx: Sender<KademliaCommand>,
297        event_rx: Receiver<KademliaEvent>,
298        next_query_id: Arc<AtomicUsize>,
299    ) -> Self {
300        Self {
301            cmd_tx,
302            event_rx,
303            next_query_id,
304        }
305    }
306
307    /// Allocate next query ID.
308    fn next_query_id(&mut self) -> QueryId {
309        let query_id = self.next_query_id.fetch_add(1, Ordering::Relaxed);
310
311        QueryId(query_id)
312    }
313
314    /// Add known peer.
315    pub async fn add_known_peer(&self, peer: PeerId, addresses: Vec<Multiaddr>) {
316        let _ = self.cmd_tx.send(KademliaCommand::AddKnownPeer { peer, addresses }).await;
317    }
318
319    /// Send `FIND_NODE` query to known peers.
320    pub async fn find_node(&mut self, peer: PeerId) -> QueryId {
321        let query_id = self.next_query_id();
322        let _ = self.cmd_tx.send(KademliaCommand::FindNode { peer, query_id }).await;
323
324        query_id
325    }
326
327    /// Store record to DHT.
328    pub async fn put_record(&mut self, record: Record, quorum: Quorum) -> QueryId {
329        let query_id = self.next_query_id();
330        let _ = self
331            .cmd_tx
332            .send(KademliaCommand::PutRecord {
333                record,
334                quorum,
335                query_id,
336            })
337            .await;
338
339        query_id
340    }
341
342    /// Store record to DHT to the given peers.
343    ///
344    /// Returns [`Err`] only if `Kademlia` is terminating.
345    pub async fn put_record_to_peers(
346        &mut self,
347        record: Record,
348        peers: Vec<PeerId>,
349        update_local_store: bool,
350        quorum: Quorum,
351    ) -> QueryId {
352        let query_id = self.next_query_id();
353        let _ = self
354            .cmd_tx
355            .send(KademliaCommand::PutRecordToPeers {
356                record,
357                query_id,
358                peers,
359                update_local_store,
360                quorum,
361            })
362            .await;
363
364        query_id
365    }
366
367    /// Get record from DHT.
368    ///
369    /// Returns [`Err`] only if `Kademlia` is terminating.
370    pub async fn get_record(&mut self, key: RecordKey, quorum: Quorum) -> QueryId {
371        let query_id = self.next_query_id();
372        let _ = self
373            .cmd_tx
374            .send(KademliaCommand::GetRecord {
375                key,
376                quorum,
377                query_id,
378            })
379            .await;
380
381        query_id
382    }
383
384    /// Register as a content provider on the DHT.
385    ///
386    /// Register the local peer ID & its `public_addresses` as a provider for a given `key`.
387    /// Returns [`Err`] only if `Kademlia` is terminating.
388    pub async fn start_providing(&mut self, key: RecordKey, quorum: Quorum) -> QueryId {
389        let query_id = self.next_query_id();
390        let _ = self
391            .cmd_tx
392            .send(KademliaCommand::StartProviding {
393                key,
394                quorum,
395                query_id,
396            })
397            .await;
398
399        query_id
400    }
401
402    /// Stop providing the key on the DHT.
403    ///
404    /// This will stop republishing the provider, but won't
405    /// remove it instantly from the nodes. It will be removed from them after the provider TTL
406    /// expires, set by default to 48 hours.
407    pub async fn stop_providing(&mut self, key: RecordKey) {
408        let _ = self.cmd_tx.send(KademliaCommand::StopProviding { key }).await;
409    }
410
411    /// Get providers from DHT.
412    ///
413    /// Returns [`Err`] only if `Kademlia` is terminating.
414    pub async fn get_providers(&mut self, key: RecordKey) -> QueryId {
415        let query_id = self.next_query_id();
416        let _ = self.cmd_tx.send(KademliaCommand::GetProviders { key, query_id }).await;
417
418        query_id
419    }
420
421    /// Store the record in the local store. Used in combination with
422    /// [`IncomingRecordValidationMode::Manual`].
423    pub async fn store_record(&mut self, record: Record) {
424        let _ = self.cmd_tx.send(KademliaCommand::StoreRecord { record }).await;
425    }
426
427    /// Try to add known peer and if the channel is clogged, return an error.
428    pub fn try_add_known_peer(&self, peer: PeerId, addresses: Vec<Multiaddr>) -> Result<(), ()> {
429        self.cmd_tx
430            .try_send(KademliaCommand::AddKnownPeer { peer, addresses })
431            .map_err(|_| ())
432    }
433
434    /// Try to initiate `FIND_NODE` query and if the channel is clogged, return an error.
435    pub fn try_find_node(&mut self, peer: PeerId) -> Result<QueryId, ()> {
436        let query_id = self.next_query_id();
437        self.cmd_tx
438            .try_send(KademliaCommand::FindNode { peer, query_id })
439            .map(|_| query_id)
440            .map_err(|_| ())
441    }
442
443    /// Try to initiate `PUT_VALUE` query and if the channel is clogged, return an error.
444    pub fn try_put_record(&mut self, record: Record, quorum: Quorum) -> Result<QueryId, ()> {
445        let query_id = self.next_query_id();
446        self.cmd_tx
447            .try_send(KademliaCommand::PutRecord {
448                record,
449                query_id,
450                quorum,
451            })
452            .map(|_| query_id)
453            .map_err(|_| ())
454    }
455
456    /// Try to initiate `PUT_VALUE` query to the given peers and if the channel is clogged,
457    /// return an error.
458    pub fn try_put_record_to_peers(
459        &mut self,
460        record: Record,
461        peers: Vec<PeerId>,
462        update_local_store: bool,
463        quorum: Quorum,
464    ) -> Result<QueryId, ()> {
465        let query_id = self.next_query_id();
466        self.cmd_tx
467            .try_send(KademliaCommand::PutRecordToPeers {
468                record,
469                query_id,
470                peers,
471                update_local_store,
472                quorum,
473            })
474            .map(|_| query_id)
475            .map_err(|_| ())
476    }
477
478    /// Try to initiate `GET_VALUE` query and if the channel is clogged, return an error.
479    pub fn try_get_record(&mut self, key: RecordKey, quorum: Quorum) -> Result<QueryId, ()> {
480        let query_id = self.next_query_id();
481        self.cmd_tx
482            .try_send(KademliaCommand::GetRecord {
483                key,
484                quorum,
485                query_id,
486            })
487            .map(|_| query_id)
488            .map_err(|_| ())
489    }
490
491    /// Try to store the record in the local store, and if the channel is clogged, return an error.
492    /// Used in combination with [`IncomingRecordValidationMode::Manual`].
493    pub fn try_store_record(&mut self, record: Record) -> Result<(), ()> {
494        self.cmd_tx.try_send(KademliaCommand::StoreRecord { record }).map_err(|_| ())
495    }
496
497    #[cfg(feature = "fuzz")]
498    /// Expose functionality for fuzzing
499    pub async fn fuzz_send_message(&mut self, command: KademliaCommand) -> crate::Result<()> {
500        let _ = self.cmd_tx.send(command).await;
501        Ok(())
502    }
503}
504
505impl Stream for KademliaHandle {
506    type Item = KademliaEvent;
507
508    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
509        self.event_rx.poll_recv(cx)
510    }
511}