referrerpolicy=no-referrer-when-downgrade

polkadot_collator_protocol/validator_side_experimental/peer_manager/
connected.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use crate::validator_side_experimental::{
18	common::{PeerInfo, PeerState, Score},
19	peer_manager::{DeclarationOutcome, ReputationUpdate, ReputationUpdateKind, TryAcceptOutcome},
20};
21use polkadot_node_network_protocol::PeerId;
22use polkadot_primitives::Id as ParaId;
23use std::{
24	cmp::Ordering,
25	collections::{BTreeMap, BTreeSet, HashMap, HashSet},
26	future::Future,
27	num::NonZeroU16,
28};
29
30/// Keeps track of connected peers, together with relevant info such as their procotol versions,
31/// declared paraids and reputations.
32#[derive(Clone)]
33pub struct ConnectedPeers {
34	per_para: BTreeMap<ParaId, PerPara>,
35	peer_info: HashMap<PeerId, PeerInfo>,
36}
37
38impl ConnectedPeers {
39	/// Create a new ConnectedPeers object.
40	pub fn new(
41		scheduled_paras: BTreeSet<ParaId>,
42		overall_limit: NonZeroU16,
43		per_para_limit: NonZeroU16,
44	) -> Self {
45		debug_assert!(per_para_limit <= overall_limit);
46
47		let limit = std::cmp::min(
48			(u16::from(overall_limit))
49				.checked_div(
50					scheduled_paras
51						.len()
52						.try_into()
53						.expect("Nr of scheduled paras on a core should always fit in a u16"),
54				)
55				.unwrap_or(0),
56			u16::from(per_para_limit),
57		);
58
59		let mut per_para = BTreeMap::new();
60
61		if limit != 0 {
62			for para_id in scheduled_paras {
63				per_para.insert(
64					para_id,
65					PerPara::new(NonZeroU16::new(limit).expect("Just checked that limit is not 0")),
66				);
67			}
68		}
69
70		Self { per_para, peer_info: Default::default() }
71	}
72
73	/// Update the reputation of a peer for a specific paraid. This needs to also be persisted to
74	/// the reputation database, but changes are duplicated to this in-memory store of connected
75	/// peers.
76	pub fn update_reputation(&mut self, update: ReputationUpdate) {
77		let Some(per_para) = self.per_para.get_mut(&update.para_id) else { return };
78
79		per_para.update_reputation(update);
80	}
81
82	/// Try accepting a new peer. The connection must have already been established, but here we
83	/// decide whether to keep it or not. We don't know for which paraid the peer will collate.
84	pub async fn try_accept<
85		RepQueryFn: Fn(PeerId, ParaId) -> QueryFut,
86		QueryFut: Future<Output = Score>,
87	>(
88		&mut self,
89		reputation_query_fn: RepQueryFn,
90		peer_id: PeerId,
91		peer_info: PeerInfo,
92	) -> TryAcceptOutcome {
93		if self.contains(&peer_id) {
94			return TryAcceptOutcome::Added
95		}
96
97		let mut outcome = TryAcceptOutcome::Rejected;
98
99		match peer_info.state {
100			PeerState::Collating(para_id) => {
101				let past_reputation = reputation_query_fn(peer_id, para_id).await;
102				if let Some(per_para) = self.per_para.get_mut(&para_id) {
103					let res = per_para.try_accept(peer_id, past_reputation);
104					outcome = outcome.combine(res);
105				}
106			},
107			PeerState::Connected =>
108				for (para_id, per_para) in self.per_para.iter_mut() {
109					let past_reputation = reputation_query_fn(peer_id, *para_id).await;
110					let res = per_para.try_accept(peer_id, past_reputation);
111					outcome = outcome.combine(res);
112				},
113		}
114
115		match outcome {
116			TryAcceptOutcome::Replaced(mut replaced) => {
117				self.peer_info.insert(peer_id, peer_info);
118
119				// Even if this peer took the place of some other peers, these replaced peers may
120				// still have connection slots for other paras. Only remove them if they don't.
121				replaced.retain(|replaced_peer| {
122					let disconnect =
123						!self.per_para.values().any(|per_para| per_para.contains(&replaced_peer));
124
125					if disconnect {
126						self.peer_info.remove(replaced_peer);
127					}
128
129					disconnect
130				});
131
132				TryAcceptOutcome::Replaced(replaced)
133			},
134			TryAcceptOutcome::Added => {
135				self.peer_info.insert(peer_id, peer_info);
136				TryAcceptOutcome::Added
137			},
138			TryAcceptOutcome::Rejected => TryAcceptOutcome::Rejected,
139		}
140	}
141
142	/// Remove the peer. Should be called when we see a peer being disconnected.
143	pub fn remove(&mut self, peer: &PeerId) {
144		for per_para in self.per_para.values_mut() {
145			per_para.remove(peer);
146		}
147
148		self.peer_info.remove(peer);
149	}
150
151	/// Process a peer's declaration of intention to collate for this paraid.
152	pub fn declared(&mut self, peer_id: PeerId, para_id: ParaId) -> DeclarationOutcome {
153		let mut outcome = DeclarationOutcome::Rejected;
154
155		let Some(peer_info) = self.peer_info.get_mut(&peer_id) else { return outcome };
156
157		match &peer_info.state {
158			PeerState::Connected => {
159				for (para, per_para) in self.per_para.iter_mut() {
160					if para == &para_id && per_para.contains(&peer_id) {
161						outcome = DeclarationOutcome::Accepted;
162					} else {
163						// We remove the reserved slots from all other paras.
164						per_para.remove(&peer_id);
165					}
166				}
167			},
168			PeerState::Collating(old_para_id) if old_para_id == &para_id => {
169				// Redundant, already collating for this para.
170				outcome = DeclarationOutcome::Accepted;
171			},
172			PeerState::Collating(old_para_id) => {
173				if let Some(old_per_para) = self.per_para.get_mut(&old_para_id) {
174					old_per_para.remove(&peer_id);
175				}
176				if let Some(per_para) = self.per_para.get(&para_id) {
177					outcome = DeclarationOutcome::Switched(*old_para_id);
178				}
179			},
180		}
181
182		if matches!(outcome, DeclarationOutcome::Accepted) {
183			peer_info.state = PeerState::Collating(para_id);
184		} else {
185			self.peer_info.remove(&peer_id);
186		}
187
188		outcome
189	}
190
191	/// Get a reference to the peer's info, if connected.
192	pub fn peer_info(&self, peer_id: &PeerId) -> Option<&PeerInfo> {
193		self.peer_info.get(&peer_id)
194	}
195
196	pub fn peer_score(&self, peer_id: &PeerId, para_id: &ParaId) -> Option<Score> {
197		self.per_para.get(para_id).and_then(|per_para| per_para.get_score(peer_id))
198	}
199
200	/// Consume self and return the relevant information for building the next instance.
201	pub fn consume(self) -> (HashMap<PeerId, PeerInfo>, BTreeMap<ParaId, PerPara>) {
202		(self.peer_info, self.per_para)
203	}
204
205	/// Return an iterator over the scheduled paraids.
206	pub fn scheduled_paras<'a>(&'a self) -> impl Iterator<Item = &'a ParaId> + 'a {
207		self.per_para.keys()
208	}
209
210	fn contains(&self, peer_id: &PeerId) -> bool {
211		self.peer_info.contains_key(peer_id)
212	}
213}
214
215/// Per-para connected peers store. Acts as a handy in-memory cache of connected peer scores for a
216/// specific paraid.
217#[derive(Clone)]
218pub struct PerPara {
219	// Don't accept more than this number of connected peers for this para.
220	limit: NonZeroU16,
221	// A min-heap would be more efficient for getting the min (constant) but modifying the score
222	// would be linear, so use a BST which achieves logarithmic performance for all ops.
223	sorted_scores: BTreeSet<PeerScoreEntry>,
224	// This is needed so that we can quickly access the ordered entry. Also has the nice benefit of
225	// being an in-memory copy of the reputation DB for the connected peers.
226	per_peer_score: HashMap<PeerId, Score>,
227}
228
229impl PerPara {
230	/// Get the peer's score, if any.
231	pub fn get_score(&self, peer_id: &PeerId) -> Option<Score> {
232		self.per_peer_score.get(peer_id).map(|s| *s)
233	}
234
235	fn new(limit: NonZeroU16) -> Self {
236		Self { limit, sorted_scores: BTreeSet::default(), per_peer_score: HashMap::default() }
237	}
238
239	fn try_accept(&mut self, peer_id: PeerId, score: Score) -> TryAcceptOutcome {
240		// If we've got enough room, add it. Otherwise, see if it has a higher reputation than any
241		// other connected peer.
242		if self.sorted_scores.len() < (u16::from(self.limit) as usize) {
243			self.sorted_scores.insert(PeerScoreEntry { peer_id, score });
244			self.per_peer_score.insert(peer_id, score);
245			TryAcceptOutcome::Added
246		} else {
247			let Some(min_score) = self.sorted_scores.first() else {
248				// The limit must be 0, which is not possible given that limit is a NonZeroU16.
249				return TryAcceptOutcome::Rejected
250			};
251
252			if min_score.score >= score {
253				TryAcceptOutcome::Rejected
254			} else {
255				let Some(replaced) = self.sorted_scores.pop_first() else {
256					// Cannot really happen since we already know there's some entry with a lower
257					// score than ours.
258					return TryAcceptOutcome::Rejected
259				};
260				self.per_peer_score.remove(&replaced.peer_id);
261
262				self.sorted_scores.insert(PeerScoreEntry { peer_id, score });
263				self.per_peer_score.insert(peer_id, score);
264				TryAcceptOutcome::Replaced([replaced.peer_id].into_iter().collect())
265			}
266		}
267	}
268
269	fn update_reputation(&mut self, update: ReputationUpdate) {
270		let Some(score) = self.per_peer_score.get_mut(&update.peer_id) else {
271			// If the peer is not connected we don't care to update anything besides the DB.
272			return
273		};
274
275		self.sorted_scores
276			.remove(&PeerScoreEntry { peer_id: update.peer_id, score: *score });
277
278		match update.kind {
279			ReputationUpdateKind::Bump => score.saturating_add(update.value.into()),
280			ReputationUpdateKind::Slash => score.saturating_sub(update.value.into()),
281		};
282
283		self.sorted_scores
284			.insert(PeerScoreEntry { peer_id: update.peer_id, score: *score });
285	}
286
287	fn remove(&mut self, peer_id: &PeerId) {
288		let Some(score) = self.per_peer_score.remove(&peer_id) else { return };
289
290		self.sorted_scores.remove(&PeerScoreEntry { peer_id: *peer_id, score });
291	}
292
293	fn contains(&self, peer_id: &PeerId) -> bool {
294		self.per_peer_score.contains_key(peer_id)
295	}
296}
297
298#[derive(PartialEq, Eq, Clone)]
299struct PeerScoreEntry {
300	peer_id: PeerId,
301	score: Score,
302}
303
304impl Ord for PeerScoreEntry {
305	fn cmp(&self, other: &Self) -> Ordering {
306		self.score.cmp(&other.score)
307	}
308}
309
310impl PartialOrd for PeerScoreEntry {
311	fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
312		Some(self.cmp(other))
313	}
314}
315
316#[cfg(test)]
317mod tests {
318	use super::*;
319
320	use polkadot_node_network_protocol::peer_set::CollationVersion;
321
322	fn default_connected_state() -> PeerInfo {
323		PeerInfo { version: CollationVersion::V2, state: PeerState::Connected }
324	}
325
326	// Test the ConnectedPeers constructor
327	#[test]
328	fn test_connected_peers_constructor() {
329		// Test an empty instance.
330		let connected = ConnectedPeers::new(
331			BTreeSet::new(),
332			NonZeroU16::new(1000).unwrap(),
333			NonZeroU16::new(50).unwrap(),
334		);
335		assert!(connected.per_para.is_empty());
336		assert!(connected.peer_info.is_empty());
337
338		// Test that the constructor sets the per-para limit as the minimum between the
339		// per_para_limit and the overall_limit divided by the number of scheduled paras.
340		let connected = ConnectedPeers::new(
341			(0..5).map(ParaId::from).collect(),
342			NonZeroU16::new(50).unwrap(),
343			NonZeroU16::new(3).unwrap(),
344		);
345		assert_eq!(connected.per_para.len(), 5);
346		assert!(connected.peer_info.is_empty());
347		for (para_id, per_para) in connected.per_para {
348			let para_id = u32::from(para_id);
349			assert!(para_id < 5);
350			assert_eq!(per_para.limit.get(), 3);
351		}
352
353		let connected = ConnectedPeers::new(
354			(0..5).map(ParaId::from).collect(),
355			NonZeroU16::new(50).unwrap(),
356			NonZeroU16::new(15).unwrap(),
357		);
358		assert_eq!(connected.per_para.len(), 5);
359		assert!(connected.peer_info.is_empty());
360		for (para_id, per_para) in connected.per_para {
361			let para_id = u32::from(para_id);
362			assert!(para_id < 5);
363			assert_eq!(per_para.limit.get(), 10);
364		}
365	}
366
367	#[tokio::test]
368	// Test peer connection acceptance criteria while the peer limit is not reached.
369	async fn test_try_accept_below_limit() {
370		let mut connected = ConnectedPeers::new(
371			(0..5).map(ParaId::from).collect(),
372			NonZeroU16::new(50).unwrap(),
373			NonZeroU16::new(15).unwrap(),
374		);
375		let first_peer = PeerId::random();
376
377		// Try accepting a new peer which has no past reputation and has not declared.
378		assert_eq!(
379			connected
380				.try_accept(
381					|_, _| async { Score::default() },
382					first_peer,
383					default_connected_state()
384				)
385				.await,
386			TryAcceptOutcome::Added
387		);
388		assert_eq!(connected.peer_info(&first_peer).unwrap(), &default_connected_state());
389		for per_para in connected.per_para.values() {
390			assert!(per_para.contains(&first_peer));
391			assert_eq!(per_para.get_score(&first_peer).unwrap(), Score::default());
392		}
393
394		// Try adding an already accepted peer.
395		assert_eq!(
396			connected
397				.try_accept(
398					|_, _| async { Score::default() },
399					first_peer,
400					default_connected_state()
401				)
402				.await,
403			TryAcceptOutcome::Added
404		);
405		assert_eq!(connected.peer_info(&first_peer).unwrap(), &default_connected_state());
406		for per_para in connected.per_para.values() {
407			assert!(per_para.contains(&first_peer));
408			assert_eq!(per_para.get_score(&first_peer).unwrap(), Score::default());
409		}
410
411		// Try accepting a peer which has past reputation for an unscheduled para.
412		let second_peer = PeerId::random();
413		assert_eq!(
414			connected
415				.try_accept(
416					|peer_id, para_id| async move {
417						if peer_id == second_peer && para_id == ParaId::from(100) {
418							Score::new(10).unwrap()
419						} else {
420							Score::default()
421						}
422					},
423					second_peer,
424					default_connected_state()
425				)
426				.await,
427			TryAcceptOutcome::Added
428		);
429		assert_eq!(connected.peer_info(&first_peer).unwrap(), &default_connected_state());
430		assert_eq!(connected.peer_info(&second_peer).unwrap(), &default_connected_state());
431
432		for per_para in connected.per_para.values() {
433			assert!(per_para.contains(&second_peer));
434			assert_eq!(per_para.get_score(&second_peer).unwrap(), Score::default());
435		}
436
437		// Try accepting a peer which has past reputation for a scheduled para but is not yet
438		// declared.
439		let third_peer = PeerId::random();
440		let third_peer_para_id = ParaId::from(3);
441		assert_eq!(
442			connected
443				.try_accept(
444					|peer_id, para_id| async move {
445						if peer_id == third_peer && para_id == third_peer_para_id {
446							Score::new(10).unwrap()
447						} else {
448							Score::default()
449						}
450					},
451					third_peer,
452					default_connected_state()
453				)
454				.await,
455			TryAcceptOutcome::Added
456		);
457		assert_eq!(connected.peer_info(&first_peer).unwrap(), &default_connected_state());
458		assert_eq!(connected.peer_info(&second_peer).unwrap(), &default_connected_state());
459		assert_eq!(connected.peer_info(&third_peer).unwrap(), &default_connected_state());
460
461		for (para_id, per_para) in connected.per_para.iter() {
462			assert!(per_para.contains(&third_peer));
463
464			if para_id == &third_peer_para_id {
465				assert_eq!(per_para.get_score(&third_peer).unwrap(), Score::new(10).unwrap());
466			} else {
467				assert_eq!(per_para.get_score(&third_peer).unwrap(), Score::default());
468			}
469		}
470
471		// Try accepting a peer which is declared for an unscheduled para. It will be rejected,
472		// regardless of its other reputations.
473		let rejected_peer = PeerId::random();
474		assert_eq!(
475			connected
476				.try_accept(
477					|peer_id, para_id| async move {
478						if peer_id == rejected_peer {
479							Score::new(10).unwrap()
480						} else {
481							Score::default()
482						}
483					},
484					rejected_peer,
485					PeerInfo {
486						version: CollationVersion::V2,
487						state: PeerState::Collating(ParaId::from(100))
488					}
489				)
490				.await,
491			TryAcceptOutcome::Rejected
492		);
493		assert_eq!(connected.peer_info(&rejected_peer), None);
494		for (para_id, per_para) in connected.per_para.iter() {
495			assert!(!per_para.contains(&rejected_peer));
496			assert_eq!(per_para.get_score(&rejected_peer), None);
497		}
498
499		// Try accepting a peer which is declared for a scheduled para.
500		let fourth_peer = PeerId::random();
501		let fourth_peer_para_id = ParaId::from(4);
502
503		assert_eq!(
504			connected
505				.try_accept(
506					|peer_id, para_id| async move {
507						if peer_id == fourth_peer && para_id == fourth_peer_para_id {
508							Score::new(10).unwrap()
509						} else {
510							Score::default()
511						}
512					},
513					fourth_peer,
514					PeerInfo {
515						version: CollationVersion::V2,
516						state: PeerState::Collating(fourth_peer_para_id)
517					}
518				)
519				.await,
520			TryAcceptOutcome::Added
521		);
522		assert_eq!(connected.peer_info(&first_peer).unwrap(), &default_connected_state());
523		assert_eq!(connected.peer_info(&second_peer).unwrap(), &default_connected_state());
524		assert_eq!(connected.peer_info(&third_peer).unwrap(), &default_connected_state());
525		assert_eq!(
526			connected.peer_info(&fourth_peer).unwrap(),
527			&PeerInfo {
528				version: CollationVersion::V2,
529				state: PeerState::Collating(fourth_peer_para_id)
530			}
531		);
532
533		for (para_id, per_para) in connected.per_para.iter() {
534			if para_id == &fourth_peer_para_id {
535				assert!(per_para.contains(&fourth_peer));
536				assert_eq!(per_para.get_score(&fourth_peer).unwrap(), Score::new(10).unwrap());
537			} else {
538				assert!(!per_para.contains(&fourth_peer));
539				assert_eq!(per_para.get_score(&fourth_peer), None);
540			}
541		}
542	}
543
544	#[tokio::test]
545	// Test peer connection acceptance criteria while the peer limit is reached.
546	async fn test_try_accept_at_limit() {
547		// We have 2 scheduled paras. The per para limit is 2.
548		let mut connected = ConnectedPeers::new(
549			(1..=2).map(ParaId::from).collect(),
550			NonZeroU16::new(50).unwrap(),
551			NonZeroU16::new(2).unwrap(),
552		);
553		let first_peer = PeerId::random();
554		let second_peer = PeerId::random();
555		let third_peer = PeerId::random();
556		let para_1 = ParaId::from(1);
557		let para_2 = ParaId::from(2);
558
559		let new_peer = PeerId::random();
560
561		// Para 1 has: first_peer (not declared), reputation 10.
562		// Para 1 has: second_peer (declared), reputation 20.
563
564		// Para 2 has: first_peer (not declared), reputation 10.
565		// Para 2 has: third_peer (declared), reputation 20.
566
567		let rep_query_fn = |peer_id, para_id| async move {
568			match (peer_id, para_id) {
569				(peer_id, para_id) if peer_id == first_peer => Score::new(10).unwrap(),
570				(peer_id, para_id) if peer_id == second_peer && para_id == para_1 =>
571					Score::new(20).unwrap(),
572				(peer_id, para_id) if peer_id == third_peer && para_id == para_2 =>
573					Score::new(20).unwrap(),
574				(peer_id, para_id) if peer_id == new_peer && para_id == para_1 =>
575					Score::new(5).unwrap(),
576
577				(_, _) => Score::default(),
578			}
579		};
580
581		assert_eq!(
582			connected.try_accept(rep_query_fn, first_peer, default_connected_state()).await,
583			TryAcceptOutcome::Added
584		);
585		assert_eq!(
586			connected
587				.try_accept(
588					rep_query_fn,
589					second_peer,
590					PeerInfo { version: CollationVersion::V2, state: PeerState::Collating(para_1) }
591				)
592				.await,
593			TryAcceptOutcome::Added
594		);
595		assert_eq!(
596			connected
597				.try_accept(
598					rep_query_fn,
599					third_peer,
600					PeerInfo { version: CollationVersion::V2, state: PeerState::Collating(para_2) }
601				)
602				.await,
603			TryAcceptOutcome::Added
604		);
605		assert_eq!(connected.peer_info(&first_peer).unwrap(), &default_connected_state());
606		assert_eq!(
607			connected.peer_info(&second_peer).unwrap(),
608			&PeerInfo { version: CollationVersion::V2, state: PeerState::Collating(para_1) }
609		);
610		assert_eq!(
611			connected.peer_info(&third_peer).unwrap(),
612			&PeerInfo { version: CollationVersion::V2, state: PeerState::Collating(para_2) }
613		);
614
615		// Let's assert the current state of the ConnectedPeers.
616
617		assert_eq!(connected.per_para.len(), 2);
618		let per_para_1 = connected.per_para.get(&para_1).unwrap();
619		assert_eq!(per_para_1.per_peer_score.len(), 2);
620		assert_eq!(per_para_1.sorted_scores.len(), 2);
621
622		assert_eq!(connected.peer_score(&first_peer, &para_1).unwrap(), Score::new(10).unwrap());
623		assert_eq!(connected.peer_score(&second_peer, &para_1).unwrap(), Score::new(20).unwrap());
624		assert_eq!(connected.peer_score(&first_peer, &para_2).unwrap(), Score::new(10).unwrap());
625		assert_eq!(connected.peer_score(&third_peer, &para_2).unwrap(), Score::new(20).unwrap());
626		assert_eq!(connected.peer_score(&second_peer, &para_2), None);
627		assert_eq!(connected.peer_score(&new_peer, &para_1), None);
628		assert_eq!(connected.peer_score(&new_peer, &para_2), None);
629
630		// Trying accepting a peer (declared or not) when all other peers have higher reputations ->
631		// Rejection.
632		assert_eq!(
633			connected.try_accept(rep_query_fn, new_peer, default_connected_state()).await,
634			TryAcceptOutcome::Rejected
635		);
636		assert_eq!(
637			connected
638				.try_accept(
639					rep_query_fn,
640					new_peer,
641					PeerInfo { version: CollationVersion::V2, state: PeerState::Collating(para_1) }
642				)
643				.await,
644			TryAcceptOutcome::Rejected
645		);
646		assert_eq!(
647			connected
648				.try_accept(
649					rep_query_fn,
650					new_peer,
651					PeerInfo { version: CollationVersion::V2, state: PeerState::Collating(para_2) }
652				)
653				.await,
654			TryAcceptOutcome::Rejected
655		);
656		assert_eq!(
657			connected
658				.try_accept(
659					rep_query_fn,
660					new_peer,
661					PeerInfo {
662						version: CollationVersion::V2,
663						state: PeerState::Collating(ParaId::from(100))
664					}
665				)
666				.await,
667			TryAcceptOutcome::Rejected
668		);
669
670		// Trying to accept an undeclared peer when all other peers have lower reputations ->
671		// Replace the ones with the lowest rep. Only kick out one if the ones with the lowest rep
672		// are the same for all paras.
673		{
674			let mut connected = connected.clone();
675			let rep_query_fn = |peer_id, para_id| async move {
676				match (peer_id, para_id) {
677					(peer_id, para_id) if peer_id == new_peer => Score::new(30).unwrap(),
678					(_, _) => Score::default(),
679				}
680			};
681			assert_eq!(
682				connected.try_accept(rep_query_fn, new_peer, default_connected_state()).await,
683				TryAcceptOutcome::Replaced([first_peer].into_iter().collect())
684			);
685			assert_eq!(connected.peer_info(&new_peer).unwrap(), &default_connected_state());
686			assert_eq!(connected.peer_info(&first_peer), None);
687
688			assert_eq!(connected.peer_score(&first_peer, &para_1), None);
689			assert_eq!(
690				connected.peer_score(&second_peer, &para_1).unwrap(),
691				Score::new(20).unwrap()
692			);
693			assert_eq!(connected.peer_score(&first_peer, &para_2), None);
694			assert_eq!(
695				connected.peer_score(&third_peer, &para_2).unwrap(),
696				Score::new(20).unwrap()
697			);
698			assert_eq!(connected.peer_score(&third_peer, &para_1), None);
699			assert_eq!(connected.peer_score(&second_peer, &para_2), None);
700			assert_eq!(connected.peer_score(&new_peer, &para_1).unwrap(), Score::new(30).unwrap());
701			assert_eq!(connected.peer_score(&new_peer, &para_2).unwrap(), Score::new(30).unwrap());
702		}
703
704		// Trying to accept an undeclared peer when all other peers have lower reputations ->
705		// Replace the ones with the lowest rep.
706		{
707			let mut connected = ConnectedPeers::new(
708				(1..=2).map(ParaId::from).collect(),
709				NonZeroU16::new(50).unwrap(),
710				NonZeroU16::new(2).unwrap(),
711			);
712			let fourth_peer = PeerId::random();
713
714			let rep_query_fn = |peer_id, para_id| async move {
715				match (peer_id, para_id) {
716					(peer_id, para_id) if peer_id == first_peer => Score::new(10).unwrap(),
717					(peer_id, para_id) if peer_id == second_peer && para_id == para_1 =>
718						Score::new(20).unwrap(),
719					(peer_id, para_id) if peer_id == third_peer && para_id == para_2 =>
720						Score::new(20).unwrap(),
721					(peer_id, para_id) if peer_id == fourth_peer && para_id == para_2 =>
722						Score::new(15).unwrap(),
723					(peer_id, para_id) if peer_id == new_peer => Score::new(30).unwrap(),
724
725					(_, _) => Score::default(),
726				}
727			};
728
729			assert_eq!(
730				connected.try_accept(rep_query_fn, first_peer, default_connected_state()).await,
731				TryAcceptOutcome::Added
732			);
733			assert_eq!(
734				connected
735					.try_accept(
736						rep_query_fn,
737						second_peer,
738						PeerInfo {
739							version: CollationVersion::V2,
740							state: PeerState::Collating(para_1)
741						}
742					)
743					.await,
744				TryAcceptOutcome::Added
745			);
746			assert_eq!(
747				connected
748					.try_accept(
749						rep_query_fn,
750						third_peer,
751						PeerInfo {
752							version: CollationVersion::V2,
753							state: PeerState::Collating(para_2)
754						}
755					)
756					.await,
757				TryAcceptOutcome::Added
758			);
759			assert_eq!(
760				connected
761					.try_accept(
762						rep_query_fn,
763						fourth_peer,
764						PeerInfo {
765							version: CollationVersion::V2,
766							state: PeerState::Collating(para_2)
767						}
768					)
769					.await,
770				TryAcceptOutcome::Replaced(HashSet::new())
771			);
772
773			assert_eq!(connected.peer_info(&first_peer).unwrap(), &default_connected_state());
774
775			assert_eq!(
776				connected.try_accept(rep_query_fn, new_peer, default_connected_state()).await,
777				TryAcceptOutcome::Replaced([first_peer, fourth_peer].into_iter().collect())
778			);
779			assert_eq!(connected.peer_info(&first_peer), None);
780			assert_eq!(connected.peer_info(&fourth_peer), None);
781
782			assert_eq!(connected.peer_info(&new_peer).unwrap(), &default_connected_state());
783
784			assert_eq!(connected.peer_score(&first_peer, &para_1), None);
785			assert_eq!(
786				connected.peer_score(&second_peer, &para_1).unwrap(),
787				Score::new(20).unwrap()
788			);
789			assert_eq!(connected.peer_score(&third_peer, &para_1), None);
790			assert_eq!(connected.peer_score(&fourth_peer, &para_1), None);
791			assert_eq!(connected.peer_score(&new_peer, &para_1).unwrap(), Score::new(30).unwrap());
792
793			assert_eq!(connected.peer_score(&first_peer, &para_2), None);
794			assert_eq!(connected.peer_score(&second_peer, &para_2), None);
795			assert_eq!(
796				connected.peer_score(&third_peer, &para_2).unwrap(),
797				Score::new(20).unwrap()
798			);
799			assert_eq!(connected.peer_score(&fourth_peer, &para_2), None);
800			assert_eq!(connected.peer_score(&new_peer, &para_2).unwrap(), Score::new(30).unwrap());
801		}
802
803		// Trying to accept a declared peer when all other peers have lower reputations ->
804		// Replace the one with the lowest rep.
805		// Because new_peer is already declared for para_1, it will only kick out first_peer's slot
806		// on para_1.
807		{
808			let mut connected = connected.clone();
809			let rep_query_fn = |peer_id, para_id| async move {
810				match (peer_id, para_id) {
811					(peer_id, para_id) if peer_id == new_peer => Score::new(30).unwrap(),
812					(_, _) => Score::default(),
813				}
814			};
815			assert_eq!(
816				connected
817					.try_accept(
818						rep_query_fn,
819						new_peer,
820						PeerInfo {
821							version: CollationVersion::V2,
822							state: PeerState::Collating(para_1)
823						}
824					)
825					.await,
826				TryAcceptOutcome::Replaced(HashSet::new())
827			);
828			assert_eq!(
829				connected.peer_info(&new_peer).unwrap(),
830				&PeerInfo { version: CollationVersion::V2, state: PeerState::Collating(para_1) }
831			);
832			assert_eq!(connected.peer_info(&first_peer).unwrap(), &default_connected_state());
833			assert_eq!(connected.peer_score(&first_peer, &para_1), None);
834			assert_eq!(
835				connected.peer_score(&second_peer, &para_1).unwrap(),
836				Score::new(20).unwrap()
837			);
838			assert_eq!(
839				connected.peer_score(&first_peer, &para_2).unwrap(),
840				Score::new(10).unwrap()
841			);
842			assert_eq!(
843				connected.peer_score(&third_peer, &para_2).unwrap(),
844				Score::new(20).unwrap()
845			);
846			assert_eq!(connected.peer_score(&second_peer, &para_2), None);
847			assert_eq!(connected.peer_score(&new_peer, &para_1).unwrap(), Score::new(30).unwrap());
848			assert_eq!(connected.peer_score(&new_peer, &para_2), None);
849		}
850
851		// Trying to accept a declared/undeclared peer when only one peer has lower reputation ->
852		// Replace the one with the lowest rep.
853		for peer_info in [
854			default_connected_state(),
855			PeerInfo { version: CollationVersion::V2, state: PeerState::Collating(para_1) },
856		] {
857			let mut connected = ConnectedPeers::new(
858				(1..=2).map(ParaId::from).collect(),
859				NonZeroU16::new(50).unwrap(),
860				NonZeroU16::new(2).unwrap(),
861			);
862
863			let rep_query_fn = |peer_id, para_id| async move {
864				match (peer_id, para_id) {
865					(peer_id, para_id) if peer_id == first_peer => Score::new(10).unwrap(),
866					(peer_id, para_id) if peer_id == second_peer && para_id == para_1 =>
867						Score::new(5).unwrap(),
868					(peer_id, para_id) if peer_id == third_peer && para_id == para_2 =>
869						Score::new(5).unwrap(),
870					(peer_id, para_id) if peer_id == new_peer && para_id == para_1 =>
871						Score::new(8).unwrap(),
872
873					(_, _) => Score::default(),
874				}
875			};
876			assert_eq!(
877				connected.try_accept(rep_query_fn, first_peer, default_connected_state()).await,
878				TryAcceptOutcome::Added
879			);
880			assert_eq!(
881				connected
882					.try_accept(
883						rep_query_fn,
884						second_peer,
885						PeerInfo {
886							version: CollationVersion::V2,
887							state: PeerState::Collating(para_1)
888						}
889					)
890					.await,
891				TryAcceptOutcome::Added
892			);
893			assert_eq!(
894				connected
895					.try_accept(
896						rep_query_fn,
897						third_peer,
898						PeerInfo {
899							version: CollationVersion::V2,
900							state: PeerState::Collating(para_2)
901						}
902					)
903					.await,
904				TryAcceptOutcome::Added
905			);
906
907			assert_eq!(
908				connected.try_accept(rep_query_fn, new_peer, peer_info.clone()).await,
909				TryAcceptOutcome::Replaced([second_peer].into_iter().collect())
910			);
911			assert_eq!(connected.peer_info(&new_peer).unwrap(), &peer_info);
912
913			assert_eq!(
914				connected.peer_score(&first_peer, &para_1).unwrap(),
915				Score::new(10).unwrap()
916			);
917			assert_eq!(connected.peer_score(&second_peer, &para_1), None);
918			assert_eq!(
919				connected.peer_score(&first_peer, &para_2).unwrap(),
920				Score::new(10).unwrap()
921			);
922			assert_eq!(connected.peer_score(&third_peer, &para_2).unwrap(), Score::new(5).unwrap());
923			assert_eq!(connected.peer_score(&second_peer, &para_2), None);
924			assert_eq!(connected.peer_score(&new_peer, &para_1).unwrap(), Score::new(8).unwrap());
925			assert_eq!(connected.peer_score(&new_peer, &para_2), None);
926		}
927	}
928
929	#[tokio::test]
930	// Test the handling of a Declare message in different scenarios.
931	async fn test_declare() {
932		let mut connected = ConnectedPeers::new(
933			(0..5).map(ParaId::from).collect(),
934			NonZeroU16::new(50).unwrap(),
935			NonZeroU16::new(15).unwrap(),
936		);
937		let first_peer = PeerId::random();
938
939		assert_eq!(connected.peer_info(&first_peer), None);
940
941		// Try handling a Declare statement from a non-existant peer. Should be a no-op
942		assert_eq!(connected.declared(first_peer, ParaId::from(1)), DeclarationOutcome::Rejected);
943
944		assert_eq!(connected.peer_info(&first_peer), None);
945
946		assert_eq!(
947			connected
948				.try_accept(
949					|_, _| async { Score::default() },
950					first_peer,
951					default_connected_state()
952				)
953				.await,
954			TryAcceptOutcome::Added
955		);
956		assert_eq!(connected.peer_info(&first_peer).unwrap(), &default_connected_state());
957		for per_para in connected.per_para.values() {
958			assert!(per_para.contains(&first_peer));
959			assert_eq!(per_para.get_score(&first_peer).unwrap(), Score::default());
960		}
961
962		// Declare coming for a para that is not scheduled.
963		{
964			let mut connected = connected.clone();
965			assert_eq!(
966				connected.declared(first_peer, ParaId::from(100)),
967				DeclarationOutcome::Rejected
968			);
969
970			assert_eq!(connected.peer_info(&first_peer), None);
971
972			for (para_id, per_para) in connected.per_para.iter() {
973				assert!(!per_para.contains(&first_peer));
974				assert_eq!(per_para.get_score(&first_peer), None);
975			}
976		}
977
978		// Declare coming for a peer that is in undeclared state on multiple paras.
979		assert_eq!(connected.declared(first_peer, ParaId::from(1)), DeclarationOutcome::Accepted);
980
981		assert_eq!(
982			connected.peer_info(&first_peer).unwrap(),
983			&PeerInfo {
984				version: CollationVersion::V2,
985				state: PeerState::Collating(ParaId::from(1))
986			}
987		);
988
989		for (para_id, per_para) in connected.per_para.iter() {
990			if para_id == &ParaId::from(1) {
991				assert!(per_para.contains(&first_peer));
992				assert_eq!(per_para.get_score(&first_peer).unwrap(), Score::default());
993			} else {
994				assert!(!per_para.contains(&first_peer));
995				assert_eq!(per_para.get_score(&first_peer), None);
996			}
997		}
998
999		// Test a redundant declare message, for the same para.
1000		assert_eq!(connected.declared(first_peer, ParaId::from(1)), DeclarationOutcome::Accepted);
1001		assert_eq!(
1002			connected.peer_info(&first_peer).unwrap(),
1003			&PeerInfo {
1004				version: CollationVersion::V2,
1005				state: PeerState::Collating(ParaId::from(1))
1006			}
1007		);
1008
1009		// Peer already declared. New declare for a different, unscheduled para.
1010		{
1011			let mut connected = connected.clone();
1012			assert_eq!(
1013				connected.declared(first_peer, ParaId::from(100)),
1014				DeclarationOutcome::Rejected
1015			);
1016			assert_eq!(connected.peer_info(&first_peer), None);
1017
1018			for (para_id, per_para) in connected.per_para.iter() {
1019				assert!(!per_para.contains(&first_peer));
1020				assert_eq!(per_para.get_score(&first_peer), None);
1021			}
1022		}
1023
1024		// Peer already declared. New declare for a different para. The paraid switch is just like a
1025		// rejection, the peer manager then needs to retry accepting the connection on the new para.
1026		assert_eq!(
1027			connected.peer_info(&first_peer).unwrap(),
1028			&PeerInfo {
1029				version: CollationVersion::V2,
1030				state: PeerState::Collating(ParaId::from(1))
1031			}
1032		);
1033		assert_eq!(
1034			connected.declared(first_peer, ParaId::from(2)),
1035			DeclarationOutcome::Switched(ParaId::from(1))
1036		);
1037		assert_eq!(connected.peer_info(&first_peer), None);
1038
1039		for (para_id, per_para) in connected.per_para.iter() {
1040			assert!(!per_para.contains(&first_peer));
1041			assert_eq!(per_para.get_score(&first_peer), None);
1042		}
1043	}
1044
1045	#[tokio::test]
1046	// Test the removal of disconnected peers.
1047	async fn test_remove() {
1048		let mut connected = ConnectedPeers::new(
1049			(0..5).map(ParaId::from).collect(),
1050			NonZeroU16::new(50).unwrap(),
1051			NonZeroU16::new(15).unwrap(),
1052		);
1053		let first_peer = PeerId::random();
1054
1055		assert_eq!(connected.peer_info(&first_peer), None);
1056
1057		// Try removing a non-existant peer. Should be a no-op
1058		connected.remove(&first_peer);
1059
1060		assert_eq!(connected.peer_info(&first_peer), None);
1061
1062		for per_para in connected.per_para.values() {
1063			assert!(!per_para.contains(&first_peer));
1064			assert_eq!(per_para.get_score(&first_peer), None);
1065		}
1066
1067		// Add a peer in undeclared state and remove it. It will be removed from all paras.
1068		{
1069			assert_eq!(
1070				connected
1071					.try_accept(
1072						|_, _| async { Score::default() },
1073						first_peer,
1074						default_connected_state()
1075					)
1076					.await,
1077				TryAcceptOutcome::Added
1078			);
1079			assert_eq!(connected.peer_info(&first_peer).unwrap(), &default_connected_state());
1080			for per_para in connected.per_para.values() {
1081				assert!(per_para.contains(&first_peer));
1082				assert_eq!(per_para.get_score(&first_peer).unwrap(), Score::default());
1083			}
1084
1085			connected.remove(&first_peer);
1086
1087			assert_eq!(connected.peer_info(&first_peer), None);
1088
1089			for per_para in connected.per_para.values() {
1090				assert!(!per_para.contains(&first_peer));
1091				assert_eq!(per_para.get_score(&first_peer), None);
1092			}
1093		}
1094
1095		// Add a peer in declared state and remove it. It will be from the declared para.
1096		{
1097			assert_eq!(
1098				connected
1099					.try_accept(
1100						|_, _| async { Score::default() },
1101						first_peer,
1102						PeerInfo {
1103							version: CollationVersion::V2,
1104							state: PeerState::Collating(ParaId::from(1))
1105						}
1106					)
1107					.await,
1108				TryAcceptOutcome::Added
1109			);
1110			assert_eq!(
1111				connected.peer_info(&first_peer).unwrap(),
1112				&PeerInfo {
1113					version: CollationVersion::V2,
1114					state: PeerState::Collating(ParaId::from(1))
1115				}
1116			);
1117			for (para_id, per_para) in connected.per_para.iter() {
1118				if para_id == &ParaId::from(1) {
1119					assert!(per_para.contains(&first_peer));
1120					assert_eq!(per_para.get_score(&first_peer).unwrap(), Score::default());
1121				} else {
1122					assert!(!per_para.contains(&first_peer));
1123					assert_eq!(per_para.get_score(&first_peer), None);
1124				}
1125			}
1126
1127			connected.remove(&first_peer);
1128
1129			assert_eq!(connected.peer_info(&first_peer), None);
1130
1131			for per_para in connected.per_para.values() {
1132				assert!(!per_para.contains(&first_peer));
1133				assert_eq!(per_para.get_score(&first_peer), None);
1134			}
1135		}
1136	}
1137
1138	#[tokio::test]
1139	// Test different scenarios for reputation updates.
1140	async fn test_update_reputation() {
1141		let mut connected = ConnectedPeers::new(
1142			(0..6).map(ParaId::from).collect(),
1143			NonZeroU16::new(50).unwrap(),
1144			NonZeroU16::new(15).unwrap(),
1145		);
1146		let first_peer = PeerId::random();
1147
1148		assert_eq!(connected.peer_info(&first_peer), None);
1149		for per_para in connected.per_para.values() {
1150			assert!(!per_para.contains(&first_peer));
1151			assert_eq!(per_para.get_score(&first_peer), None);
1152		}
1153
1154		// Update for a non-existant peer. No-op.
1155		connected.update_reputation(ReputationUpdate {
1156			peer_id: first_peer,
1157			para_id: ParaId::from(1),
1158			value: Score::new(100).unwrap(),
1159			kind: ReputationUpdateKind::Slash,
1160		});
1161
1162		assert_eq!(connected.peer_info(&first_peer), None);
1163		for per_para in connected.per_para.values() {
1164			assert!(!per_para.contains(&first_peer));
1165			assert_eq!(per_para.get_score(&first_peer), None);
1166		}
1167
1168		// Peer exists, but this para is not scheduled.
1169		assert_eq!(
1170			connected
1171				.try_accept(
1172					|peer_id, _| async move {
1173						if peer_id == first_peer {
1174							Score::new(10).unwrap()
1175						} else {
1176							Score::default()
1177						}
1178					},
1179					first_peer,
1180					default_connected_state()
1181				)
1182				.await,
1183			TryAcceptOutcome::Added
1184		);
1185		assert_eq!(connected.peer_info(&first_peer).unwrap(), &default_connected_state());
1186		for per_para in connected.per_para.values() {
1187			assert!(per_para.contains(&first_peer));
1188			assert_eq!(per_para.get_score(&first_peer).unwrap(), Score::new(10).unwrap());
1189		}
1190
1191		connected.update_reputation(ReputationUpdate {
1192			peer_id: first_peer,
1193			para_id: ParaId::from(100),
1194			value: Score::new(100).unwrap(),
1195			kind: ReputationUpdateKind::Slash,
1196		});
1197		assert_eq!(connected.peer_info(&first_peer).unwrap(), &default_connected_state());
1198		for per_para in connected.per_para.values() {
1199			assert!(per_para.contains(&first_peer));
1200			assert_eq!(per_para.get_score(&first_peer).unwrap(), Score::new(10).unwrap());
1201		}
1202
1203		// Test a slash for only one para, even though peer has reputation for all.
1204		connected.update_reputation(ReputationUpdate {
1205			peer_id: first_peer,
1206			para_id: ParaId::from(1),
1207			value: Score::new(100).unwrap(),
1208			kind: ReputationUpdateKind::Slash,
1209		});
1210		assert_eq!(connected.peer_info(&first_peer).unwrap(), &default_connected_state());
1211		for (para_id, per_para) in connected.per_para.iter() {
1212			assert!(per_para.contains(&first_peer));
1213
1214			if para_id == &ParaId::from(1) {
1215				assert_eq!(per_para.get_score(&first_peer).unwrap(), Score::new(0).unwrap());
1216			} else {
1217				assert_eq!(per_para.get_score(&first_peer).unwrap(), Score::new(10).unwrap());
1218			}
1219		}
1220
1221		// Test a bump after the peer declared for one para. First test a bump for the wrong para.
1222		// Then a bump for the declared para.
1223		assert_eq!(connected.declared(first_peer, ParaId::from(5)), DeclarationOutcome::Accepted);
1224		assert_eq!(
1225			connected.peer_info(&first_peer).unwrap(),
1226			&PeerInfo {
1227				version: CollationVersion::V2,
1228				state: PeerState::Collating(ParaId::from(5))
1229			}
1230		);
1231
1232		connected.update_reputation(ReputationUpdate {
1233			peer_id: first_peer,
1234			para_id: ParaId::from(1),
1235			value: Score::new(100).unwrap(),
1236			kind: ReputationUpdateKind::Bump,
1237		});
1238		assert_eq!(
1239			connected.peer_info(&first_peer).unwrap(),
1240			&PeerInfo {
1241				version: CollationVersion::V2,
1242				state: PeerState::Collating(ParaId::from(5))
1243			}
1244		);
1245
1246		for (para_id, per_para) in connected.per_para.iter() {
1247			if para_id == &ParaId::from(5) {
1248				assert!(per_para.contains(&first_peer));
1249				assert_eq!(per_para.get_score(&first_peer).unwrap(), Score::new(10).unwrap());
1250			} else {
1251				assert!(!per_para.contains(&first_peer));
1252				assert_eq!(per_para.get_score(&first_peer), None);
1253			}
1254		}
1255
1256		connected.update_reputation(ReputationUpdate {
1257			peer_id: first_peer,
1258			para_id: ParaId::from(5),
1259			value: Score::new(50).unwrap(),
1260			kind: ReputationUpdateKind::Bump,
1261		});
1262		assert_eq!(
1263			connected.peer_info(&first_peer).unwrap(),
1264			&PeerInfo {
1265				version: CollationVersion::V2,
1266				state: PeerState::Collating(ParaId::from(5))
1267			}
1268		);
1269
1270		for (para_id, per_para) in connected.per_para.iter() {
1271			if para_id == &ParaId::from(5) {
1272				assert!(per_para.contains(&first_peer));
1273				assert_eq!(per_para.get_score(&first_peer).unwrap(), Score::new(60).unwrap());
1274			} else {
1275				assert!(!per_para.contains(&first_peer));
1276				assert_eq!(per_para.get_score(&first_peer), None);
1277			}
1278		}
1279	}
1280}