referrerpolicy=no-referrer-when-downgrade

polkadot_statement_distribution/v2/
grid.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Utilities for handling distribution of backed candidates along the grid (outside the group to
18//! the rest of the network).
19//!
20//! The grid uses the gossip topology defined in [`polkadot_node_network_protocol::grid_topology`].
21//! It defines how messages and statements are forwarded between validators.
22//!
23//! # Protocol
24//!
25//! - Once the candidate is backed, produce a 'backed candidate packet' `(CommittedCandidateReceipt,
26//!   Statements)`.
27//! - Members of a backing group produce an announcement of a fully-backed candidate (aka "full
28//!   manifest") when they are finished.
29//!   - `BackedCandidateManifest`
30//!   - Manifests are sent along the grid topology to peers who have the relay-parent in their
31//!     implicit view.
32//!   - Only sent by 1st-hop nodes after downloading the backed candidate packet.
33//!     - The grid topology is a 2-dimensional grid that provides either a 1 or 2-hop path from any
34//!       originator to any recipient - 1st-hop nodes are those which share either a row or column
35//!       with the originator, and 2nd-hop nodes are those which share a column or row with that
36//!       1st-hop node.
37//!     - Note that for the purposes of statement distribution, we actually take the union of the
38//!       routing paths from each validator in a group to the local node to determine the sending
39//!       and receiving paths.
40//!   - Ignored when received out-of-topology
41//! - On every local view change, members of the backing group rebroadcast the manifest for all
42//!   candidates under every new relay-parent across the grid.
43//! - Nodes should send a `BackedCandidateAcknowledgement(CandidateHash, StatementFilter)`
44//!   notification to any peer which has sent a manifest, and the candidate has been acquired by
45//!   other means.
46//! - Request/response for the candidate + votes.
47//!   - Ignore if they are inconsistent with the manifest.
48//!   - A malicious backing group is capable of producing an unbounded number of backed candidates.
49//!     - We request the candidate only if the candidate is a hypothetical member in any of our
50//!       fragment chains, and:
51//! - All members of the group attempt to circulate all statements (in compact form) from the rest
52//!   of the group on candidates that have already been backed.
53//!   - They do this via the grid topology.
54//!   - They add the statements to their backed candidate packet for future requestors, and also:
55//!     - send the statement to any peer, which:
56//!       - we advertised the backed candidate to (sent manifest), and:
57//!         - has previously & successfully requested the backed candidate packet, or:
58//!         - which has sent a `BackedCandidateAcknowledgement`
59//!   - 1st-hop nodes do the same thing
60
61use polkadot_node_network_protocol::{grid_topology::SessionGridTopology, v3::StatementFilter};
62use polkadot_primitives::{CandidateHash, CompactStatement, GroupIndex, Hash, ValidatorIndex};
63
64use std::collections::{
65	hash_map::{Entry, HashMap},
66	HashSet,
67};
68
69use bitvec::{order::Lsb0, slice::BitSlice};
70
71use super::{groups::Groups, LOG_TARGET};
72
73/// Our local view of a subset of the grid topology organized around a specific validator
74/// group.
75///
76/// This tracks which authorities we expect to communicate with concerning
77/// candidates from the group. This includes both the authorities we are
78/// expected to send to as well as the authorities we expect to receive from.
79///
80/// In the case that this group is the group that we are locally assigned to,
81/// the 'receiving' side will be empty.
82#[derive(Debug, PartialEq)]
83struct GroupSubView {
84	// validators we are 'sending' to.
85	sending: HashSet<ValidatorIndex>,
86	// validators we are 'receiving' from.
87	receiving: HashSet<ValidatorIndex>,
88}
89
90/// Our local view of the topology for a session, as it pertains to backed
91/// candidate distribution.
92#[derive(Debug)]
93pub struct SessionTopologyView {
94	group_views: HashMap<GroupIndex, GroupSubView>,
95}
96
97impl SessionTopologyView {
98	/// Returns an iterator over all validator indices from the group who are allowed to
99	/// send us manifests of the given kind.
100	pub fn iter_sending_for_group(
101		&self,
102		group: GroupIndex,
103		kind: ManifestKind,
104	) -> impl Iterator<Item = ValidatorIndex> + '_ {
105		self.group_views.get(&group).into_iter().flat_map(move |sub| match kind {
106			ManifestKind::Full => sub.receiving.iter().cloned(),
107			ManifestKind::Acknowledgement => sub.sending.iter().cloned(),
108		})
109	}
110}
111
112/// Build a view of the topology for the session.
113/// For groups that we are part of: we receive from nobody and send to our X/Y peers.
114/// For groups that we are not part of: we receive from any validator in the group we share a slice
115/// with    and send to the corresponding X/Y slice in the other dimension.
116///    For any validators we don't share a slice with, we receive from the nodes
117///    which share a slice with them.
118pub fn build_session_topology<'a>(
119	groups: impl IntoIterator<Item = &'a Vec<ValidatorIndex>>,
120	topology: &SessionGridTopology,
121	our_index: Option<ValidatorIndex>,
122) -> SessionTopologyView {
123	let mut view = SessionTopologyView { group_views: HashMap::new() };
124
125	let our_index = match our_index {
126		None => return view,
127		Some(i) => i,
128	};
129
130	let our_neighbors = match topology.compute_grid_neighbors_for(our_index) {
131		None => {
132			gum::warn!(target: LOG_TARGET, ?our_index, "our index unrecognized in topology?");
133
134			return view
135		},
136		Some(n) => n,
137	};
138
139	for (i, group) in groups.into_iter().enumerate() {
140		let mut sub_view = GroupSubView { sending: HashSet::new(), receiving: HashSet::new() };
141
142		if group.contains(&our_index) {
143			sub_view.sending.extend(our_neighbors.validator_indices_x.iter().cloned());
144			sub_view.sending.extend(our_neighbors.validator_indices_y.iter().cloned());
145
146			// remove all other same-group validators from this set, they are
147			// in the cluster.
148			// TODO [now]: test this behavior.
149			for v in group {
150				sub_view.sending.remove(v);
151			}
152		} else {
153			for &group_val in group {
154				// If the validator shares a slice with us, we expect to
155				// receive from them and send to our neighbors in the other
156				// dimension.
157
158				if our_neighbors.validator_indices_x.contains(&group_val) {
159					sub_view.receiving.insert(group_val);
160					sub_view.sending.extend(
161						our_neighbors
162							.validator_indices_y
163							.iter()
164							.filter(|v| !group.contains(v))
165							.cloned(),
166					);
167
168					continue
169				}
170
171				if our_neighbors.validator_indices_y.contains(&group_val) {
172					sub_view.receiving.insert(group_val);
173					sub_view.sending.extend(
174						our_neighbors
175							.validator_indices_x
176							.iter()
177							.filter(|v| !group.contains(v))
178							.cloned(),
179					);
180
181					continue
182				}
183
184				// If they don't share a slice with us, we don't send to anybody
185				// but receive from any peers sharing a dimension with both of us
186				let their_neighbors = match topology.compute_grid_neighbors_for(group_val) {
187					None => {
188						gum::warn!(
189							target: LOG_TARGET,
190							index = ?group_val,
191							"validator index unrecognized in topology?"
192						);
193
194						continue
195					},
196					Some(n) => n,
197				};
198
199				// their X, our Y
200				for potential_link in &their_neighbors.validator_indices_x {
201					if our_neighbors.validator_indices_y.contains(potential_link) {
202						sub_view.receiving.insert(*potential_link);
203						break // one max
204					}
205				}
206
207				// their Y, our X
208				for potential_link in &their_neighbors.validator_indices_y {
209					if our_neighbors.validator_indices_x.contains(potential_link) {
210						sub_view.receiving.insert(*potential_link);
211						break // one max
212					}
213				}
214			}
215		}
216
217		view.group_views.insert(GroupIndex(i as _), sub_view);
218	}
219
220	view
221}
222
223/// The kind of backed candidate manifest we should send to a remote peer.
224#[derive(Debug, Clone, Copy, PartialEq)]
225pub enum ManifestKind {
226	/// Full manifests contain information about the candidate and should be sent
227	/// to peers which aren't guaranteed to have the candidate already.
228	Full,
229	/// Acknowledgement manifests omit information which is implicit in the candidate
230	/// itself, and should be sent to peers which are guaranteed to have the candidate
231	/// already.
232	Acknowledgement,
233}
234
235/// A tracker of knowledge from authorities within the grid for a particular
236/// relay-parent.
237#[derive(Default)]
238pub struct GridTracker {
239	received: HashMap<ValidatorIndex, ReceivedManifests>,
240	confirmed_backed: HashMap<CandidateHash, KnownBackedCandidate>,
241	unconfirmed: HashMap<CandidateHash, Vec<(ValidatorIndex, GroupIndex)>>,
242	pending_manifests: HashMap<ValidatorIndex, HashMap<CandidateHash, ManifestKind>>,
243
244	// maps target to (originator, statement) pairs.
245	pending_statements: HashMap<ValidatorIndex, HashSet<(ValidatorIndex, CompactStatement)>>,
246}
247
248impl GridTracker {
249	/// Attempt to import a manifest advertised by a remote peer.
250	///
251	/// This checks whether the peer is allowed to send us manifests
252	/// about this group at this relay-parent. This also does sanity
253	/// checks on the format of the manifest and the amount of votes
254	/// it contains. It assumes that the votes from disabled validators
255	/// are already filtered out.
256	/// It has effects on the stored state only when successful.
257	///
258	/// This returns a `bool` on success, which if true indicates that an acknowledgement is
259	/// to be sent in response to the received manifest. This only occurs when the
260	/// candidate is already known to be confirmed and backed.
261	pub fn import_manifest(
262		&mut self,
263		session_topology: &SessionTopologyView,
264		groups: &Groups,
265		candidate_hash: CandidateHash,
266		seconding_limit: usize,
267		manifest: ManifestSummary,
268		kind: ManifestKind,
269		sender: ValidatorIndex,
270	) -> Result<bool, ManifestImportError> {
271		let claimed_group_index = manifest.claimed_group_index;
272
273		let group_topology = match session_topology.group_views.get(&manifest.claimed_group_index) {
274			None => return Err(ManifestImportError::Disallowed),
275			Some(g) => g,
276		};
277
278		let receiving_from = group_topology.receiving.contains(&sender);
279		let sending_to = group_topology.sending.contains(&sender);
280		let manifest_allowed = match kind {
281			// Peers can send manifests _if_:
282			//   * They are in the receiving set for the group AND the manifest is full OR
283			//   * They are in the sending set for the group AND we have sent them a manifest AND
284			//     the received manifest is partial.
285			ManifestKind::Full => receiving_from,
286			ManifestKind::Acknowledgement =>
287				sending_to &&
288					self.confirmed_backed
289						.get(&candidate_hash)
290						.map_or(false, |c| c.has_sent_manifest_to(sender)),
291		};
292
293		if !manifest_allowed {
294			return Err(ManifestImportError::Disallowed)
295		}
296
297		let (group_size, backing_threshold) =
298			match groups.get_size_and_backing_threshold(manifest.claimed_group_index) {
299				Some(x) => x,
300				None => return Err(ManifestImportError::Malformed),
301			};
302
303		let remote_knowledge = manifest.statement_knowledge.clone();
304
305		if !remote_knowledge.has_len(group_size) {
306			return Err(ManifestImportError::Malformed)
307		}
308
309		if !remote_knowledge.has_seconded() {
310			return Err(ManifestImportError::Malformed)
311		}
312
313		// ensure votes are sufficient to back.
314		let votes = remote_knowledge.backing_validators();
315
316		if votes < backing_threshold {
317			return Err(ManifestImportError::Insufficient)
318		}
319
320		self.received.entry(sender).or_default().import_received(
321			group_size,
322			seconding_limit,
323			candidate_hash,
324			manifest,
325		)?;
326
327		let mut ack = false;
328		if let Some(confirmed) = self.confirmed_backed.get_mut(&candidate_hash) {
329			if receiving_from && !confirmed.has_sent_manifest_to(sender) {
330				// due to checks above, the manifest `kind` is guaranteed to be `Full`
331				self.pending_manifests
332					.entry(sender)
333					.or_default()
334					.insert(candidate_hash, ManifestKind::Acknowledgement);
335
336				ack = true;
337			}
338
339			// add all statements in local_knowledge & !remote_knowledge
340			// to `pending_statements` for this validator.
341			confirmed.manifest_received_from(sender, remote_knowledge);
342			if let Some(pending_statements) = confirmed.pending_statements(sender) {
343				self.pending_statements.entry(sender).or_default().extend(
344					decompose_statement_filter(
345						groups,
346						claimed_group_index,
347						candidate_hash,
348						&pending_statements,
349					),
350				);
351			}
352		} else {
353			// `received` prevents conflicting manifests so this is max 1 per validator.
354			self.unconfirmed
355				.entry(candidate_hash)
356				.or_default()
357				.push((sender, claimed_group_index))
358		}
359
360		Ok(ack)
361	}
362
363	/// Add a new backed candidate to the tracker. This yields
364	/// a list of validators which we should either advertise to
365	/// or signal that we know the candidate, along with the corresponding
366	/// type of manifest we should send.
367	pub fn add_backed_candidate(
368		&mut self,
369		session_topology: &SessionTopologyView,
370		candidate_hash: CandidateHash,
371		group_index: GroupIndex,
372		local_knowledge: StatementFilter,
373	) -> Vec<(ValidatorIndex, ManifestKind)> {
374		let c = match self.confirmed_backed.entry(candidate_hash) {
375			Entry::Occupied(_) => return Vec::new(),
376			Entry::Vacant(v) => v.insert(KnownBackedCandidate {
377				group_index,
378				mutual_knowledge: HashMap::new(),
379				local_knowledge,
380			}),
381		};
382
383		// Populate the entry with previously unconfirmed manifests.
384		for (v, claimed_group_index) in
385			self.unconfirmed.remove(&candidate_hash).into_iter().flatten()
386		{
387			if claimed_group_index != group_index {
388				// This is misbehavior, but is handled more comprehensively elsewhere
389				continue
390			}
391
392			let statement_filter = self
393				.received
394				.get(&v)
395				.and_then(|r| r.candidate_statement_filter(&candidate_hash))
396				.expect("unconfirmed is only populated by validators who have sent manifest; qed");
397
398			// No need to send direct statements, because our local knowledge is `None`
399			c.manifest_received_from(v, statement_filter);
400		}
401
402		let group_topology = match session_topology.group_views.get(&group_index) {
403			None => return Vec::new(),
404			Some(g) => g,
405		};
406
407		// advertise onwards and accept received advertisements
408
409		let sending_group_manifests =
410			group_topology.sending.iter().map(|v| (*v, ManifestKind::Full));
411
412		let receiving_group_manifests = group_topology.receiving.iter().filter_map(|v| {
413			if c.has_received_manifest_from(*v) {
414				Some((*v, ManifestKind::Acknowledgement))
415			} else {
416				None
417			}
418		});
419
420		// Note that order is important: if a validator is part of both the sending
421		// and receiving groups, we may overwrite a `Full` manifest with a `Acknowledgement`
422		// one.
423		for (v, manifest_mode) in sending_group_manifests.chain(receiving_group_manifests) {
424			gum::trace!(
425				target: LOG_TARGET,
426				validator_index = ?v,
427				?manifest_mode,
428				"Preparing to send manifest/acknowledgement"
429			);
430
431			self.pending_manifests
432				.entry(v)
433				.or_default()
434				.insert(candidate_hash, manifest_mode);
435		}
436
437		self.pending_manifests
438			.iter()
439			.filter_map(|(v, x)| x.get(&candidate_hash).map(|k| (*v, *k)))
440			.collect()
441	}
442
443	/// Note that a backed candidate has been advertised to a
444	/// given validator.
445	pub fn manifest_sent_to(
446		&mut self,
447		groups: &Groups,
448		validator_index: ValidatorIndex,
449		candidate_hash: CandidateHash,
450		local_knowledge: StatementFilter,
451	) {
452		if let Some(c) = self.confirmed_backed.get_mut(&candidate_hash) {
453			c.manifest_sent_to(validator_index, local_knowledge);
454
455			if let Some(pending_statements) = c.pending_statements(validator_index) {
456				self.pending_statements.entry(validator_index).or_default().extend(
457					decompose_statement_filter(
458						groups,
459						c.group_index,
460						candidate_hash,
461						&pending_statements,
462					),
463				);
464			}
465		}
466
467		if let Some(x) = self.pending_manifests.get_mut(&validator_index) {
468			x.remove(&candidate_hash);
469		}
470	}
471
472	/// Returns a vector of all candidates pending manifests for the specific validator, and
473	/// the type of manifest we should send.
474	pub fn pending_manifests_for(
475		&self,
476		validator_index: ValidatorIndex,
477	) -> Vec<(CandidateHash, ManifestKind)> {
478		self.pending_manifests
479			.get(&validator_index)
480			.into_iter()
481			.flat_map(|pending| pending.iter().map(|(c, m)| (*c, *m)))
482			.collect()
483	}
484
485	/// Returns a statement filter indicating statements that a given peer
486	/// is awaiting concerning the given candidate, constrained by the statements
487	/// we have ourselves.
488	pub fn pending_statements_for(
489		&self,
490		validator_index: ValidatorIndex,
491		candidate_hash: CandidateHash,
492	) -> Option<StatementFilter> {
493		self.confirmed_backed
494			.get(&candidate_hash)
495			.and_then(|x| x.pending_statements(validator_index))
496	}
497
498	/// Returns a vector of all pending statements to the validator, sorted with
499	/// `Seconded` statements at the front.
500	///
501	/// Statements are in the form `(Originator, Statement Kind)`.
502	pub fn all_pending_statements_for(
503		&self,
504		validator_index: ValidatorIndex,
505	) -> Vec<(ValidatorIndex, CompactStatement)> {
506		let mut v = self
507			.pending_statements
508			.get(&validator_index)
509			.map(|x| x.iter().cloned().collect())
510			.unwrap_or(Vec::new());
511
512		v.sort_by_key(|(_, s)| match s {
513			CompactStatement::Seconded(_) => 0u32,
514			CompactStatement::Valid(_) => 1u32,
515		});
516
517		v
518	}
519
520	/// Whether a validator can request a manifest from us.
521	pub fn can_request(&self, validator: ValidatorIndex, candidate_hash: CandidateHash) -> bool {
522		self.confirmed_backed.get(&candidate_hash).map_or(false, |c| {
523			c.has_sent_manifest_to(validator) && !c.has_received_manifest_from(validator)
524		})
525	}
526
527	/// Determine the validators which can send a statement to us by direct broadcast.
528	///
529	/// Returns a list of tuples representing each potential sender(ValidatorIndex)
530	/// and if the sender should already know about the statement, because we just
531	/// sent it to it.
532	pub fn direct_statement_providers(
533		&self,
534		groups: &Groups,
535		originator: ValidatorIndex,
536		statement: &CompactStatement,
537	) -> Vec<(ValidatorIndex, bool)> {
538		let (g, c_h, kind, in_group) =
539			match extract_statement_and_group_info(groups, originator, statement) {
540				None => return Vec::new(),
541				Some(x) => x,
542			};
543
544		self.confirmed_backed
545			.get(&c_h)
546			.map(|k| k.direct_statement_senders(g, in_group, kind))
547			.unwrap_or_default()
548	}
549
550	/// Determine the validators which can receive a statement from us by direct
551	/// broadcast.
552	pub fn direct_statement_targets(
553		&self,
554		groups: &Groups,
555		originator: ValidatorIndex,
556		statement: &CompactStatement,
557	) -> Vec<ValidatorIndex> {
558		let (g, c_h, kind, in_group) =
559			match extract_statement_and_group_info(groups, originator, statement) {
560				None => return Vec::new(),
561				Some(x) => x,
562			};
563
564		self.confirmed_backed
565			.get(&c_h)
566			.map(|k| k.direct_statement_recipients(g, in_group, kind))
567			.unwrap_or_default()
568	}
569
570	/// Note that we have learned about a statement. This will update
571	/// `pending_statements_for` for any relevant validators if actually
572	/// fresh.
573	pub fn learned_fresh_statement(
574		&mut self,
575		groups: &Groups,
576		session_topology: &SessionTopologyView,
577		originator: ValidatorIndex,
578		statement: &CompactStatement,
579	) {
580		let (g, c_h, kind, in_group) =
581			match extract_statement_and_group_info(groups, originator, statement) {
582				None => return,
583				Some(x) => x,
584			};
585
586		let known = match self.confirmed_backed.get_mut(&c_h) {
587			None => return,
588			Some(x) => x,
589		};
590
591		if !known.note_fresh_statement(in_group, kind) {
592			return
593		}
594
595		// Add to `pending_statements` for all validators we communicate with
596		// who have exchanged manifests.
597		let all_group_validators = session_topology
598			.group_views
599			.get(&g)
600			.into_iter()
601			.flat_map(|g| g.sending.iter().chain(g.receiving.iter()));
602
603		for v in all_group_validators {
604			if known.is_pending_statement(*v, in_group, kind) {
605				self.pending_statements
606					.entry(*v)
607					.or_default()
608					.insert((originator, statement.clone()));
609			}
610		}
611	}
612
613	/// Note that a direct statement about a given candidate was sent to or
614	/// received from the given validator.
615	pub fn sent_or_received_direct_statement(
616		&mut self,
617		groups: &Groups,
618		originator: ValidatorIndex,
619		counterparty: ValidatorIndex,
620		statement: &CompactStatement,
621		received: bool,
622	) {
623		if let Some((_, c_h, kind, in_group)) =
624			extract_statement_and_group_info(groups, originator, statement)
625		{
626			if let Some(known) = self.confirmed_backed.get_mut(&c_h) {
627				known.sent_or_received_direct_statement(counterparty, in_group, kind, received);
628
629				if let Some(pending) = self.pending_statements.get_mut(&counterparty) {
630					pending.remove(&(originator, statement.clone()));
631				}
632			}
633		}
634	}
635
636	/// Get the advertised statement filter of a validator for a candidate.
637	pub fn advertised_statements(
638		&self,
639		validator: ValidatorIndex,
640		candidate_hash: &CandidateHash,
641	) -> Option<StatementFilter> {
642		self.received.get(&validator)?.candidate_statement_filter(candidate_hash)
643	}
644
645	#[cfg(test)]
646	fn is_manifest_pending_for(
647		&self,
648		validator: ValidatorIndex,
649		candidate_hash: &CandidateHash,
650	) -> Option<ManifestKind> {
651		self.pending_manifests
652			.get(&validator)
653			.and_then(|m| m.get(candidate_hash))
654			.map(|x| *x)
655	}
656}
657
658fn extract_statement_and_group_info(
659	groups: &Groups,
660	originator: ValidatorIndex,
661	statement: &CompactStatement,
662) -> Option<(GroupIndex, CandidateHash, StatementKind, usize)> {
663	let (statement_kind, candidate_hash) = match statement {
664		CompactStatement::Seconded(h) => (StatementKind::Seconded, h),
665		CompactStatement::Valid(h) => (StatementKind::Valid, h),
666	};
667
668	let group = match groups.by_validator_index(originator) {
669		None => return None,
670		Some(g) => g,
671	};
672
673	let index_in_group = groups.get(group)?.iter().position(|v| v == &originator)?;
674
675	Some((group, *candidate_hash, statement_kind, index_in_group))
676}
677
678fn decompose_statement_filter<'a>(
679	groups: &'a Groups,
680	group_index: GroupIndex,
681	candidate_hash: CandidateHash,
682	statement_filter: &'a StatementFilter,
683) -> impl Iterator<Item = (ValidatorIndex, CompactStatement)> + 'a {
684	groups.get(group_index).into_iter().flat_map(move |g| {
685		let s = statement_filter
686			.seconded_in_group
687			.iter_ones()
688			.map(|i| g[i])
689			.map(move |i| (i, CompactStatement::Seconded(candidate_hash)));
690
691		let v = statement_filter
692			.validated_in_group
693			.iter_ones()
694			.map(|i| g[i])
695			.map(move |i| (i, CompactStatement::Valid(candidate_hash)));
696
697		s.chain(v)
698	})
699}
700
701/// A summary of a manifest being sent by a counterparty.
702#[derive(Debug, Clone)]
703pub struct ManifestSummary {
704	/// The claimed parent head data hash of the candidate.
705	pub claimed_parent_hash: Hash,
706	/// The claimed group index assigned to the candidate.
707	pub claimed_group_index: GroupIndex,
708	/// A statement filter sent alongisde the candidate, communicating
709	/// knowledge.
710	pub statement_knowledge: StatementFilter,
711}
712
713/// Errors in importing a manifest.
714#[derive(Debug, Clone)]
715pub enum ManifestImportError {
716	/// The manifest conflicts with another, previously sent manifest.
717	Conflicting,
718	/// The manifest has overflowed beyond the limits of what the
719	/// counterparty was allowed to send us.
720	Overflow,
721	/// The manifest claims insufficient attestations to achieve the backing
722	/// threshold.
723	Insufficient,
724	/// The manifest is malformed.
725	Malformed,
726	/// The manifest was not allowed to be sent.
727	Disallowed,
728}
729
730/// The knowledge we are aware of counterparties having of manifests.
731#[derive(Default)]
732struct ReceivedManifests {
733	received: HashMap<CandidateHash, ManifestSummary>,
734	// group -> seconded counts.
735	seconded_counts: HashMap<GroupIndex, Vec<usize>>,
736}
737
738impl ReceivedManifests {
739	fn candidate_statement_filter(
740		&self,
741		candidate_hash: &CandidateHash,
742	) -> Option<StatementFilter> {
743		self.received.get(candidate_hash).map(|m| m.statement_knowledge.clone())
744	}
745
746	/// Attempt to import a received manifest from a counterparty.
747	///
748	/// This will reject manifests which are either duplicate, conflicting,
749	/// or imply an irrational amount of `Seconded` statements.
750	///
751	/// This assumes that the manifest has already been checked for
752	/// validity - i.e. that the bitvecs match the claimed group in size
753	/// and that the manifest includes at least one `Seconded`
754	/// attestation and includes enough attestations for the candidate
755	/// to be backed.
756	///
757	/// This also should only be invoked when we are intended to track
758	/// the knowledge of this peer as determined by the [`SessionTopology`].
759	fn import_received(
760		&mut self,
761		group_size: usize,
762		seconding_limit: usize,
763		candidate_hash: CandidateHash,
764		manifest_summary: ManifestSummary,
765	) -> Result<(), ManifestImportError> {
766		match self.received.entry(candidate_hash) {
767			Entry::Occupied(mut e) => {
768				// occupied entry.
769
770				// filter out clearly conflicting data.
771				{
772					let prev = e.get();
773					if prev.claimed_group_index != manifest_summary.claimed_group_index {
774						return Err(ManifestImportError::Conflicting)
775					}
776
777					if prev.claimed_parent_hash != manifest_summary.claimed_parent_hash {
778						return Err(ManifestImportError::Conflicting)
779					}
780
781					if !manifest_summary
782						.statement_knowledge
783						.seconded_in_group
784						.contains(&prev.statement_knowledge.seconded_in_group)
785					{
786						return Err(ManifestImportError::Conflicting)
787					}
788
789					if !manifest_summary
790						.statement_knowledge
791						.validated_in_group
792						.contains(&prev.statement_knowledge.validated_in_group)
793					{
794						return Err(ManifestImportError::Conflicting)
795					}
796
797					let mut fresh_seconded =
798						manifest_summary.statement_knowledge.seconded_in_group.clone();
799					fresh_seconded |= &prev.statement_knowledge.seconded_in_group;
800
801					let within_limits = updating_ensure_within_seconding_limit(
802						&mut self.seconded_counts,
803						manifest_summary.claimed_group_index,
804						group_size,
805						seconding_limit,
806						&fresh_seconded,
807					);
808
809					if !within_limits {
810						return Err(ManifestImportError::Overflow)
811					}
812				}
813
814				// All checks passed. Overwrite: guaranteed to be
815				// superset.
816				*e.get_mut() = manifest_summary;
817				Ok(())
818			},
819			Entry::Vacant(e) => {
820				let within_limits = updating_ensure_within_seconding_limit(
821					&mut self.seconded_counts,
822					manifest_summary.claimed_group_index,
823					group_size,
824					seconding_limit,
825					&manifest_summary.statement_knowledge.seconded_in_group,
826				);
827
828				if within_limits {
829					e.insert(manifest_summary);
830					Ok(())
831				} else {
832					Err(ManifestImportError::Overflow)
833				}
834			},
835		}
836	}
837}
838
839// updates validator-seconded records but only if the new statements
840// are OK. returns `true` if alright and `false` otherwise.
841//
842// The seconding limit is a per-validator limit. It ensures an upper bound on the total number of
843// candidates entering the system.
844fn updating_ensure_within_seconding_limit(
845	seconded_counts: &mut HashMap<GroupIndex, Vec<usize>>,
846	group_index: GroupIndex,
847	group_size: usize,
848	seconding_limit: usize,
849	new_seconded: &BitSlice<u8, Lsb0>,
850) -> bool {
851	if seconding_limit == 0 {
852		return false
853	}
854
855	// due to the check above, if this was non-existent this function will
856	// always return `true`.
857	let counts = seconded_counts.entry(group_index).or_insert_with(|| vec![0; group_size]);
858
859	for i in new_seconded.iter_ones() {
860		if counts[i] == seconding_limit {
861			return false
862		}
863	}
864
865	for i in new_seconded.iter_ones() {
866		counts[i] += 1;
867	}
868
869	true
870}
871
872#[derive(Debug, Clone, Copy)]
873enum StatementKind {
874	Seconded,
875	Valid,
876}
877
878trait FilterQuery {
879	fn contains(&self, index: usize, statement_kind: StatementKind) -> bool;
880	fn set(&mut self, index: usize, statement_kind: StatementKind);
881}
882
883impl FilterQuery for StatementFilter {
884	fn contains(&self, index: usize, statement_kind: StatementKind) -> bool {
885		match statement_kind {
886			StatementKind::Seconded => self.seconded_in_group.get(index).map_or(false, |x| *x),
887			StatementKind::Valid => self.validated_in_group.get(index).map_or(false, |x| *x),
888		}
889	}
890
891	fn set(&mut self, index: usize, statement_kind: StatementKind) {
892		let b = match statement_kind {
893			StatementKind::Seconded => self.seconded_in_group.get_mut(index),
894			StatementKind::Valid => self.validated_in_group.get_mut(index),
895		};
896
897		if let Some(mut b) = b {
898			*b = true;
899		}
900	}
901}
902
903/// Knowledge that we have about a remote peer concerning a candidate, and that they have about us
904/// concerning the candidate.
905#[derive(Debug, Clone)]
906struct MutualKnowledge {
907	/// Knowledge the remote peer has about the candidate, as far as we're aware.
908	/// `Some` only if they have advertised, acknowledged, or requested the candidate.
909	remote_knowledge: Option<StatementFilter>,
910	/// Knowledge we have indicated to the remote peer about the candidate.
911	/// `Some` only if we have advertised, acknowledged, or requested the candidate
912	/// from them.
913	local_knowledge: Option<StatementFilter>,
914	/// Knowledge peer circulated to us, this is different from `local_knowledge` and
915	/// `remote_knowledge`, through the fact that includes only statements that we received from
916	/// peer while the other two, after manifest exchange part will include both what we sent to
917	/// the peer and what we received from peer, see `sent_or_received_direct_statement` for more
918	/// details.
919	received_knowledge: Option<StatementFilter>,
920}
921
922// A utility struct for keeping track of metadata about candidates
923// we have confirmed as having been backed.
924#[derive(Debug, Clone)]
925struct KnownBackedCandidate {
926	group_index: GroupIndex,
927	local_knowledge: StatementFilter,
928	mutual_knowledge: HashMap<ValidatorIndex, MutualKnowledge>,
929}
930
931impl KnownBackedCandidate {
932	fn has_received_manifest_from(&self, validator: ValidatorIndex) -> bool {
933		self.mutual_knowledge
934			.get(&validator)
935			.map_or(false, |k| k.remote_knowledge.is_some())
936	}
937
938	fn has_sent_manifest_to(&self, validator: ValidatorIndex) -> bool {
939		self.mutual_knowledge
940			.get(&validator)
941			.map_or(false, |k| k.local_knowledge.is_some())
942	}
943
944	fn manifest_sent_to(&mut self, validator: ValidatorIndex, local_knowledge: StatementFilter) {
945		let k = self.mutual_knowledge.entry(validator).or_insert_with(|| MutualKnowledge {
946			remote_knowledge: None,
947			local_knowledge: None,
948			received_knowledge: None,
949		});
950		k.received_knowledge =
951			Some(StatementFilter::blank(local_knowledge.seconded_in_group.len()));
952
953		k.local_knowledge = Some(local_knowledge);
954	}
955
956	fn manifest_received_from(
957		&mut self,
958		validator: ValidatorIndex,
959		remote_knowledge: StatementFilter,
960	) {
961		let k = self.mutual_knowledge.entry(validator).or_insert_with(|| MutualKnowledge {
962			remote_knowledge: None,
963			local_knowledge: None,
964			received_knowledge: None,
965		});
966
967		k.remote_knowledge = Some(remote_knowledge);
968	}
969
970	/// Returns a list of tuples representing each potential sender(ValidatorIndex)
971	/// and if the sender should already know about the statement, because we just
972	/// sent it to it.
973	fn direct_statement_senders(
974		&self,
975		group_index: GroupIndex,
976		originator_index_in_group: usize,
977		statement_kind: StatementKind,
978	) -> Vec<(ValidatorIndex, bool)> {
979		if group_index != self.group_index {
980			return Vec::new()
981		}
982
983		self.mutual_knowledge
984			.iter()
985			.filter(|(_, k)| k.remote_knowledge.is_some())
986			.filter(|(_, k)| {
987				k.received_knowledge
988					.as_ref()
989					.map_or(false, |r| !r.contains(originator_index_in_group, statement_kind))
990			})
991			.map(|(v, k)| {
992				(
993					*v,
994					k.local_knowledge
995						.as_ref()
996						.map_or(false, |r| r.contains(originator_index_in_group, statement_kind)),
997				)
998			})
999			.collect()
1000	}
1001
1002	fn direct_statement_recipients(
1003		&self,
1004		group_index: GroupIndex,
1005		originator_index_in_group: usize,
1006		statement_kind: StatementKind,
1007	) -> Vec<ValidatorIndex> {
1008		if group_index != self.group_index {
1009			return Vec::new()
1010		}
1011
1012		self.mutual_knowledge
1013			.iter()
1014			.filter(|(_, k)| k.local_knowledge.is_some())
1015			.filter(|(_, k)| {
1016				k.remote_knowledge
1017					.as_ref()
1018					.map_or(false, |r| !r.contains(originator_index_in_group, statement_kind))
1019			})
1020			.map(|(v, _)| *v)
1021			.collect()
1022	}
1023
1024	fn note_fresh_statement(
1025		&mut self,
1026		statement_index_in_group: usize,
1027		statement_kind: StatementKind,
1028	) -> bool {
1029		let really_fresh = !self.local_knowledge.contains(statement_index_in_group, statement_kind);
1030		self.local_knowledge.set(statement_index_in_group, statement_kind);
1031
1032		really_fresh
1033	}
1034
1035	fn sent_or_received_direct_statement(
1036		&mut self,
1037		validator: ValidatorIndex,
1038		statement_index_in_group: usize,
1039		statement_kind: StatementKind,
1040		received: bool,
1041	) {
1042		if let Some(k) = self.mutual_knowledge.get_mut(&validator) {
1043			if let (Some(r), Some(l)) = (k.remote_knowledge.as_mut(), k.local_knowledge.as_mut()) {
1044				r.set(statement_index_in_group, statement_kind);
1045				l.set(statement_index_in_group, statement_kind);
1046			}
1047
1048			if received {
1049				k.received_knowledge
1050					.as_mut()
1051					.map(|knowledge| knowledge.set(statement_index_in_group, statement_kind));
1052			}
1053		}
1054	}
1055
1056	fn is_pending_statement(
1057		&self,
1058		validator: ValidatorIndex,
1059		statement_index_in_group: usize,
1060		statement_kind: StatementKind,
1061	) -> bool {
1062		// existence of both remote & local knowledge indicate we have exchanged
1063		// manifests.
1064		// then, everything that is not in the remote knowledge is pending
1065		self.mutual_knowledge
1066			.get(&validator)
1067			.filter(|k| k.local_knowledge.is_some())
1068			.and_then(|k| k.remote_knowledge.as_ref())
1069			.map(|k| !k.contains(statement_index_in_group, statement_kind))
1070			.unwrap_or(false)
1071	}
1072
1073	fn pending_statements(&self, validator: ValidatorIndex) -> Option<StatementFilter> {
1074		// existence of both remote & local knowledge indicate we have exchanged
1075		// manifests.
1076		// then, everything that is not in the remote knowledge is pending, and we
1077		// further limit this by what is in the local knowledge itself. we use the
1078		// full local knowledge, as the local knowledge stored here may be outdated.
1079		let full_local = &self.local_knowledge;
1080
1081		self.mutual_knowledge
1082			.get(&validator)
1083			.filter(|k| k.local_knowledge.is_some())
1084			.and_then(|k| k.remote_knowledge.as_ref())
1085			.map(|remote| StatementFilter {
1086				seconded_in_group: full_local.seconded_in_group.clone() &
1087					!remote.seconded_in_group.clone(),
1088				validated_in_group: full_local.validated_in_group.clone() &
1089					!remote.validated_in_group.clone(),
1090			})
1091	}
1092}
1093
1094#[cfg(test)]
1095mod tests {
1096	use super::*;
1097	use assert_matches::assert_matches;
1098	use polkadot_node_network_protocol::grid_topology::TopologyPeerInfo;
1099	use sp_authority_discovery::AuthorityPair as AuthorityDiscoveryPair;
1100	use sp_core::crypto::Pair as PairT;
1101
1102	fn dummy_groups(group_size: usize) -> Groups {
1103		let groups = vec![(0..(group_size as u32)).map(ValidatorIndex).collect()].into();
1104
1105		Groups::new(groups, 2)
1106	}
1107
1108	#[test]
1109	fn topology_empty_for_no_index() {
1110		let base_topology = SessionGridTopology::new(
1111			vec![0, 1, 2],
1112			vec![
1113				TopologyPeerInfo {
1114					peer_ids: Vec::new(),
1115					validator_index: ValidatorIndex(0),
1116					discovery_id: AuthorityDiscoveryPair::generate().0.public(),
1117				},
1118				TopologyPeerInfo {
1119					peer_ids: Vec::new(),
1120					validator_index: ValidatorIndex(1),
1121					discovery_id: AuthorityDiscoveryPair::generate().0.public(),
1122				},
1123				TopologyPeerInfo {
1124					peer_ids: Vec::new(),
1125					validator_index: ValidatorIndex(2),
1126					discovery_id: AuthorityDiscoveryPair::generate().0.public(),
1127				},
1128			],
1129		);
1130
1131		let t = build_session_topology(
1132			&[vec![ValidatorIndex(0)], vec![ValidatorIndex(1)], vec![ValidatorIndex(2)]],
1133			&base_topology,
1134			None,
1135		);
1136
1137		assert!(t.group_views.is_empty());
1138	}
1139
1140	#[test]
1141	fn topology_setup() {
1142		let base_topology = SessionGridTopology::new(
1143			(0..9).collect(),
1144			(0..9)
1145				.map(|i| TopologyPeerInfo {
1146					peer_ids: Vec::new(),
1147					validator_index: ValidatorIndex(i),
1148					discovery_id: AuthorityDiscoveryPair::generate().0.public(),
1149				})
1150				.collect(),
1151		);
1152
1153		let t = build_session_topology(
1154			&[
1155				vec![ValidatorIndex(0), ValidatorIndex(3), ValidatorIndex(6)],
1156				vec![ValidatorIndex(4), ValidatorIndex(2), ValidatorIndex(7)],
1157				vec![ValidatorIndex(8), ValidatorIndex(5), ValidatorIndex(1)],
1158			],
1159			&base_topology,
1160			Some(ValidatorIndex(0)),
1161		);
1162
1163		assert_eq!(t.group_views.len(), 3);
1164
1165		// 0 1 2
1166		// 3 4 5
1167		// 6 7 8
1168
1169		// our group: we send to all row/column neighbors which are not in our
1170		// group and receive nothing.
1171		assert_eq!(
1172			t.group_views.get(&GroupIndex(0)).unwrap().sending,
1173			vec![1, 2].into_iter().map(ValidatorIndex).collect::<HashSet<_>>(),
1174		);
1175		assert_eq!(t.group_views.get(&GroupIndex(0)).unwrap().receiving, HashSet::new(),);
1176
1177		// we share a row with '2' and have indirect connections to '4' and '7'.
1178
1179		assert_eq!(
1180			t.group_views.get(&GroupIndex(1)).unwrap().sending,
1181			vec![3, 6].into_iter().map(ValidatorIndex).collect::<HashSet<_>>(),
1182		);
1183		assert_eq!(
1184			t.group_views.get(&GroupIndex(1)).unwrap().receiving,
1185			vec![1, 2, 3, 6].into_iter().map(ValidatorIndex).collect::<HashSet<_>>(),
1186		);
1187
1188		// we share a row with '1' and have indirect connections to '5' and '8'.
1189
1190		assert_eq!(
1191			t.group_views.get(&GroupIndex(2)).unwrap().sending,
1192			vec![3, 6].into_iter().map(ValidatorIndex).collect::<HashSet<_>>(),
1193		);
1194		assert_eq!(
1195			t.group_views.get(&GroupIndex(2)).unwrap().receiving,
1196			vec![1, 2, 3, 6].into_iter().map(ValidatorIndex).collect::<HashSet<_>>(),
1197		);
1198	}
1199
1200	#[test]
1201	fn knowledge_rejects_conflicting_manifest() {
1202		let mut knowledge = ReceivedManifests::default();
1203
1204		let expected_manifest_summary = ManifestSummary {
1205			claimed_parent_hash: Hash::repeat_byte(2),
1206			claimed_group_index: GroupIndex(0),
1207			statement_knowledge: StatementFilter {
1208				seconded_in_group: bitvec::bitvec![u8, Lsb0; 1, 1, 0],
1209				validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 1],
1210			},
1211		};
1212
1213		knowledge
1214			.import_received(
1215				3,
1216				2,
1217				CandidateHash(Hash::repeat_byte(1)),
1218				expected_manifest_summary.clone(),
1219			)
1220			.unwrap();
1221
1222		// conflicting group
1223
1224		let mut s = expected_manifest_summary.clone();
1225		s.claimed_group_index = GroupIndex(1);
1226		assert_matches!(
1227			knowledge.import_received(3, 2, CandidateHash(Hash::repeat_byte(1)), s,),
1228			Err(ManifestImportError::Conflicting)
1229		);
1230
1231		// conflicting parent hash
1232
1233		let mut s = expected_manifest_summary.clone();
1234		s.claimed_parent_hash = Hash::repeat_byte(3);
1235		assert_matches!(
1236			knowledge.import_received(3, 2, CandidateHash(Hash::repeat_byte(1)), s,),
1237			Err(ManifestImportError::Conflicting)
1238		);
1239
1240		// conflicting seconded statements bitfield
1241
1242		let mut s = expected_manifest_summary.clone();
1243		s.statement_knowledge.seconded_in_group = bitvec::bitvec![u8, Lsb0; 0, 1, 0];
1244		assert_matches!(
1245			knowledge.import_received(3, 2, CandidateHash(Hash::repeat_byte(1)), s,),
1246			Err(ManifestImportError::Conflicting)
1247		);
1248
1249		// conflicting valid statements bitfield
1250
1251		let mut s = expected_manifest_summary.clone();
1252		s.statement_knowledge.validated_in_group = bitvec::bitvec![u8, Lsb0; 0, 1, 0];
1253		assert_matches!(
1254			knowledge.import_received(3, 2, CandidateHash(Hash::repeat_byte(1)), s,),
1255			Err(ManifestImportError::Conflicting)
1256		);
1257	}
1258
1259	// Make sure we don't import manifests that would put a validator in a group over the limit of
1260	// candidates they are allowed to second (aka seconding limit).
1261	#[test]
1262	fn reject_overflowing_manifests() {
1263		let mut knowledge = ReceivedManifests::default();
1264		knowledge
1265			.import_received(
1266				3,
1267				2,
1268				CandidateHash(Hash::repeat_byte(1)),
1269				ManifestSummary {
1270					claimed_parent_hash: Hash::repeat_byte(0xA),
1271					claimed_group_index: GroupIndex(0),
1272					statement_knowledge: StatementFilter {
1273						seconded_in_group: bitvec::bitvec![u8, Lsb0; 1, 1, 0],
1274						validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 1],
1275					},
1276				},
1277			)
1278			.unwrap();
1279
1280		knowledge
1281			.import_received(
1282				3,
1283				2,
1284				CandidateHash(Hash::repeat_byte(2)),
1285				ManifestSummary {
1286					claimed_parent_hash: Hash::repeat_byte(0xB),
1287					claimed_group_index: GroupIndex(0),
1288					statement_knowledge: StatementFilter {
1289						seconded_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1],
1290						validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 1],
1291					},
1292				},
1293			)
1294			.unwrap();
1295
1296		// Reject a seconding validator that is already at the seconding limit. Seconding counts for
1297		// the validators should not be applied.
1298		assert_matches!(
1299			knowledge.import_received(
1300				3,
1301				2,
1302				CandidateHash(Hash::repeat_byte(3)),
1303				ManifestSummary {
1304					claimed_parent_hash: Hash::repeat_byte(0xC),
1305					claimed_group_index: GroupIndex(0),
1306					statement_knowledge: StatementFilter {
1307						seconded_in_group: bitvec::bitvec![u8, Lsb0; 1, 1, 1],
1308						validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 1],
1309					}
1310				},
1311			),
1312			Err(ManifestImportError::Overflow)
1313		);
1314
1315		// Don't reject validators that have seconded less than the limit so far.
1316		knowledge
1317			.import_received(
1318				3,
1319				2,
1320				CandidateHash(Hash::repeat_byte(3)),
1321				ManifestSummary {
1322					claimed_parent_hash: Hash::repeat_byte(0xC),
1323					claimed_group_index: GroupIndex(0),
1324					statement_knowledge: StatementFilter {
1325						seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 1],
1326						validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 1],
1327					},
1328				},
1329			)
1330			.unwrap();
1331	}
1332
1333	#[test]
1334	fn reject_disallowed_manifest() {
1335		let mut tracker = GridTracker::default();
1336		let session_topology = SessionTopologyView {
1337			group_views: vec![(
1338				GroupIndex(0),
1339				GroupSubView {
1340					sending: HashSet::new(),
1341					receiving: vec![ValidatorIndex(0)].into_iter().collect(),
1342				},
1343			)]
1344			.into_iter()
1345			.collect(),
1346		};
1347
1348		let groups = dummy_groups(3);
1349
1350		let candidate_hash = CandidateHash(Hash::repeat_byte(42));
1351
1352		assert_eq!(groups.get_size_and_backing_threshold(GroupIndex(0)), Some((3, 2)),);
1353
1354		// Known group, disallowed receiving validator.
1355
1356		assert_matches!(
1357			tracker.import_manifest(
1358				&session_topology,
1359				&groups,
1360				candidate_hash,
1361				3,
1362				ManifestSummary {
1363					claimed_parent_hash: Hash::repeat_byte(0),
1364					claimed_group_index: GroupIndex(0),
1365					statement_knowledge: StatementFilter {
1366						seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0],
1367						validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1],
1368					}
1369				},
1370				ManifestKind::Full,
1371				ValidatorIndex(1),
1372			),
1373			Err(ManifestImportError::Disallowed)
1374		);
1375
1376		// Unknown group
1377
1378		assert_matches!(
1379			tracker.import_manifest(
1380				&session_topology,
1381				&groups,
1382				candidate_hash,
1383				3,
1384				ManifestSummary {
1385					claimed_parent_hash: Hash::repeat_byte(0),
1386					claimed_group_index: GroupIndex(1),
1387					statement_knowledge: StatementFilter {
1388						seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0],
1389						validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1],
1390					}
1391				},
1392				ManifestKind::Full,
1393				ValidatorIndex(0),
1394			),
1395			Err(ManifestImportError::Disallowed)
1396		);
1397	}
1398
1399	#[test]
1400	fn reject_malformed_wrong_group_size() {
1401		let mut tracker = GridTracker::default();
1402		let session_topology = SessionTopologyView {
1403			group_views: vec![(
1404				GroupIndex(0),
1405				GroupSubView {
1406					sending: HashSet::new(),
1407					receiving: vec![ValidatorIndex(0)].into_iter().collect(),
1408				},
1409			)]
1410			.into_iter()
1411			.collect(),
1412		};
1413
1414		let groups = dummy_groups(3);
1415
1416		let candidate_hash = CandidateHash(Hash::repeat_byte(42));
1417
1418		assert_eq!(groups.get_size_and_backing_threshold(GroupIndex(0)), Some((3, 2)),);
1419
1420		assert_matches!(
1421			tracker.import_manifest(
1422				&session_topology,
1423				&groups,
1424				candidate_hash,
1425				3,
1426				ManifestSummary {
1427					claimed_parent_hash: Hash::repeat_byte(0),
1428					claimed_group_index: GroupIndex(0),
1429					statement_knowledge: StatementFilter {
1430						seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0, 1],
1431						validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1],
1432					}
1433				},
1434				ManifestKind::Full,
1435				ValidatorIndex(0),
1436			),
1437			Err(ManifestImportError::Malformed)
1438		);
1439
1440		assert_matches!(
1441			tracker.import_manifest(
1442				&session_topology,
1443				&groups,
1444				candidate_hash,
1445				3,
1446				ManifestSummary {
1447					claimed_parent_hash: Hash::repeat_byte(0),
1448					claimed_group_index: GroupIndex(0),
1449					statement_knowledge: StatementFilter {
1450						seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0],
1451						validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1, 0],
1452					}
1453				},
1454				ManifestKind::Full,
1455				ValidatorIndex(0),
1456			),
1457			Err(ManifestImportError::Malformed)
1458		);
1459	}
1460
1461	#[test]
1462	fn reject_malformed_no_seconders() {
1463		let mut tracker = GridTracker::default();
1464		let session_topology = SessionTopologyView {
1465			group_views: vec![(
1466				GroupIndex(0),
1467				GroupSubView {
1468					sending: HashSet::new(),
1469					receiving: vec![ValidatorIndex(0)].into_iter().collect(),
1470				},
1471			)]
1472			.into_iter()
1473			.collect(),
1474		};
1475
1476		let groups = dummy_groups(3);
1477
1478		let candidate_hash = CandidateHash(Hash::repeat_byte(42));
1479
1480		assert_eq!(groups.get_size_and_backing_threshold(GroupIndex(0)), Some((3, 2)),);
1481
1482		assert_matches!(
1483			tracker.import_manifest(
1484				&session_topology,
1485				&groups,
1486				candidate_hash,
1487				3,
1488				ManifestSummary {
1489					claimed_parent_hash: Hash::repeat_byte(0),
1490					claimed_group_index: GroupIndex(0),
1491					statement_knowledge: StatementFilter {
1492						seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 0],
1493						validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 1, 1],
1494					}
1495				},
1496				ManifestKind::Full,
1497				ValidatorIndex(0),
1498			),
1499			Err(ManifestImportError::Malformed)
1500		);
1501	}
1502
1503	#[test]
1504	fn reject_insufficient_below_threshold() {
1505		let mut tracker = GridTracker::default();
1506		let session_topology = SessionTopologyView {
1507			group_views: vec![(
1508				GroupIndex(0),
1509				GroupSubView {
1510					sending: HashSet::new(),
1511					receiving: HashSet::from([ValidatorIndex(0)]),
1512				},
1513			)]
1514			.into_iter()
1515			.collect(),
1516		};
1517
1518		let groups = dummy_groups(3);
1519
1520		let candidate_hash = CandidateHash(Hash::repeat_byte(42));
1521
1522		assert_eq!(groups.get_size_and_backing_threshold(GroupIndex(0)), Some((3, 2)),);
1523
1524		// only one vote
1525
1526		assert_matches!(
1527			tracker.import_manifest(
1528				&session_topology,
1529				&groups,
1530				candidate_hash,
1531				3,
1532				ManifestSummary {
1533					claimed_parent_hash: Hash::repeat_byte(0),
1534					claimed_group_index: GroupIndex(0),
1535					statement_knowledge: StatementFilter {
1536						seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 1],
1537						validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 0],
1538					}
1539				},
1540				ManifestKind::Full,
1541				ValidatorIndex(0),
1542			),
1543			Err(ManifestImportError::Insufficient)
1544		);
1545
1546		// seconding + validating still not enough to reach '2' threshold
1547
1548		assert_matches!(
1549			tracker.import_manifest(
1550				&session_topology,
1551				&groups,
1552				candidate_hash,
1553				3,
1554				ManifestSummary {
1555					claimed_parent_hash: Hash::repeat_byte(0),
1556					claimed_group_index: GroupIndex(0),
1557					statement_knowledge: StatementFilter {
1558						seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 1],
1559						validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 1],
1560					}
1561				},
1562				ManifestKind::Full,
1563				ValidatorIndex(0),
1564			),
1565			Err(ManifestImportError::Insufficient)
1566		);
1567
1568		// finally good.
1569
1570		assert_matches!(
1571			tracker.import_manifest(
1572				&session_topology,
1573				&groups,
1574				candidate_hash,
1575				3,
1576				ManifestSummary {
1577					claimed_parent_hash: Hash::repeat_byte(0),
1578					claimed_group_index: GroupIndex(0),
1579					statement_knowledge: StatementFilter {
1580						seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 1],
1581						validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0],
1582					}
1583				},
1584				ManifestKind::Full,
1585				ValidatorIndex(0),
1586			),
1587			Ok(false)
1588		);
1589	}
1590
1591	// Test that when we add a candidate as backed and advertise it to the sending group, they can
1592	// provide an acknowledgement manifest in response.
1593	#[test]
1594	fn senders_can_provide_manifests_in_acknowledgement() {
1595		let validator_index = ValidatorIndex(0);
1596
1597		let mut tracker = GridTracker::default();
1598		let session_topology = SessionTopologyView {
1599			group_views: vec![(
1600				GroupIndex(0),
1601				GroupSubView {
1602					sending: HashSet::from([validator_index]),
1603					receiving: HashSet::from([ValidatorIndex(1)]),
1604				},
1605			)]
1606			.into_iter()
1607			.collect(),
1608		};
1609
1610		let candidate_hash = CandidateHash(Hash::repeat_byte(42));
1611		let group_index = GroupIndex(0);
1612		let group_size = 3;
1613		let local_knowledge = StatementFilter::blank(group_size);
1614
1615		let groups = dummy_groups(group_size);
1616
1617		// Add the candidate as backed.
1618		let receivers = tracker.add_backed_candidate(
1619			&session_topology,
1620			candidate_hash,
1621			group_index,
1622			local_knowledge.clone(),
1623		);
1624		// Validator 0 is in the sending group. Advertise onward to it.
1625		//
1626		// Validator 1 is in the receiving group, but we have not received from it, so we're not
1627		// expected to send it an acknowledgement.
1628		assert_eq!(receivers, vec![(validator_index, ManifestKind::Full)]);
1629
1630		// Note the manifest as 'sent' to validator 0.
1631		tracker.manifest_sent_to(&groups, validator_index, candidate_hash, local_knowledge);
1632
1633		// Import manifest of kind `Acknowledgement` from validator 0.
1634		let ack = tracker.import_manifest(
1635			&session_topology,
1636			&groups,
1637			candidate_hash,
1638			3,
1639			ManifestSummary {
1640				claimed_parent_hash: Hash::repeat_byte(0),
1641				claimed_group_index: group_index,
1642				statement_knowledge: StatementFilter {
1643					seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0],
1644					validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1],
1645				},
1646			},
1647			ManifestKind::Acknowledgement,
1648			validator_index,
1649		);
1650		assert_matches!(ack, Ok(false));
1651	}
1652
1653	// Check that pending communication is set correctly when receiving a manifest on a confirmed
1654	// candidate.
1655	//
1656	// It should also overwrite any existing `Full` ManifestKind.
1657	#[test]
1658	fn pending_communication_receiving_manifest_on_confirmed_candidate() {
1659		let validator_index = ValidatorIndex(0);
1660
1661		let mut tracker = GridTracker::default();
1662		let session_topology = SessionTopologyView {
1663			group_views: vec![(
1664				GroupIndex(0),
1665				GroupSubView {
1666					sending: HashSet::from([validator_index]),
1667					receiving: HashSet::from([ValidatorIndex(1)]),
1668				},
1669			)]
1670			.into_iter()
1671			.collect(),
1672		};
1673
1674		let candidate_hash = CandidateHash(Hash::repeat_byte(42));
1675		let group_index = GroupIndex(0);
1676		let group_size = 3;
1677		let local_knowledge = StatementFilter::blank(group_size);
1678
1679		let groups = dummy_groups(group_size);
1680
1681		// Manifest should not be pending yet.
1682		let pending_manifest = tracker.is_manifest_pending_for(validator_index, &candidate_hash);
1683		assert_eq!(pending_manifest, None);
1684
1685		// Add the candidate as backed.
1686		tracker.add_backed_candidate(
1687			&session_topology,
1688			candidate_hash,
1689			group_index,
1690			local_knowledge.clone(),
1691		);
1692
1693		// Manifest should be pending as `Full`.
1694		let pending_manifest = tracker.is_manifest_pending_for(validator_index, &candidate_hash);
1695		assert_eq!(pending_manifest, Some(ManifestKind::Full));
1696
1697		// Note the manifest as 'sent' to validator 0.
1698		tracker.manifest_sent_to(&groups, validator_index, candidate_hash, local_knowledge);
1699
1700		// Import manifest.
1701		//
1702		// Should overwrite existing `Full` manifest.
1703		let ack = tracker.import_manifest(
1704			&session_topology,
1705			&groups,
1706			candidate_hash,
1707			3,
1708			ManifestSummary {
1709				claimed_parent_hash: Hash::repeat_byte(0),
1710				claimed_group_index: group_index,
1711				statement_knowledge: StatementFilter {
1712					seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0],
1713					validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1],
1714				},
1715			},
1716			ManifestKind::Acknowledgement,
1717			validator_index,
1718		);
1719		assert_matches!(ack, Ok(false));
1720
1721		let pending_manifest = tracker.is_manifest_pending_for(validator_index, &candidate_hash);
1722		assert_eq!(pending_manifest, None);
1723	}
1724
1725	// Check that pending communication is cleared correctly in `manifest_sent_to`
1726	//
1727	// Also test a scenario where manifest import returns `Ok(true)` (should acknowledge).
1728	#[test]
1729	fn pending_communication_is_cleared() {
1730		let validator_index = ValidatorIndex(0);
1731
1732		let mut tracker = GridTracker::default();
1733		let session_topology = SessionTopologyView {
1734			group_views: vec![(
1735				GroupIndex(0),
1736				GroupSubView {
1737					sending: HashSet::new(),
1738					receiving: HashSet::from([validator_index]),
1739				},
1740			)]
1741			.into_iter()
1742			.collect(),
1743		};
1744
1745		let candidate_hash = CandidateHash(Hash::repeat_byte(42));
1746		let group_index = GroupIndex(0);
1747		let group_size = 3;
1748		let local_knowledge = StatementFilter::blank(group_size);
1749
1750		let groups = dummy_groups(group_size);
1751
1752		// Add the candidate as backed.
1753		tracker.add_backed_candidate(
1754			&session_topology,
1755			candidate_hash,
1756			group_index,
1757			local_knowledge.clone(),
1758		);
1759
1760		// Manifest should not be pending yet.
1761		let pending_manifest = tracker.is_manifest_pending_for(validator_index, &candidate_hash);
1762		assert_eq!(pending_manifest, None);
1763
1764		// Import manifest. The candidate is confirmed backed and we are expected to receive from
1765		// validator 0, so send it an acknowledgement.
1766		let ack = tracker.import_manifest(
1767			&session_topology,
1768			&groups,
1769			candidate_hash,
1770			3,
1771			ManifestSummary {
1772				claimed_parent_hash: Hash::repeat_byte(0),
1773				claimed_group_index: group_index,
1774				statement_knowledge: StatementFilter {
1775					seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0],
1776					validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1],
1777				},
1778			},
1779			ManifestKind::Full,
1780			validator_index,
1781		);
1782		assert_matches!(ack, Ok(true));
1783
1784		// Acknowledgement manifest should be pending.
1785		let pending_manifest = tracker.is_manifest_pending_for(validator_index, &candidate_hash);
1786		assert_eq!(pending_manifest, Some(ManifestKind::Acknowledgement));
1787
1788		// Note the candidate as advertised.
1789		tracker.manifest_sent_to(&groups, validator_index, candidate_hash, local_knowledge);
1790
1791		// Pending manifest should be cleared.
1792		let pending_manifest = tracker.is_manifest_pending_for(validator_index, &candidate_hash);
1793		assert_eq!(pending_manifest, None);
1794	}
1795
1796	/// A manifest exchange means that both `manifest_sent_to` and `manifest_received_from` have
1797	/// been invoked.
1798	///
1799	/// In practice, it means that one of three things have happened:
1800	///
1801	/// - They announced, we acknowledged
1802	///
1803	/// - We announced, they acknowledged
1804	///
1805	/// - We announced, they announced (not sure if this can actually happen; it would happen if 2
1806	///   nodes had each other in their sending set and they sent manifests at the same time. The
1807	///   code accounts for this anyway)
1808	#[test]
1809	fn pending_statements_are_updated_after_manifest_exchange() {
1810		let send_to = ValidatorIndex(0);
1811		let receive_from = ValidatorIndex(1);
1812
1813		let mut tracker = GridTracker::default();
1814		let session_topology = SessionTopologyView {
1815			group_views: vec![(
1816				GroupIndex(0),
1817				GroupSubView {
1818					sending: HashSet::from([send_to]),
1819					receiving: HashSet::from([receive_from]),
1820				},
1821			)]
1822			.into_iter()
1823			.collect(),
1824		};
1825
1826		let candidate_hash = CandidateHash(Hash::repeat_byte(42));
1827		let group_index = GroupIndex(0);
1828		let group_size = 3;
1829		let local_knowledge = StatementFilter::blank(group_size);
1830
1831		let groups = dummy_groups(group_size);
1832
1833		// Confirm the candidate.
1834		let receivers = tracker.add_backed_candidate(
1835			&session_topology,
1836			candidate_hash,
1837			group_index,
1838			local_knowledge.clone(),
1839		);
1840		assert_eq!(receivers, vec![(send_to, ManifestKind::Full)]);
1841
1842		// Learn a statement from a different validator.
1843		tracker.learned_fresh_statement(
1844			&groups,
1845			&session_topology,
1846			ValidatorIndex(2),
1847			&CompactStatement::Seconded(candidate_hash),
1848		);
1849
1850		// Test receiving followed by sending an ack.
1851		{
1852			// Should start with no pending statements.
1853			assert_eq!(tracker.pending_statements_for(receive_from, candidate_hash), None);
1854			assert_eq!(tracker.all_pending_statements_for(receive_from), vec![]);
1855			let ack = tracker.import_manifest(
1856				&session_topology,
1857				&groups,
1858				candidate_hash,
1859				3,
1860				ManifestSummary {
1861					claimed_parent_hash: Hash::repeat_byte(0),
1862					claimed_group_index: group_index,
1863					statement_knowledge: StatementFilter {
1864						seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0],
1865						validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1],
1866					},
1867				},
1868				ManifestKind::Full,
1869				receive_from,
1870			);
1871			assert_matches!(ack, Ok(true));
1872
1873			// Send ack now.
1874			tracker.manifest_sent_to(
1875				&groups,
1876				receive_from,
1877				candidate_hash,
1878				local_knowledge.clone(),
1879			);
1880
1881			// There should be pending statements now.
1882			assert_eq!(
1883				tracker.pending_statements_for(receive_from, candidate_hash),
1884				Some(StatementFilter {
1885					seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 1],
1886					validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 0],
1887				})
1888			);
1889			assert_eq!(
1890				tracker.all_pending_statements_for(receive_from),
1891				vec![(ValidatorIndex(2), CompactStatement::Seconded(candidate_hash))]
1892			);
1893		}
1894
1895		// Test sending followed by receiving an ack.
1896		{
1897			// Should start with no pending statements.
1898			assert_eq!(tracker.pending_statements_for(send_to, candidate_hash), None);
1899			assert_eq!(tracker.all_pending_statements_for(send_to), vec![]);
1900
1901			tracker.manifest_sent_to(&groups, send_to, candidate_hash, local_knowledge.clone());
1902			let ack = tracker.import_manifest(
1903				&session_topology,
1904				&groups,
1905				candidate_hash,
1906				3,
1907				ManifestSummary {
1908					claimed_parent_hash: Hash::repeat_byte(0),
1909					claimed_group_index: group_index,
1910					statement_knowledge: StatementFilter {
1911						seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0],
1912						validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 1],
1913					},
1914				},
1915				ManifestKind::Acknowledgement,
1916				send_to,
1917			);
1918			assert_matches!(ack, Ok(false));
1919
1920			// There should be pending statements now.
1921			assert_eq!(
1922				tracker.pending_statements_for(send_to, candidate_hash),
1923				Some(StatementFilter {
1924					seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 1],
1925					validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 0],
1926				})
1927			);
1928			assert_eq!(
1929				tracker.all_pending_statements_for(send_to),
1930				vec![(ValidatorIndex(2), CompactStatement::Seconded(candidate_hash))]
1931			);
1932		}
1933	}
1934
1935	#[test]
1936	fn invalid_fresh_statement_import() {
1937		let validator_index = ValidatorIndex(0);
1938
1939		let mut tracker = GridTracker::default();
1940		let session_topology = SessionTopologyView {
1941			group_views: vec![(
1942				GroupIndex(0),
1943				GroupSubView {
1944					sending: HashSet::new(),
1945					receiving: HashSet::from([validator_index]),
1946				},
1947			)]
1948			.into_iter()
1949			.collect(),
1950		};
1951
1952		let candidate_hash = CandidateHash(Hash::repeat_byte(42));
1953		let group_index = GroupIndex(0);
1954		let group_size = 3;
1955		let local_knowledge = StatementFilter::blank(group_size);
1956
1957		let groups = dummy_groups(group_size);
1958
1959		// Should start with no pending statements.
1960		assert_eq!(tracker.pending_statements_for(validator_index, candidate_hash), None);
1961		assert_eq!(tracker.all_pending_statements_for(validator_index), vec![]);
1962
1963		// Try to import fresh statement. Candidate not backed.
1964		let statement = CompactStatement::Seconded(candidate_hash);
1965		tracker.learned_fresh_statement(&groups, &session_topology, validator_index, &statement);
1966
1967		assert_eq!(tracker.pending_statements_for(validator_index, candidate_hash), None);
1968		assert_eq!(tracker.all_pending_statements_for(validator_index), vec![]);
1969
1970		// Add the candidate as backed.
1971		tracker.add_backed_candidate(
1972			&session_topology,
1973			candidate_hash,
1974			group_index,
1975			local_knowledge.clone(),
1976		);
1977
1978		// Try to import fresh statement. Unknown group for validator index.
1979		let statement = CompactStatement::Seconded(candidate_hash);
1980		tracker.learned_fresh_statement(&groups, &session_topology, ValidatorIndex(1), &statement);
1981
1982		assert_eq!(tracker.pending_statements_for(validator_index, candidate_hash), None);
1983		assert_eq!(tracker.all_pending_statements_for(validator_index), vec![]);
1984	}
1985
1986	#[test]
1987	fn pending_statements_updated_when_importing_fresh_statement() {
1988		let validator_index = ValidatorIndex(0);
1989
1990		let mut tracker = GridTracker::default();
1991		let session_topology = SessionTopologyView {
1992			group_views: vec![(
1993				GroupIndex(0),
1994				GroupSubView {
1995					sending: HashSet::new(),
1996					receiving: HashSet::from([validator_index]),
1997				},
1998			)]
1999			.into_iter()
2000			.collect(),
2001		};
2002
2003		let candidate_hash = CandidateHash(Hash::repeat_byte(42));
2004		let group_index = GroupIndex(0);
2005		let group_size = 3;
2006		let local_knowledge = StatementFilter::blank(group_size);
2007
2008		let groups = dummy_groups(group_size);
2009
2010		// Should start with no pending statements.
2011		assert_eq!(tracker.pending_statements_for(validator_index, candidate_hash), None);
2012		assert_eq!(tracker.all_pending_statements_for(validator_index), vec![]);
2013
2014		// Add the candidate as backed.
2015		tracker.add_backed_candidate(
2016			&session_topology,
2017			candidate_hash,
2018			group_index,
2019			local_knowledge.clone(),
2020		);
2021
2022		// Import fresh statement.
2023
2024		let ack = tracker.import_manifest(
2025			&session_topology,
2026			&groups,
2027			candidate_hash,
2028			3,
2029			ManifestSummary {
2030				claimed_parent_hash: Hash::repeat_byte(0),
2031				claimed_group_index: group_index,
2032				statement_knowledge: StatementFilter {
2033					seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0],
2034					validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1],
2035				},
2036			},
2037			ManifestKind::Full,
2038			validator_index,
2039		);
2040		assert_matches!(ack, Ok(true));
2041		tracker.manifest_sent_to(&groups, validator_index, candidate_hash, local_knowledge);
2042		let statement = CompactStatement::Seconded(candidate_hash);
2043		tracker.learned_fresh_statement(&groups, &session_topology, validator_index, &statement);
2044
2045		// There should be pending statements now.
2046		let statements = StatementFilter {
2047			seconded_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 0],
2048			validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 0],
2049		};
2050		assert_eq!(
2051			tracker.pending_statements_for(validator_index, candidate_hash),
2052			Some(statements.clone())
2053		);
2054		assert_eq!(
2055			tracker.all_pending_statements_for(validator_index),
2056			vec![(ValidatorIndex(0), CompactStatement::Seconded(candidate_hash))]
2057		);
2058
2059		// After successful import, try importing again. Nothing should change.
2060
2061		tracker.learned_fresh_statement(&groups, &session_topology, validator_index, &statement);
2062		assert_eq!(
2063			tracker.pending_statements_for(validator_index, candidate_hash),
2064			Some(statements)
2065		);
2066		assert_eq!(
2067			tracker.all_pending_statements_for(validator_index),
2068			vec![(ValidatorIndex(0), CompactStatement::Seconded(candidate_hash))]
2069		);
2070	}
2071
2072	// After learning fresh statements, we should not generate pending statements for knowledge that
2073	// the validator already has.
2074	#[test]
2075	fn pending_statements_respect_remote_knowledge() {
2076		let validator_index = ValidatorIndex(0);
2077
2078		let mut tracker = GridTracker::default();
2079		let session_topology = SessionTopologyView {
2080			group_views: vec![(
2081				GroupIndex(0),
2082				GroupSubView {
2083					sending: HashSet::new(),
2084					receiving: HashSet::from([validator_index]),
2085				},
2086			)]
2087			.into_iter()
2088			.collect(),
2089		};
2090
2091		let candidate_hash = CandidateHash(Hash::repeat_byte(42));
2092		let group_index = GroupIndex(0);
2093		let group_size = 3;
2094		let local_knowledge = StatementFilter::blank(group_size);
2095
2096		let groups = dummy_groups(group_size);
2097
2098		// Should start with no pending statements.
2099		assert_eq!(tracker.pending_statements_for(validator_index, candidate_hash), None);
2100		assert_eq!(tracker.all_pending_statements_for(validator_index), vec![]);
2101
2102		// Add the candidate as backed.
2103		tracker.add_backed_candidate(
2104			&session_topology,
2105			candidate_hash,
2106			group_index,
2107			local_knowledge.clone(),
2108		);
2109
2110		// Import fresh statement.
2111		let ack = tracker.import_manifest(
2112			&session_topology,
2113			&groups,
2114			candidate_hash,
2115			3,
2116			ManifestSummary {
2117				claimed_parent_hash: Hash::repeat_byte(0),
2118				claimed_group_index: group_index,
2119				statement_knowledge: StatementFilter {
2120					seconded_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1],
2121					validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 0],
2122				},
2123			},
2124			ManifestKind::Full,
2125			validator_index,
2126		);
2127		assert_matches!(ack, Ok(true));
2128		tracker.manifest_sent_to(&groups, validator_index, candidate_hash, local_knowledge);
2129		tracker.learned_fresh_statement(
2130			&groups,
2131			&session_topology,
2132			validator_index,
2133			&CompactStatement::Seconded(candidate_hash),
2134		);
2135		tracker.learned_fresh_statement(
2136			&groups,
2137			&session_topology,
2138			validator_index,
2139			&CompactStatement::Valid(candidate_hash),
2140		);
2141
2142		// The pending statements should respect the remote knowledge (meaning the Seconded
2143		// statement is ignored, but not the Valid statement).
2144		let statements = StatementFilter {
2145			seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 0],
2146			validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 0],
2147		};
2148		assert_eq!(
2149			tracker.pending_statements_for(validator_index, candidate_hash),
2150			Some(statements.clone())
2151		);
2152		assert_eq!(
2153			tracker.all_pending_statements_for(validator_index),
2154			vec![(ValidatorIndex(0), CompactStatement::Valid(candidate_hash))]
2155		);
2156	}
2157
2158	#[test]
2159	fn pending_statements_cleared_when_sending() {
2160		let validator_index = ValidatorIndex(0);
2161		let counterparty = ValidatorIndex(1);
2162
2163		let mut tracker = GridTracker::default();
2164		let session_topology = SessionTopologyView {
2165			group_views: vec![(
2166				GroupIndex(0),
2167				GroupSubView {
2168					sending: HashSet::new(),
2169					receiving: HashSet::from([validator_index, counterparty]),
2170				},
2171			)]
2172			.into_iter()
2173			.collect(),
2174		};
2175
2176		let candidate_hash = CandidateHash(Hash::repeat_byte(42));
2177		let group_index = GroupIndex(0);
2178		let group_size = 3;
2179		let local_knowledge = StatementFilter::blank(group_size);
2180
2181		let groups = dummy_groups(group_size);
2182
2183		// Should start with no pending statements.
2184		assert_eq!(tracker.pending_statements_for(validator_index, candidate_hash), None);
2185		assert_eq!(tracker.all_pending_statements_for(validator_index), vec![]);
2186
2187		// Add the candidate as backed.
2188		tracker.add_backed_candidate(
2189			&session_topology,
2190			candidate_hash,
2191			group_index,
2192			local_knowledge.clone(),
2193		);
2194
2195		// Import statement for originator.
2196		tracker
2197			.import_manifest(
2198				&session_topology,
2199				&groups,
2200				candidate_hash,
2201				3,
2202				ManifestSummary {
2203					claimed_parent_hash: Hash::repeat_byte(0),
2204					claimed_group_index: group_index,
2205					statement_knowledge: StatementFilter {
2206						seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0],
2207						validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1],
2208					},
2209				},
2210				ManifestKind::Full,
2211				validator_index,
2212			)
2213			.ok()
2214			.unwrap();
2215		tracker.manifest_sent_to(&groups, validator_index, candidate_hash, local_knowledge.clone());
2216		let statement = CompactStatement::Seconded(candidate_hash);
2217		tracker.learned_fresh_statement(&groups, &session_topology, validator_index, &statement);
2218
2219		// Import statement for counterparty.
2220		tracker
2221			.import_manifest(
2222				&session_topology,
2223				&groups,
2224				candidate_hash,
2225				3,
2226				ManifestSummary {
2227					claimed_parent_hash: Hash::repeat_byte(0),
2228					claimed_group_index: group_index,
2229					statement_knowledge: StatementFilter {
2230						seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0],
2231						validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1],
2232					},
2233				},
2234				ManifestKind::Full,
2235				counterparty,
2236			)
2237			.ok()
2238			.unwrap();
2239		tracker.manifest_sent_to(&groups, counterparty, candidate_hash, local_knowledge);
2240		let statement = CompactStatement::Seconded(candidate_hash);
2241		tracker.learned_fresh_statement(&groups, &session_topology, counterparty, &statement);
2242
2243		// There should be pending statements now.
2244		let statements = StatementFilter {
2245			seconded_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 0],
2246			validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 0],
2247		};
2248		assert_eq!(
2249			tracker.pending_statements_for(validator_index, candidate_hash),
2250			Some(statements.clone())
2251		);
2252		assert_eq!(
2253			tracker.all_pending_statements_for(validator_index),
2254			vec![(ValidatorIndex(0), CompactStatement::Seconded(candidate_hash))]
2255		);
2256		assert_eq!(
2257			tracker.pending_statements_for(counterparty, candidate_hash),
2258			Some(statements.clone())
2259		);
2260		assert_eq!(
2261			tracker.all_pending_statements_for(counterparty),
2262			vec![(ValidatorIndex(0), CompactStatement::Seconded(candidate_hash))]
2263		);
2264
2265		tracker.learned_fresh_statement(&groups, &session_topology, validator_index, &statement);
2266		tracker.sent_or_received_direct_statement(
2267			&groups,
2268			validator_index,
2269			counterparty,
2270			&statement,
2271			false,
2272		);
2273
2274		// There should be no pending statements now (for the counterparty).
2275		assert_eq!(
2276			tracker.pending_statements_for(counterparty, candidate_hash),
2277			Some(StatementFilter::blank(group_size))
2278		);
2279		assert_eq!(tracker.all_pending_statements_for(counterparty), vec![]);
2280	}
2281
2282	#[test]
2283	fn session_grid_topology_consistent() {
2284		let n_validators = 300;
2285		let group_size = 5;
2286
2287		let validator_indices =
2288			(0..n_validators).map(|i| ValidatorIndex(i as u32)).collect::<Vec<_>>();
2289		let groups = validator_indices.chunks(group_size).map(|x| x.to_vec()).collect::<Vec<_>>();
2290
2291		let topology = SessionGridTopology::new(
2292			(0..n_validators).collect::<Vec<_>>(),
2293			(0..n_validators)
2294				.map(|i| TopologyPeerInfo {
2295					peer_ids: Vec::new(),
2296					validator_index: ValidatorIndex(i as u32),
2297					discovery_id: AuthorityDiscoveryPair::generate().0.public(),
2298				})
2299				.collect(),
2300		);
2301
2302		let computed_topologies = validator_indices
2303			.iter()
2304			.cloned()
2305			.map(|v| build_session_topology(groups.iter(), &topology, Some(v)))
2306			.collect::<Vec<_>>();
2307
2308		let pairwise_check_topologies = |i, j| {
2309			let v_i = ValidatorIndex(i);
2310			let v_j = ValidatorIndex(j);
2311
2312			for group in (0..groups.len()).map(|i| GroupIndex(i as u32)) {
2313				let g_i = computed_topologies[i as usize].group_views.get(&group).unwrap();
2314				let g_j = computed_topologies[j as usize].group_views.get(&group).unwrap();
2315
2316				if g_i.sending.contains(&v_j) {
2317					assert!(
2318						g_j.receiving.contains(&v_i),
2319						"{:?}: {:?}, sending but not receiving",
2320						group,
2321						&(i, j)
2322					);
2323				}
2324
2325				if g_j.sending.contains(&v_i) {
2326					assert!(
2327						g_i.receiving.contains(&v_j),
2328						"{:?}: {:?}, sending but not receiving",
2329						group,
2330						&(j, i)
2331					);
2332				}
2333
2334				if g_i.receiving.contains(&v_j) {
2335					assert!(g_j.sending.contains(&v_i), "{:?}, receiving but not sending", &(i, j));
2336				}
2337
2338				if g_j.receiving.contains(&v_i) {
2339					assert!(g_i.sending.contains(&v_j), "{:?}, receiving but not sending", &(j, i));
2340				}
2341			}
2342		};
2343
2344		for i in 0..n_validators {
2345			for j in (i + 1)..n_validators {
2346				pairwise_check_topologies(i as u32, j as u32);
2347			}
2348		}
2349	}
2350}