litep2p/protocol/libp2p/kademlia/handle.rs
1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::{
22 protocol::libp2p::kademlia::{ContentProvider, PeerRecord, QueryId, Record, RecordKey},
23 PeerId,
24};
25
26use futures::Stream;
27use multiaddr::Multiaddr;
28use tokio::sync::mpsc::{Receiver, Sender};
29
30use std::{
31 num::NonZeroUsize,
32 pin::Pin,
33 sync::{
34 atomic::{AtomicUsize, Ordering},
35 Arc,
36 },
37 task::{Context, Poll},
38};
39
40/// Quorum.
41///
42/// Quorum defines how many peers must be successfully contacted
43/// in order for the query to be considered successful.
44#[derive(Debug, Copy, Clone, PartialEq, Eq)]
45#[cfg_attr(feature = "fuzz", derive(serde::Serialize, serde::Deserialize))]
46pub enum Quorum {
47 /// All peers must be successfully contacted.
48 All,
49
50 /// One peer must be successfully contacted.
51 One,
52
53 /// `N` peers must be successfully contacted.
54 N(NonZeroUsize),
55}
56
57/// Routing table update mode.
58#[derive(Debug, Copy, Clone)]
59pub enum RoutingTableUpdateMode {
60 /// Don't insert discovered peers automatically to the routing tables but
61 /// allow user to do that by calling [`KademliaHandle::add_known_peer()`].
62 Manual,
63
64 /// Automatically add all discovered peers to routing tables.
65 Automatic,
66}
67
68/// Incoming record validation mode.
69#[derive(Debug, Copy, Clone)]
70pub enum IncomingRecordValidationMode {
71 /// Don't insert incoming records automatically to the local DHT store
72 /// and let the user do that by calling [`KademliaHandle::store_record()`].
73 Manual,
74
75 /// Automatically accept all incoming records.
76 Automatic,
77}
78
79/// Kademlia commands.
80#[derive(Debug)]
81#[cfg_attr(feature = "fuzz", derive(serde::Serialize, serde::Deserialize))]
82pub enum KademliaCommand {
83 /// Add known peer.
84 AddKnownPeer {
85 /// Peer ID.
86 peer: PeerId,
87
88 /// Addresses of peer.
89 addresses: Vec<Multiaddr>,
90 },
91
92 /// Send `FIND_NODE` message.
93 FindNode {
94 /// Peer ID.
95 peer: PeerId,
96
97 /// Query ID for the query.
98 query_id: QueryId,
99 },
100
101 /// Store record to DHT.
102 PutRecord {
103 /// Record.
104 record: Record,
105
106 /// [`Quorum`] for the query.
107 quorum: Quorum,
108
109 /// Query ID for the query.
110 query_id: QueryId,
111 },
112
113 /// Store record to DHT to the given peers.
114 ///
115 /// Similar to [`KademliaCommand::PutRecord`] but allows user to specify the peers.
116 PutRecordToPeers {
117 /// Record.
118 record: Record,
119
120 /// [`Quorum`] for the query.
121 quorum: Quorum,
122
123 /// Query ID for the query.
124 query_id: QueryId,
125
126 /// Use the following peers for the put request.
127 peers: Vec<PeerId>,
128
129 /// Update local store.
130 update_local_store: bool,
131 },
132
133 /// Get record from DHT.
134 GetRecord {
135 /// Record key.
136 key: RecordKey,
137
138 /// [`Quorum`] for the query.
139 quorum: Quorum,
140
141 /// Query ID for the query.
142 query_id: QueryId,
143 },
144
145 /// Get providers from DHT.
146 GetProviders {
147 /// Provided key.
148 key: RecordKey,
149
150 /// Query ID for the query.
151 query_id: QueryId,
152 },
153
154 /// Register as a content provider for `key`.
155 StartProviding {
156 /// Provided key.
157 key: RecordKey,
158
159 /// [`Quorum`] for the query.
160 quorum: Quorum,
161
162 /// Query ID for the query.
163 query_id: QueryId,
164 },
165
166 /// Stop providing the key locally and refreshing the provider.
167 StopProviding {
168 /// Provided key.
169 key: RecordKey,
170 },
171
172 /// Store record locally.
173 StoreRecord {
174 // Record.
175 record: Record,
176 },
177}
178
179/// Kademlia events.
180#[derive(Debug, Clone)]
181pub enum KademliaEvent {
182 /// Result for the issued `FIND_NODE` query.
183 FindNodeSuccess {
184 /// Query ID.
185 query_id: QueryId,
186
187 /// Target of the query
188 target: PeerId,
189
190 /// Found nodes and their addresses.
191 peers: Vec<(PeerId, Vec<Multiaddr>)>,
192 },
193
194 /// Routing table update.
195 ///
196 /// Kademlia has discovered one or more peers that should be added to the routing table.
197 /// If [`RoutingTableUpdateMode`] is `Automatic`, user can ignore this event unless some
198 /// upper-level protocols has user for this information.
199 ///
200 /// If the mode was set to `Manual`, user should call [`KademliaHandle::add_known_peer()`]
201 /// in order to add the peers to routing table.
202 RoutingTableUpdate {
203 /// Discovered peers.
204 peers: Vec<PeerId>,
205 },
206
207 /// `GET_VALUE` query succeeded.
208 GetRecordSuccess {
209 /// Query ID.
210 query_id: QueryId,
211 },
212
213 /// `GET_VALUE` inflight query produced a result.
214 ///
215 /// This event is emitted when a peer responds to the query with a record.
216 GetRecordPartialResult {
217 /// Query ID.
218 query_id: QueryId,
219
220 /// Found record.
221 record: PeerRecord,
222 },
223
224 /// `GET_PROVIDERS` query succeeded.
225 GetProvidersSuccess {
226 /// Query ID.
227 query_id: QueryId,
228
229 /// Provided key.
230 provided_key: RecordKey,
231
232 /// Found providers with cached addresses. Returned providers are sorted by distane to the
233 /// provided key.
234 providers: Vec<ContentProvider>,
235 },
236
237 /// `PUT_VALUE` query succeeded.
238 PutRecordSuccess {
239 /// Query ID.
240 query_id: QueryId,
241
242 /// Record key.
243 key: RecordKey,
244 },
245
246 /// `ADD_PROVIDER` query succeeded.
247 AddProviderSuccess {
248 /// Query ID.
249 query_id: QueryId,
250
251 /// Provided key.
252 provided_key: RecordKey,
253 },
254
255 /// Query failed.
256 QueryFailed {
257 /// Query ID.
258 query_id: QueryId,
259 },
260
261 /// Incoming `PUT_VALUE` request received.
262 ///
263 /// In case of using [`IncomingRecordValidationMode::Manual`] and successful validation
264 /// the record must be manually inserted into the local DHT store with
265 /// [`KademliaHandle::store_record()`].
266 IncomingRecord {
267 /// Record.
268 record: Record,
269 },
270
271 /// Incoming `ADD_PROVIDER` request received.
272 IncomingProvider {
273 /// Provided key.
274 provided_key: RecordKey,
275
276 /// Provider.
277 provider: ContentProvider,
278 },
279}
280
281/// Handle for communicating with the Kademlia protocol.
282pub struct KademliaHandle {
283 /// TX channel for sending commands to `Kademlia`.
284 cmd_tx: Sender<KademliaCommand>,
285
286 /// RX channel for receiving events from `Kademlia`.
287 event_rx: Receiver<KademliaEvent>,
288
289 /// Next query ID.
290 next_query_id: Arc<AtomicUsize>,
291}
292
293impl KademliaHandle {
294 /// Create new [`KademliaHandle`].
295 pub(super) fn new(
296 cmd_tx: Sender<KademliaCommand>,
297 event_rx: Receiver<KademliaEvent>,
298 next_query_id: Arc<AtomicUsize>,
299 ) -> Self {
300 Self {
301 cmd_tx,
302 event_rx,
303 next_query_id,
304 }
305 }
306
307 /// Allocate next query ID.
308 fn next_query_id(&mut self) -> QueryId {
309 let query_id = self.next_query_id.fetch_add(1, Ordering::Relaxed);
310
311 QueryId(query_id)
312 }
313
314 /// Add known peer.
315 pub async fn add_known_peer(&self, peer: PeerId, addresses: Vec<Multiaddr>) {
316 let _ = self.cmd_tx.send(KademliaCommand::AddKnownPeer { peer, addresses }).await;
317 }
318
319 /// Send `FIND_NODE` query to known peers.
320 pub async fn find_node(&mut self, peer: PeerId) -> QueryId {
321 let query_id = self.next_query_id();
322 let _ = self.cmd_tx.send(KademliaCommand::FindNode { peer, query_id }).await;
323
324 query_id
325 }
326
327 /// Store record to DHT.
328 pub async fn put_record(&mut self, record: Record, quorum: Quorum) -> QueryId {
329 let query_id = self.next_query_id();
330 let _ = self
331 .cmd_tx
332 .send(KademliaCommand::PutRecord {
333 record,
334 quorum,
335 query_id,
336 })
337 .await;
338
339 query_id
340 }
341
342 /// Store record to DHT to the given peers.
343 ///
344 /// Returns [`Err`] only if `Kademlia` is terminating.
345 pub async fn put_record_to_peers(
346 &mut self,
347 record: Record,
348 peers: Vec<PeerId>,
349 update_local_store: bool,
350 quorum: Quorum,
351 ) -> QueryId {
352 let query_id = self.next_query_id();
353 let _ = self
354 .cmd_tx
355 .send(KademliaCommand::PutRecordToPeers {
356 record,
357 query_id,
358 peers,
359 update_local_store,
360 quorum,
361 })
362 .await;
363
364 query_id
365 }
366
367 /// Get record from DHT.
368 ///
369 /// Returns [`Err`] only if `Kademlia` is terminating.
370 pub async fn get_record(&mut self, key: RecordKey, quorum: Quorum) -> QueryId {
371 let query_id = self.next_query_id();
372 let _ = self
373 .cmd_tx
374 .send(KademliaCommand::GetRecord {
375 key,
376 quorum,
377 query_id,
378 })
379 .await;
380
381 query_id
382 }
383
384 /// Register as a content provider on the DHT.
385 ///
386 /// Register the local peer ID & its `public_addresses` as a provider for a given `key`.
387 /// Returns [`Err`] only if `Kademlia` is terminating.
388 pub async fn start_providing(&mut self, key: RecordKey, quorum: Quorum) -> QueryId {
389 let query_id = self.next_query_id();
390 let _ = self
391 .cmd_tx
392 .send(KademliaCommand::StartProviding {
393 key,
394 quorum,
395 query_id,
396 })
397 .await;
398
399 query_id
400 }
401
402 /// Stop providing the key on the DHT.
403 ///
404 /// This will stop republishing the provider, but won't
405 /// remove it instantly from the nodes. It will be removed from them after the provider TTL
406 /// expires, set by default to 48 hours.
407 pub async fn stop_providing(&mut self, key: RecordKey) {
408 let _ = self.cmd_tx.send(KademliaCommand::StopProviding { key }).await;
409 }
410
411 /// Get providers from DHT.
412 ///
413 /// Returns [`Err`] only if `Kademlia` is terminating.
414 pub async fn get_providers(&mut self, key: RecordKey) -> QueryId {
415 let query_id = self.next_query_id();
416 let _ = self.cmd_tx.send(KademliaCommand::GetProviders { key, query_id }).await;
417
418 query_id
419 }
420
421 /// Store the record in the local store. Used in combination with
422 /// [`IncomingRecordValidationMode::Manual`].
423 pub async fn store_record(&mut self, record: Record) {
424 let _ = self.cmd_tx.send(KademliaCommand::StoreRecord { record }).await;
425 }
426
427 /// Try to add known peer and if the channel is clogged, return an error.
428 pub fn try_add_known_peer(&self, peer: PeerId, addresses: Vec<Multiaddr>) -> Result<(), ()> {
429 self.cmd_tx
430 .try_send(KademliaCommand::AddKnownPeer { peer, addresses })
431 .map_err(|_| ())
432 }
433
434 /// Try to initiate `FIND_NODE` query and if the channel is clogged, return an error.
435 pub fn try_find_node(&mut self, peer: PeerId) -> Result<QueryId, ()> {
436 let query_id = self.next_query_id();
437 self.cmd_tx
438 .try_send(KademliaCommand::FindNode { peer, query_id })
439 .map(|_| query_id)
440 .map_err(|_| ())
441 }
442
443 /// Try to initiate `PUT_VALUE` query and if the channel is clogged, return an error.
444 pub fn try_put_record(&mut self, record: Record, quorum: Quorum) -> Result<QueryId, ()> {
445 let query_id = self.next_query_id();
446 self.cmd_tx
447 .try_send(KademliaCommand::PutRecord {
448 record,
449 query_id,
450 quorum,
451 })
452 .map(|_| query_id)
453 .map_err(|_| ())
454 }
455
456 /// Try to initiate `PUT_VALUE` query to the given peers and if the channel is clogged,
457 /// return an error.
458 pub fn try_put_record_to_peers(
459 &mut self,
460 record: Record,
461 peers: Vec<PeerId>,
462 update_local_store: bool,
463 quorum: Quorum,
464 ) -> Result<QueryId, ()> {
465 let query_id = self.next_query_id();
466 self.cmd_tx
467 .try_send(KademliaCommand::PutRecordToPeers {
468 record,
469 query_id,
470 peers,
471 update_local_store,
472 quorum,
473 })
474 .map(|_| query_id)
475 .map_err(|_| ())
476 }
477
478 /// Try to initiate `GET_VALUE` query and if the channel is clogged, return an error.
479 pub fn try_get_record(&mut self, key: RecordKey, quorum: Quorum) -> Result<QueryId, ()> {
480 let query_id = self.next_query_id();
481 self.cmd_tx
482 .try_send(KademliaCommand::GetRecord {
483 key,
484 quorum,
485 query_id,
486 })
487 .map(|_| query_id)
488 .map_err(|_| ())
489 }
490
491 /// Try to store the record in the local store, and if the channel is clogged, return an error.
492 /// Used in combination with [`IncomingRecordValidationMode::Manual`].
493 pub fn try_store_record(&mut self, record: Record) -> Result<(), ()> {
494 self.cmd_tx.try_send(KademliaCommand::StoreRecord { record }).map_err(|_| ())
495 }
496
497 #[cfg(feature = "fuzz")]
498 /// Expose functionality for fuzzing
499 pub async fn fuzz_send_message(&mut self, command: KademliaCommand) -> crate::Result<()> {
500 let _ = self.cmd_tx.send(command).await;
501 Ok(())
502 }
503}
504
505impl Stream for KademliaHandle {
506 type Item = KademliaEvent;
507
508 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
509 self.event_rx.poll_recv(cx)
510 }
511}