referrerpolicy=no-referrer-when-downgrade

polkadot_statement_distribution/v2/
cluster.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Direct distribution of statements within a cluster,
18//! even those concerning candidates which are not yet backed.
19//!
20//! Members of a validation group assigned to a para at a given relay-parent
21//! always distribute statements directly to each other.
22//!
23//! The main way we limit the amount of candidates that have to be handled by
24//! the system is to limit the amount of `Seconded` messages that we allow
25//! each validator to issue at each relay-parent. Since the amount of relay-parents
26//! that we have to deal with at any time is itself bounded, this lets us bound
27//! the memory and work that we have here. Bounding `Seconded` statements is enough
28//! because they imply a bounded amount of `Valid` statements about the same candidate
29//! which may follow.
30//!
31//! The motivation for this piece of code is that the statements that each validator
32//! sees may differ. i.e. even though a validator is allowed to issue X `Seconded`
33//! statements at a relay-parent, they may in fact issue X*2 and issue one set to
34//! one partition of the backing group and one set to another. Of course, in practice
35//! these types of partitions will not exist, but in the worst case each validator in the
36//! group would see an entirely different set of X `Seconded` statements from some validator
37//! and each validator is in its own partition. After that partition resolves, we'd have to
38//! deal with up to `limit*group_size` `Seconded` statements from that validator. And then
39//! if every validator in the group does the same thing, we're dealing with something like
40//! `limit*group_size^2` `Seconded` statements in total.
41//!
42//! Given that both our group sizes and our limits per relay-parent are small, this is
43//! quite manageable, and the utility here lets us deal with it in only a few kilobytes
44//! of memory.
45//!
46//! It's also worth noting that any case where a validator issues more than the legal limit
47//! of `Seconded` statements at a relay parent is trivially slashable on-chain, which means
48//! the 'worst case' adversary that this code defends against is effectively lighting money
49//! on fire. Nevertheless, we handle the case here to ensure that the behavior of the
50//! system is well-defined even if an adversary is willing to be slashed.
51//!
52//! More concretely, this module exposes a [`ClusterTracker`] utility which allows us to determine
53//! whether to accept or reject messages from other validators in the same group as we
54//! are in, based on _the most charitable possible interpretation of our protocol rules_,
55//! and to keep track of what we have sent to other validators in the group and what we may
56//! continue to send them.
57
58use polkadot_primitives::{CandidateHash, CompactStatement, Hash, ValidatorIndex};
59
60use crate::LOG_TARGET;
61use std::collections::{HashMap, HashSet};
62
63// A piece of knowledge about a candidate
64#[derive(Hash, Clone, PartialEq, Eq)]
65enum Knowledge {
66	// General knowledge.
67	General(CandidateHash),
68	// Specific knowledge of a given statement (with its originator)
69	Specific(CompactStatement, ValidatorIndex),
70}
71
72// Knowledge paired with its source.
73#[derive(Hash, Clone, PartialEq, Eq)]
74enum TaggedKnowledge {
75	// Knowledge we have received from the validator on the p2p layer.
76	IncomingP2P(Knowledge),
77	// Knowledge we have sent to the validator on the p2p layer.
78	OutgoingP2P(Knowledge),
79	// Knowledge of candidates the validator has seconded.
80	// This is limited only to `Seconded` statements we have accepted
81	// _without prejudice_.
82	Seconded(CandidateHash),
83}
84
85/// Utility for keeping track of limits on direct statements within a group.
86///
87/// See module docs for more details.
88pub struct ClusterTracker {
89	validators: Vec<ValidatorIndex>,
90	seconding_limit: usize,
91	knowledge: HashMap<ValidatorIndex, HashSet<TaggedKnowledge>>,
92	// Statements known locally which haven't been sent to particular validators.
93	// maps target validator to (originator, statement) pairs.
94	pending: HashMap<ValidatorIndex, HashSet<(ValidatorIndex, CompactStatement)>>,
95}
96
97impl ClusterTracker {
98	/// Instantiate a new `ClusterTracker` tracker. Fails if `cluster_validators` is empty
99	pub fn new(cluster_validators: Vec<ValidatorIndex>, seconding_limit: usize) -> Option<Self> {
100		if cluster_validators.is_empty() {
101			return None
102		}
103		Some(ClusterTracker {
104			validators: cluster_validators,
105			seconding_limit,
106			knowledge: HashMap::new(),
107			pending: HashMap::new(),
108		})
109	}
110
111	/// Query whether we can receive some statement from the given validator.
112	///
113	/// This does no deduplication of `Valid` statements.
114	pub fn can_receive(
115		&self,
116		sender: ValidatorIndex,
117		originator: ValidatorIndex,
118		statement: CompactStatement,
119	) -> Result<Accept, RejectIncoming> {
120		if !self.is_in_group(sender) || !self.is_in_group(originator) {
121			return Err(RejectIncoming::NotInGroup)
122		}
123
124		if self.they_sent(sender, Knowledge::Specific(statement.clone(), originator)) {
125			return Err(RejectIncoming::Duplicate)
126		}
127
128		match statement {
129			CompactStatement::Seconded(candidate_hash) => {
130				// check whether the sender has not sent too many seconded statements for the
131				// originator. we know by the duplicate check above that this iterator doesn't
132				// include the statement itself.
133				let other_seconded_for_orig_from_remote = self
134					.knowledge
135					.get(&sender)
136					.into_iter()
137					.flat_map(|v_knowledge| v_knowledge.iter())
138					.filter(|k| match k {
139						TaggedKnowledge::IncomingP2P(Knowledge::Specific(
140							CompactStatement::Seconded(_),
141							orig,
142						)) if orig == &originator => true,
143						_ => false,
144					})
145					.count();
146
147				if other_seconded_for_orig_from_remote == self.seconding_limit {
148					return Err(RejectIncoming::ExcessiveSeconded)
149				}
150
151				// at this point, it doesn't seem like the remote has done anything wrong.
152				if self.seconded_already_or_within_limit(originator, candidate_hash) {
153					Ok(Accept::Ok)
154				} else {
155					Ok(Accept::WithPrejudice)
156				}
157			},
158			CompactStatement::Valid(candidate_hash) => {
159				if !self.knows_candidate(sender, candidate_hash) {
160					return Err(RejectIncoming::CandidateUnknown)
161				}
162
163				Ok(Accept::Ok)
164			},
165		}
166	}
167
168	/// Note that we issued a statement. This updates internal structures.
169	pub fn note_issued(&mut self, originator: ValidatorIndex, statement: CompactStatement) {
170		for cluster_member in &self.validators {
171			if !self.they_know_statement(*cluster_member, originator, statement.clone()) {
172				// add the statement to pending knowledge for all peers
173				// which don't know the statement.
174				self.pending
175					.entry(*cluster_member)
176					.or_default()
177					.insert((originator, statement.clone()));
178			}
179		}
180	}
181
182	/// Note that we accepted an incoming statement. This updates internal structures.
183	///
184	/// Should only be called after a successful `can_receive` call.
185	pub fn note_received(
186		&mut self,
187		sender: ValidatorIndex,
188		originator: ValidatorIndex,
189		statement: CompactStatement,
190	) {
191		for cluster_member in &self.validators {
192			if cluster_member == &sender {
193				if let Some(pending) = self.pending.get_mut(&sender) {
194					pending.remove(&(originator, statement.clone()));
195				}
196			} else if !self.they_know_statement(*cluster_member, originator, statement.clone()) {
197				// add the statement to pending knowledge for all peers
198				// which don't know the statement.
199				self.pending
200					.entry(*cluster_member)
201					.or_default()
202					.insert((originator, statement.clone()));
203			}
204		}
205
206		{
207			let sender_knowledge = self.knowledge.entry(sender).or_default();
208			sender_knowledge.insert(TaggedKnowledge::IncomingP2P(Knowledge::Specific(
209				statement.clone(),
210				originator,
211			)));
212
213			if let CompactStatement::Seconded(candidate_hash) = statement.clone() {
214				sender_knowledge
215					.insert(TaggedKnowledge::IncomingP2P(Knowledge::General(candidate_hash)));
216			}
217		}
218
219		if let CompactStatement::Seconded(candidate_hash) = statement {
220			// since we accept additional `Seconded` statements beyond the limits
221			// 'with prejudice', we must respect the limit here.
222			if self.seconded_already_or_within_limit(originator, candidate_hash) {
223				let originator_knowledge = self.knowledge.entry(originator).or_default();
224				originator_knowledge.insert(TaggedKnowledge::Seconded(candidate_hash));
225			}
226		}
227	}
228
229	/// Query whether we can send a statement to a given validator.
230	pub fn can_send(
231		&self,
232		target: ValidatorIndex,
233		originator: ValidatorIndex,
234		statement: CompactStatement,
235	) -> Result<(), RejectOutgoing> {
236		if !self.is_in_group(target) || !self.is_in_group(originator) {
237			return Err(RejectOutgoing::NotInGroup)
238		}
239
240		if self.they_know_statement(target, originator, statement.clone()) {
241			return Err(RejectOutgoing::Known)
242		}
243
244		match statement {
245			CompactStatement::Seconded(candidate_hash) => {
246				// we send the same `Seconded` statements to all our peers, and only the first `k`
247				// from each originator.
248				if !self.seconded_already_or_within_limit(originator, candidate_hash) {
249					return Err(RejectOutgoing::ExcessiveSeconded)
250				}
251
252				Ok(())
253			},
254			CompactStatement::Valid(candidate_hash) => {
255				if !self.knows_candidate(target, candidate_hash) {
256					return Err(RejectOutgoing::CandidateUnknown)
257				}
258
259				Ok(())
260			},
261		}
262	}
263
264	/// Note that we sent an outgoing statement to a peer in the group.
265	/// This must be preceded by a successful `can_send` call.
266	pub fn note_sent(
267		&mut self,
268		target: ValidatorIndex,
269		originator: ValidatorIndex,
270		statement: CompactStatement,
271	) {
272		{
273			let target_knowledge = self.knowledge.entry(target).or_default();
274			target_knowledge.insert(TaggedKnowledge::OutgoingP2P(Knowledge::Specific(
275				statement.clone(),
276				originator,
277			)));
278
279			if let CompactStatement::Seconded(candidate_hash) = statement.clone() {
280				target_knowledge
281					.insert(TaggedKnowledge::OutgoingP2P(Knowledge::General(candidate_hash)));
282			}
283		}
284
285		if let CompactStatement::Seconded(candidate_hash) = statement {
286			let originator_knowledge = self.knowledge.entry(originator).or_default();
287			originator_knowledge.insert(TaggedKnowledge::Seconded(candidate_hash));
288		}
289
290		if let Some(pending) = self.pending.get_mut(&target) {
291			pending.remove(&(originator, statement));
292		}
293	}
294
295	/// Get all targets as validator-indices. This doesn't attempt to filter
296	/// out the local validator index.
297	pub fn targets(&self) -> &[ValidatorIndex] {
298		&self.validators
299	}
300
301	/// Get all possible senders for the given originator.
302	/// Returns the empty slice in the case that the originator
303	/// is not part of the cluster.
304	// note: this API is future-proofing for a case where we may
305	// extend clusters beyond just the assigned group, for optimization
306	// purposes.
307	pub fn senders_for_originator(&self, originator: ValidatorIndex) -> &[ValidatorIndex] {
308		if self.validators.contains(&originator) {
309			&self.validators[..]
310		} else {
311			&[]
312		}
313	}
314
315	/// Whether a validator knows the candidate is `Seconded`.
316	pub fn knows_candidate(
317		&self,
318		validator: ValidatorIndex,
319		candidate_hash: CandidateHash,
320	) -> bool {
321		// we sent, they sent, or they signed and we received from someone else.
322
323		self.we_sent_seconded(validator, candidate_hash) ||
324			self.they_sent_seconded(validator, candidate_hash) ||
325			self.validator_seconded(validator, candidate_hash)
326	}
327
328	/// Whether a validator can request a candidate from us.
329	pub fn can_request(&self, target: ValidatorIndex, candidate_hash: CandidateHash) -> bool {
330		self.validators.contains(&target) &&
331			self.we_sent_seconded(target, candidate_hash) &&
332			!self.they_sent_seconded(target, candidate_hash)
333	}
334
335	/// Returns a Vec of pending statements to be sent to a particular validator
336	/// index. `Seconded` statements are sorted to the front of the vector.
337	///
338	/// Pending statements have the form (originator, compact statement).
339	pub fn pending_statements_for(
340		&self,
341		target: ValidatorIndex,
342	) -> Vec<(ValidatorIndex, CompactStatement)> {
343		let mut v = self
344			.pending
345			.get(&target)
346			.map(|x| x.iter().cloned().collect::<Vec<_>>())
347			.unwrap_or_default();
348
349		v.sort_by_key(|(_, s)| match s {
350			CompactStatement::Seconded(_) => 0u8,
351			CompactStatement::Valid(_) => 1u8,
352		});
353
354		v
355	}
356
357	// returns true if it's legal to accept a new `Seconded` message from this validator.
358	// This is either
359	//   1. because we've already accepted it.
360	//   2. because there's space for more seconding.
361	fn seconded_already_or_within_limit(
362		&self,
363		validator: ValidatorIndex,
364		candidate_hash: CandidateHash,
365	) -> bool {
366		let seconded_other_candidates = self
367			.knowledge
368			.get(&validator)
369			.into_iter()
370			.flat_map(|v_knowledge| v_knowledge.iter())
371			.filter(|k| match k {
372				TaggedKnowledge::Seconded(c) if c != &candidate_hash => true,
373				_ => false,
374			})
375			.count();
376
377		// This fulfills both properties by under-counting when the validator is at the limit
378		// but _has_ seconded the candidate already.
379		seconded_other_candidates < self.seconding_limit
380	}
381
382	fn they_know_statement(
383		&self,
384		validator: ValidatorIndex,
385		originator: ValidatorIndex,
386		statement: CompactStatement,
387	) -> bool {
388		let knowledge = Knowledge::Specific(statement, originator);
389		self.we_sent(validator, knowledge.clone()) || self.they_sent(validator, knowledge)
390	}
391
392	fn they_sent(&self, validator: ValidatorIndex, knowledge: Knowledge) -> bool {
393		self.knowledge
394			.get(&validator)
395			.map_or(false, |k| k.contains(&TaggedKnowledge::IncomingP2P(knowledge)))
396	}
397
398	fn we_sent(&self, validator: ValidatorIndex, knowledge: Knowledge) -> bool {
399		self.knowledge
400			.get(&validator)
401			.map_or(false, |k| k.contains(&TaggedKnowledge::OutgoingP2P(knowledge)))
402	}
403
404	fn we_sent_seconded(&self, validator: ValidatorIndex, candidate_hash: CandidateHash) -> bool {
405		self.we_sent(validator, Knowledge::General(candidate_hash))
406	}
407
408	fn they_sent_seconded(&self, validator: ValidatorIndex, candidate_hash: CandidateHash) -> bool {
409		self.they_sent(validator, Knowledge::General(candidate_hash))
410	}
411
412	fn validator_seconded(&self, validator: ValidatorIndex, candidate_hash: CandidateHash) -> bool {
413		self.knowledge
414			.get(&validator)
415			.map_or(false, |k| k.contains(&TaggedKnowledge::Seconded(candidate_hash)))
416	}
417
418	fn is_in_group(&self, validator: ValidatorIndex) -> bool {
419		self.validators.contains(&validator)
420	}
421
422	/// Dumps pending statement for this cluster.
423	///
424	/// Normally we should not have pending statements to validators in our cluster,
425	/// but if we do for all validators in our cluster, then we don't participate
426	/// in backing. Occasional pending statements are expected if two authorities
427	/// can't detect each other or after restart, where it takes a while to discover
428	/// the whole network.
429
430	pub fn warn_if_too_many_pending_statements(&self, parent_hash: Hash) {
431		if self.pending.iter().filter(|pending| !pending.1.is_empty()).count() >=
432			self.validators.len() &&
433			// No reason to warn if we are the only node in the cluster.
434			self.validators.len() > 1
435		{
436			gum::warn!(
437				target: LOG_TARGET,
438				pending_statements  = ?self.pending,
439				?parent_hash,
440				"Cluster has too many pending statements, something wrong with our connection to our group peers
441				Restart might be needed if validator gets 0 backing rewards for more than 3-4 consecutive sessions"
442			);
443		}
444	}
445}
446
447/// Incoming statement was accepted.
448#[derive(Debug, PartialEq)]
449pub enum Accept {
450	/// Neither the peer nor the originator have apparently exceeded limits.
451	/// Candidate or statement may already be known.
452	Ok,
453	/// Accept the message; the peer hasn't exceeded limits but the originator has.
454	WithPrejudice,
455}
456
457/// Incoming statement was rejected.
458#[derive(Debug, PartialEq)]
459pub enum RejectIncoming {
460	/// Peer sent excessive `Seconded` statements.
461	ExcessiveSeconded,
462	/// Sender or originator is not in the group.
463	NotInGroup,
464	/// Candidate is unknown to us. Only applies to `Valid` statements.
465	CandidateUnknown,
466	/// Statement is duplicate.
467	Duplicate,
468}
469
470/// Outgoing statement was rejected.
471#[derive(Debug, PartialEq)]
472pub enum RejectOutgoing {
473	/// Candidate was unknown. Only applies to `Valid` statements.
474	CandidateUnknown,
475	/// We attempted to send excessive `Seconded` statements.
476	/// indicates a bug on the local node's code.
477	ExcessiveSeconded,
478	/// The statement was already known to the peer.
479	Known,
480	/// Target or originator not in the group.
481	NotInGroup,
482}
483
484#[cfg(test)]
485mod tests {
486	use super::*;
487	use polkadot_primitives::Hash;
488
489	#[test]
490	fn rejects_incoming_outside_of_group() {
491		let group =
492			vec![ValidatorIndex(5), ValidatorIndex(200), ValidatorIndex(24), ValidatorIndex(146)];
493
494		let seconding_limit = 2;
495
496		let tracker = ClusterTracker::new(group.clone(), seconding_limit).expect("not empty");
497
498		assert_eq!(
499			tracker.can_receive(
500				ValidatorIndex(100),
501				ValidatorIndex(5),
502				CompactStatement::Seconded(CandidateHash(Hash::repeat_byte(1))),
503			),
504			Err(RejectIncoming::NotInGroup),
505		);
506
507		assert_eq!(
508			tracker.can_receive(
509				ValidatorIndex(5),
510				ValidatorIndex(100),
511				CompactStatement::Seconded(CandidateHash(Hash::repeat_byte(1))),
512			),
513			Err(RejectIncoming::NotInGroup),
514		);
515	}
516
517	#[test]
518	fn begrudgingly_accepts_too_many_seconded_from_multiple_peers() {
519		let group =
520			vec![ValidatorIndex(5), ValidatorIndex(200), ValidatorIndex(24), ValidatorIndex(146)];
521
522		let seconding_limit = 2;
523		let hash_a = CandidateHash(Hash::repeat_byte(1));
524		let hash_b = CandidateHash(Hash::repeat_byte(2));
525		let hash_c = CandidateHash(Hash::repeat_byte(3));
526
527		let mut tracker = ClusterTracker::new(group.clone(), seconding_limit).expect("not empty");
528
529		assert_eq!(
530			tracker.can_receive(
531				ValidatorIndex(5),
532				ValidatorIndex(5),
533				CompactStatement::Seconded(hash_a),
534			),
535			Ok(Accept::Ok),
536		);
537		tracker.note_received(
538			ValidatorIndex(5),
539			ValidatorIndex(5),
540			CompactStatement::Seconded(hash_a),
541		);
542
543		assert_eq!(
544			tracker.can_receive(
545				ValidatorIndex(5),
546				ValidatorIndex(5),
547				CompactStatement::Seconded(hash_b),
548			),
549			Ok(Accept::Ok),
550		);
551		tracker.note_received(
552			ValidatorIndex(5),
553			ValidatorIndex(5),
554			CompactStatement::Seconded(hash_b),
555		);
556
557		assert_eq!(
558			tracker.can_receive(
559				ValidatorIndex(5),
560				ValidatorIndex(5),
561				CompactStatement::Seconded(hash_c),
562			),
563			Err(RejectIncoming::ExcessiveSeconded),
564		);
565	}
566
567	#[test]
568	fn rejects_too_many_seconded_from_sender() {
569		let group =
570			vec![ValidatorIndex(5), ValidatorIndex(200), ValidatorIndex(24), ValidatorIndex(146)];
571
572		let seconding_limit = 2;
573		let hash_a = CandidateHash(Hash::repeat_byte(1));
574		let hash_b = CandidateHash(Hash::repeat_byte(2));
575		let hash_c = CandidateHash(Hash::repeat_byte(3));
576
577		let mut tracker = ClusterTracker::new(group.clone(), seconding_limit).expect("not empty");
578
579		assert_eq!(
580			tracker.can_receive(
581				ValidatorIndex(5),
582				ValidatorIndex(5),
583				CompactStatement::Seconded(hash_a),
584			),
585			Ok(Accept::Ok),
586		);
587		tracker.note_received(
588			ValidatorIndex(5),
589			ValidatorIndex(5),
590			CompactStatement::Seconded(hash_a),
591		);
592
593		assert_eq!(
594			tracker.can_receive(
595				ValidatorIndex(5),
596				ValidatorIndex(5),
597				CompactStatement::Seconded(hash_b),
598			),
599			Ok(Accept::Ok),
600		);
601		tracker.note_received(
602			ValidatorIndex(5),
603			ValidatorIndex(5),
604			CompactStatement::Seconded(hash_b),
605		);
606
607		assert_eq!(
608			tracker.can_receive(
609				ValidatorIndex(200),
610				ValidatorIndex(5),
611				CompactStatement::Seconded(hash_c),
612			),
613			Ok(Accept::WithPrejudice),
614		);
615	}
616
617	#[test]
618	fn rejects_duplicates() {
619		let group =
620			vec![ValidatorIndex(5), ValidatorIndex(200), ValidatorIndex(24), ValidatorIndex(146)];
621
622		let seconding_limit = 2;
623		let hash_a = CandidateHash(Hash::repeat_byte(1));
624
625		let mut tracker = ClusterTracker::new(group, seconding_limit).expect("not empty");
626
627		tracker.note_received(
628			ValidatorIndex(5),
629			ValidatorIndex(5),
630			CompactStatement::Seconded(hash_a),
631		);
632
633		tracker.note_received(
634			ValidatorIndex(5),
635			ValidatorIndex(200),
636			CompactStatement::Valid(hash_a),
637		);
638
639		assert_eq!(
640			tracker.can_receive(
641				ValidatorIndex(5),
642				ValidatorIndex(5),
643				CompactStatement::Seconded(hash_a),
644			),
645			Err(RejectIncoming::Duplicate),
646		);
647
648		assert_eq!(
649			tracker.can_receive(
650				ValidatorIndex(5),
651				ValidatorIndex(200),
652				CompactStatement::Valid(hash_a),
653			),
654			Err(RejectIncoming::Duplicate),
655		);
656	}
657
658	#[test]
659	fn rejects_incoming_valid_without_seconded() {
660		let group =
661			vec![ValidatorIndex(5), ValidatorIndex(200), ValidatorIndex(24), ValidatorIndex(146)];
662
663		let seconding_limit = 2;
664
665		let tracker = ClusterTracker::new(group, seconding_limit).expect("not empty");
666
667		let hash_a = CandidateHash(Hash::repeat_byte(1));
668
669		assert_eq!(
670			tracker.can_receive(
671				ValidatorIndex(5),
672				ValidatorIndex(5),
673				CompactStatement::Valid(hash_a),
674			),
675			Err(RejectIncoming::CandidateUnknown),
676		);
677	}
678
679	#[test]
680	fn accepts_incoming_valid_after_receiving_seconded() {
681		let group =
682			vec![ValidatorIndex(5), ValidatorIndex(200), ValidatorIndex(24), ValidatorIndex(146)];
683
684		let seconding_limit = 2;
685
686		let mut tracker = ClusterTracker::new(group.clone(), seconding_limit).expect("not empty");
687		let hash_a = CandidateHash(Hash::repeat_byte(1));
688
689		tracker.note_received(
690			ValidatorIndex(5),
691			ValidatorIndex(200),
692			CompactStatement::Seconded(hash_a),
693		);
694
695		assert_eq!(
696			tracker.can_receive(
697				ValidatorIndex(5),
698				ValidatorIndex(5),
699				CompactStatement::Valid(hash_a),
700			),
701			Ok(Accept::Ok)
702		);
703	}
704
705	#[test]
706	fn accepts_incoming_valid_after_outgoing_seconded() {
707		let group =
708			vec![ValidatorIndex(5), ValidatorIndex(200), ValidatorIndex(24), ValidatorIndex(146)];
709
710		let seconding_limit = 2;
711
712		let mut tracker = ClusterTracker::new(group.clone(), seconding_limit).expect("not empty");
713		let hash_a = CandidateHash(Hash::repeat_byte(1));
714
715		tracker.note_sent(
716			ValidatorIndex(5),
717			ValidatorIndex(200),
718			CompactStatement::Seconded(hash_a),
719		);
720
721		assert_eq!(
722			tracker.can_receive(
723				ValidatorIndex(5),
724				ValidatorIndex(5),
725				CompactStatement::Valid(hash_a),
726			),
727			Ok(Accept::Ok)
728		);
729	}
730
731	#[test]
732	fn cannot_send_too_many_seconded_even_to_multiple_peers() {
733		let group =
734			vec![ValidatorIndex(5), ValidatorIndex(200), ValidatorIndex(24), ValidatorIndex(146)];
735
736		let seconding_limit = 2;
737
738		let mut tracker = ClusterTracker::new(group.clone(), seconding_limit).expect("not empty");
739		let hash_a = CandidateHash(Hash::repeat_byte(1));
740		let hash_b = CandidateHash(Hash::repeat_byte(2));
741		let hash_c = CandidateHash(Hash::repeat_byte(3));
742
743		tracker.note_sent(
744			ValidatorIndex(200),
745			ValidatorIndex(5),
746			CompactStatement::Seconded(hash_a),
747		);
748
749		tracker.note_sent(
750			ValidatorIndex(200),
751			ValidatorIndex(5),
752			CompactStatement::Seconded(hash_b),
753		);
754
755		assert_eq!(
756			tracker.can_send(
757				ValidatorIndex(200),
758				ValidatorIndex(5),
759				CompactStatement::Seconded(hash_c),
760			),
761			Err(RejectOutgoing::ExcessiveSeconded),
762		);
763
764		assert_eq!(
765			tracker.can_send(
766				ValidatorIndex(24),
767				ValidatorIndex(5),
768				CompactStatement::Seconded(hash_c),
769			),
770			Err(RejectOutgoing::ExcessiveSeconded),
771		);
772	}
773
774	#[test]
775	fn cannot_send_duplicate() {
776		let group =
777			vec![ValidatorIndex(5), ValidatorIndex(200), ValidatorIndex(24), ValidatorIndex(146)];
778
779		let seconding_limit = 2;
780
781		let mut tracker = ClusterTracker::new(group.clone(), seconding_limit).expect("not empty");
782		let hash_a = CandidateHash(Hash::repeat_byte(1));
783
784		tracker.note_sent(
785			ValidatorIndex(200),
786			ValidatorIndex(5),
787			CompactStatement::Seconded(hash_a),
788		);
789
790		assert_eq!(
791			tracker.can_send(
792				ValidatorIndex(200),
793				ValidatorIndex(5),
794				CompactStatement::Seconded(hash_a),
795			),
796			Err(RejectOutgoing::Known),
797		);
798	}
799
800	#[test]
801	fn cannot_send_what_was_received() {
802		let group =
803			vec![ValidatorIndex(5), ValidatorIndex(200), ValidatorIndex(24), ValidatorIndex(146)];
804
805		let seconding_limit = 2;
806
807		let mut tracker = ClusterTracker::new(group.clone(), seconding_limit).expect("not empty");
808		let hash_a = CandidateHash(Hash::repeat_byte(1));
809
810		tracker.note_received(
811			ValidatorIndex(200),
812			ValidatorIndex(5),
813			CompactStatement::Seconded(hash_a),
814		);
815
816		assert_eq!(
817			tracker.can_send(
818				ValidatorIndex(200),
819				ValidatorIndex(5),
820				CompactStatement::Seconded(hash_a),
821			),
822			Err(RejectOutgoing::Known),
823		);
824	}
825
826	// Ensure statements received with prejudice don't prevent sending later.
827	#[test]
828	fn can_send_statements_received_with_prejudice() {
829		let group =
830			vec![ValidatorIndex(5), ValidatorIndex(200), ValidatorIndex(24), ValidatorIndex(146)];
831
832		let seconding_limit = 1;
833
834		let mut tracker = ClusterTracker::new(group.clone(), seconding_limit).expect("not empty");
835		let hash_a = CandidateHash(Hash::repeat_byte(1));
836		let hash_b = CandidateHash(Hash::repeat_byte(2));
837
838		assert_eq!(
839			tracker.can_receive(
840				ValidatorIndex(200),
841				ValidatorIndex(5),
842				CompactStatement::Seconded(hash_a),
843			),
844			Ok(Accept::Ok),
845		);
846
847		tracker.note_received(
848			ValidatorIndex(200),
849			ValidatorIndex(5),
850			CompactStatement::Seconded(hash_a),
851		);
852
853		assert_eq!(
854			tracker.can_receive(
855				ValidatorIndex(24),
856				ValidatorIndex(5),
857				CompactStatement::Seconded(hash_b),
858			),
859			Ok(Accept::WithPrejudice),
860		);
861
862		tracker.note_received(
863			ValidatorIndex(24),
864			ValidatorIndex(5),
865			CompactStatement::Seconded(hash_b),
866		);
867
868		assert_eq!(
869			tracker.can_send(
870				ValidatorIndex(24),
871				ValidatorIndex(5),
872				CompactStatement::Seconded(hash_a),
873			),
874			Ok(()),
875		);
876	}
877
878	// Test that the `pending_statements` are set whenever we receive a fresh statement.
879	//
880	// Also test that pending statements are sorted, with `Seconded` statements in the front.
881	#[test]
882	fn pending_statements_set_when_receiving_fresh_statements() {
883		let group =
884			vec![ValidatorIndex(5), ValidatorIndex(200), ValidatorIndex(24), ValidatorIndex(146)];
885
886		let seconding_limit = 1;
887
888		let mut tracker = ClusterTracker::new(group.clone(), seconding_limit).expect("not empty");
889		let hash_a = CandidateHash(Hash::repeat_byte(1));
890		let hash_b = CandidateHash(Hash::repeat_byte(2));
891
892		// Receive a 'Seconded' statement for candidate A.
893		{
894			assert_eq!(
895				tracker.can_receive(
896					ValidatorIndex(200),
897					ValidatorIndex(5),
898					CompactStatement::Seconded(hash_a),
899				),
900				Ok(Accept::Ok),
901			);
902			tracker.note_received(
903				ValidatorIndex(200),
904				ValidatorIndex(5),
905				CompactStatement::Seconded(hash_a),
906			);
907
908			assert_eq!(
909				tracker.pending_statements_for(ValidatorIndex(5)),
910				vec![(ValidatorIndex(5), CompactStatement::Seconded(hash_a))]
911			);
912			assert_eq!(tracker.pending_statements_for(ValidatorIndex(200)), vec![]);
913			assert_eq!(
914				tracker.pending_statements_for(ValidatorIndex(24)),
915				vec![(ValidatorIndex(5), CompactStatement::Seconded(hash_a))]
916			);
917			assert_eq!(
918				tracker.pending_statements_for(ValidatorIndex(146)),
919				vec![(ValidatorIndex(5), CompactStatement::Seconded(hash_a))]
920			);
921		}
922
923		// Receive a 'Valid' statement for candidate A.
924		{
925			// First, send a `Seconded` statement for the candidate.
926			assert_eq!(
927				tracker.can_send(
928					ValidatorIndex(24),
929					ValidatorIndex(200),
930					CompactStatement::Seconded(hash_a)
931				),
932				Ok(())
933			);
934			tracker.note_sent(
935				ValidatorIndex(24),
936				ValidatorIndex(200),
937				CompactStatement::Seconded(hash_a),
938			);
939
940			// We have to see that the candidate is known by the sender, e.g. we sent them
941			// 'Seconded' above.
942			assert_eq!(
943				tracker.can_receive(
944					ValidatorIndex(24),
945					ValidatorIndex(200),
946					CompactStatement::Valid(hash_a),
947				),
948				Ok(Accept::Ok),
949			);
950			tracker.note_received(
951				ValidatorIndex(24),
952				ValidatorIndex(200),
953				CompactStatement::Valid(hash_a),
954			);
955
956			assert_eq!(
957				tracker.pending_statements_for(ValidatorIndex(5)),
958				vec![
959					(ValidatorIndex(5), CompactStatement::Seconded(hash_a)),
960					(ValidatorIndex(200), CompactStatement::Valid(hash_a))
961				]
962			);
963			assert_eq!(
964				tracker.pending_statements_for(ValidatorIndex(200)),
965				vec![(ValidatorIndex(200), CompactStatement::Valid(hash_a))]
966			);
967			assert_eq!(
968				tracker.pending_statements_for(ValidatorIndex(24)),
969				vec![(ValidatorIndex(5), CompactStatement::Seconded(hash_a))]
970			);
971			assert_eq!(
972				tracker.pending_statements_for(ValidatorIndex(146)),
973				vec![
974					(ValidatorIndex(5), CompactStatement::Seconded(hash_a)),
975					(ValidatorIndex(200), CompactStatement::Valid(hash_a))
976				]
977			);
978		}
979
980		// Receive a 'Seconded' statement for candidate B.
981		{
982			assert_eq!(
983				tracker.can_receive(
984					ValidatorIndex(5),
985					ValidatorIndex(146),
986					CompactStatement::Seconded(hash_b),
987				),
988				Ok(Accept::Ok),
989			);
990			tracker.note_received(
991				ValidatorIndex(5),
992				ValidatorIndex(146),
993				CompactStatement::Seconded(hash_b),
994			);
995
996			assert_eq!(
997				tracker.pending_statements_for(ValidatorIndex(5)),
998				vec![
999					(ValidatorIndex(5), CompactStatement::Seconded(hash_a)),
1000					(ValidatorIndex(200), CompactStatement::Valid(hash_a))
1001				]
1002			);
1003			assert_eq!(
1004				tracker.pending_statements_for(ValidatorIndex(200)),
1005				vec![
1006					(ValidatorIndex(146), CompactStatement::Seconded(hash_b)),
1007					(ValidatorIndex(200), CompactStatement::Valid(hash_a)),
1008				]
1009			);
1010			{
1011				let mut pending_statements = tracker.pending_statements_for(ValidatorIndex(24));
1012				pending_statements.sort();
1013				assert_eq!(
1014					pending_statements,
1015					vec![
1016						(ValidatorIndex(5), CompactStatement::Seconded(hash_a)),
1017						(ValidatorIndex(146), CompactStatement::Seconded(hash_b))
1018					],
1019				);
1020			}
1021			{
1022				let mut pending_statements = tracker.pending_statements_for(ValidatorIndex(146));
1023				pending_statements.sort();
1024				assert_eq!(
1025					pending_statements,
1026					vec![
1027						(ValidatorIndex(5), CompactStatement::Seconded(hash_a)),
1028						(ValidatorIndex(146), CompactStatement::Seconded(hash_b)),
1029						(ValidatorIndex(200), CompactStatement::Valid(hash_a)),
1030					]
1031				);
1032			}
1033		}
1034	}
1035
1036	// Test that the `pending_statements` are updated when we send or receive statements from others
1037	// in the cluster.
1038	#[test]
1039	fn pending_statements_updated_when_sending_statements() {
1040		let group =
1041			vec![ValidatorIndex(5), ValidatorIndex(200), ValidatorIndex(24), ValidatorIndex(146)];
1042
1043		let seconding_limit = 1;
1044
1045		let mut tracker = ClusterTracker::new(group.clone(), seconding_limit).expect("not empty");
1046		let hash_a = CandidateHash(Hash::repeat_byte(1));
1047		let hash_b = CandidateHash(Hash::repeat_byte(2));
1048
1049		// Receive a 'Seconded' statement for candidate A.
1050		{
1051			assert_eq!(
1052				tracker.can_receive(
1053					ValidatorIndex(200),
1054					ValidatorIndex(5),
1055					CompactStatement::Seconded(hash_a),
1056				),
1057				Ok(Accept::Ok),
1058			);
1059			tracker.note_received(
1060				ValidatorIndex(200),
1061				ValidatorIndex(5),
1062				CompactStatement::Seconded(hash_a),
1063			);
1064
1065			// Pending statements should be updated.
1066			assert_eq!(
1067				tracker.pending_statements_for(ValidatorIndex(5)),
1068				vec![(ValidatorIndex(5), CompactStatement::Seconded(hash_a))]
1069			);
1070			assert_eq!(tracker.pending_statements_for(ValidatorIndex(200)), vec![]);
1071			assert_eq!(
1072				tracker.pending_statements_for(ValidatorIndex(24)),
1073				vec![(ValidatorIndex(5), CompactStatement::Seconded(hash_a))]
1074			);
1075			assert_eq!(
1076				tracker.pending_statements_for(ValidatorIndex(146)),
1077				vec![(ValidatorIndex(5), CompactStatement::Seconded(hash_a))]
1078			);
1079		}
1080
1081		// Receive a 'Valid' statement for candidate B.
1082		{
1083			// First, send a `Seconded` statement for the candidate.
1084			assert_eq!(
1085				tracker.can_send(
1086					ValidatorIndex(24),
1087					ValidatorIndex(200),
1088					CompactStatement::Seconded(hash_b)
1089				),
1090				Ok(())
1091			);
1092			tracker.note_sent(
1093				ValidatorIndex(24),
1094				ValidatorIndex(200),
1095				CompactStatement::Seconded(hash_b),
1096			);
1097
1098			// We have to see the candidate is known by the sender, e.g. we sent them 'Seconded'.
1099			assert_eq!(
1100				tracker.can_receive(
1101					ValidatorIndex(24),
1102					ValidatorIndex(200),
1103					CompactStatement::Valid(hash_b),
1104				),
1105				Ok(Accept::Ok),
1106			);
1107			tracker.note_received(
1108				ValidatorIndex(24),
1109				ValidatorIndex(200),
1110				CompactStatement::Valid(hash_b),
1111			);
1112
1113			// Pending statements should be updated.
1114			assert_eq!(
1115				tracker.pending_statements_for(ValidatorIndex(5)),
1116				vec![
1117					(ValidatorIndex(5), CompactStatement::Seconded(hash_a)),
1118					(ValidatorIndex(200), CompactStatement::Valid(hash_b))
1119				]
1120			);
1121			assert_eq!(
1122				tracker.pending_statements_for(ValidatorIndex(200)),
1123				vec![(ValidatorIndex(200), CompactStatement::Valid(hash_b))]
1124			);
1125			assert_eq!(
1126				tracker.pending_statements_for(ValidatorIndex(24)),
1127				vec![(ValidatorIndex(5), CompactStatement::Seconded(hash_a))]
1128			);
1129			assert_eq!(
1130				tracker.pending_statements_for(ValidatorIndex(146)),
1131				vec![
1132					(ValidatorIndex(5), CompactStatement::Seconded(hash_a)),
1133					(ValidatorIndex(200), CompactStatement::Valid(hash_b))
1134				]
1135			);
1136		}
1137
1138		// Send a 'Seconded' statement.
1139		{
1140			assert_eq!(
1141				tracker.can_send(
1142					ValidatorIndex(5),
1143					ValidatorIndex(5),
1144					CompactStatement::Seconded(hash_a)
1145				),
1146				Ok(())
1147			);
1148			tracker.note_sent(
1149				ValidatorIndex(5),
1150				ValidatorIndex(5),
1151				CompactStatement::Seconded(hash_a),
1152			);
1153
1154			// Pending statements should be updated.
1155			assert_eq!(
1156				tracker.pending_statements_for(ValidatorIndex(5)),
1157				vec![(ValidatorIndex(200), CompactStatement::Valid(hash_b))]
1158			);
1159			assert_eq!(
1160				tracker.pending_statements_for(ValidatorIndex(200)),
1161				vec![(ValidatorIndex(200), CompactStatement::Valid(hash_b))]
1162			);
1163			assert_eq!(
1164				tracker.pending_statements_for(ValidatorIndex(24)),
1165				vec![(ValidatorIndex(5), CompactStatement::Seconded(hash_a))]
1166			);
1167			assert_eq!(
1168				tracker.pending_statements_for(ValidatorIndex(146)),
1169				vec![
1170					(ValidatorIndex(5), CompactStatement::Seconded(hash_a)),
1171					(ValidatorIndex(200), CompactStatement::Valid(hash_b))
1172				]
1173			);
1174		}
1175
1176		// Send a 'Valid' statement.
1177		{
1178			// First, send a `Seconded` statement for the candidate.
1179			assert_eq!(
1180				tracker.can_send(
1181					ValidatorIndex(5),
1182					ValidatorIndex(200),
1183					CompactStatement::Seconded(hash_b)
1184				),
1185				Ok(())
1186			);
1187			tracker.note_sent(
1188				ValidatorIndex(5),
1189				ValidatorIndex(200),
1190				CompactStatement::Seconded(hash_b),
1191			);
1192
1193			// We have to see that the candidate is known by the sender, e.g. we sent them
1194			// 'Seconded' above.
1195			assert_eq!(
1196				tracker.can_send(
1197					ValidatorIndex(5),
1198					ValidatorIndex(200),
1199					CompactStatement::Valid(hash_b)
1200				),
1201				Ok(())
1202			);
1203			tracker.note_sent(
1204				ValidatorIndex(5),
1205				ValidatorIndex(200),
1206				CompactStatement::Valid(hash_b),
1207			);
1208
1209			// Pending statements should be updated.
1210			assert_eq!(tracker.pending_statements_for(ValidatorIndex(5)), vec![]);
1211			assert_eq!(
1212				tracker.pending_statements_for(ValidatorIndex(200)),
1213				vec![(ValidatorIndex(200), CompactStatement::Valid(hash_b))]
1214			);
1215			assert_eq!(
1216				tracker.pending_statements_for(ValidatorIndex(24)),
1217				vec![(ValidatorIndex(5), CompactStatement::Seconded(hash_a))]
1218			);
1219			assert_eq!(
1220				tracker.pending_statements_for(ValidatorIndex(146)),
1221				vec![
1222					(ValidatorIndex(5), CompactStatement::Seconded(hash_a)),
1223					(ValidatorIndex(200), CompactStatement::Valid(hash_b))
1224				]
1225			);
1226		}
1227	}
1228}