referrerpolicy=no-referrer-when-downgrade

polkadot_collator_protocol/validator_side_experimental/peer_manager/
db.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use crate::validator_side_experimental::{
18	common::Score,
19	peer_manager::{backend::Backend, ReputationUpdate, ReputationUpdateKind},
20};
21use async_trait::async_trait;
22use polkadot_node_network_protocol::PeerId;
23use polkadot_primitives::{BlockNumber, Hash, Id as ParaId};
24use std::{
25	collections::{btree_map, hash_map, BTreeMap, BTreeSet, HashMap},
26	time::{SystemTime, UNIX_EPOCH},
27};
28
29/// This is an in-memory temporary implementation for the DB, to be used only for prototyping and
30/// testing purposes.
31pub struct Db {
32	db: BTreeMap<ParaId, HashMap<PeerId, ScoreEntry>>,
33	last_finalized: Option<BlockNumber>,
34	stored_limit_per_para: u8,
35}
36
37impl Db {
38	/// Create a new instance of the in-memory DB.
39	///
40	/// `stored_limit_per_para` is the maximum number of reputations that can be stored per para.
41	pub async fn new(stored_limit_per_para: u8) -> Self {
42		Self { db: BTreeMap::new(), last_finalized: None, stored_limit_per_para }
43	}
44}
45
46type Timestamp = u128;
47
48#[derive(Clone, Debug)]
49struct ScoreEntry {
50	score: Score,
51	last_bumped: Timestamp,
52}
53
54#[async_trait]
55impl Backend for Db {
56	async fn processed_finalized_block_number(&self) -> Option<BlockNumber> {
57		self.last_finalized
58	}
59
60	async fn query(&self, peer_id: &PeerId, para_id: &ParaId) -> Option<Score> {
61		self.db.get(para_id).and_then(|per_para| per_para.get(peer_id).map(|e| e.score))
62	}
63
64	async fn slash(&mut self, peer_id: &PeerId, para_id: &ParaId, value: Score) {
65		if let btree_map::Entry::Occupied(mut per_para_entry) = self.db.entry(*para_id) {
66			if let hash_map::Entry::Occupied(mut e) = per_para_entry.get_mut().entry(*peer_id) {
67				let score = e.get_mut().score;
68				// Remove the entry if it goes to zero.
69				if score <= value {
70					e.remove();
71				} else {
72					e.get_mut().score.saturating_sub(value.into());
73				}
74			}
75
76			// If the per_para length went to 0, remove it completely
77			if per_para_entry.get().is_empty() {
78				per_para_entry.remove();
79			}
80		}
81	}
82
83	async fn prune_paras(&mut self, registered_paras: BTreeSet<ParaId>) {
84		self.db.retain(|para, _| registered_paras.contains(&para));
85	}
86
87	async fn process_bumps(
88		&mut self,
89		leaf_number: BlockNumber,
90		bumps: BTreeMap<ParaId, HashMap<PeerId, Score>>,
91		decay_value: Option<Score>,
92	) -> Vec<ReputationUpdate> {
93		if self.last_finalized.unwrap_or(0) >= leaf_number {
94			return vec![]
95		}
96
97		self.last_finalized = Some(leaf_number);
98		self.bump_reputations(bumps, decay_value)
99	}
100}
101
102impl Db {
103	fn bump_reputations(
104		&mut self,
105		bumps: BTreeMap<ParaId, HashMap<PeerId, Score>>,
106		maybe_decay_value: Option<Score>,
107	) -> Vec<ReputationUpdate> {
108		let mut reported_updates = vec![];
109		let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis();
110
111		for (para, bumps_per_para) in bumps {
112			reported_updates.reserve(bumps_per_para.len());
113
114			for (peer_id, bump) in bumps_per_para.iter() {
115				if u16::from(*bump) == 0 {
116					continue
117				}
118
119				self.db
120					.entry(para)
121					.or_default()
122					.entry(*peer_id)
123					.and_modify(|e| {
124						e.score.saturating_add(u16::from(*bump));
125						e.last_bumped = now;
126					})
127					.or_insert(ScoreEntry { score: *bump, last_bumped: now });
128
129				reported_updates.push(ReputationUpdate {
130					peer_id: *peer_id,
131					para_id: para,
132					value: *bump,
133					kind: ReputationUpdateKind::Bump,
134				});
135			}
136
137			if let btree_map::Entry::Occupied(mut per_para_entry) = self.db.entry(para) {
138				if let Some(decay_value) = maybe_decay_value {
139					let peers_to_slash = per_para_entry
140						.get()
141						.keys()
142						.filter(|peer_id| !bumps_per_para.contains_key(peer_id))
143						.copied()
144						.collect::<Vec<PeerId>>();
145
146					for peer_id in peers_to_slash {
147						if let hash_map::Entry::Occupied(mut e) =
148							per_para_entry.get_mut().entry(peer_id)
149						{
150							// Remove the entry if it goes to zero.
151							if e.get_mut().score <= decay_value {
152								let score = e.remove().score;
153								reported_updates.push(ReputationUpdate {
154									peer_id,
155									para_id: para,
156									value: score,
157									kind: ReputationUpdateKind::Slash,
158								});
159							} else {
160								e.get_mut().score.saturating_sub(decay_value.into());
161								reported_updates.push(ReputationUpdate {
162									peer_id,
163									para_id: para,
164									value: decay_value,
165									kind: ReputationUpdateKind::Slash,
166								});
167							}
168						}
169					}
170				}
171
172				let per_para_limit = self.stored_limit_per_para as usize;
173				if per_para_entry.get().is_empty() {
174					// If the per_para length went to 0, remove it completely
175					per_para_entry.remove();
176				} else if per_para_entry.get().len() > per_para_limit {
177					// We have exceeded the maximum capacity, in which case we need to prune
178					// the least recently bumped values
179					let diff = per_para_entry.get().len() - per_para_limit;
180					Self::prune_for_para(&para, &mut per_para_entry, diff, &mut reported_updates);
181				}
182			}
183		}
184
185		reported_updates
186	}
187
188	fn prune_for_para(
189		para_id: &ParaId,
190		per_para: &mut btree_map::OccupiedEntry<ParaId, HashMap<PeerId, ScoreEntry>>,
191		diff: usize,
192		reported_updates: &mut Vec<ReputationUpdate>,
193	) {
194		for _ in 0..diff {
195			let (peer_id_to_remove, score) = per_para
196				.get()
197				.iter()
198				.min_by_key(|(_peer, entry)| entry.last_bumped)
199				.map(|(peer, entry)| (*peer, entry.score))
200				.expect("We know there are enough reps over the limit");
201
202			per_para.get_mut().remove(&peer_id_to_remove);
203
204			reported_updates.push(ReputationUpdate {
205				peer_id: peer_id_to_remove,
206				para_id: *para_id,
207				value: score,
208				kind: ReputationUpdateKind::Slash,
209			});
210		}
211	}
212
213	#[cfg(test)]
214	fn len(&self) -> usize {
215		self.db.len()
216	}
217}
218
219#[cfg(test)]
220mod tests {
221	use std::time::Duration;
222
223	use super::*;
224
225	#[tokio::test]
226	// Test different types of reputation updates and their effects.
227	async fn test_reputation_updates() {
228		let mut db = Db::new(10).await;
229		assert_eq!(db.processed_finalized_block_number().await, None);
230		assert_eq!(db.len(), 0);
231
232		// Test empty update with no decay.
233		assert!(db.process_bumps(10, Default::default(), None).await.is_empty());
234		assert_eq!(db.processed_finalized_block_number().await, Some(10));
235		assert_eq!(db.len(), 0);
236
237		// Test a query on a non-existant entry.
238		assert_eq!(db.query(&PeerId::random(), &ParaId::from(1000)).await, None);
239
240		// Test empty update with decay.
241		assert!(db
242			.process_bumps(11, Default::default(), Some(Score::new(1).unwrap()))
243			.await
244			.is_empty());
245		assert_eq!(db.processed_finalized_block_number().await, Some(11));
246		assert_eq!(db.len(), 0);
247
248		// Test empty update with a leaf number smaller than the latest one.
249		assert!(db
250			.process_bumps(5, Default::default(), Some(Score::new(1).unwrap()))
251			.await
252			.is_empty());
253		assert_eq!(db.processed_finalized_block_number().await, Some(11));
254		assert_eq!(db.len(), 0);
255
256		// Test an update with zeroed score.
257		assert!(db
258			.process_bumps(
259				12,
260				[(
261					ParaId::from(100),
262					[(PeerId::random(), Score::new(0).unwrap())].into_iter().collect()
263				)]
264				.into_iter()
265				.collect(),
266				Some(Score::new(1).unwrap())
267			)
268			.await
269			.is_empty());
270		assert_eq!(db.processed_finalized_block_number().await, Some(12));
271		assert_eq!(db.len(), 0);
272
273		// Reuse the same 12 block height, it should not be taken into consideration.
274		let first_peer_id = PeerId::random();
275		let first_para_id = ParaId::from(100);
276		assert!(db
277			.process_bumps(
278				12,
279				[(first_para_id, [(first_peer_id, Score::new(10).unwrap())].into_iter().collect())]
280					.into_iter()
281					.collect(),
282				Some(Score::new(1).unwrap())
283			)
284			.await
285			.is_empty());
286		assert_eq!(db.processed_finalized_block_number().await, Some(12));
287		assert_eq!(db.len(), 0);
288		assert_eq!(db.query(&first_peer_id, &first_para_id).await, None);
289
290		// Test a non-zero update on an empty DB.
291		assert_eq!(
292			db.process_bumps(
293				13,
294				[(first_para_id, [(first_peer_id, Score::new(10).unwrap())].into_iter().collect())]
295					.into_iter()
296					.collect(),
297				Some(Score::new(1).unwrap())
298			)
299			.await,
300			vec![ReputationUpdate {
301				peer_id: first_peer_id,
302				para_id: first_para_id,
303				kind: ReputationUpdateKind::Bump,
304				value: Score::new(10).unwrap()
305			}]
306		);
307		assert_eq!(db.processed_finalized_block_number().await, Some(13));
308		assert_eq!(db.len(), 1);
309		assert_eq!(
310			db.query(&first_peer_id, &first_para_id).await.unwrap(),
311			Score::new(10).unwrap()
312		);
313		// Query a non-existant peer_id for this para.
314		assert_eq!(db.query(&PeerId::random(), &first_para_id).await, None);
315		// Query this peer's rep for a different para.
316		assert_eq!(db.query(&first_peer_id, &ParaId::from(200)).await, None);
317
318		// Test a subsequent update with a lower block height. Will be ignored.
319		assert!(db
320			.process_bumps(
321				10,
322				[(first_para_id, [(first_peer_id, Score::new(10).unwrap())].into_iter().collect())]
323					.into_iter()
324					.collect(),
325				Some(Score::new(1).unwrap())
326			)
327			.await
328			.is_empty());
329		assert_eq!(db.processed_finalized_block_number().await, Some(13));
330		assert_eq!(db.len(), 1);
331		assert_eq!(
332			db.query(&first_peer_id, &first_para_id).await.unwrap(),
333			Score::new(10).unwrap()
334		);
335
336		let second_para_id = ParaId::from(200);
337		let second_peer_id = PeerId::random();
338		// Test a subsequent update with no decay.
339		assert_eq!(
340			db.process_bumps(
341				14,
342				[
343					(
344						first_para_id,
345						[(second_peer_id, Score::new(10).unwrap())].into_iter().collect()
346					),
347					(
348						second_para_id,
349						[(first_peer_id, Score::new(5).unwrap())].into_iter().collect()
350					)
351				]
352				.into_iter()
353				.collect(),
354				None
355			)
356			.await,
357			vec![
358				ReputationUpdate {
359					peer_id: second_peer_id,
360					para_id: first_para_id,
361					kind: ReputationUpdateKind::Bump,
362					value: Score::new(10).unwrap()
363				},
364				ReputationUpdate {
365					peer_id: first_peer_id,
366					para_id: second_para_id,
367					kind: ReputationUpdateKind::Bump,
368					value: Score::new(5).unwrap()
369				}
370			]
371		);
372		assert_eq!(db.len(), 2);
373		assert_eq!(db.processed_finalized_block_number().await, Some(14));
374		assert_eq!(
375			db.query(&first_peer_id, &first_para_id).await.unwrap(),
376			Score::new(10).unwrap()
377		);
378		assert_eq!(
379			db.query(&second_peer_id, &first_para_id).await.unwrap(),
380			Score::new(10).unwrap()
381		);
382		assert_eq!(
383			db.query(&first_peer_id, &second_para_id).await.unwrap(),
384			Score::new(5).unwrap()
385		);
386
387		// Empty update with decay has no effect.
388		assert!(db
389			.process_bumps(15, Default::default(), Some(Score::new(1).unwrap()))
390			.await
391			.is_empty());
392		assert_eq!(db.processed_finalized_block_number().await, Some(15));
393		assert_eq!(db.len(), 2);
394		assert_eq!(
395			db.query(&first_peer_id, &first_para_id).await.unwrap(),
396			Score::new(10).unwrap()
397		);
398		assert_eq!(
399			db.query(&second_peer_id, &first_para_id).await.unwrap(),
400			Score::new(10).unwrap()
401		);
402		assert_eq!(
403			db.query(&first_peer_id, &second_para_id).await.unwrap(),
404			Score::new(5).unwrap()
405		);
406
407		// Test a subsequent update with decay.
408		assert_eq!(
409			db.process_bumps(
410				16,
411				[
412					(
413						first_para_id,
414						[(first_peer_id, Score::new(10).unwrap())].into_iter().collect()
415					),
416					(
417						second_para_id,
418						[(second_peer_id, Score::new(10).unwrap())].into_iter().collect()
419					),
420				]
421				.into_iter()
422				.collect(),
423				Some(Score::new(1).unwrap())
424			)
425			.await,
426			vec![
427				ReputationUpdate {
428					peer_id: first_peer_id,
429					para_id: first_para_id,
430					kind: ReputationUpdateKind::Bump,
431					value: Score::new(10).unwrap()
432				},
433				ReputationUpdate {
434					peer_id: second_peer_id,
435					para_id: first_para_id,
436					kind: ReputationUpdateKind::Slash,
437					value: Score::new(1).unwrap()
438				},
439				ReputationUpdate {
440					peer_id: second_peer_id,
441					para_id: second_para_id,
442					kind: ReputationUpdateKind::Bump,
443					value: Score::new(10).unwrap()
444				},
445				ReputationUpdate {
446					peer_id: first_peer_id,
447					para_id: second_para_id,
448					kind: ReputationUpdateKind::Slash,
449					value: Score::new(1).unwrap()
450				},
451			]
452		);
453		assert_eq!(db.processed_finalized_block_number().await, Some(16));
454		assert_eq!(db.len(), 2);
455		assert_eq!(
456			db.query(&first_peer_id, &first_para_id).await.unwrap(),
457			Score::new(20).unwrap()
458		);
459		assert_eq!(
460			db.query(&second_peer_id, &first_para_id).await.unwrap(),
461			Score::new(9).unwrap()
462		);
463		assert_eq!(
464			db.query(&first_peer_id, &second_para_id).await.unwrap(),
465			Score::new(4).unwrap()
466		);
467		assert_eq!(
468			db.query(&second_peer_id, &second_para_id).await.unwrap(),
469			Score::new(10).unwrap()
470		);
471
472		// Test a decay that makes the reputation go to 0 (The peer's entry will be removed)
473		assert_eq!(
474			db.process_bumps(
475				17,
476				[(
477					second_para_id,
478					[(second_peer_id, Score::new(10).unwrap())].into_iter().collect()
479				),]
480				.into_iter()
481				.collect(),
482				Some(Score::new(5).unwrap())
483			)
484			.await,
485			vec![
486				ReputationUpdate {
487					peer_id: second_peer_id,
488					para_id: second_para_id,
489					kind: ReputationUpdateKind::Bump,
490					value: Score::new(10).unwrap()
491				},
492				ReputationUpdate {
493					peer_id: first_peer_id,
494					para_id: second_para_id,
495					kind: ReputationUpdateKind::Slash,
496					value: Score::new(4).unwrap()
497				}
498			]
499		);
500		assert_eq!(db.processed_finalized_block_number().await, Some(17));
501		assert_eq!(db.len(), 2);
502		assert_eq!(
503			db.query(&first_peer_id, &first_para_id).await.unwrap(),
504			Score::new(20).unwrap()
505		);
506		assert_eq!(
507			db.query(&second_peer_id, &first_para_id).await.unwrap(),
508			Score::new(9).unwrap()
509		);
510		assert_eq!(db.query(&first_peer_id, &second_para_id).await, None);
511		assert_eq!(
512			db.query(&second_peer_id, &second_para_id).await.unwrap(),
513			Score::new(20).unwrap()
514		);
515
516		// Test an update which ends up pruning least recently used entries. The per-para limit is
517		// 10.
518		let mut db = Db::new(10).await;
519		let peer_ids = (0..10).map(|_| PeerId::random()).collect::<Vec<_>>();
520
521		// Add an equal reputation for all peers.
522		assert_eq!(
523			db.process_bumps(
524				1,
525				[(
526					first_para_id,
527					peer_ids.iter().map(|peer_id| (*peer_id, Score::new(10).unwrap())).collect()
528				)]
529				.into_iter()
530				.collect(),
531				None,
532			)
533			.await
534			.len(),
535			10
536		);
537		assert_eq!(db.len(), 1);
538
539		for peer_id in peer_ids.iter() {
540			assert_eq!(db.query(peer_id, &first_para_id).await.unwrap(), Score::new(10).unwrap());
541		}
542
543		// Now sleep for one second and then bump the reputations of all peers except for the one
544		// with 4th index. We need to sleep so that the update time of the 4th peer is older than
545		// the rest.
546		tokio::time::sleep(Duration::from_millis(100)).await;
547		assert_eq!(
548			db.process_bumps(
549				2,
550				[(
551					first_para_id,
552					peer_ids
553						.iter()
554						.enumerate()
555						.filter_map(
556							|(i, peer_id)| (i != 4).then_some((*peer_id, Score::new(10).unwrap()))
557						)
558						.collect()
559				)]
560				.into_iter()
561				.collect(),
562				Some(Score::new(5).unwrap()),
563			)
564			.await
565			.len(),
566			10
567		);
568
569		for (i, peer_id) in peer_ids.iter().enumerate() {
570			if i == 4 {
571				assert_eq!(
572					db.query(peer_id, &first_para_id).await.unwrap(),
573					Score::new(5).unwrap()
574				);
575			} else {
576				assert_eq!(
577					db.query(peer_id, &first_para_id).await.unwrap(),
578					Score::new(20).unwrap()
579				);
580			}
581		}
582
583		// Now add a 11th peer. It should evict the 4th peer.
584		let new_peer = PeerId::random();
585		tokio::time::sleep(Duration::from_millis(100)).await;
586		assert_eq!(
587			db.process_bumps(
588				3,
589				[(first_para_id, [(new_peer, Score::new(10).unwrap())].into_iter().collect())]
590					.into_iter()
591					.collect(),
592				Some(Score::new(5).unwrap()),
593			)
594			.await
595			.len(),
596			11
597		);
598		for (i, peer_id) in peer_ids.iter().enumerate() {
599			if i == 4 {
600				assert_eq!(db.query(peer_id, &first_para_id).await, None);
601			} else {
602				assert_eq!(
603					db.query(peer_id, &first_para_id).await.unwrap(),
604					Score::new(15).unwrap()
605				);
606			}
607		}
608		assert_eq!(db.query(&new_peer, &first_para_id).await.unwrap(), Score::new(10).unwrap());
609
610		// Now try adding yet another peer. The decay would naturally evict the new peer so no need
611		// to evict the least recently bumped.
612		let yet_another_peer = PeerId::random();
613		assert_eq!(
614			db.process_bumps(
615				4,
616				[(
617					first_para_id,
618					[(yet_another_peer, Score::new(10).unwrap())].into_iter().collect()
619				)]
620				.into_iter()
621				.collect(),
622				Some(Score::new(10).unwrap()),
623			)
624			.await
625			.len(),
626			11
627		);
628		for (i, peer_id) in peer_ids.iter().enumerate() {
629			if i == 4 {
630				assert_eq!(db.query(peer_id, &first_para_id).await, None);
631			} else {
632				assert_eq!(
633					db.query(peer_id, &first_para_id).await.unwrap(),
634					Score::new(5).unwrap()
635				);
636			}
637		}
638		assert_eq!(db.query(&new_peer, &first_para_id).await, None);
639		assert_eq!(
640			db.query(&yet_another_peer, &first_para_id).await,
641			Some(Score::new(10).unwrap())
642		);
643	}
644
645	#[tokio::test]
646	// Test reputation slashes.
647	async fn test_slash() {
648		let mut db = Db::new(10).await;
649
650		// Test slash on empty DB
651		let peer_id = PeerId::random();
652		db.slash(&peer_id, &ParaId::from(100), Score::new(50).unwrap()).await;
653		assert_eq!(db.query(&peer_id, &ParaId::from(100)).await, None);
654
655		// Test slash on non-existent para
656		let another_peer_id = PeerId::random();
657		assert_eq!(
658			db.process_bumps(
659				1,
660				[
661					(ParaId::from(100), [(peer_id, Score::new(10).unwrap())].into_iter().collect()),
662					(
663						ParaId::from(200),
664						[(another_peer_id, Score::new(12).unwrap())].into_iter().collect()
665					),
666					(ParaId::from(300), [(peer_id, Score::new(15).unwrap())].into_iter().collect())
667				]
668				.into_iter()
669				.collect(),
670				Some(Score::new(10).unwrap()),
671			)
672			.await
673			.len(),
674			3
675		);
676		assert_eq!(db.query(&peer_id, &ParaId::from(100)).await.unwrap(), Score::new(10).unwrap());
677		assert_eq!(
678			db.query(&another_peer_id, &ParaId::from(200)).await.unwrap(),
679			Score::new(12).unwrap()
680		);
681		assert_eq!(db.query(&peer_id, &ParaId::from(300)).await.unwrap(), Score::new(15).unwrap());
682
683		db.slash(&peer_id, &ParaId::from(200), Score::new(4).unwrap()).await;
684		assert_eq!(db.query(&peer_id, &ParaId::from(100)).await.unwrap(), Score::new(10).unwrap());
685		assert_eq!(
686			db.query(&another_peer_id, &ParaId::from(200)).await.unwrap(),
687			Score::new(12).unwrap()
688		);
689		assert_eq!(db.query(&peer_id, &ParaId::from(300)).await.unwrap(), Score::new(15).unwrap());
690
691		// Test regular slash
692		db.slash(&peer_id, &ParaId::from(100), Score::new(4).unwrap()).await;
693		assert_eq!(db.query(&peer_id, &ParaId::from(100)).await.unwrap(), Score::new(6).unwrap());
694
695		// Test slash which removes the entry altogether
696		db.slash(&peer_id, &ParaId::from(100), Score::new(8).unwrap()).await;
697		assert_eq!(db.query(&peer_id, &ParaId::from(100)).await, None);
698		assert_eq!(db.len(), 2);
699	}
700
701	#[tokio::test]
702	// Test para pruning.
703	async fn test_prune_paras() {
704		let mut db = Db::new(10).await;
705
706		db.prune_paras(BTreeSet::new()).await;
707		assert_eq!(db.len(), 0);
708
709		db.prune_paras([ParaId::from(100), ParaId::from(200)].into_iter().collect())
710			.await;
711		assert_eq!(db.len(), 0);
712
713		let peer_id = PeerId::random();
714		let another_peer_id = PeerId::random();
715
716		assert_eq!(
717			db.process_bumps(
718				1,
719				[
720					(ParaId::from(100), [(peer_id, Score::new(10).unwrap())].into_iter().collect()),
721					(
722						ParaId::from(200),
723						[(another_peer_id, Score::new(12).unwrap())].into_iter().collect()
724					),
725					(ParaId::from(300), [(peer_id, Score::new(15).unwrap())].into_iter().collect())
726				]
727				.into_iter()
728				.collect(),
729				Some(Score::new(10).unwrap()),
730			)
731			.await
732			.len(),
733			3
734		);
735		assert_eq!(db.len(), 3);
736
737		// Registered paras include the existing ones. Does nothing
738		db.prune_paras(
739			[ParaId::from(100), ParaId::from(200), ParaId::from(300), ParaId::from(400)]
740				.into_iter()
741				.collect(),
742		)
743		.await;
744		assert_eq!(db.len(), 3);
745
746		assert_eq!(db.query(&peer_id, &ParaId::from(100)).await.unwrap(), Score::new(10).unwrap());
747		assert_eq!(
748			db.query(&another_peer_id, &ParaId::from(200)).await.unwrap(),
749			Score::new(12).unwrap()
750		);
751		assert_eq!(db.query(&peer_id, &ParaId::from(300)).await.unwrap(), Score::new(15).unwrap());
752
753		// Prunes multiple paras.
754		db.prune_paras([ParaId::from(300)].into_iter().collect()).await;
755		assert_eq!(db.len(), 1);
756		assert_eq!(db.query(&peer_id, &ParaId::from(100)).await, None);
757		assert_eq!(db.query(&another_peer_id, &ParaId::from(200)).await, None);
758		assert_eq!(db.query(&peer_id, &ParaId::from(300)).await.unwrap(), Score::new(15).unwrap());
759
760		// Prunes all paras.
761		db.prune_paras(BTreeSet::new()).await;
762		assert_eq!(db.len(), 0);
763		assert_eq!(db.query(&peer_id, &ParaId::from(300)).await, None);
764	}
765}