1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
34// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
89// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU General Public License for more details.
1314// You should have received a copy of the GNU General Public License
15// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
1617//! Validator groups buffer for connection managements.
18//!
19//! Solves 2 problems:
20//! 1. A collator may want to stay connected to multiple groups on rotation boundaries.
21//! 2. It's important to disconnect from validator when there're no collations to be fetched.
22//!
23//! We keep a simple FIFO buffer of N validator groups and a bitvec for each advertisement,
24//! 1 indicating we want to be connected to i-th validator in a buffer, 0 otherwise.
25//!
26//! The bit is set to 1 for the whole **group** whenever it's inserted into the buffer. Given a
27//! relay parent, one can reset a bit back to 0 for particular **validator**. For example, if a
28//! collation was fetched or some timeout has been hit.
29//!
30//! The bitwise OR over known advertisements gives us validators indices for connection request.
3132use std::{
33 collections::{HashMap, VecDeque},
34 future::Future,
35 num::NonZeroUsize,
36 ops::Range,
37 pin::Pin,
38 task::{Context, Poll},
39 time::Duration,
40};
4142use bitvec::{bitvec, vec::BitVec};
43use futures::FutureExt;
4445use polkadot_node_network_protocol::PeerId;
46use polkadot_primitives::{AuthorityDiscoveryId, CandidateHash, GroupIndex, SessionIndex};
4748/// Elastic scaling: how many candidates per relay chain block the collator supports building.
49pub const MAX_CHAINED_CANDIDATES_PER_RCB: NonZeroUsize = match NonZeroUsize::new(3) {
50Some(cap) => cap,
51None => panic!("max candidates per rcb cannot be zero"),
52};
5354/// The ring buffer stores at most this many unique validator groups.
55///
56/// This value should be chosen in way that all groups assigned to our para
57/// in the view can fit into the buffer multiplied by amount of candidates we support per relay
58/// chain block in the case of elastic scaling.
59pub const VALIDATORS_BUFFER_CAPACITY: NonZeroUsize =
60match NonZeroUsize::new(3 * MAX_CHAINED_CANDIDATES_PER_RCB.get()) {
61Some(cap) => cap,
62None => panic!("buffer capacity must be non-zero"),
63 };
6465/// Unique identifier of a validators group.
66#[derive(Debug)]
67struct ValidatorsGroupInfo {
68/// Number of validators in the group.
69len: usize,
70 session_index: SessionIndex,
71 group_index: GroupIndex,
72}
7374/// Ring buffer of validator groups.
75///
76/// Tracks which peers we want to be connected to with respect to advertised collations.
77#[derive(Debug)]
78pub struct ValidatorGroupsBuffer {
79/// Validator groups identifiers we **had** advertisements for.
80group_infos: VecDeque<ValidatorsGroupInfo>,
81/// Continuous buffer of validators discovery keys.
82validators: VecDeque<AuthorityDiscoveryId>,
83/// Mapping from candidate hashes to bit-vectors with bits for all `validators`.
84 /// Invariants kept: All bit-vectors are guaranteed to have the same size.
85should_be_connected: HashMap<CandidateHash, BitVec>,
86/// Buffer capacity, limits the number of **groups** tracked.
87cap: NonZeroUsize,
88}
8990impl ValidatorGroupsBuffer {
91/// Creates a new buffer with a non-zero capacity.
92pub fn with_capacity(cap: NonZeroUsize) -> Self {
93Self {
94 group_infos: VecDeque::new(),
95 validators: VecDeque::new(),
96 should_be_connected: HashMap::new(),
97 cap,
98 }
99 }
100101/// Returns discovery ids of validators we are assigned to in this backing group window.
102pub fn validators_to_connect(&self) -> Vec<AuthorityDiscoveryId> {
103let validators_num = self.validators.len();
104let bits = self
105.should_be_connected
106 .values()
107 .fold(bitvec![0; validators_num], |acc, next| acc | next);
108109let mut should_be_connected: Vec<AuthorityDiscoveryId> = self
110.validators
111 .iter()
112 .enumerate()
113 .filter_map(|(idx, authority_id)| bits[idx].then(|| authority_id.clone()))
114 .collect();
115116if let Some(last_group) = self.group_infos.iter().last() {
117for validator in self.validators.iter().rev().take(last_group.len) {
118if !should_be_connected.contains(validator) {
119 should_be_connected.push(validator.clone());
120 }
121 }
122 }
123124 should_be_connected
125 }
126127/// Note a new advertisement, marking that we want to be connected to validators
128 /// from this group.
129 ///
130 /// If max capacity is reached and the group is new, drops validators from the back
131 /// of the buffer.
132pub fn note_collation_advertised(
133&mut self,
134 candidate_hash: CandidateHash,
135 session_index: SessionIndex,
136 group_index: GroupIndex,
137 validators: &[AuthorityDiscoveryId],
138 ) {
139if validators.is_empty() {
140return
141}
142143match self.group_infos.iter().enumerate().find(|(_, group)| {
144 group.session_index == session_index && group.group_index == group_index
145 }) {
146Some((idx, group)) => {
147let group_start_idx = self.group_lengths_iter().take(idx).sum();
148self.set_bits(candidate_hash, group_start_idx..(group_start_idx + group.len));
149 },
150None => self.push(candidate_hash, session_index, group_index, validators),
151 }
152 }
153154/// Note that a validator is no longer interested in a given candidate.
155pub fn reset_validator_interest(
156&mut self,
157 candidate_hash: CandidateHash,
158 authority_id: &AuthorityDiscoveryId,
159 ) {
160let bits = match self.should_be_connected.get_mut(&candidate_hash) {
161Some(bits) => bits,
162None => return,
163 };
164165for (idx, auth_id) in self.validators.iter().enumerate() {
166if auth_id == authority_id {
167 bits.set(idx, false);
168 }
169 }
170 }
171172/// Remove advertised candidate from the buffer.
173 ///
174 /// The buffer will no longer track which validators are interested in a corresponding
175 /// advertisement.
176pub fn remove_candidate(&mut self, candidate_hash: &CandidateHash) {
177self.should_be_connected.remove(candidate_hash);
178 }
179180/// Pushes a new group to the buffer along with advertisement, setting all validators
181 /// bits to 1.
182 ///
183 /// If the buffer is full, drops group from the tail.
184fn push(
185&mut self,
186 candidate_hash: CandidateHash,
187 session_index: SessionIndex,
188 group_index: GroupIndex,
189 validators: &[AuthorityDiscoveryId],
190 ) {
191let new_group_info =
192 ValidatorsGroupInfo { len: validators.len(), session_index, group_index };
193194let buf = &mut self.group_infos;
195let cap = self.cap.get();
196197if buf.len() >= cap {
198let pruned_group = buf.pop_front().expect("buf is not empty; qed");
199self.validators.drain(..pruned_group.len);
200201self.should_be_connected.values_mut().for_each(|bits| {
202 bits.as_mut_bitslice().shift_left(pruned_group.len);
203 });
204 }
205206self.validators.extend(validators.iter().cloned());
207 buf.push_back(new_group_info);
208let buf_len = buf.len();
209let group_start_idx = self.group_lengths_iter().take(buf_len - 1).sum();
210211let new_len = self.validators.len();
212self.should_be_connected
213 .values_mut()
214 .for_each(|bits| bits.resize(new_len, false));
215self.set_bits(candidate_hash, group_start_idx..(group_start_idx + validators.len()));
216 }
217218/// Sets advertisement bits to 1 in a given range (usually corresponding to some group).
219 /// If the relay parent is unknown, inserts 0-initialized bitvec first.
220 ///
221 /// The range must be ensured to be within bounds.
222fn set_bits(&mut self, candidate_hash: CandidateHash, range: Range<usize>) {
223let bits = self
224.should_be_connected
225 .entry(candidate_hash)
226 .or_insert_with(|| bitvec![0; self.validators.len()]);
227228 bits[range].fill(true);
229 }
230231/// Returns iterator over numbers of validators in groups.
232 ///
233 /// Useful for getting an index of the first validator in i-th group.
234fn group_lengths_iter(&self) -> impl Iterator<Item = usize> + '_ {
235self.group_infos.iter().map(|group| group.len)
236 }
237}
238239/// A timeout for resetting validators' interests in collations.
240pub const RESET_INTEREST_TIMEOUT: Duration = Duration::from_secs(6);
241242/// A future that returns a candidate hash along with validator discovery
243/// keys once a timeout hit.
244///
245/// If a validator doesn't manage to fetch a collation within this timeout
246/// we should reset its interest in this advertisement in a buffer. For example,
247/// when the PoV was already requested from another peer.
248pub struct ResetInterestTimeout {
249 fut: futures_timer::Delay,
250 candidate_hash: CandidateHash,
251 peer_id: PeerId,
252}
253254impl ResetInterestTimeout {
255/// Returns new `ResetInterestTimeout` that resolves after given timeout.
256pub fn new(candidate_hash: CandidateHash, peer_id: PeerId, delay: Duration) -> Self {
257Self { fut: futures_timer::Delay::new(delay), candidate_hash, peer_id }
258 }
259}
260261impl Future for ResetInterestTimeout {
262type Output = (CandidateHash, PeerId);
263264fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
265self.fut.poll_unpin(cx).map(|_| (self.candidate_hash, self.peer_id))
266 }
267}
268269#[cfg(test)]
270mod tests {
271use super::*;
272use polkadot_primitives::Hash;
273use sp_keyring::Sr25519Keyring;
274275#[test]
276fn one_capacity_buffer() {
277let cap = NonZeroUsize::new(1).unwrap();
278let mut buf = ValidatorGroupsBuffer::with_capacity(cap);
279280let hash_a = CandidateHash(Hash::repeat_byte(0x1));
281let hash_b = CandidateHash(Hash::repeat_byte(0x2));
282283let validators: Vec<_> = [
284 Sr25519Keyring::Alice,
285 Sr25519Keyring::Bob,
286 Sr25519Keyring::Charlie,
287 Sr25519Keyring::Dave,
288 Sr25519Keyring::Ferdie,
289 ]
290 .into_iter()
291 .map(|key| AuthorityDiscoveryId::from(key.public()))
292 .collect();
293294assert!(buf.validators_to_connect().is_empty());
295296 buf.note_collation_advertised(hash_a, 0, GroupIndex(0), &validators[..2]);
297assert_eq!(buf.validators_to_connect(), validators[..2].to_vec());
298299 buf.reset_validator_interest(hash_a, &validators[1]);
300assert_eq!(buf.validators_to_connect(), validators[0..2].to_vec());
301302 buf.note_collation_advertised(hash_b, 0, GroupIndex(1), &validators[2..]);
303assert_eq!(buf.validators_to_connect(), validators[2..].to_vec());
304305for validator in &validators[2..] {
306 buf.reset_validator_interest(hash_b, validator);
307 }
308let mut expected = validators[2..].to_vec();
309 expected.sort();
310let mut result = buf.validators_to_connect();
311 result.sort();
312assert_eq!(result, expected);
313 }
314315#[test]
316fn buffer_works() {
317let cap = NonZeroUsize::new(3).unwrap();
318let mut buf = ValidatorGroupsBuffer::with_capacity(cap);
319320let hashes: Vec<_> = (0..5).map(|i| CandidateHash(Hash::repeat_byte(i))).collect();
321322let validators: Vec<_> = [
323 Sr25519Keyring::Alice,
324 Sr25519Keyring::Bob,
325 Sr25519Keyring::Charlie,
326 Sr25519Keyring::Dave,
327 Sr25519Keyring::Ferdie,
328 ]
329 .into_iter()
330 .map(|key| AuthorityDiscoveryId::from(key.public()))
331 .collect();
332333 buf.note_collation_advertised(hashes[0], 0, GroupIndex(0), &validators[..2]);
334 buf.note_collation_advertised(hashes[1], 0, GroupIndex(0), &validators[..2]);
335 buf.note_collation_advertised(hashes[2], 0, GroupIndex(1), &validators[2..4]);
336 buf.note_collation_advertised(hashes[2], 0, GroupIndex(1), &validators[2..4]);
337338assert_eq!(buf.validators_to_connect(), validators[..4].to_vec());
339340for validator in &validators[2..4] {
341 buf.reset_validator_interest(hashes[2], validator);
342 }
343344 buf.reset_validator_interest(hashes[1], &validators[0]);
345let mut expected: Vec<_> = validators[..4].iter().cloned().collect();
346let mut result = buf.validators_to_connect();
347 expected.sort();
348 result.sort();
349assert_eq!(result, expected);
350351 buf.reset_validator_interest(hashes[0], &validators[0]);
352let mut expected: Vec<_> = validators[1..4].iter().cloned().collect();
353 expected.sort();
354let mut result = buf.validators_to_connect();
355 result.sort();
356assert_eq!(result, expected);
357358 buf.note_collation_advertised(hashes[3], 0, GroupIndex(1), &validators[2..4]);
359 buf.note_collation_advertised(
360 hashes[4],
3610,
362 GroupIndex(2),
363 std::slice::from_ref(&validators[4]),
364 );
365366 buf.reset_validator_interest(hashes[3], &validators[2]);
367 buf.note_collation_advertised(
368 hashes[4],
3690,
370 GroupIndex(3),
371 std::slice::from_ref(&validators[0]),
372 );
373374assert_eq!(
375 buf.validators_to_connect(),
376vec![validators[3].clone(), validators[4].clone(), validators[0].clone()]
377 );
378 }
379}