litep2p/protocol/libp2p/kademlia/
message.rs

1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::{
22    protocol::libp2p::kademlia::{
23        record::{Key as RecordKey, ProviderRecord, Record},
24        schema,
25        types::{ConnectionType, KademliaPeer},
26    },
27    PeerId,
28};
29
30use bytes::{Bytes, BytesMut};
31use prost::Message;
32use std::time::{Duration, Instant};
33
34/// Logging target for the file.
35const LOG_TARGET: &str = "litep2p::ipfs::kademlia::message";
36
37/// Kademlia message.
38#[derive(Debug, Clone)]
39pub enum KademliaMessage {
40    /// `FIND_NODE` message.
41    FindNode {
42        /// Query target.
43        target: Vec<u8>,
44
45        /// Found peers.
46        peers: Vec<KademliaPeer>,
47    },
48
49    /// Kademlia `PUT_VALUE` message.
50    PutValue {
51        /// Record.
52        record: Record,
53    },
54
55    /// `GET_VALUE` message.
56    GetRecord {
57        /// Key.
58        key: Option<RecordKey>,
59
60        /// Record.
61        record: Option<Record>,
62
63        /// Peers closer to the key.
64        peers: Vec<KademliaPeer>,
65    },
66
67    /// `ADD_PROVIDER` message.
68    AddProvider {
69        /// Key.
70        key: RecordKey,
71
72        /// Peers, providing the data for `key`. Must contain exactly one peer matching the sender
73        /// of the message.
74        providers: Vec<KademliaPeer>,
75    },
76
77    /// `GET_PROVIDERS` message.
78    GetProviders {
79        /// Key. `None` in response.
80        key: Option<RecordKey>,
81
82        /// Peers closer to the key.
83        peers: Vec<KademliaPeer>,
84
85        /// Peers, providing the data for `key`.
86        providers: Vec<KademliaPeer>,
87    },
88}
89
90impl KademliaMessage {
91    /// Create `FIND_NODE` message for `peer`.
92    pub fn find_node<T: Into<Vec<u8>>>(key: T) -> Bytes {
93        let message = schema::kademlia::Message {
94            key: key.into(),
95            r#type: schema::kademlia::MessageType::FindNode.into(),
96            cluster_level_raw: 10,
97            ..Default::default()
98        };
99
100        let mut buf = BytesMut::with_capacity(message.encoded_len());
101        message.encode(&mut buf).expect("Vec<u8> to provide needed capacity");
102
103        buf.freeze()
104    }
105
106    /// Create `PUT_VALUE` message for `record`.
107    pub fn put_value(record: Record) -> Bytes {
108        let message = schema::kademlia::Message {
109            key: record.key.clone().into(),
110            r#type: schema::kademlia::MessageType::PutValue.into(),
111            record: Some(record_to_schema(record)),
112            cluster_level_raw: 10,
113            ..Default::default()
114        };
115
116        let mut buf = BytesMut::with_capacity(message.encoded_len());
117        message.encode(&mut buf).expect("BytesMut to provide needed capacity");
118
119        buf.freeze()
120    }
121
122    /// Create `GET_VALUE` message for `record`.
123    pub fn get_record(key: RecordKey) -> Bytes {
124        let message = schema::kademlia::Message {
125            key: key.clone().into(),
126            r#type: schema::kademlia::MessageType::GetValue.into(),
127            cluster_level_raw: 10,
128            ..Default::default()
129        };
130
131        let mut buf = BytesMut::with_capacity(message.encoded_len());
132        message.encode(&mut buf).expect("BytesMut to provide needed capacity");
133
134        buf.freeze()
135    }
136
137    /// Create `FIND_NODE` response.
138    pub fn find_node_response<K: AsRef<[u8]>>(key: K, peers: Vec<KademliaPeer>) -> Vec<u8> {
139        let message = schema::kademlia::Message {
140            key: key.as_ref().to_vec(),
141            cluster_level_raw: 10,
142            r#type: schema::kademlia::MessageType::FindNode.into(),
143            closer_peers: peers.iter().map(|peer| peer.into()).collect(),
144            ..Default::default()
145        };
146
147        let mut buf = Vec::with_capacity(message.encoded_len());
148        message.encode(&mut buf).expect("Vec<u8> to provide needed capacity");
149
150        buf
151    }
152
153    /// Create `PUT_VALUE` response.
154    pub fn get_value_response(
155        key: RecordKey,
156        peers: Vec<KademliaPeer>,
157        record: Option<Record>,
158    ) -> Vec<u8> {
159        let message = schema::kademlia::Message {
160            key: key.to_vec(),
161            cluster_level_raw: 10,
162            r#type: schema::kademlia::MessageType::GetValue.into(),
163            closer_peers: peers.iter().map(|peer| peer.into()).collect(),
164            record: record.map(record_to_schema),
165            ..Default::default()
166        };
167
168        let mut buf = Vec::with_capacity(message.encoded_len());
169        message.encode(&mut buf).expect("Vec<u8> to provide needed capacity");
170
171        buf
172    }
173
174    /// Create `ADD_PROVIDER` message with `provider`.
175    #[allow(unused)]
176    pub fn add_provider(provider: ProviderRecord) -> Vec<u8> {
177        let peer = KademliaPeer::new(
178            provider.provider,
179            provider.addresses,
180            ConnectionType::CanConnect, // ignored by message recipient
181        );
182        let message = schema::kademlia::Message {
183            key: provider.key.clone().to_vec(),
184            cluster_level_raw: 10,
185            r#type: schema::kademlia::MessageType::AddProvider.into(),
186            provider_peers: std::iter::once((&peer).into()).collect(),
187            ..Default::default()
188        };
189
190        let mut buf = Vec::with_capacity(message.encoded_len());
191        message.encode(&mut buf).expect("Vec<u8> to provide needed capacity");
192
193        buf
194    }
195
196    /// Create `GET_PROVIDERS` request for `key`.
197    #[allow(unused)]
198    pub fn get_providers_request(key: RecordKey) -> Vec<u8> {
199        let message = schema::kademlia::Message {
200            key: key.to_vec(),
201            cluster_level_raw: 10,
202            r#type: schema::kademlia::MessageType::GetProviders.into(),
203            ..Default::default()
204        };
205
206        let mut buf = Vec::with_capacity(message.encoded_len());
207        message.encode(&mut buf).expect("Vec<u8> to provide needed capacity");
208
209        buf
210    }
211
212    /// Create `GET_PROVIDERS` response.
213    pub fn get_providers_response(
214        key: RecordKey,
215        providers: Vec<ProviderRecord>,
216        closer_peers: &[KademliaPeer],
217    ) -> Vec<u8> {
218        debug_assert!(providers.iter().all(|p| p.key == key));
219
220        let provider_peers = providers
221            .into_iter()
222            .map(|p| {
223                KademliaPeer::new(
224                    p.provider,
225                    p.addresses,
226                    ConnectionType::CanConnect, // ignored by recipient
227                )
228            })
229            .map(|p| (&p).into())
230            .collect();
231
232        let message = schema::kademlia::Message {
233            key: key.to_vec(),
234            cluster_level_raw: 10,
235            r#type: schema::kademlia::MessageType::GetProviders.into(),
236            closer_peers: closer_peers.iter().map(Into::into).collect(),
237            provider_peers,
238            ..Default::default()
239        };
240
241        let mut buf = Vec::with_capacity(message.encoded_len());
242        message.encode(&mut buf).expect("Vec<u8> to provide needed capacity");
243
244        buf
245    }
246
247    /// Get [`KademliaMessage`] from bytes.
248    pub fn from_bytes(bytes: BytesMut) -> Option<Self> {
249        match schema::kademlia::Message::decode(bytes) {
250            Ok(message) => match message.r#type {
251                // FIND_NODE
252                4 => {
253                    let peers = message
254                        .closer_peers
255                        .iter()
256                        .filter_map(|peer| KademliaPeer::try_from(peer).ok())
257                        .collect();
258
259                    Some(Self::FindNode {
260                        target: message.key,
261                        peers,
262                    })
263                }
264                // PUT_VALUE
265                0 => {
266                    let record = message.record?;
267
268                    Some(Self::PutValue {
269                        record: record_from_schema(record)?,
270                    })
271                }
272                // GET_VALUE
273                1 => {
274                    let key = match message.key.is_empty() {
275                        true => message.record.as_ref().and_then(|record| {
276                            (!record.key.is_empty()).then_some(RecordKey::from(record.key.clone()))
277                        }),
278                        false => Some(RecordKey::from(message.key.clone())),
279                    };
280
281                    let record = if let Some(record) = message.record {
282                        Some(record_from_schema(record)?)
283                    } else {
284                        None
285                    };
286
287                    Some(Self::GetRecord {
288                        key,
289                        record,
290                        peers: message
291                            .closer_peers
292                            .iter()
293                            .filter_map(|peer| KademliaPeer::try_from(peer).ok())
294                            .collect(),
295                    })
296                }
297                // ADD_PROVIDER
298                2 => {
299                    let key = (!message.key.is_empty()).then_some(message.key.into())?;
300                    let providers = message
301                        .provider_peers
302                        .iter()
303                        .filter_map(|peer| KademliaPeer::try_from(peer).ok())
304                        .collect();
305
306                    Some(Self::AddProvider { key, providers })
307                }
308                // GET_PROVIDERS
309                3 => {
310                    let key = (!message.key.is_empty()).then_some(message.key.into());
311                    let peers = message
312                        .closer_peers
313                        .iter()
314                        .filter_map(|peer| KademliaPeer::try_from(peer).ok())
315                        .collect();
316                    let providers = message
317                        .provider_peers
318                        .iter()
319                        .filter_map(|peer| KademliaPeer::try_from(peer).ok())
320                        .collect();
321
322                    Some(Self::GetProviders {
323                        key,
324                        peers,
325                        providers,
326                    })
327                }
328                message_type => {
329                    tracing::warn!(target: LOG_TARGET, ?message_type, "unhandled message");
330                    None
331                }
332            },
333            Err(error) => {
334                tracing::debug!(target: LOG_TARGET, ?error, "failed to decode message");
335                None
336            }
337        }
338    }
339}
340
341fn record_to_schema(record: Record) -> schema::kademlia::Record {
342    schema::kademlia::Record {
343        key: record.key.into(),
344        value: record.value,
345        time_received: String::new(),
346        publisher: record.publisher.map(|peer_id| peer_id.to_bytes()).unwrap_or_default(),
347        ttl: record
348            .expires
349            .map(|expires| {
350                let now = Instant::now();
351                if expires > now {
352                    u32::try_from((expires - now).as_secs()).unwrap_or(u32::MAX)
353                } else {
354                    1 // because 0 means "does not expire"
355                }
356            })
357            .unwrap_or(0),
358    }
359}
360
361fn record_from_schema(record: schema::kademlia::Record) -> Option<Record> {
362    Some(Record {
363        key: record.key.into(),
364        value: record.value,
365        publisher: if !record.publisher.is_empty() {
366            Some(PeerId::from_bytes(&record.publisher).ok()?)
367        } else {
368            None
369        },
370        expires: if record.ttl > 0 {
371            Some(Instant::now() + Duration::from_secs(record.ttl as u64))
372        } else {
373            None
374        },
375    })
376}
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381
382    #[test]
383    fn non_empty_publisher_and_ttl_are_preserved() {
384        let expires = Instant::now() + Duration::from_secs(3600);
385
386        let record = Record {
387            key: vec![1, 2, 3].into(),
388            value: vec![17],
389            publisher: Some(PeerId::random()),
390            expires: Some(expires),
391        };
392
393        let got_record = record_from_schema(record_to_schema(record.clone())).unwrap();
394
395        assert_eq!(got_record.key, record.key);
396        assert_eq!(got_record.value, record.value);
397        assert_eq!(got_record.publisher, record.publisher);
398
399        // Check that the expiration time is sane.
400        let got_expires = got_record.expires.unwrap();
401        assert!(got_expires - expires >= Duration::ZERO);
402        assert!(got_expires - expires < Duration::from_secs(10));
403    }
404
405    #[test]
406    fn empty_publisher_and_ttl_are_preserved() {
407        let record = Record {
408            key: vec![1, 2, 3].into(),
409            value: vec![17],
410            publisher: None,
411            expires: None,
412        };
413
414        let got_record = record_from_schema(record_to_schema(record.clone())).unwrap();
415
416        assert_eq!(got_record, record);
417    }
418}