1use crate::validator_side_experimental::{
18 common::Score,
19 peer_manager::{backend::Backend, ReputationUpdate, ReputationUpdateKind},
20};
21use async_trait::async_trait;
22use polkadot_node_network_protocol::PeerId;
23use polkadot_primitives::{BlockNumber, Hash, Id as ParaId};
24use std::{
25 collections::{btree_map, hash_map, BTreeMap, BTreeSet, HashMap},
26 time::{SystemTime, UNIX_EPOCH},
27};
28
29pub struct Db {
32 db: BTreeMap<ParaId, HashMap<PeerId, ScoreEntry>>,
33 last_finalized: Option<BlockNumber>,
34 stored_limit_per_para: u8,
35}
36
37impl Db {
38 pub async fn new(stored_limit_per_para: u8) -> Self {
42 Self { db: BTreeMap::new(), last_finalized: None, stored_limit_per_para }
43 }
44}
45
46type Timestamp = u128;
47
48#[derive(Clone, Debug)]
49struct ScoreEntry {
50 score: Score,
51 last_bumped: Timestamp,
52}
53
54#[async_trait]
55impl Backend for Db {
56 async fn processed_finalized_block_number(&self) -> Option<BlockNumber> {
57 self.last_finalized
58 }
59
60 async fn query(&self, peer_id: &PeerId, para_id: &ParaId) -> Option<Score> {
61 self.db.get(para_id).and_then(|per_para| per_para.get(peer_id).map(|e| e.score))
62 }
63
64 async fn slash(&mut self, peer_id: &PeerId, para_id: &ParaId, value: Score) {
65 if let btree_map::Entry::Occupied(mut per_para_entry) = self.db.entry(*para_id) {
66 if let hash_map::Entry::Occupied(mut e) = per_para_entry.get_mut().entry(*peer_id) {
67 let score = e.get_mut().score;
68 if score <= value {
70 e.remove();
71 } else {
72 e.get_mut().score.saturating_sub(value.into());
73 }
74 }
75
76 if per_para_entry.get().is_empty() {
78 per_para_entry.remove();
79 }
80 }
81 }
82
83 async fn prune_paras(&mut self, registered_paras: BTreeSet<ParaId>) {
84 self.db.retain(|para, _| registered_paras.contains(¶));
85 }
86
87 async fn process_bumps(
88 &mut self,
89 leaf_number: BlockNumber,
90 bumps: BTreeMap<ParaId, HashMap<PeerId, Score>>,
91 decay_value: Option<Score>,
92 ) -> Vec<ReputationUpdate> {
93 if self.last_finalized.unwrap_or(0) >= leaf_number {
94 return vec![]
95 }
96
97 self.last_finalized = Some(leaf_number);
98 self.bump_reputations(bumps, decay_value)
99 }
100}
101
102impl Db {
103 fn bump_reputations(
104 &mut self,
105 bumps: BTreeMap<ParaId, HashMap<PeerId, Score>>,
106 maybe_decay_value: Option<Score>,
107 ) -> Vec<ReputationUpdate> {
108 let mut reported_updates = vec![];
109 let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis();
110
111 for (para, bumps_per_para) in bumps {
112 reported_updates.reserve(bumps_per_para.len());
113
114 for (peer_id, bump) in bumps_per_para.iter() {
115 if u16::from(*bump) == 0 {
116 continue
117 }
118
119 self.db
120 .entry(para)
121 .or_default()
122 .entry(*peer_id)
123 .and_modify(|e| {
124 e.score.saturating_add(u16::from(*bump));
125 e.last_bumped = now;
126 })
127 .or_insert(ScoreEntry { score: *bump, last_bumped: now });
128
129 reported_updates.push(ReputationUpdate {
130 peer_id: *peer_id,
131 para_id: para,
132 value: *bump,
133 kind: ReputationUpdateKind::Bump,
134 });
135 }
136
137 if let btree_map::Entry::Occupied(mut per_para_entry) = self.db.entry(para) {
138 if let Some(decay_value) = maybe_decay_value {
139 let peers_to_slash = per_para_entry
140 .get()
141 .keys()
142 .filter(|peer_id| !bumps_per_para.contains_key(peer_id))
143 .copied()
144 .collect::<Vec<PeerId>>();
145
146 for peer_id in peers_to_slash {
147 if let hash_map::Entry::Occupied(mut e) =
148 per_para_entry.get_mut().entry(peer_id)
149 {
150 if e.get_mut().score <= decay_value {
152 let score = e.remove().score;
153 reported_updates.push(ReputationUpdate {
154 peer_id,
155 para_id: para,
156 value: score,
157 kind: ReputationUpdateKind::Slash,
158 });
159 } else {
160 e.get_mut().score.saturating_sub(decay_value.into());
161 reported_updates.push(ReputationUpdate {
162 peer_id,
163 para_id: para,
164 value: decay_value,
165 kind: ReputationUpdateKind::Slash,
166 });
167 }
168 }
169 }
170 }
171
172 let per_para_limit = self.stored_limit_per_para as usize;
173 if per_para_entry.get().is_empty() {
174 per_para_entry.remove();
176 } else if per_para_entry.get().len() > per_para_limit {
177 let diff = per_para_entry.get().len() - per_para_limit;
180 Self::prune_for_para(¶, &mut per_para_entry, diff, &mut reported_updates);
181 }
182 }
183 }
184
185 reported_updates
186 }
187
188 fn prune_for_para(
189 para_id: &ParaId,
190 per_para: &mut btree_map::OccupiedEntry<ParaId, HashMap<PeerId, ScoreEntry>>,
191 diff: usize,
192 reported_updates: &mut Vec<ReputationUpdate>,
193 ) {
194 for _ in 0..diff {
195 let (peer_id_to_remove, score) = per_para
196 .get()
197 .iter()
198 .min_by_key(|(_peer, entry)| entry.last_bumped)
199 .map(|(peer, entry)| (*peer, entry.score))
200 .expect("We know there are enough reps over the limit");
201
202 per_para.get_mut().remove(&peer_id_to_remove);
203
204 reported_updates.push(ReputationUpdate {
205 peer_id: peer_id_to_remove,
206 para_id: *para_id,
207 value: score,
208 kind: ReputationUpdateKind::Slash,
209 });
210 }
211 }
212
213 #[cfg(test)]
214 fn len(&self) -> usize {
215 self.db.len()
216 }
217}
218
219#[cfg(test)]
220mod tests {
221 use std::time::Duration;
222
223 use super::*;
224
225 #[tokio::test]
226 async fn test_reputation_updates() {
228 let mut db = Db::new(10).await;
229 assert_eq!(db.processed_finalized_block_number().await, None);
230 assert_eq!(db.len(), 0);
231
232 assert!(db.process_bumps(10, Default::default(), None).await.is_empty());
234 assert_eq!(db.processed_finalized_block_number().await, Some(10));
235 assert_eq!(db.len(), 0);
236
237 assert_eq!(db.query(&PeerId::random(), &ParaId::from(1000)).await, None);
239
240 assert!(db
242 .process_bumps(11, Default::default(), Some(Score::new(1).unwrap()))
243 .await
244 .is_empty());
245 assert_eq!(db.processed_finalized_block_number().await, Some(11));
246 assert_eq!(db.len(), 0);
247
248 assert!(db
250 .process_bumps(5, Default::default(), Some(Score::new(1).unwrap()))
251 .await
252 .is_empty());
253 assert_eq!(db.processed_finalized_block_number().await, Some(11));
254 assert_eq!(db.len(), 0);
255
256 assert!(db
258 .process_bumps(
259 12,
260 [(
261 ParaId::from(100),
262 [(PeerId::random(), Score::new(0).unwrap())].into_iter().collect()
263 )]
264 .into_iter()
265 .collect(),
266 Some(Score::new(1).unwrap())
267 )
268 .await
269 .is_empty());
270 assert_eq!(db.processed_finalized_block_number().await, Some(12));
271 assert_eq!(db.len(), 0);
272
273 let first_peer_id = PeerId::random();
275 let first_para_id = ParaId::from(100);
276 assert!(db
277 .process_bumps(
278 12,
279 [(first_para_id, [(first_peer_id, Score::new(10).unwrap())].into_iter().collect())]
280 .into_iter()
281 .collect(),
282 Some(Score::new(1).unwrap())
283 )
284 .await
285 .is_empty());
286 assert_eq!(db.processed_finalized_block_number().await, Some(12));
287 assert_eq!(db.len(), 0);
288 assert_eq!(db.query(&first_peer_id, &first_para_id).await, None);
289
290 assert_eq!(
292 db.process_bumps(
293 13,
294 [(first_para_id, [(first_peer_id, Score::new(10).unwrap())].into_iter().collect())]
295 .into_iter()
296 .collect(),
297 Some(Score::new(1).unwrap())
298 )
299 .await,
300 vec![ReputationUpdate {
301 peer_id: first_peer_id,
302 para_id: first_para_id,
303 kind: ReputationUpdateKind::Bump,
304 value: Score::new(10).unwrap()
305 }]
306 );
307 assert_eq!(db.processed_finalized_block_number().await, Some(13));
308 assert_eq!(db.len(), 1);
309 assert_eq!(
310 db.query(&first_peer_id, &first_para_id).await.unwrap(),
311 Score::new(10).unwrap()
312 );
313 assert_eq!(db.query(&PeerId::random(), &first_para_id).await, None);
315 assert_eq!(db.query(&first_peer_id, &ParaId::from(200)).await, None);
317
318 assert!(db
320 .process_bumps(
321 10,
322 [(first_para_id, [(first_peer_id, Score::new(10).unwrap())].into_iter().collect())]
323 .into_iter()
324 .collect(),
325 Some(Score::new(1).unwrap())
326 )
327 .await
328 .is_empty());
329 assert_eq!(db.processed_finalized_block_number().await, Some(13));
330 assert_eq!(db.len(), 1);
331 assert_eq!(
332 db.query(&first_peer_id, &first_para_id).await.unwrap(),
333 Score::new(10).unwrap()
334 );
335
336 let second_para_id = ParaId::from(200);
337 let second_peer_id = PeerId::random();
338 assert_eq!(
340 db.process_bumps(
341 14,
342 [
343 (
344 first_para_id,
345 [(second_peer_id, Score::new(10).unwrap())].into_iter().collect()
346 ),
347 (
348 second_para_id,
349 [(first_peer_id, Score::new(5).unwrap())].into_iter().collect()
350 )
351 ]
352 .into_iter()
353 .collect(),
354 None
355 )
356 .await,
357 vec![
358 ReputationUpdate {
359 peer_id: second_peer_id,
360 para_id: first_para_id,
361 kind: ReputationUpdateKind::Bump,
362 value: Score::new(10).unwrap()
363 },
364 ReputationUpdate {
365 peer_id: first_peer_id,
366 para_id: second_para_id,
367 kind: ReputationUpdateKind::Bump,
368 value: Score::new(5).unwrap()
369 }
370 ]
371 );
372 assert_eq!(db.len(), 2);
373 assert_eq!(db.processed_finalized_block_number().await, Some(14));
374 assert_eq!(
375 db.query(&first_peer_id, &first_para_id).await.unwrap(),
376 Score::new(10).unwrap()
377 );
378 assert_eq!(
379 db.query(&second_peer_id, &first_para_id).await.unwrap(),
380 Score::new(10).unwrap()
381 );
382 assert_eq!(
383 db.query(&first_peer_id, &second_para_id).await.unwrap(),
384 Score::new(5).unwrap()
385 );
386
387 assert!(db
389 .process_bumps(15, Default::default(), Some(Score::new(1).unwrap()))
390 .await
391 .is_empty());
392 assert_eq!(db.processed_finalized_block_number().await, Some(15));
393 assert_eq!(db.len(), 2);
394 assert_eq!(
395 db.query(&first_peer_id, &first_para_id).await.unwrap(),
396 Score::new(10).unwrap()
397 );
398 assert_eq!(
399 db.query(&second_peer_id, &first_para_id).await.unwrap(),
400 Score::new(10).unwrap()
401 );
402 assert_eq!(
403 db.query(&first_peer_id, &second_para_id).await.unwrap(),
404 Score::new(5).unwrap()
405 );
406
407 assert_eq!(
409 db.process_bumps(
410 16,
411 [
412 (
413 first_para_id,
414 [(first_peer_id, Score::new(10).unwrap())].into_iter().collect()
415 ),
416 (
417 second_para_id,
418 [(second_peer_id, Score::new(10).unwrap())].into_iter().collect()
419 ),
420 ]
421 .into_iter()
422 .collect(),
423 Some(Score::new(1).unwrap())
424 )
425 .await,
426 vec![
427 ReputationUpdate {
428 peer_id: first_peer_id,
429 para_id: first_para_id,
430 kind: ReputationUpdateKind::Bump,
431 value: Score::new(10).unwrap()
432 },
433 ReputationUpdate {
434 peer_id: second_peer_id,
435 para_id: first_para_id,
436 kind: ReputationUpdateKind::Slash,
437 value: Score::new(1).unwrap()
438 },
439 ReputationUpdate {
440 peer_id: second_peer_id,
441 para_id: second_para_id,
442 kind: ReputationUpdateKind::Bump,
443 value: Score::new(10).unwrap()
444 },
445 ReputationUpdate {
446 peer_id: first_peer_id,
447 para_id: second_para_id,
448 kind: ReputationUpdateKind::Slash,
449 value: Score::new(1).unwrap()
450 },
451 ]
452 );
453 assert_eq!(db.processed_finalized_block_number().await, Some(16));
454 assert_eq!(db.len(), 2);
455 assert_eq!(
456 db.query(&first_peer_id, &first_para_id).await.unwrap(),
457 Score::new(20).unwrap()
458 );
459 assert_eq!(
460 db.query(&second_peer_id, &first_para_id).await.unwrap(),
461 Score::new(9).unwrap()
462 );
463 assert_eq!(
464 db.query(&first_peer_id, &second_para_id).await.unwrap(),
465 Score::new(4).unwrap()
466 );
467 assert_eq!(
468 db.query(&second_peer_id, &second_para_id).await.unwrap(),
469 Score::new(10).unwrap()
470 );
471
472 assert_eq!(
474 db.process_bumps(
475 17,
476 [(
477 second_para_id,
478 [(second_peer_id, Score::new(10).unwrap())].into_iter().collect()
479 ),]
480 .into_iter()
481 .collect(),
482 Some(Score::new(5).unwrap())
483 )
484 .await,
485 vec![
486 ReputationUpdate {
487 peer_id: second_peer_id,
488 para_id: second_para_id,
489 kind: ReputationUpdateKind::Bump,
490 value: Score::new(10).unwrap()
491 },
492 ReputationUpdate {
493 peer_id: first_peer_id,
494 para_id: second_para_id,
495 kind: ReputationUpdateKind::Slash,
496 value: Score::new(4).unwrap()
497 }
498 ]
499 );
500 assert_eq!(db.processed_finalized_block_number().await, Some(17));
501 assert_eq!(db.len(), 2);
502 assert_eq!(
503 db.query(&first_peer_id, &first_para_id).await.unwrap(),
504 Score::new(20).unwrap()
505 );
506 assert_eq!(
507 db.query(&second_peer_id, &first_para_id).await.unwrap(),
508 Score::new(9).unwrap()
509 );
510 assert_eq!(db.query(&first_peer_id, &second_para_id).await, None);
511 assert_eq!(
512 db.query(&second_peer_id, &second_para_id).await.unwrap(),
513 Score::new(20).unwrap()
514 );
515
516 let mut db = Db::new(10).await;
519 let peer_ids = (0..10).map(|_| PeerId::random()).collect::<Vec<_>>();
520
521 assert_eq!(
523 db.process_bumps(
524 1,
525 [(
526 first_para_id,
527 peer_ids.iter().map(|peer_id| (*peer_id, Score::new(10).unwrap())).collect()
528 )]
529 .into_iter()
530 .collect(),
531 None,
532 )
533 .await
534 .len(),
535 10
536 );
537 assert_eq!(db.len(), 1);
538
539 for peer_id in peer_ids.iter() {
540 assert_eq!(db.query(peer_id, &first_para_id).await.unwrap(), Score::new(10).unwrap());
541 }
542
543 tokio::time::sleep(Duration::from_millis(100)).await;
547 assert_eq!(
548 db.process_bumps(
549 2,
550 [(
551 first_para_id,
552 peer_ids
553 .iter()
554 .enumerate()
555 .filter_map(
556 |(i, peer_id)| (i != 4).then_some((*peer_id, Score::new(10).unwrap()))
557 )
558 .collect()
559 )]
560 .into_iter()
561 .collect(),
562 Some(Score::new(5).unwrap()),
563 )
564 .await
565 .len(),
566 10
567 );
568
569 for (i, peer_id) in peer_ids.iter().enumerate() {
570 if i == 4 {
571 assert_eq!(
572 db.query(peer_id, &first_para_id).await.unwrap(),
573 Score::new(5).unwrap()
574 );
575 } else {
576 assert_eq!(
577 db.query(peer_id, &first_para_id).await.unwrap(),
578 Score::new(20).unwrap()
579 );
580 }
581 }
582
583 let new_peer = PeerId::random();
585 tokio::time::sleep(Duration::from_millis(100)).await;
586 assert_eq!(
587 db.process_bumps(
588 3,
589 [(first_para_id, [(new_peer, Score::new(10).unwrap())].into_iter().collect())]
590 .into_iter()
591 .collect(),
592 Some(Score::new(5).unwrap()),
593 )
594 .await
595 .len(),
596 11
597 );
598 for (i, peer_id) in peer_ids.iter().enumerate() {
599 if i == 4 {
600 assert_eq!(db.query(peer_id, &first_para_id).await, None);
601 } else {
602 assert_eq!(
603 db.query(peer_id, &first_para_id).await.unwrap(),
604 Score::new(15).unwrap()
605 );
606 }
607 }
608 assert_eq!(db.query(&new_peer, &first_para_id).await.unwrap(), Score::new(10).unwrap());
609
610 let yet_another_peer = PeerId::random();
613 assert_eq!(
614 db.process_bumps(
615 4,
616 [(
617 first_para_id,
618 [(yet_another_peer, Score::new(10).unwrap())].into_iter().collect()
619 )]
620 .into_iter()
621 .collect(),
622 Some(Score::new(10).unwrap()),
623 )
624 .await
625 .len(),
626 11
627 );
628 for (i, peer_id) in peer_ids.iter().enumerate() {
629 if i == 4 {
630 assert_eq!(db.query(peer_id, &first_para_id).await, None);
631 } else {
632 assert_eq!(
633 db.query(peer_id, &first_para_id).await.unwrap(),
634 Score::new(5).unwrap()
635 );
636 }
637 }
638 assert_eq!(db.query(&new_peer, &first_para_id).await, None);
639 assert_eq!(
640 db.query(&yet_another_peer, &first_para_id).await,
641 Some(Score::new(10).unwrap())
642 );
643 }
644
645 #[tokio::test]
646 async fn test_slash() {
648 let mut db = Db::new(10).await;
649
650 let peer_id = PeerId::random();
652 db.slash(&peer_id, &ParaId::from(100), Score::new(50).unwrap()).await;
653 assert_eq!(db.query(&peer_id, &ParaId::from(100)).await, None);
654
655 let another_peer_id = PeerId::random();
657 assert_eq!(
658 db.process_bumps(
659 1,
660 [
661 (ParaId::from(100), [(peer_id, Score::new(10).unwrap())].into_iter().collect()),
662 (
663 ParaId::from(200),
664 [(another_peer_id, Score::new(12).unwrap())].into_iter().collect()
665 ),
666 (ParaId::from(300), [(peer_id, Score::new(15).unwrap())].into_iter().collect())
667 ]
668 .into_iter()
669 .collect(),
670 Some(Score::new(10).unwrap()),
671 )
672 .await
673 .len(),
674 3
675 );
676 assert_eq!(db.query(&peer_id, &ParaId::from(100)).await.unwrap(), Score::new(10).unwrap());
677 assert_eq!(
678 db.query(&another_peer_id, &ParaId::from(200)).await.unwrap(),
679 Score::new(12).unwrap()
680 );
681 assert_eq!(db.query(&peer_id, &ParaId::from(300)).await.unwrap(), Score::new(15).unwrap());
682
683 db.slash(&peer_id, &ParaId::from(200), Score::new(4).unwrap()).await;
684 assert_eq!(db.query(&peer_id, &ParaId::from(100)).await.unwrap(), Score::new(10).unwrap());
685 assert_eq!(
686 db.query(&another_peer_id, &ParaId::from(200)).await.unwrap(),
687 Score::new(12).unwrap()
688 );
689 assert_eq!(db.query(&peer_id, &ParaId::from(300)).await.unwrap(), Score::new(15).unwrap());
690
691 db.slash(&peer_id, &ParaId::from(100), Score::new(4).unwrap()).await;
693 assert_eq!(db.query(&peer_id, &ParaId::from(100)).await.unwrap(), Score::new(6).unwrap());
694
695 db.slash(&peer_id, &ParaId::from(100), Score::new(8).unwrap()).await;
697 assert_eq!(db.query(&peer_id, &ParaId::from(100)).await, None);
698 assert_eq!(db.len(), 2);
699 }
700
701 #[tokio::test]
702 async fn test_prune_paras() {
704 let mut db = Db::new(10).await;
705
706 db.prune_paras(BTreeSet::new()).await;
707 assert_eq!(db.len(), 0);
708
709 db.prune_paras([ParaId::from(100), ParaId::from(200)].into_iter().collect())
710 .await;
711 assert_eq!(db.len(), 0);
712
713 let peer_id = PeerId::random();
714 let another_peer_id = PeerId::random();
715
716 assert_eq!(
717 db.process_bumps(
718 1,
719 [
720 (ParaId::from(100), [(peer_id, Score::new(10).unwrap())].into_iter().collect()),
721 (
722 ParaId::from(200),
723 [(another_peer_id, Score::new(12).unwrap())].into_iter().collect()
724 ),
725 (ParaId::from(300), [(peer_id, Score::new(15).unwrap())].into_iter().collect())
726 ]
727 .into_iter()
728 .collect(),
729 Some(Score::new(10).unwrap()),
730 )
731 .await
732 .len(),
733 3
734 );
735 assert_eq!(db.len(), 3);
736
737 db.prune_paras(
739 [ParaId::from(100), ParaId::from(200), ParaId::from(300), ParaId::from(400)]
740 .into_iter()
741 .collect(),
742 )
743 .await;
744 assert_eq!(db.len(), 3);
745
746 assert_eq!(db.query(&peer_id, &ParaId::from(100)).await.unwrap(), Score::new(10).unwrap());
747 assert_eq!(
748 db.query(&another_peer_id, &ParaId::from(200)).await.unwrap(),
749 Score::new(12).unwrap()
750 );
751 assert_eq!(db.query(&peer_id, &ParaId::from(300)).await.unwrap(), Score::new(15).unwrap());
752
753 db.prune_paras([ParaId::from(300)].into_iter().collect()).await;
755 assert_eq!(db.len(), 1);
756 assert_eq!(db.query(&peer_id, &ParaId::from(100)).await, None);
757 assert_eq!(db.query(&another_peer_id, &ParaId::from(200)).await, None);
758 assert_eq!(db.query(&peer_id, &ParaId::from(300)).await.unwrap(), Score::new(15).unwrap());
759
760 db.prune_paras(BTreeSet::new()).await;
762 assert_eq!(db.len(), 0);
763 assert_eq!(db.query(&peer_id, &ParaId::from(300)).await, None);
764 }
765}