litep2p/protocol/libp2p/kademlia/handle.rs
1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::{
22 protocol::libp2p::kademlia::{PeerRecord, QueryId, Record, RecordKey},
23 PeerId,
24};
25
26use futures::Stream;
27use multiaddr::Multiaddr;
28use tokio::sync::mpsc::{Receiver, Sender};
29
30use std::{
31 num::NonZeroUsize,
32 pin::Pin,
33 task::{Context, Poll},
34};
35
36/// Quorum.
37///
38/// Quorum defines how many peers must be successfully contacted
39/// in order for the query to be considered successful.
40#[derive(Debug, Copy, Clone)]
41pub enum Quorum {
42 /// All peers must be successfully contacted.
43 All,
44
45 /// One peer must be successfully contacted.
46 One,
47
48 /// `N` peer must be successfully contacted.
49 N(NonZeroUsize),
50}
51
52/// Routing table update mode.
53#[derive(Debug, Copy, Clone)]
54pub enum RoutingTableUpdateMode {
55 /// Don't insert discovered peers automatically to the routing tables but
56 /// allow user to do that by calling [`KademliaHandle::add_known_peer()`].
57 Manual,
58
59 /// Automatically add all discovered peers to routing tables.
60 Automatic,
61}
62
63/// Incoming record validation mode.
64#[derive(Debug, Copy, Clone)]
65pub enum IncomingRecordValidationMode {
66 /// Don't insert incoming records automatically to the local DHT store
67 /// and let the user do that by calling [`KademliaHandle::store_record()`].
68 Manual,
69
70 /// Automatically accept all incoming records.
71 Automatic,
72}
73
74/// Kademlia commands.
75#[derive(Debug)]
76pub(crate) enum KademliaCommand {
77 /// Add known peer.
78 AddKnownPeer {
79 /// Peer ID.
80 peer: PeerId,
81
82 /// Addresses of peer.
83 addresses: Vec<Multiaddr>,
84 },
85
86 /// Send `FIND_NODE` message.
87 FindNode {
88 /// Peer ID.
89 peer: PeerId,
90
91 /// Query ID for the query.
92 query_id: QueryId,
93 },
94
95 /// Store record to DHT.
96 PutRecord {
97 /// Record.
98 record: Record,
99
100 /// Query ID for the query.
101 query_id: QueryId,
102 },
103
104 /// Store record to DHT to the given peers.
105 ///
106 /// Similar to [`KademliaCommand::PutRecord`] but allows user to specify the peers.
107 PutRecordToPeers {
108 /// Record.
109 record: Record,
110
111 /// Query ID for the query.
112 query_id: QueryId,
113
114 /// Use the following peers for the put request.
115 peers: Vec<PeerId>,
116
117 /// Update local store.
118 update_local_store: bool,
119 },
120
121 /// Get record from DHT.
122 GetRecord {
123 /// Record key.
124 key: RecordKey,
125
126 /// [`Quorum`] for the query.
127 quorum: Quorum,
128
129 /// Query ID for the query.
130 query_id: QueryId,
131 },
132
133 /// Store record locally.
134 StoreRecord {
135 // Record.
136 record: Record,
137 },
138}
139
140/// Kademlia events.
141#[derive(Debug, Clone)]
142pub enum KademliaEvent {
143 /// Result for the issued `FIND_NODE` query.
144 FindNodeSuccess {
145 /// Query ID.
146 query_id: QueryId,
147
148 /// Target of the query
149 target: PeerId,
150
151 /// Found nodes and their addresses.
152 peers: Vec<(PeerId, Vec<Multiaddr>)>,
153 },
154
155 /// Routing table update.
156 ///
157 /// Kademlia has discovered one or more peers that should be added to the routing table.
158 /// If [`RoutingTableUpdateMode`] is `Automatic`, user can ignore this event unless some
159 /// upper-level protocols has user for this information.
160 ///
161 /// If the mode was set to `Manual`, user should call [`KademliaHandle::add_known_peer()`]
162 /// in order to add the peers to routing table.
163 RoutingTableUpdate {
164 /// Discovered peers.
165 peers: Vec<PeerId>,
166 },
167
168 /// `GET_VALUE` query succeeded.
169 GetRecordSuccess {
170 /// Query ID.
171 query_id: QueryId,
172
173 /// Found records.
174 records: RecordsType,
175 },
176
177 /// `PUT_VALUE` query succeeded.
178 PutRecordSucess {
179 /// Query ID.
180 query_id: QueryId,
181
182 /// Record key.
183 key: RecordKey,
184 },
185
186 /// Query failed.
187 QueryFailed {
188 /// Query ID.
189 query_id: QueryId,
190 },
191
192 /// Incoming `PUT_VALUE` request received.
193 ///
194 /// In case of using [`IncomingRecordValidationMode::Manual`] and successful validation
195 /// the record must be manually inserted into the local DHT store with
196 /// [`KademliaHandle::store_record()`].
197 IncomingRecord {
198 /// Record.
199 record: Record,
200 },
201}
202
203/// The type of the DHT records.
204#[derive(Debug, Clone)]
205pub enum RecordsType {
206 /// Record was found in the local store.
207 ///
208 /// This contains only a single result.
209 LocalStore(Record),
210
211 /// Records found in the network.
212 Network(Vec<PeerRecord>),
213}
214
215/// Handle for communicating with the Kademlia protocol.
216pub struct KademliaHandle {
217 /// TX channel for sending commands to `Kademlia`.
218 cmd_tx: Sender<KademliaCommand>,
219
220 /// RX channel for receiving events from `Kademlia`.
221 event_rx: Receiver<KademliaEvent>,
222
223 /// Next query ID.
224 next_query_id: usize,
225}
226
227impl KademliaHandle {
228 /// Create new [`KademliaHandle`].
229 pub(super) fn new(cmd_tx: Sender<KademliaCommand>, event_rx: Receiver<KademliaEvent>) -> Self {
230 Self {
231 cmd_tx,
232 event_rx,
233 next_query_id: 0usize,
234 }
235 }
236
237 /// Allocate next query ID.
238 fn next_query_id(&mut self) -> QueryId {
239 let query_id = self.next_query_id;
240 self.next_query_id += 1;
241
242 QueryId(query_id)
243 }
244
245 /// Add known peer.
246 pub async fn add_known_peer(&self, peer: PeerId, addresses: Vec<Multiaddr>) {
247 let _ = self.cmd_tx.send(KademliaCommand::AddKnownPeer { peer, addresses }).await;
248 }
249
250 /// Send `FIND_NODE` query to known peers.
251 pub async fn find_node(&mut self, peer: PeerId) -> QueryId {
252 let query_id = self.next_query_id();
253 let _ = self.cmd_tx.send(KademliaCommand::FindNode { peer, query_id }).await;
254
255 query_id
256 }
257
258 /// Store record to DHT.
259 pub async fn put_record(&mut self, record: Record) -> QueryId {
260 let query_id = self.next_query_id();
261 let _ = self.cmd_tx.send(KademliaCommand::PutRecord { record, query_id }).await;
262
263 query_id
264 }
265
266 /// Store record to DHT to the given peers.
267 pub async fn put_record_to_peers(
268 &mut self,
269 record: Record,
270 peers: Vec<PeerId>,
271 update_local_store: bool,
272 ) -> QueryId {
273 let query_id = self.next_query_id();
274 let _ = self
275 .cmd_tx
276 .send(KademliaCommand::PutRecordToPeers {
277 record,
278 query_id,
279 peers,
280 update_local_store,
281 })
282 .await;
283
284 query_id
285 }
286
287 /// Get record from DHT.
288 pub async fn get_record(&mut self, key: RecordKey, quorum: Quorum) -> QueryId {
289 let query_id = self.next_query_id();
290 let _ = self
291 .cmd_tx
292 .send(KademliaCommand::GetRecord {
293 key,
294 quorum,
295 query_id,
296 })
297 .await;
298
299 query_id
300 }
301
302 /// Store the record in the local store. Used in combination with
303 /// [`IncomingRecordValidationMode::Manual`].
304 pub async fn store_record(&mut self, record: Record) {
305 let _ = self.cmd_tx.send(KademliaCommand::StoreRecord { record }).await;
306 }
307
308 /// Try to add known peer and if the channel is clogged, return an error.
309 pub fn try_add_known_peer(&self, peer: PeerId, addresses: Vec<Multiaddr>) -> Result<(), ()> {
310 self.cmd_tx
311 .try_send(KademliaCommand::AddKnownPeer { peer, addresses })
312 .map_err(|_| ())
313 }
314
315 /// Try to initiate `FIND_NODE` query and if the channel is clogged, return an error.
316 pub fn try_find_node(&mut self, peer: PeerId) -> Result<QueryId, ()> {
317 let query_id = self.next_query_id();
318 self.cmd_tx
319 .try_send(KademliaCommand::FindNode { peer, query_id })
320 .map(|_| query_id)
321 .map_err(|_| ())
322 }
323
324 /// Try to initiate `PUT_VALUE` query and if the channel is clogged, return an error.
325 pub fn try_put_record(&mut self, record: Record) -> Result<QueryId, ()> {
326 let query_id = self.next_query_id();
327 self.cmd_tx
328 .try_send(KademliaCommand::PutRecord { record, query_id })
329 .map(|_| query_id)
330 .map_err(|_| ())
331 }
332
333 /// Try to initiate `PUT_VALUE` query to the given peers and if the channel is clogged,
334 /// return an error.
335 pub fn try_put_record_to_peers(
336 &mut self,
337 record: Record,
338 peers: Vec<PeerId>,
339 update_local_store: bool,
340 ) -> Result<QueryId, ()> {
341 let query_id = self.next_query_id();
342 self.cmd_tx
343 .try_send(KademliaCommand::PutRecordToPeers {
344 record,
345 query_id,
346 peers,
347 update_local_store,
348 })
349 .map(|_| query_id)
350 .map_err(|_| ())
351 }
352
353 /// Try to initiate `GET_VALUE` query and if the channel is clogged, return an error.
354 pub fn try_get_record(&mut self, key: RecordKey, quorum: Quorum) -> Result<QueryId, ()> {
355 let query_id = self.next_query_id();
356 self.cmd_tx
357 .try_send(KademliaCommand::GetRecord {
358 key,
359 quorum,
360 query_id,
361 })
362 .map(|_| query_id)
363 .map_err(|_| ())
364 }
365
366 /// Try to store the record in the local store, and if the channel is clogged, return an error.
367 /// Used in combination with [`IncomingRecordValidationMode::Manual`].
368 pub fn try_store_record(&mut self, record: Record) -> Result<(), ()> {
369 self.cmd_tx.try_send(KademliaCommand::StoreRecord { record }).map_err(|_| ())
370 }
371}
372
373impl Stream for KademliaHandle {
374 type Item = KademliaEvent;
375
376 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
377 self.event_rx.poll_recv(cx)
378 }
379}