libp2p_kad/record_priv/store/
memory.rs1use super::*;
22
23use crate::kbucket;
24use libp2p_identity::PeerId;
25use smallvec::SmallVec;
26use std::borrow::Cow;
27use std::collections::{hash_map, hash_set, HashMap, HashSet};
28use std::iter;
29
30pub struct MemoryStore {
32 local_key: kbucket::Key<PeerId>,
34 config: MemoryStoreConfig,
36 records: HashMap<Key, Record>,
38 providers: HashMap<Key, SmallVec<[ProviderRecord; K_VALUE.get()]>>,
40 provided: HashSet<ProviderRecord>,
44}
45
46#[derive(Debug, Clone)]
48pub struct MemoryStoreConfig {
49 pub max_records: usize,
51 pub max_value_bytes: usize,
53 pub max_providers_per_key: usize,
57 pub max_provided_keys: usize,
60}
61
62impl Default for MemoryStoreConfig {
63 fn default() -> Self {
64 Self {
65 max_records: 1024,
66 max_value_bytes: 65 * 1024,
67 max_provided_keys: 1024,
68 max_providers_per_key: K_VALUE.get(),
69 }
70 }
71}
72
73impl MemoryStore {
74 pub fn new(local_id: PeerId) -> Self {
76 Self::with_config(local_id, Default::default())
77 }
78
79 pub fn with_config(local_id: PeerId, config: MemoryStoreConfig) -> Self {
81 MemoryStore {
82 local_key: kbucket::Key::from(local_id),
83 config,
84 records: HashMap::default(),
85 provided: HashSet::default(),
86 providers: HashMap::default(),
87 }
88 }
89
90 pub fn retain<F>(&mut self, f: F)
92 where
93 F: FnMut(&Key, &mut Record) -> bool,
94 {
95 self.records.retain(f);
96 }
97}
98
99impl RecordStore for MemoryStore {
100 type RecordsIter<'a> =
101 iter::Map<hash_map::Values<'a, Key, Record>, fn(&'a Record) -> Cow<'a, Record>>;
102
103 type ProvidedIter<'a> = iter::Map<
104 hash_set::Iter<'a, ProviderRecord>,
105 fn(&'a ProviderRecord) -> Cow<'a, ProviderRecord>,
106 >;
107
108 fn get(&self, k: &Key) -> Option<Cow<'_, Record>> {
109 self.records.get(k).map(Cow::Borrowed)
110 }
111
112 fn put(&mut self, r: Record) -> Result<()> {
113 if r.value.len() >= self.config.max_value_bytes {
114 return Err(Error::ValueTooLarge);
115 }
116
117 let num_records = self.records.len();
118
119 match self.records.entry(r.key.clone()) {
120 hash_map::Entry::Occupied(mut e) => {
121 e.insert(r);
122 }
123 hash_map::Entry::Vacant(e) => {
124 if num_records >= self.config.max_records {
125 return Err(Error::MaxRecords);
126 }
127 e.insert(r);
128 }
129 }
130
131 Ok(())
132 }
133
134 fn remove(&mut self, k: &Key) {
135 self.records.remove(k);
136 }
137
138 fn records(&self) -> Self::RecordsIter<'_> {
139 self.records.values().map(Cow::Borrowed)
140 }
141
142 fn add_provider(&mut self, record: ProviderRecord) -> Result<()> {
143 let num_keys = self.providers.len();
144
145 let providers = match self.providers.entry(record.key.clone()) {
147 e @ hash_map::Entry::Occupied(_) => e,
148 e @ hash_map::Entry::Vacant(_) => {
149 if self.config.max_provided_keys == num_keys {
150 return Err(Error::MaxProvidedKeys);
151 }
152 e
153 }
154 }
155 .or_insert_with(Default::default);
156
157 if let Some(i) = providers.iter().position(|p| p.provider == record.provider) {
158 providers.as_mut()[i] = record;
160 } else {
161 let local_key = self.local_key.clone();
163 let key = kbucket::Key::new(record.key.clone());
164 let provider = kbucket::Key::from(record.provider);
165 if let Some(i) = providers.iter().position(|p| {
166 let pk = kbucket::Key::from(p.provider);
167 provider.distance(&key) < pk.distance(&key)
168 }) {
169 if local_key.preimage() == &record.provider {
171 self.provided.insert(record.clone());
172 }
173 providers.insert(i, record);
174 if providers.len() > self.config.max_providers_per_key {
176 if let Some(p) = providers.pop() {
177 self.provided.remove(&p);
178 }
179 }
180 } else if providers.len() < self.config.max_providers_per_key {
181 if local_key.preimage() == &record.provider {
184 self.provided.insert(record.clone());
185 }
186 providers.push(record);
187 }
188 }
189 Ok(())
190 }
191
192 fn providers(&self, key: &Key) -> Vec<ProviderRecord> {
193 self.providers
194 .get(key)
195 .map_or_else(Vec::new, |ps| ps.clone().into_vec())
196 }
197
198 fn provided(&self) -> Self::ProvidedIter<'_> {
199 self.provided.iter().map(Cow::Borrowed)
200 }
201
202 fn remove_provider(&mut self, key: &Key, provider: &PeerId) {
203 if let hash_map::Entry::Occupied(mut e) = self.providers.entry(key.clone()) {
204 let providers = e.get_mut();
205 if let Some(i) = providers.iter().position(|p| &p.provider == provider) {
206 let p = providers.remove(i);
207 self.provided.remove(&p);
208 }
209 if providers.is_empty() {
210 e.remove();
211 }
212 }
213 }
214}
215
216#[cfg(test)]
217mod tests {
218 use super::*;
219 use crate::SHA_256_MH;
220 use libp2p_core::multihash::Multihash;
221 use quickcheck::*;
222 use rand::Rng;
223
224 fn random_multihash() -> Multihash<64> {
225 Multihash::wrap(SHA_256_MH, &rand::thread_rng().gen::<[u8; 32]>()).unwrap()
226 }
227
228 fn distance(r: &ProviderRecord) -> kbucket::Distance {
229 kbucket::Key::new(r.key.clone()).distance(&kbucket::Key::from(r.provider))
230 }
231
232 #[test]
233 fn put_get_remove_record() {
234 fn prop(r: Record) {
235 let mut store = MemoryStore::new(PeerId::random());
236 assert!(store.put(r.clone()).is_ok());
237 assert_eq!(Some(Cow::Borrowed(&r)), store.get(&r.key));
238 store.remove(&r.key);
239 assert!(store.get(&r.key).is_none());
240 }
241 quickcheck(prop as fn(_))
242 }
243
244 #[test]
245 fn add_get_remove_provider() {
246 fn prop(r: ProviderRecord) {
247 let mut store = MemoryStore::new(PeerId::random());
248 assert!(store.add_provider(r.clone()).is_ok());
249 assert!(store.providers(&r.key).contains(&r));
250 store.remove_provider(&r.key, &r.provider);
251 assert!(!store.providers(&r.key).contains(&r));
252 }
253 quickcheck(prop as fn(_))
254 }
255
256 #[test]
257 fn providers_ordered_by_distance_to_key() {
258 fn prop(providers: Vec<kbucket::Key<PeerId>>) -> bool {
259 let mut store = MemoryStore::new(PeerId::random());
260 let key = Key::from(random_multihash());
261
262 let mut records = providers
263 .into_iter()
264 .map(|p| ProviderRecord::new(key.clone(), p.into_preimage(), Vec::new()))
265 .collect::<Vec<_>>();
266
267 for r in &records {
268 assert!(store.add_provider(r.clone()).is_ok());
269 }
270
271 records.sort_by_key(distance);
272 records.truncate(store.config.max_providers_per_key);
273
274 records == store.providers(&key).to_vec()
275 }
276
277 quickcheck(prop as fn(_) -> _)
278 }
279
280 #[test]
281 fn provided() {
282 let id = PeerId::random();
283 let mut store = MemoryStore::new(id);
284 let key = random_multihash();
285 let rec = ProviderRecord::new(key, id, Vec::new());
286 assert!(store.add_provider(rec.clone()).is_ok());
287 assert_eq!(
288 vec![Cow::Borrowed(&rec)],
289 store.provided().collect::<Vec<_>>()
290 );
291 store.remove_provider(&rec.key, &id);
292 assert_eq!(store.provided().count(), 0);
293 }
294
295 #[test]
296 fn update_provider() {
297 let mut store = MemoryStore::new(PeerId::random());
298 let key = random_multihash();
299 let prv = PeerId::random();
300 let mut rec = ProviderRecord::new(key, prv, Vec::new());
301 assert!(store.add_provider(rec.clone()).is_ok());
302 assert_eq!(vec![rec.clone()], store.providers(&rec.key).to_vec());
303 rec.expires = Some(Instant::now());
304 assert!(store.add_provider(rec.clone()).is_ok());
305 assert_eq!(vec![rec.clone()], store.providers(&rec.key).to_vec());
306 }
307
308 #[test]
309 fn max_provided_keys() {
310 let mut store = MemoryStore::new(PeerId::random());
311 for _ in 0..store.config.max_provided_keys {
312 let key = random_multihash();
313 let prv = PeerId::random();
314 let rec = ProviderRecord::new(key, prv, Vec::new());
315 let _ = store.add_provider(rec);
316 }
317 let key = random_multihash();
318 let prv = PeerId::random();
319 let rec = ProviderRecord::new(key, prv, Vec::new());
320 match store.add_provider(rec) {
321 Err(Error::MaxProvidedKeys) => {}
322 _ => panic!("Unexpected result"),
323 }
324 }
325}