libp2p_kad/record/store/
memory.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use super::*;
22
23use crate::kbucket;
24use smallvec::SmallVec;
25use std::collections::{hash_map, hash_set, HashMap, HashSet};
26use std::iter;
27
28/// In-memory implementation of a `RecordStore`.
29pub struct MemoryStore {
30    /// The identity of the peer owning the store.
31    local_key: kbucket::Key<PeerId>,
32    /// The configuration of the store.
33    config: MemoryStoreConfig,
34    /// The stored (regular) records.
35    records: HashMap<Key, Record>,
36    /// The stored provider records.
37    providers: HashMap<Key, SmallVec<[ProviderRecord; K_VALUE.get()]>>,
38    /// The set of all provider records for the node identified by `local_key`.
39    ///
40    /// Must be kept in sync with `providers`.
41    provided: HashSet<ProviderRecord>,
42}
43
44/// Configuration for a `MemoryStore`.
45#[derive(Debug, Clone)]
46pub struct MemoryStoreConfig {
47    /// The maximum number of records.
48    pub max_records: usize,
49    /// The maximum size of record values, in bytes.
50    pub max_value_bytes: usize,
51    /// The maximum number of providers stored for a key.
52    ///
53    /// This should match up with the chosen replication factor.
54    pub max_providers_per_key: usize,
55    /// The maximum number of provider records for which the
56    /// local node is the provider.
57    pub max_provided_keys: usize,
58}
59
60impl Default for MemoryStoreConfig {
61    fn default() -> Self {
62        Self {
63            max_records: 1024,
64            max_value_bytes: 65 * 1024,
65            max_provided_keys: 1024,
66            max_providers_per_key: K_VALUE.get(),
67        }
68    }
69}
70
71impl MemoryStore {
72    /// Creates a new `MemoryRecordStore` with a default configuration.
73    pub fn new(local_id: PeerId) -> Self {
74        Self::with_config(local_id, Default::default())
75    }
76
77    /// Creates a new `MemoryRecordStore` with the given configuration.
78    pub fn with_config(local_id: PeerId, config: MemoryStoreConfig) -> Self {
79        MemoryStore {
80            local_key: kbucket::Key::from(local_id),
81            config,
82            records: HashMap::default(),
83            provided: HashSet::default(),
84            providers: HashMap::default(),
85        }
86    }
87
88    /// Retains the records satisfying a predicate.
89    pub fn retain<F>(&mut self, f: F)
90    where
91        F: FnMut(&Key, &mut Record) -> bool,
92    {
93        self.records.retain(f);
94    }
95}
96
97impl RecordStore for MemoryStore {
98    type RecordsIter<'a> =
99        iter::Map<hash_map::Values<'a, Key, Record>, fn(&'a Record) -> Cow<'a, Record>>;
100
101    type ProvidedIter<'a> = iter::Map<
102        hash_set::Iter<'a, ProviderRecord>,
103        fn(&'a ProviderRecord) -> Cow<'a, ProviderRecord>,
104    >;
105
106    fn get(&self, k: &Key) -> Option<Cow<'_, Record>> {
107        self.records.get(k).map(Cow::Borrowed)
108    }
109
110    fn put(&mut self, r: Record) -> Result<()> {
111        if r.value.len() >= self.config.max_value_bytes {
112            return Err(Error::ValueTooLarge);
113        }
114
115        let num_records = self.records.len();
116
117        match self.records.entry(r.key.clone()) {
118            hash_map::Entry::Occupied(mut e) => {
119                e.insert(r);
120            }
121            hash_map::Entry::Vacant(e) => {
122                if num_records >= self.config.max_records {
123                    return Err(Error::MaxRecords);
124                }
125                e.insert(r);
126            }
127        }
128
129        Ok(())
130    }
131
132    fn remove(&mut self, k: &Key) {
133        self.records.remove(k);
134    }
135
136    fn records(&self) -> Self::RecordsIter<'_> {
137        self.records.values().map(Cow::Borrowed)
138    }
139
140    fn add_provider(&mut self, record: ProviderRecord) -> Result<()> {
141        let num_keys = self.providers.len();
142
143        // Obtain the entry
144        let providers = match self.providers.entry(record.key.clone()) {
145            e @ hash_map::Entry::Occupied(_) => e,
146            e @ hash_map::Entry::Vacant(_) => {
147                if self.config.max_provided_keys == num_keys {
148                    return Err(Error::MaxProvidedKeys);
149                }
150                e
151            }
152        }
153        .or_insert_with(Default::default);
154
155        for p in providers.iter_mut() {
156            if p.provider == record.provider {
157                // In-place update of an existing provider record.
158                if self.local_key.preimage() == &record.provider {
159                    self.provided.remove(p);
160                    self.provided.insert(record.clone());
161                }
162                *p = record;
163                return Ok(());
164            }
165        }
166
167        // If the providers list is full, we ignore the new provider.
168        // This strategy can mitigate Sybil attacks, in which an attacker
169        // floods the network with fake provider records.
170        if providers.len() == self.config.max_providers_per_key {
171            return Ok(());
172        }
173
174        // Otherwise, insert the new provider record.
175        if self.local_key.preimage() == &record.provider {
176            self.provided.insert(record.clone());
177        }
178        providers.push(record);
179
180        Ok(())
181    }
182
183    fn providers(&self, key: &Key) -> Vec<ProviderRecord> {
184        self.providers
185            .get(key)
186            .map_or_else(Vec::new, |ps| ps.clone().into_vec())
187    }
188
189    fn provided(&self) -> Self::ProvidedIter<'_> {
190        self.provided.iter().map(Cow::Borrowed)
191    }
192
193    fn remove_provider(&mut self, key: &Key, provider: &PeerId) {
194        if let hash_map::Entry::Occupied(mut e) = self.providers.entry(key.clone()) {
195            let providers = e.get_mut();
196            if let Some(i) = providers.iter().position(|p| &p.provider == provider) {
197                let p = providers.remove(i);
198                if &p.provider == self.local_key.preimage() {
199                    self.provided.remove(&p);
200                }
201            }
202            if providers.is_empty() {
203                e.remove();
204            }
205        }
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use super::*;
212    use crate::SHA_256_MH;
213    use quickcheck::*;
214    use rand::Rng;
215
216    fn random_multihash() -> Multihash<64> {
217        Multihash::wrap(SHA_256_MH, &rand::thread_rng().gen::<[u8; 32]>()).unwrap()
218    }
219    #[test]
220    fn put_get_remove_record() {
221        fn prop(r: Record) {
222            let mut store = MemoryStore::new(PeerId::random());
223            assert!(store.put(r.clone()).is_ok());
224            assert_eq!(Some(Cow::Borrowed(&r)), store.get(&r.key));
225            store.remove(&r.key);
226            assert!(store.get(&r.key).is_none());
227        }
228        quickcheck(prop as fn(_))
229    }
230
231    #[test]
232    fn add_get_remove_provider() {
233        fn prop(r: ProviderRecord) {
234            let mut store = MemoryStore::new(PeerId::random());
235            assert!(store.add_provider(r.clone()).is_ok());
236            assert!(store.providers(&r.key).contains(&r));
237            store.remove_provider(&r.key, &r.provider);
238            assert!(!store.providers(&r.key).contains(&r));
239        }
240        quickcheck(prop as fn(_))
241    }
242
243    #[test]
244    fn provided() {
245        let id = PeerId::random();
246        let mut store = MemoryStore::new(id);
247        let key = random_multihash();
248        let rec = ProviderRecord::new(key, id, Vec::new());
249        assert!(store.add_provider(rec.clone()).is_ok());
250        assert_eq!(
251            vec![Cow::Borrowed(&rec)],
252            store.provided().collect::<Vec<_>>()
253        );
254        store.remove_provider(&rec.key, &id);
255        assert_eq!(store.provided().count(), 0);
256    }
257
258    #[test]
259    fn update_provider() {
260        let mut store = MemoryStore::new(PeerId::random());
261        let key = random_multihash();
262        let prv = PeerId::random();
263        let mut rec = ProviderRecord::new(key, prv, Vec::new());
264        assert!(store.add_provider(rec.clone()).is_ok());
265        assert_eq!(vec![rec.clone()], store.providers(&rec.key).to_vec());
266        rec.expires = Some(Instant::now());
267        assert!(store.add_provider(rec.clone()).is_ok());
268        assert_eq!(vec![rec.clone()], store.providers(&rec.key).to_vec());
269    }
270
271    #[test]
272    fn update_provided() {
273        let prv = PeerId::random();
274        let mut store = MemoryStore::new(prv);
275        let key = random_multihash();
276        let mut rec = ProviderRecord::new(key, prv, Vec::new());
277        assert!(store.add_provider(rec.clone()).is_ok());
278        assert_eq!(
279            vec![Cow::Borrowed(&rec)],
280            store.provided().collect::<Vec<_>>()
281        );
282        rec.expires = Some(Instant::now());
283        assert!(store.add_provider(rec.clone()).is_ok());
284        assert_eq!(
285            vec![Cow::Borrowed(&rec)],
286            store.provided().collect::<Vec<_>>()
287        );
288    }
289
290    #[test]
291    fn max_providers_per_key() {
292        let config = MemoryStoreConfig::default();
293        let key = kbucket::Key::new(Key::from(random_multihash()));
294
295        let mut store = MemoryStore::with_config(PeerId::random(), config.clone());
296        let peers = (0..config.max_providers_per_key)
297            .map(|_| PeerId::random())
298            .collect::<Vec<_>>();
299        for peer in peers {
300            let rec = ProviderRecord::new(key.preimage().clone(), peer, Vec::new());
301            assert!(store.add_provider(rec).is_ok());
302        }
303
304        // The new provider cannot be added because the key is already saturated.
305        let peer = PeerId::random();
306        let rec = ProviderRecord::new(key.preimage().clone(), peer, Vec::new());
307        assert!(store.add_provider(rec.clone()).is_ok());
308        assert!(!store.providers(&rec.key).contains(&rec));
309    }
310
311    #[test]
312    fn max_provided_keys() {
313        let mut store = MemoryStore::new(PeerId::random());
314        for _ in 0..store.config.max_provided_keys {
315            let key = random_multihash();
316            let prv = PeerId::random();
317            let rec = ProviderRecord::new(key, prv, Vec::new());
318            let _ = store.add_provider(rec);
319        }
320        let key = random_multihash();
321        let prv = PeerId::random();
322        let rec = ProviderRecord::new(key, prv, Vec::new());
323        match store.add_provider(rec) {
324            Err(Error::MaxProvidedKeys) => {}
325            _ => panic!("Unexpected result"),
326        }
327    }
328}