litep2p/protocol/libp2p/kademlia/
handle.rs

1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::{
22    protocol::libp2p::kademlia::{PeerRecord, QueryId, Record, RecordKey},
23    PeerId,
24};
25
26use futures::Stream;
27use multiaddr::Multiaddr;
28use tokio::sync::mpsc::{Receiver, Sender};
29
30use std::{
31    num::NonZeroUsize,
32    pin::Pin,
33    task::{Context, Poll},
34};
35
36/// Quorum.
37///
38/// Quorum defines how many peers must be successfully contacted
39/// in order for the query to be considered successful.
40#[derive(Debug, Copy, Clone)]
41pub enum Quorum {
42    /// All peers must be successfully contacted.
43    All,
44
45    /// One peer must be successfully contacted.
46    One,
47
48    /// `N` peer must be successfully contacted.
49    N(NonZeroUsize),
50}
51
52/// Routing table update mode.
53#[derive(Debug, Copy, Clone)]
54pub enum RoutingTableUpdateMode {
55    /// Don't insert discovered peers automatically to the routing tables but
56    /// allow user to do that by calling [`KademliaHandle::add_known_peer()`].
57    Manual,
58
59    /// Automatically add all discovered peers to routing tables.
60    Automatic,
61}
62
63/// Incoming record validation mode.
64#[derive(Debug, Copy, Clone)]
65pub enum IncomingRecordValidationMode {
66    /// Don't insert incoming records automatically to the local DHT store
67    /// and let the user do that by calling [`KademliaHandle::store_record()`].
68    Manual,
69
70    /// Automatically accept all incoming records.
71    Automatic,
72}
73
74/// Kademlia commands.
75#[derive(Debug)]
76pub(crate) enum KademliaCommand {
77    /// Add known peer.
78    AddKnownPeer {
79        /// Peer ID.
80        peer: PeerId,
81
82        /// Addresses of peer.
83        addresses: Vec<Multiaddr>,
84    },
85
86    /// Send `FIND_NODE` message.
87    FindNode {
88        /// Peer ID.
89        peer: PeerId,
90
91        /// Query ID for the query.
92        query_id: QueryId,
93    },
94
95    /// Store record to DHT.
96    PutRecord {
97        /// Record.
98        record: Record,
99
100        /// Query ID for the query.
101        query_id: QueryId,
102    },
103
104    /// Store record to DHT to the given peers.
105    ///
106    /// Similar to [`KademliaCommand::PutRecord`] but allows user to specify the peers.
107    PutRecordToPeers {
108        /// Record.
109        record: Record,
110
111        /// Query ID for the query.
112        query_id: QueryId,
113
114        /// Use the following peers for the put request.
115        peers: Vec<PeerId>,
116
117        /// Update local store.
118        update_local_store: bool,
119    },
120
121    /// Get record from DHT.
122    GetRecord {
123        /// Record key.
124        key: RecordKey,
125
126        /// [`Quorum`] for the query.
127        quorum: Quorum,
128
129        /// Query ID for the query.
130        query_id: QueryId,
131    },
132
133    /// Store record locally.
134    StoreRecord {
135        // Record.
136        record: Record,
137    },
138}
139
140/// Kademlia events.
141#[derive(Debug, Clone)]
142pub enum KademliaEvent {
143    /// Result for the issued `FIND_NODE` query.
144    FindNodeSuccess {
145        /// Query ID.
146        query_id: QueryId,
147
148        /// Target of the query
149        target: PeerId,
150
151        /// Found nodes and their addresses.
152        peers: Vec<(PeerId, Vec<Multiaddr>)>,
153    },
154
155    /// Routing table update.
156    ///
157    /// Kademlia has discovered one or more peers that should be added to the routing table.
158    /// If [`RoutingTableUpdateMode`] is `Automatic`, user can ignore this event unless some
159    /// upper-level protocols has user for this information.
160    ///
161    /// If the mode was set to `Manual`, user should call [`KademliaHandle::add_known_peer()`]
162    /// in order to add the peers to routing table.
163    RoutingTableUpdate {
164        /// Discovered peers.
165        peers: Vec<PeerId>,
166    },
167
168    /// `GET_VALUE` query succeeded.
169    GetRecordSuccess {
170        /// Query ID.
171        query_id: QueryId,
172
173        /// Found records.
174        records: RecordsType,
175    },
176
177    /// `PUT_VALUE` query succeeded.
178    PutRecordSucess {
179        /// Query ID.
180        query_id: QueryId,
181
182        /// Record key.
183        key: RecordKey,
184    },
185
186    /// Query failed.
187    QueryFailed {
188        /// Query ID.
189        query_id: QueryId,
190    },
191
192    /// Incoming `PUT_VALUE` request received.
193    ///
194    /// In case of using [`IncomingRecordValidationMode::Manual`] and successful validation
195    /// the record must be manually inserted into the local DHT store with
196    /// [`KademliaHandle::store_record()`].
197    IncomingRecord {
198        /// Record.
199        record: Record,
200    },
201}
202
203/// The type of the DHT records.
204#[derive(Debug, Clone)]
205pub enum RecordsType {
206    /// Record was found in the local store.
207    ///
208    /// This contains only a single result.
209    LocalStore(Record),
210
211    /// Records found in the network.
212    Network(Vec<PeerRecord>),
213}
214
215/// Handle for communicating with the Kademlia protocol.
216pub struct KademliaHandle {
217    /// TX channel for sending commands to `Kademlia`.
218    cmd_tx: Sender<KademliaCommand>,
219
220    /// RX channel for receiving events from `Kademlia`.
221    event_rx: Receiver<KademliaEvent>,
222
223    /// Next query ID.
224    next_query_id: usize,
225}
226
227impl KademliaHandle {
228    /// Create new [`KademliaHandle`].
229    pub(super) fn new(cmd_tx: Sender<KademliaCommand>, event_rx: Receiver<KademliaEvent>) -> Self {
230        Self {
231            cmd_tx,
232            event_rx,
233            next_query_id: 0usize,
234        }
235    }
236
237    /// Allocate next query ID.
238    fn next_query_id(&mut self) -> QueryId {
239        let query_id = self.next_query_id;
240        self.next_query_id += 1;
241
242        QueryId(query_id)
243    }
244
245    /// Add known peer.
246    pub async fn add_known_peer(&self, peer: PeerId, addresses: Vec<Multiaddr>) {
247        let _ = self.cmd_tx.send(KademliaCommand::AddKnownPeer { peer, addresses }).await;
248    }
249
250    /// Send `FIND_NODE` query to known peers.
251    pub async fn find_node(&mut self, peer: PeerId) -> QueryId {
252        let query_id = self.next_query_id();
253        let _ = self.cmd_tx.send(KademliaCommand::FindNode { peer, query_id }).await;
254
255        query_id
256    }
257
258    /// Store record to DHT.
259    pub async fn put_record(&mut self, record: Record) -> QueryId {
260        let query_id = self.next_query_id();
261        let _ = self.cmd_tx.send(KademliaCommand::PutRecord { record, query_id }).await;
262
263        query_id
264    }
265
266    /// Store record to DHT to the given peers.
267    pub async fn put_record_to_peers(
268        &mut self,
269        record: Record,
270        peers: Vec<PeerId>,
271        update_local_store: bool,
272    ) -> QueryId {
273        let query_id = self.next_query_id();
274        let _ = self
275            .cmd_tx
276            .send(KademliaCommand::PutRecordToPeers {
277                record,
278                query_id,
279                peers,
280                update_local_store,
281            })
282            .await;
283
284        query_id
285    }
286
287    /// Get record from DHT.
288    pub async fn get_record(&mut self, key: RecordKey, quorum: Quorum) -> QueryId {
289        let query_id = self.next_query_id();
290        let _ = self
291            .cmd_tx
292            .send(KademliaCommand::GetRecord {
293                key,
294                quorum,
295                query_id,
296            })
297            .await;
298
299        query_id
300    }
301
302    /// Store the record in the local store. Used in combination with
303    /// [`IncomingRecordValidationMode::Manual`].
304    pub async fn store_record(&mut self, record: Record) {
305        let _ = self.cmd_tx.send(KademliaCommand::StoreRecord { record }).await;
306    }
307
308    /// Try to add known peer and if the channel is clogged, return an error.
309    pub fn try_add_known_peer(&self, peer: PeerId, addresses: Vec<Multiaddr>) -> Result<(), ()> {
310        self.cmd_tx
311            .try_send(KademliaCommand::AddKnownPeer { peer, addresses })
312            .map_err(|_| ())
313    }
314
315    /// Try to initiate `FIND_NODE` query and if the channel is clogged, return an error.
316    pub fn try_find_node(&mut self, peer: PeerId) -> Result<QueryId, ()> {
317        let query_id = self.next_query_id();
318        self.cmd_tx
319            .try_send(KademliaCommand::FindNode { peer, query_id })
320            .map(|_| query_id)
321            .map_err(|_| ())
322    }
323
324    /// Try to initiate `PUT_VALUE` query and if the channel is clogged, return an error.
325    pub fn try_put_record(&mut self, record: Record) -> Result<QueryId, ()> {
326        let query_id = self.next_query_id();
327        self.cmd_tx
328            .try_send(KademliaCommand::PutRecord { record, query_id })
329            .map(|_| query_id)
330            .map_err(|_| ())
331    }
332
333    /// Try to initiate `PUT_VALUE` query to the given peers and if the channel is clogged,
334    /// return an error.
335    pub fn try_put_record_to_peers(
336        &mut self,
337        record: Record,
338        peers: Vec<PeerId>,
339        update_local_store: bool,
340    ) -> Result<QueryId, ()> {
341        let query_id = self.next_query_id();
342        self.cmd_tx
343            .try_send(KademliaCommand::PutRecordToPeers {
344                record,
345                query_id,
346                peers,
347                update_local_store,
348            })
349            .map(|_| query_id)
350            .map_err(|_| ())
351    }
352
353    /// Try to initiate `GET_VALUE` query and if the channel is clogged, return an error.
354    pub fn try_get_record(&mut self, key: RecordKey, quorum: Quorum) -> Result<QueryId, ()> {
355        let query_id = self.next_query_id();
356        self.cmd_tx
357            .try_send(KademliaCommand::GetRecord {
358                key,
359                quorum,
360                query_id,
361            })
362            .map(|_| query_id)
363            .map_err(|_| ())
364    }
365
366    /// Try to store the record in the local store, and if the channel is clogged, return an error.
367    /// Used in combination with [`IncomingRecordValidationMode::Manual`].
368    pub fn try_store_record(&mut self, record: Record) -> Result<(), ()> {
369        self.cmd_tx.try_send(KademliaCommand::StoreRecord { record }).map_err(|_| ())
370    }
371}
372
373impl Stream for KademliaHandle {
374    type Item = KademliaEvent;
375
376    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
377        self.event_rx.poll_recv(cx)
378    }
379}