1use crate::error::Error;
20use log::{info, warn};
21use sc_network::{multiaddr::Protocol, Multiaddr};
22use sc_network_types::PeerId;
23use serde::{Deserialize, Serialize};
24use sp_authority_discovery::AuthorityId;
25use sp_runtime::DeserializeOwned;
26use std::{
27 collections::{hash_map::Entry, HashMap, HashSet},
28 fs::File,
29 io::{self, BufReader, Write},
30 path::Path,
31};
32
33#[derive(Default, Clone, PartialEq, Debug)]
36pub(crate) struct AddrCache {
37 authority_id_to_addresses: HashMap<AuthorityId, HashSet<Multiaddr>>,
45 peer_id_to_authority_ids: HashMap<PeerId, HashSet<AuthorityId>>,
46}
47
48impl Serialize for AddrCache {
49 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
50 where
51 S: serde::Serializer,
52 {
53 SerializeAddrCache::from(self.clone()).serialize(serializer)
54 }
55}
56
57impl<'de> Deserialize<'de> for AddrCache {
58 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
59 where
60 D: serde::Deserializer<'de>,
61 {
62 SerializeAddrCache::deserialize(deserializer).map(Into::into)
63 }
64}
65
66#[derive(Serialize, Deserialize)]
80struct SerializeAddrCache {
81 authority_id_to_addresses: HashMap<AuthorityId, HashSet<Multiaddr>>,
82}
83
84impl From<SerializeAddrCache> for AddrCache {
85 fn from(value: SerializeAddrCache) -> Self {
86 let mut peer_id_to_authority_ids: HashMap<PeerId, HashSet<AuthorityId>> = HashMap::new();
87
88 for (authority_id, addresses) in &value.authority_id_to_addresses {
89 for peer_id in addresses_to_peer_ids(addresses) {
90 peer_id_to_authority_ids
91 .entry(peer_id)
92 .or_insert_with(HashSet::new)
93 .insert(authority_id.clone());
94 }
95 }
96
97 AddrCache {
98 authority_id_to_addresses: value.authority_id_to_addresses,
99 peer_id_to_authority_ids,
100 }
101 }
102}
103impl From<AddrCache> for SerializeAddrCache {
104 fn from(value: AddrCache) -> Self {
105 Self { authority_id_to_addresses: value.authority_id_to_addresses }
106 }
107}
108
109fn write_to_file(path: impl AsRef<Path>, contents: &str) -> io::Result<()> {
110 let path = path.as_ref();
111 let mut file = File::create(path)?;
112 file.write_all(contents.as_bytes())?;
113 file.flush()?;
114 Ok(())
115}
116
117impl TryFrom<&Path> for AddrCache {
118 type Error = Error;
119
120 fn try_from(path: &Path) -> Result<Self, Self::Error> {
121 load_from_file::<AddrCache>(&path).map_err(|e| {
123 Error::EncodingDecodingAddrCache(format!(
124 "Failed to load AddrCache from file: {}, error: {:?}",
125 path.display(),
126 e
127 ))
128 })
129 }
130}
131impl AddrCache {
132 pub fn new() -> Self {
133 AddrCache::default()
134 }
135
136 fn serialize(&self) -> Option<String> {
137 serde_json::to_string_pretty(self).inspect_err(|e| {
138 warn!(target: super::LOG_TARGET, "Failed to serialize AddrCache to JSON: {} => skip persisting it.", e);
139 }).ok()
140 }
141
142 fn persist(path: impl AsRef<Path>, serialized_cache: String) {
143 match write_to_file(path.as_ref(), &serialized_cache) {
144 Err(err) => {
145 warn!(target: super::LOG_TARGET, "Failed to persist AddrCache on disk at path: {}, error: {}", path.as_ref().display(), err);
146 },
147 Ok(_) => {
148 info!(target: super::LOG_TARGET, "Successfully persisted AddrCache on disk");
149 },
150 }
151 }
152
153 pub fn serialize_and_persist(&self, path: impl AsRef<Path>) {
154 let Some(serialized) = self.serialize() else { return };
155 Self::persist(path, serialized);
156 }
157
158 pub fn insert(&mut self, authority_id: AuthorityId, addresses: Vec<Multiaddr>) {
161 let addresses = addresses.into_iter().collect::<HashSet<_>>();
162 let peer_ids = addresses_to_peer_ids(&addresses);
163
164 if peer_ids.is_empty() {
165 log::debug!(
166 target: super::LOG_TARGET,
167 "Authority({:?}) provides no addresses or addresses without peer ids. Adresses: {:?}",
168 authority_id,
169 addresses,
170 );
171 return
172 } else if peer_ids.len() > 1 {
173 log::warn!(
174 target: super::LOG_TARGET,
175 "Authority({:?}) can be reached through multiple peer ids: {:?}",
176 authority_id,
177 peer_ids
178 );
179 }
180
181 log::debug!(
182 target: super::LOG_TARGET,
183 "Found addresses for authority {authority_id:?}: {addresses:?}",
184 );
185
186 let old_addresses = self.authority_id_to_addresses.insert(authority_id.clone(), addresses);
187 let old_peer_ids = addresses_to_peer_ids(&old_addresses.unwrap_or_default());
188
189 peer_ids.difference(&old_peer_ids).for_each(|new_peer_id| {
191 self.peer_id_to_authority_ids
192 .entry(*new_peer_id)
193 .or_default()
194 .insert(authority_id.clone());
195 });
196
197 self.remove_authority_id_from_peer_ids(&authority_id, old_peer_ids.difference(&peer_ids));
199 }
200
201 fn remove_authority_id_from_peer_ids<'a>(
205 &mut self,
206 authority_id: &AuthorityId,
207 peer_ids: impl Iterator<Item = &'a PeerId>,
208 ) {
209 peer_ids.for_each(|peer_id| {
210 if let Entry::Occupied(mut e) = self.peer_id_to_authority_ids.entry(*peer_id) {
211 e.get_mut().remove(authority_id);
212
213 if e.get().is_empty() {
215 e.remove();
216 }
217 }
218 })
219 }
220
221 pub fn num_authority_ids(&self) -> usize {
223 self.authority_id_to_addresses.len()
224 }
225
226 pub fn get_addresses_by_authority_id(
228 &self,
229 authority_id: &AuthorityId,
230 ) -> Option<&HashSet<Multiaddr>> {
231 self.authority_id_to_addresses.get(authority_id)
232 }
233
234 pub fn get_authority_ids_by_peer_id(&self, peer_id: &PeerId) -> Option<&HashSet<AuthorityId>> {
239 self.peer_id_to_authority_ids.get(peer_id)
240 }
241
242 pub fn retain_ids(&mut self, authority_ids: &[AuthorityId]) {
245 let authority_ids_to_remove = self
247 .authority_id_to_addresses
248 .iter()
249 .filter(|(id, _addresses)| !authority_ids.contains(id))
250 .map(|entry| entry.0)
251 .cloned()
252 .collect::<Vec<AuthorityId>>();
253
254 for authority_id_to_remove in authority_ids_to_remove {
255 let addresses = if let Some(addresses) =
257 self.authority_id_to_addresses.remove(&authority_id_to_remove)
258 {
259 addresses
260 } else {
261 continue
262 };
263
264 self.remove_authority_id_from_peer_ids(
265 &authority_id_to_remove,
266 addresses_to_peer_ids(&addresses).iter(),
267 );
268 }
269 }
270}
271
272fn peer_id_from_multiaddr(addr: &Multiaddr) -> Option<PeerId> {
273 addr.iter().last().and_then(|protocol| {
274 if let Protocol::P2p(multihash) = protocol {
275 PeerId::from_multihash(multihash).ok()
276 } else {
277 None
278 }
279 })
280}
281
282fn addresses_to_peer_ids(addresses: &HashSet<Multiaddr>) -> HashSet<PeerId> {
283 addresses.iter().filter_map(peer_id_from_multiaddr).collect::<HashSet<_>>()
284}
285
286fn load_from_file<T: DeserializeOwned>(path: impl AsRef<Path>) -> io::Result<T> {
287 let file = File::open(path)?;
288 let reader = BufReader::new(file);
289
290 serde_json::from_reader(reader).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
291}
292
293#[cfg(test)]
294mod tests {
295
296 use std::{
297 thread::sleep,
298 time::{Duration, Instant},
299 };
300
301 use super::*;
302
303 use quickcheck::{Arbitrary, Gen, QuickCheck, TestResult};
304 use sc_network_types::multihash::{Code, Multihash};
305
306 use sp_authority_discovery::{AuthorityId, AuthorityPair};
307 use sp_core::crypto::Pair;
308
309 #[derive(Clone, Debug)]
310 struct TestAuthorityId(AuthorityId);
311
312 impl Arbitrary for TestAuthorityId {
313 fn arbitrary(g: &mut Gen) -> Self {
314 let seed = (0..32).map(|_| u8::arbitrary(g)).collect::<Vec<_>>();
315 TestAuthorityId(AuthorityPair::from_seed_slice(&seed).unwrap().public())
316 }
317 }
318
319 #[derive(Clone, Debug)]
320 struct TestMultiaddr(Multiaddr);
321
322 impl Arbitrary for TestMultiaddr {
323 fn arbitrary(g: &mut Gen) -> Self {
324 let seed = (0..32).map(|_| u8::arbitrary(g)).collect::<Vec<_>>();
325 let peer_id =
326 PeerId::from_multihash(Multihash::wrap(Code::Sha2_256.into(), &seed).unwrap())
327 .unwrap();
328 let multiaddr = "/ip6/2001:db8:0:0:0:0:0:2/tcp/30333"
329 .parse::<Multiaddr>()
330 .unwrap()
331 .with(Protocol::P2p(peer_id.into()));
332
333 TestMultiaddr(multiaddr)
334 }
335 }
336
337 #[derive(Clone, Debug)]
338 struct TestMultiaddrsSamePeerCombo(Multiaddr, Multiaddr);
339
340 impl Arbitrary for TestMultiaddrsSamePeerCombo {
341 fn arbitrary(g: &mut Gen) -> Self {
342 let seed = (0..32).map(|_| u8::arbitrary(g)).collect::<Vec<_>>();
343 let peer_id =
344 PeerId::from_multihash(Multihash::wrap(Code::Sha2_256.into(), &seed).unwrap())
345 .unwrap();
346 let multiaddr1 = "/ip6/2001:db8:0:0:0:0:0:2/tcp/30333"
347 .parse::<Multiaddr>()
348 .unwrap()
349 .with(Protocol::P2p(peer_id.into()));
350 let multiaddr2 = "/ip6/2002:db8:0:0:0:0:0:2/tcp/30133"
351 .parse::<Multiaddr>()
352 .unwrap()
353 .with(Protocol::P2p(peer_id.into()));
354 TestMultiaddrsSamePeerCombo(multiaddr1, multiaddr2)
355 }
356 }
357
358 #[test]
359 fn retains_only_entries_of_provided_authority_ids() {
360 fn property(
361 first: (TestAuthorityId, TestMultiaddr),
362 second: (TestAuthorityId, TestMultiaddr),
363 third: (TestAuthorityId, TestMultiaddr),
364 ) -> TestResult {
365 let first: (AuthorityId, Multiaddr) = ((first.0).0, (first.1).0);
366 let second: (AuthorityId, Multiaddr) = ((second.0).0, (second.1).0);
367 let third: (AuthorityId, Multiaddr) = ((third.0).0, (third.1).0);
368
369 let mut cache = AddrCache::new();
370
371 cache.insert(first.0.clone(), vec![first.1.clone()]);
372 cache.insert(second.0.clone(), vec![second.1.clone()]);
373 cache.insert(third.0.clone(), vec![third.1.clone()]);
374
375 assert_eq!(
376 Some(&HashSet::from([third.1.clone()])),
377 cache.get_addresses_by_authority_id(&third.0),
378 "Expect `get_addresses_by_authority_id` to return addresses of third authority.",
379 );
380 assert_eq!(
381 Some(&HashSet::from([third.0.clone()])),
382 cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&third.1).unwrap()),
383 "Expect `get_authority_id_by_peer_id` to return `AuthorityId` of third authority.",
384 );
385
386 cache.retain_ids(&vec![first.0.clone(), second.0]);
387
388 assert_eq!(
389 None,
390 cache.get_addresses_by_authority_id(&third.0),
391 "Expect `get_addresses_by_authority_id` to not return `None` for third authority.",
392 );
393 assert_eq!(
394 None,
395 cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&third.1).unwrap()),
396 "Expect `get_authority_id_by_peer_id` to return `None` for third authority.",
397 );
398
399 TestResult::passed()
400 }
401
402 QuickCheck::new()
403 .max_tests(10)
404 .quickcheck(property as fn(_, _, _) -> TestResult)
405 }
406
407 #[test]
408 fn test_from_to_serializable() {
409 let serializable = SerializeAddrCache::from(AddrCache::sample());
410 let roundtripped = AddrCache::from(serializable);
411 assert_eq!(roundtripped, AddrCache::sample())
412 }
413 #[test]
414 fn keeps_consistency_between_authority_id_and_peer_id() {
415 fn property(
416 authority1: TestAuthorityId,
417 authority2: TestAuthorityId,
418 multiaddr1: TestMultiaddr,
419 multiaddr2: TestMultiaddr,
420 multiaddr3: TestMultiaddrsSamePeerCombo,
421 ) -> TestResult {
422 let authority1 = authority1.0;
423 let authority2 = authority2.0;
424 let multiaddr1 = multiaddr1.0;
425 let multiaddr2 = multiaddr2.0;
426 let TestMultiaddrsSamePeerCombo(multiaddr3, multiaddr4) = multiaddr3;
427
428 let mut cache = AddrCache::new();
429
430 cache.insert(authority1.clone(), vec![multiaddr1.clone()]);
431 cache.insert(
432 authority1.clone(),
433 vec![multiaddr2.clone(), multiaddr3.clone(), multiaddr4.clone()],
434 );
435
436 assert_eq!(
437 None,
438 cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&multiaddr1).unwrap())
439 );
440 assert_eq!(
441 Some(&HashSet::from([authority1.clone()])),
442 cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&multiaddr2).unwrap())
443 );
444 assert_eq!(
445 Some(&HashSet::from([authority1.clone()])),
446 cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&multiaddr3).unwrap())
447 );
448 assert_eq!(
449 Some(&HashSet::from([authority1.clone()])),
450 cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&multiaddr4).unwrap())
451 );
452
453 cache.insert(authority2.clone(), vec![multiaddr2.clone()]);
454
455 assert_eq!(
456 Some(&HashSet::from([authority2.clone(), authority1.clone()])),
457 cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&multiaddr2).unwrap())
458 );
459 assert_eq!(
460 Some(&HashSet::from([authority1.clone()])),
461 cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&multiaddr3).unwrap())
462 );
463 assert_eq!(cache.get_addresses_by_authority_id(&authority1).unwrap().len(), 3);
464
465 cache.insert(authority2.clone(), vec![multiaddr2.clone(), multiaddr3.clone()]);
466
467 assert_eq!(
468 Some(&HashSet::from([authority2.clone(), authority1.clone()])),
469 cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&multiaddr2).unwrap())
470 );
471 assert_eq!(
472 Some(&HashSet::from([authority2.clone(), authority1.clone()])),
473 cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&multiaddr3).unwrap())
474 );
475 assert_eq!(
476 &HashSet::from([multiaddr2.clone(), multiaddr3.clone(), multiaddr4.clone()]),
477 cache.get_addresses_by_authority_id(&authority1).unwrap(),
478 );
479
480 TestResult::passed()
481 }
482
483 QuickCheck::new()
484 .max_tests(10)
485 .quickcheck(property as fn(_, _, _, _, _) -> TestResult)
486 }
487
488 #[test]
492 fn adding_two_authority_ids_for_the_same_peer_id() {
493 let mut addr_cache = AddrCache::new();
494
495 let peer_id = PeerId::random();
496 let addr = Multiaddr::empty().with(Protocol::P2p(peer_id.into()));
497
498 let authority_id0 = AuthorityPair::generate().0.public();
499 let authority_id1 = AuthorityPair::generate().0.public();
500
501 addr_cache.insert(authority_id0.clone(), vec![addr.clone()]);
502 addr_cache.insert(authority_id1.clone(), vec![addr.clone()]);
503
504 assert_eq!(2, addr_cache.num_authority_ids());
505 assert_eq!(
506 &HashSet::from([addr.clone()]),
507 addr_cache.get_addresses_by_authority_id(&authority_id0).unwrap()
508 );
509 assert_eq!(
510 &HashSet::from([addr]),
511 addr_cache.get_addresses_by_authority_id(&authority_id1).unwrap()
512 );
513 }
514
515 impl AddrCache {
516 pub fn sample() -> Self {
517 let mut addr_cache = AddrCache::new();
518
519 let peer_id = PeerId::from_multihash(
520 Multihash::wrap(Code::Sha2_256.into(), &[0xab; 32]).unwrap(),
521 )
522 .unwrap();
523 let addr = Multiaddr::empty().with(Protocol::P2p(peer_id.into()));
524 let authority_id0 = AuthorityPair::from_seed(&[0xaa; 32]).public();
525 let authority_id1 = AuthorityPair::from_seed(&[0xbb; 32]).public();
526
527 addr_cache.insert(authority_id0.clone(), vec![addr.clone()]);
528 addr_cache.insert(authority_id1.clone(), vec![addr.clone()]);
529 addr_cache
530 }
531 }
532
533 #[test]
534 fn serde_json() {
535 let sample = || AddrCache::sample();
536 let serializable = AddrCache::from(sample());
537 let json = serde_json::to_string(&serializable).expect("Serialization should not fail");
538 let deserialized = serde_json::from_str::<AddrCache>(&json).unwrap();
539 let from_serializable = AddrCache::try_from(deserialized).unwrap();
540 assert_eq!(sample(), from_serializable);
541 }
542
543 #[test]
544 fn deserialize_from_json() {
545 let json = r#"
546 {
547 "authority_id_to_addresses": {
548 "5FjfMGrqw9ck5XZaPVTKm2RE5cbwoVUfXvSGZY7KCUEFtdr7": [
549 "/p2p/QmZtnFaddFtzGNT8BxdHVbQrhSFdq1pWxud5z4fA4kxfDt"
550 ],
551 "5DiQDBQvjFkmUF3C8a7ape5rpRPoajmMj44Q9CTGPfVBaa6U": [
552 "/p2p/QmZtnFaddFtzGNT8BxdHVbQrhSFdq1pWxud5z4fA4kxfDt"
553 ]
554 }
555 }
556 "#;
557 let deserialized = serde_json::from_str::<AddrCache>(json).unwrap();
558 assert_eq!(deserialized, AddrCache::sample())
559 }
560
561 fn serialize_and_write_to_file<T: Serialize>(
562 path: impl AsRef<Path>,
563 contents: &T,
564 ) -> io::Result<()> {
565 let serialized = serde_json::to_string_pretty(contents).unwrap();
566 write_to_file(path, &serialized)
567 }
568
569 #[test]
570 fn test_load_cache_from_disc() {
571 let dir = tempfile::tempdir().unwrap();
572 let path = dir.path().join("cache.json");
573 let sample = AddrCache::sample();
574 assert_eq!(sample.num_authority_ids(), 2);
575 serialize_and_write_to_file(&path, &sample).unwrap();
576 sleep(Duration::from_millis(10)); let cache = AddrCache::try_from(path.as_path()).unwrap();
578 assert_eq!(cache.num_authority_ids(), 2);
579 }
580
581 fn create_cache(authority_id_count: u64, multiaddr_per_authority_count: u64) -> AddrCache {
582 let mut addr_cache = AddrCache::new();
583
584 for i in 0..authority_id_count {
585 let seed = &mut [0xab as u8; 32];
586 let i_bytes = i.to_le_bytes();
587 seed[0..8].copy_from_slice(&i_bytes);
588
589 let authority_id = AuthorityPair::from_seed(seed).public();
590 let multi_addresses = (0..multiaddr_per_authority_count)
591 .map(|j| {
592 let mut digest = [0xab; 32];
593 let j_bytes = j.to_le_bytes();
594 digest[0..8].copy_from_slice(&j_bytes);
595 let peer_id = PeerId::from_multihash(
596 Multihash::wrap(Code::Sha2_256.into(), &digest).unwrap(),
597 )
598 .unwrap();
599 Multiaddr::empty().with(Protocol::P2p(peer_id.into()))
600 })
601 .collect::<Vec<_>>();
602
603 assert_eq!(multi_addresses.len(), multiaddr_per_authority_count as usize);
604 addr_cache.insert(authority_id.clone(), multi_addresses);
605 }
606 assert_eq!(addr_cache.authority_id_to_addresses.len(), authority_id_count as usize);
607
608 addr_cache
609 }
610
611 #[test]
613 #[ignore]
614 fn addr_cache_measure_serde_performance() {
615 let addr_cache = create_cache(1000, 5);
616
617 #[derive(Default, Clone, PartialEq, Debug, Serialize, Deserialize)]
620 pub(crate) struct NaiveSerdeAddrCache {
621 authority_id_to_addresses: HashMap<AuthorityId, HashSet<Multiaddr>>,
622 peer_id_to_authority_ids: HashMap<PeerId, HashSet<AuthorityId>>,
623 }
624 impl From<AddrCache> for NaiveSerdeAddrCache {
625 fn from(value: AddrCache) -> Self {
626 Self {
627 authority_id_to_addresses: value.authority_id_to_addresses,
628 peer_id_to_authority_ids: value.peer_id_to_authority_ids,
629 }
630 }
631 }
632
633 let naive = NaiveSerdeAddrCache::from(addr_cache.clone());
634 let storage_optimized = addr_cache.clone();
635
636 fn measure_clone<T: Clone>(data: &T) -> Duration {
637 let start = Instant::now();
638 let _ = data.clone();
639 start.elapsed()
640 }
641 fn measure_serialize<T: Serialize>(data: &T) -> (Duration, String) {
642 let start = Instant::now();
643 let json = serde_json::to_string_pretty(data).unwrap();
644 (start.elapsed(), json)
645 }
646 fn measure_deserialize<T: DeserializeOwned>(json: String) -> (Duration, T) {
647 let start = Instant::now();
648 let value = serde_json::from_str(&json).unwrap();
649 (start.elapsed(), value)
650 }
651
652 let serialize_naive = measure_serialize(&naive);
653 let serialize_storage_optimized = measure_serialize(&storage_optimized);
654 println!("CLONE: Naive took: {} ms", measure_clone(&naive).as_millis());
655 println!(
656 "CLONE: Storage optimized took: {} ms",
657 measure_clone(&storage_optimized).as_millis()
658 );
659 println!("SERIALIZE: Naive took: {} ms", serialize_naive.0.as_millis());
660 println!(
661 "SERIALIZE: Storage optimized took: {} ms",
662 serialize_storage_optimized.0.as_millis()
663 );
664 let deserialize_naive = measure_deserialize::<NaiveSerdeAddrCache>(serialize_naive.1);
665 let deserialize_storage_optimized =
666 measure_deserialize::<AddrCache>(serialize_storage_optimized.1);
667 println!("DESERIALIZE: Naive took: {} ms", deserialize_naive.0.as_millis());
668 println!(
669 "DESERIALIZE: Storage optimized took: {} ms",
670 deserialize_storage_optimized.0.as_millis()
671 );
672 assert_eq!(deserialize_naive.1, naive);
673 assert_eq!(deserialize_storage_optimized.1, storage_optimized);
674 }
675}