referrerpolicy=no-referrer-when-downgrade

polkadot_collator_protocol/collator_side/
validators_buffer.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Validator groups buffer for connection managements.
18//!
19//! Solves 2 problems:
20//! 	1. A collator may want to stay connected to multiple groups on rotation boundaries.
21//! 	2. It's important to disconnect from validator when there're no collations to be fetched.
22//!
23//! We keep a simple FIFO buffer of N validator groups and a bitvec for each advertisement,
24//! 1 indicating we want to be connected to i-th validator in a buffer, 0 otherwise.
25//!
26//! The bit is set to 1 for the whole **group** whenever it's inserted into the buffer. Given a
27//! relay parent, one can reset a bit back to 0 for particular **validator**. For example, if a
28//! collation was fetched or some timeout has been hit.
29//!
30//! The bitwise OR over known advertisements gives us validators indices for connection request.
31
32use std::{
33	collections::{HashMap, VecDeque},
34	future::Future,
35	num::NonZeroUsize,
36	ops::Range,
37	pin::Pin,
38	task::{Context, Poll},
39	time::Duration,
40};
41
42use bitvec::{bitvec, vec::BitVec};
43use futures::FutureExt;
44
45use polkadot_node_network_protocol::PeerId;
46use polkadot_primitives::{AuthorityDiscoveryId, CandidateHash, GroupIndex, SessionIndex};
47
48/// Elastic scaling: how many candidates per relay chain block the collator supports building.
49pub const MAX_CHAINED_CANDIDATES_PER_RCB: NonZeroUsize = match NonZeroUsize::new(3) {
50	Some(cap) => cap,
51	None => panic!("max candidates per rcb cannot be zero"),
52};
53
54/// The ring buffer stores at most this many unique validator groups.
55///
56/// This value should be chosen in way that all groups assigned to our para
57/// in the view can fit into the buffer multiplied by amount of candidates we support per relay
58/// chain block in the case of elastic scaling.
59pub const VALIDATORS_BUFFER_CAPACITY: NonZeroUsize =
60	match NonZeroUsize::new(3 * MAX_CHAINED_CANDIDATES_PER_RCB.get()) {
61		Some(cap) => cap,
62		None => panic!("buffer capacity must be non-zero"),
63	};
64
65/// Unique identifier of a validators group.
66#[derive(Debug)]
67struct ValidatorsGroupInfo {
68	/// Number of validators in the group.
69	len: usize,
70	session_index: SessionIndex,
71	group_index: GroupIndex,
72}
73
74/// Ring buffer of validator groups.
75///
76/// Tracks which peers we want to be connected to with respect to advertised collations.
77#[derive(Debug)]
78pub struct ValidatorGroupsBuffer {
79	/// Validator groups identifiers we **had** advertisements for.
80	group_infos: VecDeque<ValidatorsGroupInfo>,
81	/// Continuous buffer of validators discovery keys.
82	validators: VecDeque<AuthorityDiscoveryId>,
83	/// Mapping from candidate hashes to bit-vectors with bits for all `validators`.
84	/// Invariants kept: All bit-vectors are guaranteed to have the same size.
85	should_be_connected: HashMap<CandidateHash, BitVec>,
86	/// Buffer capacity, limits the number of **groups** tracked.
87	cap: NonZeroUsize,
88}
89
90impl ValidatorGroupsBuffer {
91	/// Creates a new buffer with a non-zero capacity.
92	pub fn with_capacity(cap: NonZeroUsize) -> Self {
93		Self {
94			group_infos: VecDeque::new(),
95			validators: VecDeque::new(),
96			should_be_connected: HashMap::new(),
97			cap,
98		}
99	}
100
101	/// Returns discovery ids of validators we are assigned to in this backing group window.
102	pub fn validators_to_connect(&self) -> Vec<AuthorityDiscoveryId> {
103		let validators_num = self.validators.len();
104		let bits = self
105			.should_be_connected
106			.values()
107			.fold(bitvec![0; validators_num], |acc, next| acc | next);
108
109		let mut should_be_connected: Vec<AuthorityDiscoveryId> = self
110			.validators
111			.iter()
112			.enumerate()
113			.filter_map(|(idx, authority_id)| bits[idx].then(|| authority_id.clone()))
114			.collect();
115
116		if let Some(last_group) = self.group_infos.iter().last() {
117			for validator in self.validators.iter().rev().take(last_group.len) {
118				if !should_be_connected.contains(validator) {
119					should_be_connected.push(validator.clone());
120				}
121			}
122		}
123
124		should_be_connected
125	}
126
127	/// Note a new advertisement, marking that we want to be connected to validators
128	/// from this group.
129	///
130	/// If max capacity is reached and the group is new, drops validators from the back
131	/// of the buffer.
132	pub fn note_collation_advertised(
133		&mut self,
134		candidate_hash: CandidateHash,
135		session_index: SessionIndex,
136		group_index: GroupIndex,
137		validators: &[AuthorityDiscoveryId],
138	) {
139		if validators.is_empty() {
140			return
141		}
142
143		match self.group_infos.iter().enumerate().find(|(_, group)| {
144			group.session_index == session_index && group.group_index == group_index
145		}) {
146			Some((idx, group)) => {
147				let group_start_idx = self.group_lengths_iter().take(idx).sum();
148				self.set_bits(candidate_hash, group_start_idx..(group_start_idx + group.len));
149			},
150			None => self.push(candidate_hash, session_index, group_index, validators),
151		}
152	}
153
154	/// Note that a validator is no longer interested in a given candidate.
155	pub fn reset_validator_interest(
156		&mut self,
157		candidate_hash: CandidateHash,
158		authority_id: &AuthorityDiscoveryId,
159	) {
160		let bits = match self.should_be_connected.get_mut(&candidate_hash) {
161			Some(bits) => bits,
162			None => return,
163		};
164
165		for (idx, auth_id) in self.validators.iter().enumerate() {
166			if auth_id == authority_id {
167				bits.set(idx, false);
168			}
169		}
170	}
171
172	/// Remove advertised candidate from the buffer.
173	///
174	/// The buffer will no longer track which validators are interested in a corresponding
175	/// advertisement.
176	pub fn remove_candidate(&mut self, candidate_hash: &CandidateHash) {
177		self.should_be_connected.remove(candidate_hash);
178	}
179
180	/// Pushes a new group to the buffer along with advertisement, setting all validators
181	/// bits to 1.
182	///
183	/// If the buffer is full, drops group from the tail.
184	fn push(
185		&mut self,
186		candidate_hash: CandidateHash,
187		session_index: SessionIndex,
188		group_index: GroupIndex,
189		validators: &[AuthorityDiscoveryId],
190	) {
191		let new_group_info =
192			ValidatorsGroupInfo { len: validators.len(), session_index, group_index };
193
194		let buf = &mut self.group_infos;
195		let cap = self.cap.get();
196
197		if buf.len() >= cap {
198			let pruned_group = buf.pop_front().expect("buf is not empty; qed");
199			self.validators.drain(..pruned_group.len);
200
201			self.should_be_connected.values_mut().for_each(|bits| {
202				bits.as_mut_bitslice().shift_left(pruned_group.len);
203			});
204		}
205
206		self.validators.extend(validators.iter().cloned());
207		buf.push_back(new_group_info);
208		let buf_len = buf.len();
209		let group_start_idx = self.group_lengths_iter().take(buf_len - 1).sum();
210
211		let new_len = self.validators.len();
212		self.should_be_connected
213			.values_mut()
214			.for_each(|bits| bits.resize(new_len, false));
215		self.set_bits(candidate_hash, group_start_idx..(group_start_idx + validators.len()));
216	}
217
218	/// Sets advertisement bits to 1 in a given range (usually corresponding to some group).
219	/// If the relay parent is unknown, inserts 0-initialized bitvec first.
220	///
221	/// The range must be ensured to be within bounds.
222	fn set_bits(&mut self, candidate_hash: CandidateHash, range: Range<usize>) {
223		let bits = self
224			.should_be_connected
225			.entry(candidate_hash)
226			.or_insert_with(|| bitvec![0; self.validators.len()]);
227
228		bits[range].fill(true);
229	}
230
231	/// Returns iterator over numbers of validators in groups.
232	///
233	/// Useful for getting an index of the first validator in i-th group.
234	fn group_lengths_iter(&self) -> impl Iterator<Item = usize> + '_ {
235		self.group_infos.iter().map(|group| group.len)
236	}
237}
238
239/// A timeout for resetting validators' interests in collations.
240pub const RESET_INTEREST_TIMEOUT: Duration = Duration::from_secs(6);
241
242/// A future that returns a candidate hash along with validator discovery
243/// keys once a timeout hit.
244///
245/// If a validator doesn't manage to fetch a collation within this timeout
246/// we should reset its interest in this advertisement in a buffer. For example,
247/// when the PoV was already requested from another peer.
248pub struct ResetInterestTimeout {
249	fut: futures_timer::Delay,
250	candidate_hash: CandidateHash,
251	peer_id: PeerId,
252}
253
254impl ResetInterestTimeout {
255	/// Returns new `ResetInterestTimeout` that resolves after given timeout.
256	pub fn new(candidate_hash: CandidateHash, peer_id: PeerId, delay: Duration) -> Self {
257		Self { fut: futures_timer::Delay::new(delay), candidate_hash, peer_id }
258	}
259}
260
261impl Future for ResetInterestTimeout {
262	type Output = (CandidateHash, PeerId);
263
264	fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
265		self.fut.poll_unpin(cx).map(|_| (self.candidate_hash, self.peer_id))
266	}
267}
268
269#[cfg(test)]
270mod tests {
271	use super::*;
272	use polkadot_primitives::Hash;
273	use sp_keyring::Sr25519Keyring;
274
275	#[test]
276	fn one_capacity_buffer() {
277		let cap = NonZeroUsize::new(1).unwrap();
278		let mut buf = ValidatorGroupsBuffer::with_capacity(cap);
279
280		let hash_a = CandidateHash(Hash::repeat_byte(0x1));
281		let hash_b = CandidateHash(Hash::repeat_byte(0x2));
282
283		let validators: Vec<_> = [
284			Sr25519Keyring::Alice,
285			Sr25519Keyring::Bob,
286			Sr25519Keyring::Charlie,
287			Sr25519Keyring::Dave,
288			Sr25519Keyring::Ferdie,
289		]
290		.into_iter()
291		.map(|key| AuthorityDiscoveryId::from(key.public()))
292		.collect();
293
294		assert!(buf.validators_to_connect().is_empty());
295
296		buf.note_collation_advertised(hash_a, 0, GroupIndex(0), &validators[..2]);
297		assert_eq!(buf.validators_to_connect(), validators[..2].to_vec());
298
299		buf.reset_validator_interest(hash_a, &validators[1]);
300		assert_eq!(buf.validators_to_connect(), validators[0..2].to_vec());
301
302		buf.note_collation_advertised(hash_b, 0, GroupIndex(1), &validators[2..]);
303		assert_eq!(buf.validators_to_connect(), validators[2..].to_vec());
304
305		for validator in &validators[2..] {
306			buf.reset_validator_interest(hash_b, validator);
307		}
308		let mut expected = validators[2..].to_vec();
309		expected.sort();
310		let mut result = buf.validators_to_connect();
311		result.sort();
312		assert_eq!(result, expected);
313	}
314
315	#[test]
316	fn buffer_works() {
317		let cap = NonZeroUsize::new(3).unwrap();
318		let mut buf = ValidatorGroupsBuffer::with_capacity(cap);
319
320		let hashes: Vec<_> = (0..5).map(|i| CandidateHash(Hash::repeat_byte(i))).collect();
321
322		let validators: Vec<_> = [
323			Sr25519Keyring::Alice,
324			Sr25519Keyring::Bob,
325			Sr25519Keyring::Charlie,
326			Sr25519Keyring::Dave,
327			Sr25519Keyring::Ferdie,
328		]
329		.into_iter()
330		.map(|key| AuthorityDiscoveryId::from(key.public()))
331		.collect();
332
333		buf.note_collation_advertised(hashes[0], 0, GroupIndex(0), &validators[..2]);
334		buf.note_collation_advertised(hashes[1], 0, GroupIndex(0), &validators[..2]);
335		buf.note_collation_advertised(hashes[2], 0, GroupIndex(1), &validators[2..4]);
336		buf.note_collation_advertised(hashes[2], 0, GroupIndex(1), &validators[2..4]);
337
338		assert_eq!(buf.validators_to_connect(), validators[..4].to_vec());
339
340		for validator in &validators[2..4] {
341			buf.reset_validator_interest(hashes[2], validator);
342		}
343
344		buf.reset_validator_interest(hashes[1], &validators[0]);
345		let mut expected: Vec<_> = validators[..4].iter().cloned().collect();
346		let mut result = buf.validators_to_connect();
347		expected.sort();
348		result.sort();
349		assert_eq!(result, expected);
350
351		buf.reset_validator_interest(hashes[0], &validators[0]);
352		let mut expected: Vec<_> = validators[1..4].iter().cloned().collect();
353		expected.sort();
354		let mut result = buf.validators_to_connect();
355		result.sort();
356		assert_eq!(result, expected);
357
358		buf.note_collation_advertised(hashes[3], 0, GroupIndex(1), &validators[2..4]);
359		buf.note_collation_advertised(
360			hashes[4],
361			0,
362			GroupIndex(2),
363			std::slice::from_ref(&validators[4]),
364		);
365
366		buf.reset_validator_interest(hashes[3], &validators[2]);
367		buf.note_collation_advertised(
368			hashes[4],
369			0,
370			GroupIndex(3),
371			std::slice::from_ref(&validators[0]),
372		);
373
374		assert_eq!(
375			buf.validators_to_connect(),
376			vec![validators[3].clone(), validators[4].clone(), validators[0].clone()]
377		);
378	}
379}