litep2p/protocol/libp2p/kademlia/
message.rs1use crate::{
22 protocol::libp2p::kademlia::{
23 record::{Key as RecordKey, ProviderRecord, Record},
24 schema,
25 types::{ConnectionType, KademliaPeer},
26 },
27 PeerId,
28};
29
30use bytes::{Bytes, BytesMut};
31use prost::Message;
32use std::time::{Duration, Instant};
33
34const LOG_TARGET: &str = "litep2p::ipfs::kademlia::message";
36
37#[derive(Debug, Clone)]
39pub enum KademliaMessage {
40 FindNode {
42 target: Vec<u8>,
44
45 peers: Vec<KademliaPeer>,
47 },
48
49 PutValue {
51 record: Record,
53 },
54
55 GetRecord {
57 key: Option<RecordKey>,
59
60 record: Option<Record>,
62
63 peers: Vec<KademliaPeer>,
65 },
66
67 AddProvider {
69 key: RecordKey,
71
72 providers: Vec<KademliaPeer>,
75 },
76
77 GetProviders {
79 key: Option<RecordKey>,
81
82 peers: Vec<KademliaPeer>,
84
85 providers: Vec<KademliaPeer>,
87 },
88}
89
90impl KademliaMessage {
91 pub fn find_node<T: Into<Vec<u8>>>(key: T) -> Bytes {
93 let message = schema::kademlia::Message {
94 key: key.into(),
95 r#type: schema::kademlia::MessageType::FindNode.into(),
96 cluster_level_raw: 10,
97 ..Default::default()
98 };
99
100 let mut buf = BytesMut::with_capacity(message.encoded_len());
101 message.encode(&mut buf).expect("Vec<u8> to provide needed capacity");
102
103 buf.freeze()
104 }
105
106 pub fn put_value(record: Record) -> Bytes {
108 let message = schema::kademlia::Message {
109 key: record.key.clone().into(),
110 r#type: schema::kademlia::MessageType::PutValue.into(),
111 record: Some(record_to_schema(record)),
112 cluster_level_raw: 10,
113 ..Default::default()
114 };
115
116 let mut buf = BytesMut::with_capacity(message.encoded_len());
117 message.encode(&mut buf).expect("BytesMut to provide needed capacity");
118
119 buf.freeze()
120 }
121
122 pub fn get_record(key: RecordKey) -> Bytes {
124 let message = schema::kademlia::Message {
125 key: key.clone().into(),
126 r#type: schema::kademlia::MessageType::GetValue.into(),
127 cluster_level_raw: 10,
128 ..Default::default()
129 };
130
131 let mut buf = BytesMut::with_capacity(message.encoded_len());
132 message.encode(&mut buf).expect("BytesMut to provide needed capacity");
133
134 buf.freeze()
135 }
136
137 pub fn find_node_response<K: AsRef<[u8]>>(key: K, peers: Vec<KademliaPeer>) -> Vec<u8> {
139 let message = schema::kademlia::Message {
140 key: key.as_ref().to_vec(),
141 cluster_level_raw: 10,
142 r#type: schema::kademlia::MessageType::FindNode.into(),
143 closer_peers: peers.iter().map(|peer| peer.into()).collect(),
144 ..Default::default()
145 };
146
147 let mut buf = Vec::with_capacity(message.encoded_len());
148 message.encode(&mut buf).expect("Vec<u8> to provide needed capacity");
149
150 buf
151 }
152
153 pub fn get_value_response(
155 key: RecordKey,
156 peers: Vec<KademliaPeer>,
157 record: Option<Record>,
158 ) -> Vec<u8> {
159 let message = schema::kademlia::Message {
160 key: key.to_vec(),
161 cluster_level_raw: 10,
162 r#type: schema::kademlia::MessageType::GetValue.into(),
163 closer_peers: peers.iter().map(|peer| peer.into()).collect(),
164 record: record.map(record_to_schema),
165 ..Default::default()
166 };
167
168 let mut buf = Vec::with_capacity(message.encoded_len());
169 message.encode(&mut buf).expect("Vec<u8> to provide needed capacity");
170
171 buf
172 }
173
174 #[allow(unused)]
176 pub fn add_provider(provider: ProviderRecord) -> Vec<u8> {
177 let peer = KademliaPeer::new(
178 provider.provider,
179 provider.addresses,
180 ConnectionType::CanConnect, );
182 let message = schema::kademlia::Message {
183 key: provider.key.clone().to_vec(),
184 cluster_level_raw: 10,
185 r#type: schema::kademlia::MessageType::AddProvider.into(),
186 provider_peers: std::iter::once((&peer).into()).collect(),
187 ..Default::default()
188 };
189
190 let mut buf = Vec::with_capacity(message.encoded_len());
191 message.encode(&mut buf).expect("Vec<u8> to provide needed capacity");
192
193 buf
194 }
195
196 #[allow(unused)]
198 pub fn get_providers_request(key: RecordKey) -> Vec<u8> {
199 let message = schema::kademlia::Message {
200 key: key.to_vec(),
201 cluster_level_raw: 10,
202 r#type: schema::kademlia::MessageType::GetProviders.into(),
203 ..Default::default()
204 };
205
206 let mut buf = Vec::with_capacity(message.encoded_len());
207 message.encode(&mut buf).expect("Vec<u8> to provide needed capacity");
208
209 buf
210 }
211
212 pub fn get_providers_response(
214 key: RecordKey,
215 providers: Vec<ProviderRecord>,
216 closer_peers: &[KademliaPeer],
217 ) -> Vec<u8> {
218 debug_assert!(providers.iter().all(|p| p.key == key));
219
220 let provider_peers = providers
221 .into_iter()
222 .map(|p| {
223 KademliaPeer::new(
224 p.provider,
225 p.addresses,
226 ConnectionType::CanConnect, )
228 })
229 .map(|p| (&p).into())
230 .collect();
231
232 let message = schema::kademlia::Message {
233 key: key.to_vec(),
234 cluster_level_raw: 10,
235 r#type: schema::kademlia::MessageType::GetProviders.into(),
236 closer_peers: closer_peers.iter().map(Into::into).collect(),
237 provider_peers,
238 ..Default::default()
239 };
240
241 let mut buf = Vec::with_capacity(message.encoded_len());
242 message.encode(&mut buf).expect("Vec<u8> to provide needed capacity");
243
244 buf
245 }
246
247 pub fn from_bytes(bytes: BytesMut) -> Option<Self> {
249 match schema::kademlia::Message::decode(bytes) {
250 Ok(message) => match message.r#type {
251 4 => {
253 let peers = message
254 .closer_peers
255 .iter()
256 .filter_map(|peer| KademliaPeer::try_from(peer).ok())
257 .collect();
258
259 Some(Self::FindNode {
260 target: message.key,
261 peers,
262 })
263 }
264 0 => {
266 let record = message.record?;
267
268 Some(Self::PutValue {
269 record: record_from_schema(record)?,
270 })
271 }
272 1 => {
274 let key = match message.key.is_empty() {
275 true => message.record.as_ref().and_then(|record| {
276 (!record.key.is_empty()).then_some(RecordKey::from(record.key.clone()))
277 }),
278 false => Some(RecordKey::from(message.key.clone())),
279 };
280
281 let record = if let Some(record) = message.record {
282 Some(record_from_schema(record)?)
283 } else {
284 None
285 };
286
287 Some(Self::GetRecord {
288 key,
289 record,
290 peers: message
291 .closer_peers
292 .iter()
293 .filter_map(|peer| KademliaPeer::try_from(peer).ok())
294 .collect(),
295 })
296 }
297 2 => {
299 let key = (!message.key.is_empty()).then_some(message.key.into())?;
300 let providers = message
301 .provider_peers
302 .iter()
303 .filter_map(|peer| KademliaPeer::try_from(peer).ok())
304 .collect();
305
306 Some(Self::AddProvider { key, providers })
307 }
308 3 => {
310 let key = (!message.key.is_empty()).then_some(message.key.into());
311 let peers = message
312 .closer_peers
313 .iter()
314 .filter_map(|peer| KademliaPeer::try_from(peer).ok())
315 .collect();
316 let providers = message
317 .provider_peers
318 .iter()
319 .filter_map(|peer| KademliaPeer::try_from(peer).ok())
320 .collect();
321
322 Some(Self::GetProviders {
323 key,
324 peers,
325 providers,
326 })
327 }
328 message_type => {
329 tracing::warn!(target: LOG_TARGET, ?message_type, "unhandled message");
330 None
331 }
332 },
333 Err(error) => {
334 tracing::debug!(target: LOG_TARGET, ?error, "failed to decode message");
335 None
336 }
337 }
338 }
339}
340
341fn record_to_schema(record: Record) -> schema::kademlia::Record {
342 schema::kademlia::Record {
343 key: record.key.into(),
344 value: record.value,
345 time_received: String::new(),
346 publisher: record.publisher.map(|peer_id| peer_id.to_bytes()).unwrap_or_default(),
347 ttl: record
348 .expires
349 .map(|expires| {
350 let now = Instant::now();
351 if expires > now {
352 u32::try_from((expires - now).as_secs()).unwrap_or(u32::MAX)
353 } else {
354 1 }
356 })
357 .unwrap_or(0),
358 }
359}
360
361fn record_from_schema(record: schema::kademlia::Record) -> Option<Record> {
362 Some(Record {
363 key: record.key.into(),
364 value: record.value,
365 publisher: if !record.publisher.is_empty() {
366 Some(PeerId::from_bytes(&record.publisher).ok()?)
367 } else {
368 None
369 },
370 expires: if record.ttl > 0 {
371 Some(Instant::now() + Duration::from_secs(record.ttl as u64))
372 } else {
373 None
374 },
375 })
376}
377
378#[cfg(test)]
379mod tests {
380 use super::*;
381
382 #[test]
383 fn non_empty_publisher_and_ttl_are_preserved() {
384 let expires = Instant::now() + Duration::from_secs(3600);
385
386 let record = Record {
387 key: vec![1, 2, 3].into(),
388 value: vec![17],
389 publisher: Some(PeerId::random()),
390 expires: Some(expires),
391 };
392
393 let got_record = record_from_schema(record_to_schema(record.clone())).unwrap();
394
395 assert_eq!(got_record.key, record.key);
396 assert_eq!(got_record.value, record.value);
397 assert_eq!(got_record.publisher, record.publisher);
398
399 let got_expires = got_record.expires.unwrap();
401 assert!(got_expires - expires >= Duration::ZERO);
402 assert!(got_expires - expires < Duration::from_secs(10));
403 }
404
405 #[test]
406 fn empty_publisher_and_ttl_are_preserved() {
407 let record = Record {
408 key: vec![1, 2, 3].into(),
409 value: vec![17],
410 publisher: None,
411 expires: None,
412 };
413
414 let got_record = record_from_schema(record_to_schema(record.clone())).unwrap();
415
416 assert_eq!(got_record, record);
417 }
418}