1use crate::validator_side_experimental::{
18 common::{PeerInfo, PeerState, Score},
19 peer_manager::{DeclarationOutcome, ReputationUpdate, ReputationUpdateKind, TryAcceptOutcome},
20};
21use polkadot_node_network_protocol::PeerId;
22use polkadot_primitives::Id as ParaId;
23use std::{
24 cmp::Ordering,
25 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
26 future::Future,
27 num::NonZeroU16,
28};
29
30#[derive(Clone)]
33pub struct ConnectedPeers {
34 per_para: BTreeMap<ParaId, PerPara>,
35 peer_info: HashMap<PeerId, PeerInfo>,
36}
37
38impl ConnectedPeers {
39 pub fn new(
41 scheduled_paras: BTreeSet<ParaId>,
42 overall_limit: NonZeroU16,
43 per_para_limit: NonZeroU16,
44 ) -> Self {
45 debug_assert!(per_para_limit <= overall_limit);
46
47 let limit = std::cmp::min(
48 (u16::from(overall_limit))
49 .checked_div(
50 scheduled_paras
51 .len()
52 .try_into()
53 .expect("Nr of scheduled paras on a core should always fit in a u16"),
54 )
55 .unwrap_or(0),
56 u16::from(per_para_limit),
57 );
58
59 let mut per_para = BTreeMap::new();
60
61 if limit != 0 {
62 for para_id in scheduled_paras {
63 per_para.insert(
64 para_id,
65 PerPara::new(NonZeroU16::new(limit).expect("Just checked that limit is not 0")),
66 );
67 }
68 }
69
70 Self { per_para, peer_info: Default::default() }
71 }
72
73 pub fn update_reputation(&mut self, update: ReputationUpdate) {
77 let Some(per_para) = self.per_para.get_mut(&update.para_id) else { return };
78
79 per_para.update_reputation(update);
80 }
81
82 pub async fn try_accept<
85 RepQueryFn: Fn(PeerId, ParaId) -> QueryFut,
86 QueryFut: Future<Output = Score>,
87 >(
88 &mut self,
89 reputation_query_fn: RepQueryFn,
90 peer_id: PeerId,
91 peer_info: PeerInfo,
92 ) -> TryAcceptOutcome {
93 if self.contains(&peer_id) {
94 return TryAcceptOutcome::Added
95 }
96
97 let mut outcome = TryAcceptOutcome::Rejected;
98
99 match peer_info.state {
100 PeerState::Collating(para_id) => {
101 let past_reputation = reputation_query_fn(peer_id, para_id).await;
102 if let Some(per_para) = self.per_para.get_mut(¶_id) {
103 let res = per_para.try_accept(peer_id, past_reputation);
104 outcome = outcome.combine(res);
105 }
106 },
107 PeerState::Connected =>
108 for (para_id, per_para) in self.per_para.iter_mut() {
109 let past_reputation = reputation_query_fn(peer_id, *para_id).await;
110 let res = per_para.try_accept(peer_id, past_reputation);
111 outcome = outcome.combine(res);
112 },
113 }
114
115 match outcome {
116 TryAcceptOutcome::Replaced(mut replaced) => {
117 self.peer_info.insert(peer_id, peer_info);
118
119 replaced.retain(|replaced_peer| {
122 let disconnect =
123 !self.per_para.values().any(|per_para| per_para.contains(&replaced_peer));
124
125 if disconnect {
126 self.peer_info.remove(replaced_peer);
127 }
128
129 disconnect
130 });
131
132 TryAcceptOutcome::Replaced(replaced)
133 },
134 TryAcceptOutcome::Added => {
135 self.peer_info.insert(peer_id, peer_info);
136 TryAcceptOutcome::Added
137 },
138 TryAcceptOutcome::Rejected => TryAcceptOutcome::Rejected,
139 }
140 }
141
142 pub fn remove(&mut self, peer: &PeerId) {
144 for per_para in self.per_para.values_mut() {
145 per_para.remove(peer);
146 }
147
148 self.peer_info.remove(peer);
149 }
150
151 pub fn declared(&mut self, peer_id: PeerId, para_id: ParaId) -> DeclarationOutcome {
153 let mut outcome = DeclarationOutcome::Rejected;
154
155 let Some(peer_info) = self.peer_info.get_mut(&peer_id) else { return outcome };
156
157 match &peer_info.state {
158 PeerState::Connected => {
159 for (para, per_para) in self.per_para.iter_mut() {
160 if para == ¶_id && per_para.contains(&peer_id) {
161 outcome = DeclarationOutcome::Accepted;
162 } else {
163 per_para.remove(&peer_id);
165 }
166 }
167 },
168 PeerState::Collating(old_para_id) if old_para_id == ¶_id => {
169 outcome = DeclarationOutcome::Accepted;
171 },
172 PeerState::Collating(old_para_id) => {
173 if let Some(old_per_para) = self.per_para.get_mut(&old_para_id) {
174 old_per_para.remove(&peer_id);
175 }
176 if let Some(per_para) = self.per_para.get(¶_id) {
177 outcome = DeclarationOutcome::Switched(*old_para_id);
178 }
179 },
180 }
181
182 if matches!(outcome, DeclarationOutcome::Accepted) {
183 peer_info.state = PeerState::Collating(para_id);
184 } else {
185 self.peer_info.remove(&peer_id);
186 }
187
188 outcome
189 }
190
191 pub fn peer_info(&self, peer_id: &PeerId) -> Option<&PeerInfo> {
193 self.peer_info.get(&peer_id)
194 }
195
196 pub fn peer_score(&self, peer_id: &PeerId, para_id: &ParaId) -> Option<Score> {
197 self.per_para.get(para_id).and_then(|per_para| per_para.get_score(peer_id))
198 }
199
200 pub fn consume(self) -> (HashMap<PeerId, PeerInfo>, BTreeMap<ParaId, PerPara>) {
202 (self.peer_info, self.per_para)
203 }
204
205 pub fn scheduled_paras<'a>(&'a self) -> impl Iterator<Item = &'a ParaId> + 'a {
207 self.per_para.keys()
208 }
209
210 fn contains(&self, peer_id: &PeerId) -> bool {
211 self.peer_info.contains_key(peer_id)
212 }
213}
214
215#[derive(Clone)]
218pub struct PerPara {
219 limit: NonZeroU16,
221 sorted_scores: BTreeSet<PeerScoreEntry>,
224 per_peer_score: HashMap<PeerId, Score>,
227}
228
229impl PerPara {
230 pub fn get_score(&self, peer_id: &PeerId) -> Option<Score> {
232 self.per_peer_score.get(peer_id).map(|s| *s)
233 }
234
235 fn new(limit: NonZeroU16) -> Self {
236 Self { limit, sorted_scores: BTreeSet::default(), per_peer_score: HashMap::default() }
237 }
238
239 fn try_accept(&mut self, peer_id: PeerId, score: Score) -> TryAcceptOutcome {
240 if self.sorted_scores.len() < (u16::from(self.limit) as usize) {
243 self.sorted_scores.insert(PeerScoreEntry { peer_id, score });
244 self.per_peer_score.insert(peer_id, score);
245 TryAcceptOutcome::Added
246 } else {
247 let Some(min_score) = self.sorted_scores.first() else {
248 return TryAcceptOutcome::Rejected
250 };
251
252 if min_score.score >= score {
253 TryAcceptOutcome::Rejected
254 } else {
255 let Some(replaced) = self.sorted_scores.pop_first() else {
256 return TryAcceptOutcome::Rejected
259 };
260 self.per_peer_score.remove(&replaced.peer_id);
261
262 self.sorted_scores.insert(PeerScoreEntry { peer_id, score });
263 self.per_peer_score.insert(peer_id, score);
264 TryAcceptOutcome::Replaced([replaced.peer_id].into_iter().collect())
265 }
266 }
267 }
268
269 fn update_reputation(&mut self, update: ReputationUpdate) {
270 let Some(score) = self.per_peer_score.get_mut(&update.peer_id) else {
271 return
273 };
274
275 self.sorted_scores
276 .remove(&PeerScoreEntry { peer_id: update.peer_id, score: *score });
277
278 match update.kind {
279 ReputationUpdateKind::Bump => score.saturating_add(update.value.into()),
280 ReputationUpdateKind::Slash => score.saturating_sub(update.value.into()),
281 };
282
283 self.sorted_scores
284 .insert(PeerScoreEntry { peer_id: update.peer_id, score: *score });
285 }
286
287 fn remove(&mut self, peer_id: &PeerId) {
288 let Some(score) = self.per_peer_score.remove(&peer_id) else { return };
289
290 self.sorted_scores.remove(&PeerScoreEntry { peer_id: *peer_id, score });
291 }
292
293 fn contains(&self, peer_id: &PeerId) -> bool {
294 self.per_peer_score.contains_key(peer_id)
295 }
296}
297
298#[derive(PartialEq, Eq, Clone)]
299struct PeerScoreEntry {
300 peer_id: PeerId,
301 score: Score,
302}
303
304impl Ord for PeerScoreEntry {
305 fn cmp(&self, other: &Self) -> Ordering {
306 self.score.cmp(&other.score)
307 }
308}
309
310impl PartialOrd for PeerScoreEntry {
311 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
312 Some(self.cmp(other))
313 }
314}
315
316#[cfg(test)]
317mod tests {
318 use super::*;
319
320 use polkadot_node_network_protocol::peer_set::CollationVersion;
321
322 fn default_connected_state() -> PeerInfo {
323 PeerInfo { version: CollationVersion::V2, state: PeerState::Connected }
324 }
325
326 #[test]
328 fn test_connected_peers_constructor() {
329 let connected = ConnectedPeers::new(
331 BTreeSet::new(),
332 NonZeroU16::new(1000).unwrap(),
333 NonZeroU16::new(50).unwrap(),
334 );
335 assert!(connected.per_para.is_empty());
336 assert!(connected.peer_info.is_empty());
337
338 let connected = ConnectedPeers::new(
341 (0..5).map(ParaId::from).collect(),
342 NonZeroU16::new(50).unwrap(),
343 NonZeroU16::new(3).unwrap(),
344 );
345 assert_eq!(connected.per_para.len(), 5);
346 assert!(connected.peer_info.is_empty());
347 for (para_id, per_para) in connected.per_para {
348 let para_id = u32::from(para_id);
349 assert!(para_id < 5);
350 assert_eq!(per_para.limit.get(), 3);
351 }
352
353 let connected = ConnectedPeers::new(
354 (0..5).map(ParaId::from).collect(),
355 NonZeroU16::new(50).unwrap(),
356 NonZeroU16::new(15).unwrap(),
357 );
358 assert_eq!(connected.per_para.len(), 5);
359 assert!(connected.peer_info.is_empty());
360 for (para_id, per_para) in connected.per_para {
361 let para_id = u32::from(para_id);
362 assert!(para_id < 5);
363 assert_eq!(per_para.limit.get(), 10);
364 }
365 }
366
367 #[tokio::test]
368 async fn test_try_accept_below_limit() {
370 let mut connected = ConnectedPeers::new(
371 (0..5).map(ParaId::from).collect(),
372 NonZeroU16::new(50).unwrap(),
373 NonZeroU16::new(15).unwrap(),
374 );
375 let first_peer = PeerId::random();
376
377 assert_eq!(
379 connected
380 .try_accept(
381 |_, _| async { Score::default() },
382 first_peer,
383 default_connected_state()
384 )
385 .await,
386 TryAcceptOutcome::Added
387 );
388 assert_eq!(connected.peer_info(&first_peer).unwrap(), &default_connected_state());
389 for per_para in connected.per_para.values() {
390 assert!(per_para.contains(&first_peer));
391 assert_eq!(per_para.get_score(&first_peer).unwrap(), Score::default());
392 }
393
394 assert_eq!(
396 connected
397 .try_accept(
398 |_, _| async { Score::default() },
399 first_peer,
400 default_connected_state()
401 )
402 .await,
403 TryAcceptOutcome::Added
404 );
405 assert_eq!(connected.peer_info(&first_peer).unwrap(), &default_connected_state());
406 for per_para in connected.per_para.values() {
407 assert!(per_para.contains(&first_peer));
408 assert_eq!(per_para.get_score(&first_peer).unwrap(), Score::default());
409 }
410
411 let second_peer = PeerId::random();
413 assert_eq!(
414 connected
415 .try_accept(
416 |peer_id, para_id| async move {
417 if peer_id == second_peer && para_id == ParaId::from(100) {
418 Score::new(10).unwrap()
419 } else {
420 Score::default()
421 }
422 },
423 second_peer,
424 default_connected_state()
425 )
426 .await,
427 TryAcceptOutcome::Added
428 );
429 assert_eq!(connected.peer_info(&first_peer).unwrap(), &default_connected_state());
430 assert_eq!(connected.peer_info(&second_peer).unwrap(), &default_connected_state());
431
432 for per_para in connected.per_para.values() {
433 assert!(per_para.contains(&second_peer));
434 assert_eq!(per_para.get_score(&second_peer).unwrap(), Score::default());
435 }
436
437 let third_peer = PeerId::random();
440 let third_peer_para_id = ParaId::from(3);
441 assert_eq!(
442 connected
443 .try_accept(
444 |peer_id, para_id| async move {
445 if peer_id == third_peer && para_id == third_peer_para_id {
446 Score::new(10).unwrap()
447 } else {
448 Score::default()
449 }
450 },
451 third_peer,
452 default_connected_state()
453 )
454 .await,
455 TryAcceptOutcome::Added
456 );
457 assert_eq!(connected.peer_info(&first_peer).unwrap(), &default_connected_state());
458 assert_eq!(connected.peer_info(&second_peer).unwrap(), &default_connected_state());
459 assert_eq!(connected.peer_info(&third_peer).unwrap(), &default_connected_state());
460
461 for (para_id, per_para) in connected.per_para.iter() {
462 assert!(per_para.contains(&third_peer));
463
464 if para_id == &third_peer_para_id {
465 assert_eq!(per_para.get_score(&third_peer).unwrap(), Score::new(10).unwrap());
466 } else {
467 assert_eq!(per_para.get_score(&third_peer).unwrap(), Score::default());
468 }
469 }
470
471 let rejected_peer = PeerId::random();
474 assert_eq!(
475 connected
476 .try_accept(
477 |peer_id, para_id| async move {
478 if peer_id == rejected_peer {
479 Score::new(10).unwrap()
480 } else {
481 Score::default()
482 }
483 },
484 rejected_peer,
485 PeerInfo {
486 version: CollationVersion::V2,
487 state: PeerState::Collating(ParaId::from(100))
488 }
489 )
490 .await,
491 TryAcceptOutcome::Rejected
492 );
493 assert_eq!(connected.peer_info(&rejected_peer), None);
494 for (para_id, per_para) in connected.per_para.iter() {
495 assert!(!per_para.contains(&rejected_peer));
496 assert_eq!(per_para.get_score(&rejected_peer), None);
497 }
498
499 let fourth_peer = PeerId::random();
501 let fourth_peer_para_id = ParaId::from(4);
502
503 assert_eq!(
504 connected
505 .try_accept(
506 |peer_id, para_id| async move {
507 if peer_id == fourth_peer && para_id == fourth_peer_para_id {
508 Score::new(10).unwrap()
509 } else {
510 Score::default()
511 }
512 },
513 fourth_peer,
514 PeerInfo {
515 version: CollationVersion::V2,
516 state: PeerState::Collating(fourth_peer_para_id)
517 }
518 )
519 .await,
520 TryAcceptOutcome::Added
521 );
522 assert_eq!(connected.peer_info(&first_peer).unwrap(), &default_connected_state());
523 assert_eq!(connected.peer_info(&second_peer).unwrap(), &default_connected_state());
524 assert_eq!(connected.peer_info(&third_peer).unwrap(), &default_connected_state());
525 assert_eq!(
526 connected.peer_info(&fourth_peer).unwrap(),
527 &PeerInfo {
528 version: CollationVersion::V2,
529 state: PeerState::Collating(fourth_peer_para_id)
530 }
531 );
532
533 for (para_id, per_para) in connected.per_para.iter() {
534 if para_id == &fourth_peer_para_id {
535 assert!(per_para.contains(&fourth_peer));
536 assert_eq!(per_para.get_score(&fourth_peer).unwrap(), Score::new(10).unwrap());
537 } else {
538 assert!(!per_para.contains(&fourth_peer));
539 assert_eq!(per_para.get_score(&fourth_peer), None);
540 }
541 }
542 }
543
544 #[tokio::test]
545 async fn test_try_accept_at_limit() {
547 let mut connected = ConnectedPeers::new(
549 (1..=2).map(ParaId::from).collect(),
550 NonZeroU16::new(50).unwrap(),
551 NonZeroU16::new(2).unwrap(),
552 );
553 let first_peer = PeerId::random();
554 let second_peer = PeerId::random();
555 let third_peer = PeerId::random();
556 let para_1 = ParaId::from(1);
557 let para_2 = ParaId::from(2);
558
559 let new_peer = PeerId::random();
560
561 let rep_query_fn = |peer_id, para_id| async move {
568 match (peer_id, para_id) {
569 (peer_id, para_id) if peer_id == first_peer => Score::new(10).unwrap(),
570 (peer_id, para_id) if peer_id == second_peer && para_id == para_1 =>
571 Score::new(20).unwrap(),
572 (peer_id, para_id) if peer_id == third_peer && para_id == para_2 =>
573 Score::new(20).unwrap(),
574 (peer_id, para_id) if peer_id == new_peer && para_id == para_1 =>
575 Score::new(5).unwrap(),
576
577 (_, _) => Score::default(),
578 }
579 };
580
581 assert_eq!(
582 connected.try_accept(rep_query_fn, first_peer, default_connected_state()).await,
583 TryAcceptOutcome::Added
584 );
585 assert_eq!(
586 connected
587 .try_accept(
588 rep_query_fn,
589 second_peer,
590 PeerInfo { version: CollationVersion::V2, state: PeerState::Collating(para_1) }
591 )
592 .await,
593 TryAcceptOutcome::Added
594 );
595 assert_eq!(
596 connected
597 .try_accept(
598 rep_query_fn,
599 third_peer,
600 PeerInfo { version: CollationVersion::V2, state: PeerState::Collating(para_2) }
601 )
602 .await,
603 TryAcceptOutcome::Added
604 );
605 assert_eq!(connected.peer_info(&first_peer).unwrap(), &default_connected_state());
606 assert_eq!(
607 connected.peer_info(&second_peer).unwrap(),
608 &PeerInfo { version: CollationVersion::V2, state: PeerState::Collating(para_1) }
609 );
610 assert_eq!(
611 connected.peer_info(&third_peer).unwrap(),
612 &PeerInfo { version: CollationVersion::V2, state: PeerState::Collating(para_2) }
613 );
614
615 assert_eq!(connected.per_para.len(), 2);
618 let per_para_1 = connected.per_para.get(¶_1).unwrap();
619 assert_eq!(per_para_1.per_peer_score.len(), 2);
620 assert_eq!(per_para_1.sorted_scores.len(), 2);
621
622 assert_eq!(connected.peer_score(&first_peer, ¶_1).unwrap(), Score::new(10).unwrap());
623 assert_eq!(connected.peer_score(&second_peer, ¶_1).unwrap(), Score::new(20).unwrap());
624 assert_eq!(connected.peer_score(&first_peer, ¶_2).unwrap(), Score::new(10).unwrap());
625 assert_eq!(connected.peer_score(&third_peer, ¶_2).unwrap(), Score::new(20).unwrap());
626 assert_eq!(connected.peer_score(&second_peer, ¶_2), None);
627 assert_eq!(connected.peer_score(&new_peer, ¶_1), None);
628 assert_eq!(connected.peer_score(&new_peer, ¶_2), None);
629
630 assert_eq!(
633 connected.try_accept(rep_query_fn, new_peer, default_connected_state()).await,
634 TryAcceptOutcome::Rejected
635 );
636 assert_eq!(
637 connected
638 .try_accept(
639 rep_query_fn,
640 new_peer,
641 PeerInfo { version: CollationVersion::V2, state: PeerState::Collating(para_1) }
642 )
643 .await,
644 TryAcceptOutcome::Rejected
645 );
646 assert_eq!(
647 connected
648 .try_accept(
649 rep_query_fn,
650 new_peer,
651 PeerInfo { version: CollationVersion::V2, state: PeerState::Collating(para_2) }
652 )
653 .await,
654 TryAcceptOutcome::Rejected
655 );
656 assert_eq!(
657 connected
658 .try_accept(
659 rep_query_fn,
660 new_peer,
661 PeerInfo {
662 version: CollationVersion::V2,
663 state: PeerState::Collating(ParaId::from(100))
664 }
665 )
666 .await,
667 TryAcceptOutcome::Rejected
668 );
669
670 {
674 let mut connected = connected.clone();
675 let rep_query_fn = |peer_id, para_id| async move {
676 match (peer_id, para_id) {
677 (peer_id, para_id) if peer_id == new_peer => Score::new(30).unwrap(),
678 (_, _) => Score::default(),
679 }
680 };
681 assert_eq!(
682 connected.try_accept(rep_query_fn, new_peer, default_connected_state()).await,
683 TryAcceptOutcome::Replaced([first_peer].into_iter().collect())
684 );
685 assert_eq!(connected.peer_info(&new_peer).unwrap(), &default_connected_state());
686 assert_eq!(connected.peer_info(&first_peer), None);
687
688 assert_eq!(connected.peer_score(&first_peer, ¶_1), None);
689 assert_eq!(
690 connected.peer_score(&second_peer, ¶_1).unwrap(),
691 Score::new(20).unwrap()
692 );
693 assert_eq!(connected.peer_score(&first_peer, ¶_2), None);
694 assert_eq!(
695 connected.peer_score(&third_peer, ¶_2).unwrap(),
696 Score::new(20).unwrap()
697 );
698 assert_eq!(connected.peer_score(&third_peer, ¶_1), None);
699 assert_eq!(connected.peer_score(&second_peer, ¶_2), None);
700 assert_eq!(connected.peer_score(&new_peer, ¶_1).unwrap(), Score::new(30).unwrap());
701 assert_eq!(connected.peer_score(&new_peer, ¶_2).unwrap(), Score::new(30).unwrap());
702 }
703
704 {
707 let mut connected = ConnectedPeers::new(
708 (1..=2).map(ParaId::from).collect(),
709 NonZeroU16::new(50).unwrap(),
710 NonZeroU16::new(2).unwrap(),
711 );
712 let fourth_peer = PeerId::random();
713
714 let rep_query_fn = |peer_id, para_id| async move {
715 match (peer_id, para_id) {
716 (peer_id, para_id) if peer_id == first_peer => Score::new(10).unwrap(),
717 (peer_id, para_id) if peer_id == second_peer && para_id == para_1 =>
718 Score::new(20).unwrap(),
719 (peer_id, para_id) if peer_id == third_peer && para_id == para_2 =>
720 Score::new(20).unwrap(),
721 (peer_id, para_id) if peer_id == fourth_peer && para_id == para_2 =>
722 Score::new(15).unwrap(),
723 (peer_id, para_id) if peer_id == new_peer => Score::new(30).unwrap(),
724
725 (_, _) => Score::default(),
726 }
727 };
728
729 assert_eq!(
730 connected.try_accept(rep_query_fn, first_peer, default_connected_state()).await,
731 TryAcceptOutcome::Added
732 );
733 assert_eq!(
734 connected
735 .try_accept(
736 rep_query_fn,
737 second_peer,
738 PeerInfo {
739 version: CollationVersion::V2,
740 state: PeerState::Collating(para_1)
741 }
742 )
743 .await,
744 TryAcceptOutcome::Added
745 );
746 assert_eq!(
747 connected
748 .try_accept(
749 rep_query_fn,
750 third_peer,
751 PeerInfo {
752 version: CollationVersion::V2,
753 state: PeerState::Collating(para_2)
754 }
755 )
756 .await,
757 TryAcceptOutcome::Added
758 );
759 assert_eq!(
760 connected
761 .try_accept(
762 rep_query_fn,
763 fourth_peer,
764 PeerInfo {
765 version: CollationVersion::V2,
766 state: PeerState::Collating(para_2)
767 }
768 )
769 .await,
770 TryAcceptOutcome::Replaced(HashSet::new())
771 );
772
773 assert_eq!(connected.peer_info(&first_peer).unwrap(), &default_connected_state());
774
775 assert_eq!(
776 connected.try_accept(rep_query_fn, new_peer, default_connected_state()).await,
777 TryAcceptOutcome::Replaced([first_peer, fourth_peer].into_iter().collect())
778 );
779 assert_eq!(connected.peer_info(&first_peer), None);
780 assert_eq!(connected.peer_info(&fourth_peer), None);
781
782 assert_eq!(connected.peer_info(&new_peer).unwrap(), &default_connected_state());
783
784 assert_eq!(connected.peer_score(&first_peer, ¶_1), None);
785 assert_eq!(
786 connected.peer_score(&second_peer, ¶_1).unwrap(),
787 Score::new(20).unwrap()
788 );
789 assert_eq!(connected.peer_score(&third_peer, ¶_1), None);
790 assert_eq!(connected.peer_score(&fourth_peer, ¶_1), None);
791 assert_eq!(connected.peer_score(&new_peer, ¶_1).unwrap(), Score::new(30).unwrap());
792
793 assert_eq!(connected.peer_score(&first_peer, ¶_2), None);
794 assert_eq!(connected.peer_score(&second_peer, ¶_2), None);
795 assert_eq!(
796 connected.peer_score(&third_peer, ¶_2).unwrap(),
797 Score::new(20).unwrap()
798 );
799 assert_eq!(connected.peer_score(&fourth_peer, ¶_2), None);
800 assert_eq!(connected.peer_score(&new_peer, ¶_2).unwrap(), Score::new(30).unwrap());
801 }
802
803 {
808 let mut connected = connected.clone();
809 let rep_query_fn = |peer_id, para_id| async move {
810 match (peer_id, para_id) {
811 (peer_id, para_id) if peer_id == new_peer => Score::new(30).unwrap(),
812 (_, _) => Score::default(),
813 }
814 };
815 assert_eq!(
816 connected
817 .try_accept(
818 rep_query_fn,
819 new_peer,
820 PeerInfo {
821 version: CollationVersion::V2,
822 state: PeerState::Collating(para_1)
823 }
824 )
825 .await,
826 TryAcceptOutcome::Replaced(HashSet::new())
827 );
828 assert_eq!(
829 connected.peer_info(&new_peer).unwrap(),
830 &PeerInfo { version: CollationVersion::V2, state: PeerState::Collating(para_1) }
831 );
832 assert_eq!(connected.peer_info(&first_peer).unwrap(), &default_connected_state());
833 assert_eq!(connected.peer_score(&first_peer, ¶_1), None);
834 assert_eq!(
835 connected.peer_score(&second_peer, ¶_1).unwrap(),
836 Score::new(20).unwrap()
837 );
838 assert_eq!(
839 connected.peer_score(&first_peer, ¶_2).unwrap(),
840 Score::new(10).unwrap()
841 );
842 assert_eq!(
843 connected.peer_score(&third_peer, ¶_2).unwrap(),
844 Score::new(20).unwrap()
845 );
846 assert_eq!(connected.peer_score(&second_peer, ¶_2), None);
847 assert_eq!(connected.peer_score(&new_peer, ¶_1).unwrap(), Score::new(30).unwrap());
848 assert_eq!(connected.peer_score(&new_peer, ¶_2), None);
849 }
850
851 for peer_info in [
854 default_connected_state(),
855 PeerInfo { version: CollationVersion::V2, state: PeerState::Collating(para_1) },
856 ] {
857 let mut connected = ConnectedPeers::new(
858 (1..=2).map(ParaId::from).collect(),
859 NonZeroU16::new(50).unwrap(),
860 NonZeroU16::new(2).unwrap(),
861 );
862
863 let rep_query_fn = |peer_id, para_id| async move {
864 match (peer_id, para_id) {
865 (peer_id, para_id) if peer_id == first_peer => Score::new(10).unwrap(),
866 (peer_id, para_id) if peer_id == second_peer && para_id == para_1 =>
867 Score::new(5).unwrap(),
868 (peer_id, para_id) if peer_id == third_peer && para_id == para_2 =>
869 Score::new(5).unwrap(),
870 (peer_id, para_id) if peer_id == new_peer && para_id == para_1 =>
871 Score::new(8).unwrap(),
872
873 (_, _) => Score::default(),
874 }
875 };
876 assert_eq!(
877 connected.try_accept(rep_query_fn, first_peer, default_connected_state()).await,
878 TryAcceptOutcome::Added
879 );
880 assert_eq!(
881 connected
882 .try_accept(
883 rep_query_fn,
884 second_peer,
885 PeerInfo {
886 version: CollationVersion::V2,
887 state: PeerState::Collating(para_1)
888 }
889 )
890 .await,
891 TryAcceptOutcome::Added
892 );
893 assert_eq!(
894 connected
895 .try_accept(
896 rep_query_fn,
897 third_peer,
898 PeerInfo {
899 version: CollationVersion::V2,
900 state: PeerState::Collating(para_2)
901 }
902 )
903 .await,
904 TryAcceptOutcome::Added
905 );
906
907 assert_eq!(
908 connected.try_accept(rep_query_fn, new_peer, peer_info.clone()).await,
909 TryAcceptOutcome::Replaced([second_peer].into_iter().collect())
910 );
911 assert_eq!(connected.peer_info(&new_peer).unwrap(), &peer_info);
912
913 assert_eq!(
914 connected.peer_score(&first_peer, ¶_1).unwrap(),
915 Score::new(10).unwrap()
916 );
917 assert_eq!(connected.peer_score(&second_peer, ¶_1), None);
918 assert_eq!(
919 connected.peer_score(&first_peer, ¶_2).unwrap(),
920 Score::new(10).unwrap()
921 );
922 assert_eq!(connected.peer_score(&third_peer, ¶_2).unwrap(), Score::new(5).unwrap());
923 assert_eq!(connected.peer_score(&second_peer, ¶_2), None);
924 assert_eq!(connected.peer_score(&new_peer, ¶_1).unwrap(), Score::new(8).unwrap());
925 assert_eq!(connected.peer_score(&new_peer, ¶_2), None);
926 }
927 }
928
929 #[tokio::test]
930 async fn test_declare() {
932 let mut connected = ConnectedPeers::new(
933 (0..5).map(ParaId::from).collect(),
934 NonZeroU16::new(50).unwrap(),
935 NonZeroU16::new(15).unwrap(),
936 );
937 let first_peer = PeerId::random();
938
939 assert_eq!(connected.peer_info(&first_peer), None);
940
941 assert_eq!(connected.declared(first_peer, ParaId::from(1)), DeclarationOutcome::Rejected);
943
944 assert_eq!(connected.peer_info(&first_peer), None);
945
946 assert_eq!(
947 connected
948 .try_accept(
949 |_, _| async { Score::default() },
950 first_peer,
951 default_connected_state()
952 )
953 .await,
954 TryAcceptOutcome::Added
955 );
956 assert_eq!(connected.peer_info(&first_peer).unwrap(), &default_connected_state());
957 for per_para in connected.per_para.values() {
958 assert!(per_para.contains(&first_peer));
959 assert_eq!(per_para.get_score(&first_peer).unwrap(), Score::default());
960 }
961
962 {
964 let mut connected = connected.clone();
965 assert_eq!(
966 connected.declared(first_peer, ParaId::from(100)),
967 DeclarationOutcome::Rejected
968 );
969
970 assert_eq!(connected.peer_info(&first_peer), None);
971
972 for (para_id, per_para) in connected.per_para.iter() {
973 assert!(!per_para.contains(&first_peer));
974 assert_eq!(per_para.get_score(&first_peer), None);
975 }
976 }
977
978 assert_eq!(connected.declared(first_peer, ParaId::from(1)), DeclarationOutcome::Accepted);
980
981 assert_eq!(
982 connected.peer_info(&first_peer).unwrap(),
983 &PeerInfo {
984 version: CollationVersion::V2,
985 state: PeerState::Collating(ParaId::from(1))
986 }
987 );
988
989 for (para_id, per_para) in connected.per_para.iter() {
990 if para_id == &ParaId::from(1) {
991 assert!(per_para.contains(&first_peer));
992 assert_eq!(per_para.get_score(&first_peer).unwrap(), Score::default());
993 } else {
994 assert!(!per_para.contains(&first_peer));
995 assert_eq!(per_para.get_score(&first_peer), None);
996 }
997 }
998
999 assert_eq!(connected.declared(first_peer, ParaId::from(1)), DeclarationOutcome::Accepted);
1001 assert_eq!(
1002 connected.peer_info(&first_peer).unwrap(),
1003 &PeerInfo {
1004 version: CollationVersion::V2,
1005 state: PeerState::Collating(ParaId::from(1))
1006 }
1007 );
1008
1009 {
1011 let mut connected = connected.clone();
1012 assert_eq!(
1013 connected.declared(first_peer, ParaId::from(100)),
1014 DeclarationOutcome::Rejected
1015 );
1016 assert_eq!(connected.peer_info(&first_peer), None);
1017
1018 for (para_id, per_para) in connected.per_para.iter() {
1019 assert!(!per_para.contains(&first_peer));
1020 assert_eq!(per_para.get_score(&first_peer), None);
1021 }
1022 }
1023
1024 assert_eq!(
1027 connected.peer_info(&first_peer).unwrap(),
1028 &PeerInfo {
1029 version: CollationVersion::V2,
1030 state: PeerState::Collating(ParaId::from(1))
1031 }
1032 );
1033 assert_eq!(
1034 connected.declared(first_peer, ParaId::from(2)),
1035 DeclarationOutcome::Switched(ParaId::from(1))
1036 );
1037 assert_eq!(connected.peer_info(&first_peer), None);
1038
1039 for (para_id, per_para) in connected.per_para.iter() {
1040 assert!(!per_para.contains(&first_peer));
1041 assert_eq!(per_para.get_score(&first_peer), None);
1042 }
1043 }
1044
1045 #[tokio::test]
1046 async fn test_remove() {
1048 let mut connected = ConnectedPeers::new(
1049 (0..5).map(ParaId::from).collect(),
1050 NonZeroU16::new(50).unwrap(),
1051 NonZeroU16::new(15).unwrap(),
1052 );
1053 let first_peer = PeerId::random();
1054
1055 assert_eq!(connected.peer_info(&first_peer), None);
1056
1057 connected.remove(&first_peer);
1059
1060 assert_eq!(connected.peer_info(&first_peer), None);
1061
1062 for per_para in connected.per_para.values() {
1063 assert!(!per_para.contains(&first_peer));
1064 assert_eq!(per_para.get_score(&first_peer), None);
1065 }
1066
1067 {
1069 assert_eq!(
1070 connected
1071 .try_accept(
1072 |_, _| async { Score::default() },
1073 first_peer,
1074 default_connected_state()
1075 )
1076 .await,
1077 TryAcceptOutcome::Added
1078 );
1079 assert_eq!(connected.peer_info(&first_peer).unwrap(), &default_connected_state());
1080 for per_para in connected.per_para.values() {
1081 assert!(per_para.contains(&first_peer));
1082 assert_eq!(per_para.get_score(&first_peer).unwrap(), Score::default());
1083 }
1084
1085 connected.remove(&first_peer);
1086
1087 assert_eq!(connected.peer_info(&first_peer), None);
1088
1089 for per_para in connected.per_para.values() {
1090 assert!(!per_para.contains(&first_peer));
1091 assert_eq!(per_para.get_score(&first_peer), None);
1092 }
1093 }
1094
1095 {
1097 assert_eq!(
1098 connected
1099 .try_accept(
1100 |_, _| async { Score::default() },
1101 first_peer,
1102 PeerInfo {
1103 version: CollationVersion::V2,
1104 state: PeerState::Collating(ParaId::from(1))
1105 }
1106 )
1107 .await,
1108 TryAcceptOutcome::Added
1109 );
1110 assert_eq!(
1111 connected.peer_info(&first_peer).unwrap(),
1112 &PeerInfo {
1113 version: CollationVersion::V2,
1114 state: PeerState::Collating(ParaId::from(1))
1115 }
1116 );
1117 for (para_id, per_para) in connected.per_para.iter() {
1118 if para_id == &ParaId::from(1) {
1119 assert!(per_para.contains(&first_peer));
1120 assert_eq!(per_para.get_score(&first_peer).unwrap(), Score::default());
1121 } else {
1122 assert!(!per_para.contains(&first_peer));
1123 assert_eq!(per_para.get_score(&first_peer), None);
1124 }
1125 }
1126
1127 connected.remove(&first_peer);
1128
1129 assert_eq!(connected.peer_info(&first_peer), None);
1130
1131 for per_para in connected.per_para.values() {
1132 assert!(!per_para.contains(&first_peer));
1133 assert_eq!(per_para.get_score(&first_peer), None);
1134 }
1135 }
1136 }
1137
1138 #[tokio::test]
1139 async fn test_update_reputation() {
1141 let mut connected = ConnectedPeers::new(
1142 (0..6).map(ParaId::from).collect(),
1143 NonZeroU16::new(50).unwrap(),
1144 NonZeroU16::new(15).unwrap(),
1145 );
1146 let first_peer = PeerId::random();
1147
1148 assert_eq!(connected.peer_info(&first_peer), None);
1149 for per_para in connected.per_para.values() {
1150 assert!(!per_para.contains(&first_peer));
1151 assert_eq!(per_para.get_score(&first_peer), None);
1152 }
1153
1154 connected.update_reputation(ReputationUpdate {
1156 peer_id: first_peer,
1157 para_id: ParaId::from(1),
1158 value: Score::new(100).unwrap(),
1159 kind: ReputationUpdateKind::Slash,
1160 });
1161
1162 assert_eq!(connected.peer_info(&first_peer), None);
1163 for per_para in connected.per_para.values() {
1164 assert!(!per_para.contains(&first_peer));
1165 assert_eq!(per_para.get_score(&first_peer), None);
1166 }
1167
1168 assert_eq!(
1170 connected
1171 .try_accept(
1172 |peer_id, _| async move {
1173 if peer_id == first_peer {
1174 Score::new(10).unwrap()
1175 } else {
1176 Score::default()
1177 }
1178 },
1179 first_peer,
1180 default_connected_state()
1181 )
1182 .await,
1183 TryAcceptOutcome::Added
1184 );
1185 assert_eq!(connected.peer_info(&first_peer).unwrap(), &default_connected_state());
1186 for per_para in connected.per_para.values() {
1187 assert!(per_para.contains(&first_peer));
1188 assert_eq!(per_para.get_score(&first_peer).unwrap(), Score::new(10).unwrap());
1189 }
1190
1191 connected.update_reputation(ReputationUpdate {
1192 peer_id: first_peer,
1193 para_id: ParaId::from(100),
1194 value: Score::new(100).unwrap(),
1195 kind: ReputationUpdateKind::Slash,
1196 });
1197 assert_eq!(connected.peer_info(&first_peer).unwrap(), &default_connected_state());
1198 for per_para in connected.per_para.values() {
1199 assert!(per_para.contains(&first_peer));
1200 assert_eq!(per_para.get_score(&first_peer).unwrap(), Score::new(10).unwrap());
1201 }
1202
1203 connected.update_reputation(ReputationUpdate {
1205 peer_id: first_peer,
1206 para_id: ParaId::from(1),
1207 value: Score::new(100).unwrap(),
1208 kind: ReputationUpdateKind::Slash,
1209 });
1210 assert_eq!(connected.peer_info(&first_peer).unwrap(), &default_connected_state());
1211 for (para_id, per_para) in connected.per_para.iter() {
1212 assert!(per_para.contains(&first_peer));
1213
1214 if para_id == &ParaId::from(1) {
1215 assert_eq!(per_para.get_score(&first_peer).unwrap(), Score::new(0).unwrap());
1216 } else {
1217 assert_eq!(per_para.get_score(&first_peer).unwrap(), Score::new(10).unwrap());
1218 }
1219 }
1220
1221 assert_eq!(connected.declared(first_peer, ParaId::from(5)), DeclarationOutcome::Accepted);
1224 assert_eq!(
1225 connected.peer_info(&first_peer).unwrap(),
1226 &PeerInfo {
1227 version: CollationVersion::V2,
1228 state: PeerState::Collating(ParaId::from(5))
1229 }
1230 );
1231
1232 connected.update_reputation(ReputationUpdate {
1233 peer_id: first_peer,
1234 para_id: ParaId::from(1),
1235 value: Score::new(100).unwrap(),
1236 kind: ReputationUpdateKind::Bump,
1237 });
1238 assert_eq!(
1239 connected.peer_info(&first_peer).unwrap(),
1240 &PeerInfo {
1241 version: CollationVersion::V2,
1242 state: PeerState::Collating(ParaId::from(5))
1243 }
1244 );
1245
1246 for (para_id, per_para) in connected.per_para.iter() {
1247 if para_id == &ParaId::from(5) {
1248 assert!(per_para.contains(&first_peer));
1249 assert_eq!(per_para.get_score(&first_peer).unwrap(), Score::new(10).unwrap());
1250 } else {
1251 assert!(!per_para.contains(&first_peer));
1252 assert_eq!(per_para.get_score(&first_peer), None);
1253 }
1254 }
1255
1256 connected.update_reputation(ReputationUpdate {
1257 peer_id: first_peer,
1258 para_id: ParaId::from(5),
1259 value: Score::new(50).unwrap(),
1260 kind: ReputationUpdateKind::Bump,
1261 });
1262 assert_eq!(
1263 connected.peer_info(&first_peer).unwrap(),
1264 &PeerInfo {
1265 version: CollationVersion::V2,
1266 state: PeerState::Collating(ParaId::from(5))
1267 }
1268 );
1269
1270 for (para_id, per_para) in connected.per_para.iter() {
1271 if para_id == &ParaId::from(5) {
1272 assert!(per_para.contains(&first_peer));
1273 assert_eq!(per_para.get_score(&first_peer).unwrap(), Score::new(60).unwrap());
1274 } else {
1275 assert!(!per_para.contains(&first_peer));
1276 assert_eq!(per_para.get_score(&first_peer), None);
1277 }
1278 }
1279 }
1280}