libp2p_kad/record_priv/store/
memory.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use super::*;
22
23use crate::kbucket;
24use libp2p_identity::PeerId;
25use smallvec::SmallVec;
26use std::borrow::Cow;
27use std::collections::{hash_map, hash_set, HashMap, HashSet};
28use std::iter;
29
30/// In-memory implementation of a `RecordStore`.
31pub struct MemoryStore {
32    /// The identity of the peer owning the store.
33    local_key: kbucket::Key<PeerId>,
34    /// The configuration of the store.
35    config: MemoryStoreConfig,
36    /// The stored (regular) records.
37    records: HashMap<Key, Record>,
38    /// The stored provider records.
39    providers: HashMap<Key, SmallVec<[ProviderRecord; K_VALUE.get()]>>,
40    /// The set of all provider records for the node identified by `local_key`.
41    ///
42    /// Must be kept in sync with `providers`.
43    provided: HashSet<ProviderRecord>,
44}
45
46/// Configuration for a `MemoryStore`.
47#[derive(Debug, Clone)]
48pub struct MemoryStoreConfig {
49    /// The maximum number of records.
50    pub max_records: usize,
51    /// The maximum size of record values, in bytes.
52    pub max_value_bytes: usize,
53    /// The maximum number of providers stored for a key.
54    ///
55    /// This should match up with the chosen replication factor.
56    pub max_providers_per_key: usize,
57    /// The maximum number of provider records for which the
58    /// local node is the provider.
59    pub max_provided_keys: usize,
60}
61
62impl Default for MemoryStoreConfig {
63    fn default() -> Self {
64        Self {
65            max_records: 1024,
66            max_value_bytes: 65 * 1024,
67            max_provided_keys: 1024,
68            max_providers_per_key: K_VALUE.get(),
69        }
70    }
71}
72
73impl MemoryStore {
74    /// Creates a new `MemoryRecordStore` with a default configuration.
75    pub fn new(local_id: PeerId) -> Self {
76        Self::with_config(local_id, Default::default())
77    }
78
79    /// Creates a new `MemoryRecordStore` with the given configuration.
80    pub fn with_config(local_id: PeerId, config: MemoryStoreConfig) -> Self {
81        MemoryStore {
82            local_key: kbucket::Key::from(local_id),
83            config,
84            records: HashMap::default(),
85            provided: HashSet::default(),
86            providers: HashMap::default(),
87        }
88    }
89
90    /// Retains the records satisfying a predicate.
91    pub fn retain<F>(&mut self, f: F)
92    where
93        F: FnMut(&Key, &mut Record) -> bool,
94    {
95        self.records.retain(f);
96    }
97}
98
99impl RecordStore for MemoryStore {
100    type RecordsIter<'a> =
101        iter::Map<hash_map::Values<'a, Key, Record>, fn(&'a Record) -> Cow<'a, Record>>;
102
103    type ProvidedIter<'a> = iter::Map<
104        hash_set::Iter<'a, ProviderRecord>,
105        fn(&'a ProviderRecord) -> Cow<'a, ProviderRecord>,
106    >;
107
108    fn get(&self, k: &Key) -> Option<Cow<'_, Record>> {
109        self.records.get(k).map(Cow::Borrowed)
110    }
111
112    fn put(&mut self, r: Record) -> Result<()> {
113        if r.value.len() >= self.config.max_value_bytes {
114            return Err(Error::ValueTooLarge);
115        }
116
117        let num_records = self.records.len();
118
119        match self.records.entry(r.key.clone()) {
120            hash_map::Entry::Occupied(mut e) => {
121                e.insert(r);
122            }
123            hash_map::Entry::Vacant(e) => {
124                if num_records >= self.config.max_records {
125                    return Err(Error::MaxRecords);
126                }
127                e.insert(r);
128            }
129        }
130
131        Ok(())
132    }
133
134    fn remove(&mut self, k: &Key) {
135        self.records.remove(k);
136    }
137
138    fn records(&self) -> Self::RecordsIter<'_> {
139        self.records.values().map(Cow::Borrowed)
140    }
141
142    fn add_provider(&mut self, record: ProviderRecord) -> Result<()> {
143        let num_keys = self.providers.len();
144
145        // Obtain the entry
146        let providers = match self.providers.entry(record.key.clone()) {
147            e @ hash_map::Entry::Occupied(_) => e,
148            e @ hash_map::Entry::Vacant(_) => {
149                if self.config.max_provided_keys == num_keys {
150                    return Err(Error::MaxProvidedKeys);
151                }
152                e
153            }
154        }
155        .or_insert_with(Default::default);
156
157        if let Some(i) = providers.iter().position(|p| p.provider == record.provider) {
158            // In-place update of an existing provider record.
159            providers.as_mut()[i] = record;
160        } else {
161            // It is a new provider record for that key.
162            let local_key = self.local_key.clone();
163            let key = kbucket::Key::new(record.key.clone());
164            let provider = kbucket::Key::from(record.provider);
165            if let Some(i) = providers.iter().position(|p| {
166                let pk = kbucket::Key::from(p.provider);
167                provider.distance(&key) < pk.distance(&key)
168            }) {
169                // Insert the new provider.
170                if local_key.preimage() == &record.provider {
171                    self.provided.insert(record.clone());
172                }
173                providers.insert(i, record);
174                // Remove the excess provider, if any.
175                if providers.len() > self.config.max_providers_per_key {
176                    if let Some(p) = providers.pop() {
177                        self.provided.remove(&p);
178                    }
179                }
180            } else if providers.len() < self.config.max_providers_per_key {
181                // The distance of the new provider to the key is larger than
182                // the distance of any existing provider, but there is still room.
183                if local_key.preimage() == &record.provider {
184                    self.provided.insert(record.clone());
185                }
186                providers.push(record);
187            }
188        }
189        Ok(())
190    }
191
192    fn providers(&self, key: &Key) -> Vec<ProviderRecord> {
193        self.providers
194            .get(key)
195            .map_or_else(Vec::new, |ps| ps.clone().into_vec())
196    }
197
198    fn provided(&self) -> Self::ProvidedIter<'_> {
199        self.provided.iter().map(Cow::Borrowed)
200    }
201
202    fn remove_provider(&mut self, key: &Key, provider: &PeerId) {
203        if let hash_map::Entry::Occupied(mut e) = self.providers.entry(key.clone()) {
204            let providers = e.get_mut();
205            if let Some(i) = providers.iter().position(|p| &p.provider == provider) {
206                let p = providers.remove(i);
207                self.provided.remove(&p);
208            }
209            if providers.is_empty() {
210                e.remove();
211            }
212        }
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219    use crate::SHA_256_MH;
220    use libp2p_core::multihash::Multihash;
221    use quickcheck::*;
222    use rand::Rng;
223
224    fn random_multihash() -> Multihash<64> {
225        Multihash::wrap(SHA_256_MH, &rand::thread_rng().gen::<[u8; 32]>()).unwrap()
226    }
227
228    fn distance(r: &ProviderRecord) -> kbucket::Distance {
229        kbucket::Key::new(r.key.clone()).distance(&kbucket::Key::from(r.provider))
230    }
231
232    #[test]
233    fn put_get_remove_record() {
234        fn prop(r: Record) {
235            let mut store = MemoryStore::new(PeerId::random());
236            assert!(store.put(r.clone()).is_ok());
237            assert_eq!(Some(Cow::Borrowed(&r)), store.get(&r.key));
238            store.remove(&r.key);
239            assert!(store.get(&r.key).is_none());
240        }
241        quickcheck(prop as fn(_))
242    }
243
244    #[test]
245    fn add_get_remove_provider() {
246        fn prop(r: ProviderRecord) {
247            let mut store = MemoryStore::new(PeerId::random());
248            assert!(store.add_provider(r.clone()).is_ok());
249            assert!(store.providers(&r.key).contains(&r));
250            store.remove_provider(&r.key, &r.provider);
251            assert!(!store.providers(&r.key).contains(&r));
252        }
253        quickcheck(prop as fn(_))
254    }
255
256    #[test]
257    fn providers_ordered_by_distance_to_key() {
258        fn prop(providers: Vec<kbucket::Key<PeerId>>) -> bool {
259            let mut store = MemoryStore::new(PeerId::random());
260            let key = Key::from(random_multihash());
261
262            let mut records = providers
263                .into_iter()
264                .map(|p| ProviderRecord::new(key.clone(), p.into_preimage(), Vec::new()))
265                .collect::<Vec<_>>();
266
267            for r in &records {
268                assert!(store.add_provider(r.clone()).is_ok());
269            }
270
271            records.sort_by_key(distance);
272            records.truncate(store.config.max_providers_per_key);
273
274            records == store.providers(&key).to_vec()
275        }
276
277        quickcheck(prop as fn(_) -> _)
278    }
279
280    #[test]
281    fn provided() {
282        let id = PeerId::random();
283        let mut store = MemoryStore::new(id);
284        let key = random_multihash();
285        let rec = ProviderRecord::new(key, id, Vec::new());
286        assert!(store.add_provider(rec.clone()).is_ok());
287        assert_eq!(
288            vec![Cow::Borrowed(&rec)],
289            store.provided().collect::<Vec<_>>()
290        );
291        store.remove_provider(&rec.key, &id);
292        assert_eq!(store.provided().count(), 0);
293    }
294
295    #[test]
296    fn update_provider() {
297        let mut store = MemoryStore::new(PeerId::random());
298        let key = random_multihash();
299        let prv = PeerId::random();
300        let mut rec = ProviderRecord::new(key, prv, Vec::new());
301        assert!(store.add_provider(rec.clone()).is_ok());
302        assert_eq!(vec![rec.clone()], store.providers(&rec.key).to_vec());
303        rec.expires = Some(Instant::now());
304        assert!(store.add_provider(rec.clone()).is_ok());
305        assert_eq!(vec![rec.clone()], store.providers(&rec.key).to_vec());
306    }
307
308    #[test]
309    fn max_provided_keys() {
310        let mut store = MemoryStore::new(PeerId::random());
311        for _ in 0..store.config.max_provided_keys {
312            let key = random_multihash();
313            let prv = PeerId::random();
314            let rec = ProviderRecord::new(key, prv, Vec::new());
315            let _ = store.add_provider(rec);
316        }
317        let key = random_multihash();
318        let prv = PeerId::random();
319        let rec = ProviderRecord::new(key, prv, Vec::new());
320        match store.add_provider(rec) {
321            Err(Error::MaxProvidedKeys) => {}
322            _ => panic!("Unexpected result"),
323        }
324    }
325}