polkadot_collator_protocol/collator_side/
validators_buffer.rs1use std::{
33 collections::{HashMap, VecDeque},
34 future::Future,
35 num::NonZeroUsize,
36 ops::Range,
37 pin::Pin,
38 task::{Context, Poll},
39 time::Duration,
40};
41
42use bitvec::{bitvec, vec::BitVec};
43use futures::FutureExt;
44
45use polkadot_node_network_protocol::PeerId;
46use polkadot_primitives::{AuthorityDiscoveryId, CandidateHash, GroupIndex, SessionIndex};
47
48pub const MAX_CHAINED_CANDIDATES_PER_RCB: NonZeroUsize = match NonZeroUsize::new(3) {
50 Some(cap) => cap,
51 None => panic!("max candidates per rcb cannot be zero"),
52};
53
54pub const VALIDATORS_BUFFER_CAPACITY: NonZeroUsize =
60 match NonZeroUsize::new(3 * MAX_CHAINED_CANDIDATES_PER_RCB.get()) {
61 Some(cap) => cap,
62 None => panic!("buffer capacity must be non-zero"),
63 };
64
65#[derive(Debug)]
67struct ValidatorsGroupInfo {
68 len: usize,
70 session_index: SessionIndex,
71 group_index: GroupIndex,
72}
73
74#[derive(Debug)]
78pub struct ValidatorGroupsBuffer {
79 group_infos: VecDeque<ValidatorsGroupInfo>,
81 validators: VecDeque<AuthorityDiscoveryId>,
83 should_be_connected: HashMap<CandidateHash, BitVec>,
86 cap: NonZeroUsize,
88}
89
90impl ValidatorGroupsBuffer {
91 pub fn with_capacity(cap: NonZeroUsize) -> Self {
93 Self {
94 group_infos: VecDeque::new(),
95 validators: VecDeque::new(),
96 should_be_connected: HashMap::new(),
97 cap,
98 }
99 }
100
101 pub fn validators_to_connect(&self) -> Vec<AuthorityDiscoveryId> {
103 let validators_num = self.validators.len();
104 let bits = self
105 .should_be_connected
106 .values()
107 .fold(bitvec![0; validators_num], |acc, next| acc | next);
108
109 let mut should_be_connected: Vec<AuthorityDiscoveryId> = self
110 .validators
111 .iter()
112 .enumerate()
113 .filter_map(|(idx, authority_id)| bits[idx].then(|| authority_id.clone()))
114 .collect();
115
116 if let Some(last_group) = self.group_infos.iter().last() {
117 for validator in self.validators.iter().rev().take(last_group.len) {
118 if !should_be_connected.contains(validator) {
119 should_be_connected.push(validator.clone());
120 }
121 }
122 }
123
124 should_be_connected
125 }
126
127 pub fn note_collation_advertised(
133 &mut self,
134 candidate_hash: CandidateHash,
135 session_index: SessionIndex,
136 group_index: GroupIndex,
137 validators: &[AuthorityDiscoveryId],
138 ) {
139 if validators.is_empty() {
140 return
141 }
142
143 match self.group_infos.iter().enumerate().find(|(_, group)| {
144 group.session_index == session_index && group.group_index == group_index
145 }) {
146 Some((idx, group)) => {
147 let group_start_idx = self.group_lengths_iter().take(idx).sum();
148 self.set_bits(candidate_hash, group_start_idx..(group_start_idx + group.len));
149 },
150 None => self.push(candidate_hash, session_index, group_index, validators),
151 }
152 }
153
154 pub fn reset_validator_interest(
156 &mut self,
157 candidate_hash: CandidateHash,
158 authority_id: &AuthorityDiscoveryId,
159 ) {
160 let bits = match self.should_be_connected.get_mut(&candidate_hash) {
161 Some(bits) => bits,
162 None => return,
163 };
164
165 for (idx, auth_id) in self.validators.iter().enumerate() {
166 if auth_id == authority_id {
167 bits.set(idx, false);
168 }
169 }
170 }
171
172 pub fn remove_candidate(&mut self, candidate_hash: &CandidateHash) {
177 self.should_be_connected.remove(candidate_hash);
178 }
179
180 fn push(
185 &mut self,
186 candidate_hash: CandidateHash,
187 session_index: SessionIndex,
188 group_index: GroupIndex,
189 validators: &[AuthorityDiscoveryId],
190 ) {
191 let new_group_info =
192 ValidatorsGroupInfo { len: validators.len(), session_index, group_index };
193
194 let buf = &mut self.group_infos;
195 let cap = self.cap.get();
196
197 if buf.len() >= cap {
198 let pruned_group = buf.pop_front().expect("buf is not empty; qed");
199 self.validators.drain(..pruned_group.len);
200
201 self.should_be_connected.values_mut().for_each(|bits| {
202 bits.as_mut_bitslice().shift_left(pruned_group.len);
203 });
204 }
205
206 self.validators.extend(validators.iter().cloned());
207 buf.push_back(new_group_info);
208 let buf_len = buf.len();
209 let group_start_idx = self.group_lengths_iter().take(buf_len - 1).sum();
210
211 let new_len = self.validators.len();
212 self.should_be_connected
213 .values_mut()
214 .for_each(|bits| bits.resize(new_len, false));
215 self.set_bits(candidate_hash, group_start_idx..(group_start_idx + validators.len()));
216 }
217
218 fn set_bits(&mut self, candidate_hash: CandidateHash, range: Range<usize>) {
223 let bits = self
224 .should_be_connected
225 .entry(candidate_hash)
226 .or_insert_with(|| bitvec![0; self.validators.len()]);
227
228 bits[range].fill(true);
229 }
230
231 fn group_lengths_iter(&self) -> impl Iterator<Item = usize> + '_ {
235 self.group_infos.iter().map(|group| group.len)
236 }
237}
238
239pub const RESET_INTEREST_TIMEOUT: Duration = Duration::from_secs(6);
241
242pub struct ResetInterestTimeout {
249 fut: futures_timer::Delay,
250 candidate_hash: CandidateHash,
251 peer_id: PeerId,
252}
253
254impl ResetInterestTimeout {
255 pub fn new(candidate_hash: CandidateHash, peer_id: PeerId, delay: Duration) -> Self {
257 Self { fut: futures_timer::Delay::new(delay), candidate_hash, peer_id }
258 }
259}
260
261impl Future for ResetInterestTimeout {
262 type Output = (CandidateHash, PeerId);
263
264 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
265 self.fut.poll_unpin(cx).map(|_| (self.candidate_hash, self.peer_id))
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use super::*;
272 use polkadot_primitives::Hash;
273 use sp_keyring::Sr25519Keyring;
274
275 #[test]
276 fn one_capacity_buffer() {
277 let cap = NonZeroUsize::new(1).unwrap();
278 let mut buf = ValidatorGroupsBuffer::with_capacity(cap);
279
280 let hash_a = CandidateHash(Hash::repeat_byte(0x1));
281 let hash_b = CandidateHash(Hash::repeat_byte(0x2));
282
283 let validators: Vec<_> = [
284 Sr25519Keyring::Alice,
285 Sr25519Keyring::Bob,
286 Sr25519Keyring::Charlie,
287 Sr25519Keyring::Dave,
288 Sr25519Keyring::Ferdie,
289 ]
290 .into_iter()
291 .map(|key| AuthorityDiscoveryId::from(key.public()))
292 .collect();
293
294 assert!(buf.validators_to_connect().is_empty());
295
296 buf.note_collation_advertised(hash_a, 0, GroupIndex(0), &validators[..2]);
297 assert_eq!(buf.validators_to_connect(), validators[..2].to_vec());
298
299 buf.reset_validator_interest(hash_a, &validators[1]);
300 assert_eq!(buf.validators_to_connect(), validators[0..2].to_vec());
301
302 buf.note_collation_advertised(hash_b, 0, GroupIndex(1), &validators[2..]);
303 assert_eq!(buf.validators_to_connect(), validators[2..].to_vec());
304
305 for validator in &validators[2..] {
306 buf.reset_validator_interest(hash_b, validator);
307 }
308 let mut expected = validators[2..].to_vec();
309 expected.sort();
310 let mut result = buf.validators_to_connect();
311 result.sort();
312 assert_eq!(result, expected);
313 }
314
315 #[test]
316 fn buffer_works() {
317 let cap = NonZeroUsize::new(3).unwrap();
318 let mut buf = ValidatorGroupsBuffer::with_capacity(cap);
319
320 let hashes: Vec<_> = (0..5).map(|i| CandidateHash(Hash::repeat_byte(i))).collect();
321
322 let validators: Vec<_> = [
323 Sr25519Keyring::Alice,
324 Sr25519Keyring::Bob,
325 Sr25519Keyring::Charlie,
326 Sr25519Keyring::Dave,
327 Sr25519Keyring::Ferdie,
328 ]
329 .into_iter()
330 .map(|key| AuthorityDiscoveryId::from(key.public()))
331 .collect();
332
333 buf.note_collation_advertised(hashes[0], 0, GroupIndex(0), &validators[..2]);
334 buf.note_collation_advertised(hashes[1], 0, GroupIndex(0), &validators[..2]);
335 buf.note_collation_advertised(hashes[2], 0, GroupIndex(1), &validators[2..4]);
336 buf.note_collation_advertised(hashes[2], 0, GroupIndex(1), &validators[2..4]);
337
338 assert_eq!(buf.validators_to_connect(), validators[..4].to_vec());
339
340 for validator in &validators[2..4] {
341 buf.reset_validator_interest(hashes[2], validator);
342 }
343
344 buf.reset_validator_interest(hashes[1], &validators[0]);
345 let mut expected: Vec<_> = validators[..4].iter().cloned().collect();
346 let mut result = buf.validators_to_connect();
347 expected.sort();
348 result.sort();
349 assert_eq!(result, expected);
350
351 buf.reset_validator_interest(hashes[0], &validators[0]);
352 let mut expected: Vec<_> = validators[1..4].iter().cloned().collect();
353 expected.sort();
354 let mut result = buf.validators_to_connect();
355 result.sort();
356 assert_eq!(result, expected);
357
358 buf.note_collation_advertised(hashes[3], 0, GroupIndex(1), &validators[2..4]);
359 buf.note_collation_advertised(
360 hashes[4],
361 0,
362 GroupIndex(2),
363 std::slice::from_ref(&validators[4]),
364 );
365
366 buf.reset_validator_interest(hashes[3], &validators[2]);
367 buf.note_collation_advertised(
368 hashes[4],
369 0,
370 GroupIndex(3),
371 std::slice::from_ref(&validators[0]),
372 );
373
374 assert_eq!(
375 buf.validators_to_connect(),
376 vec![validators[3].clone(), validators[4].clone(), validators[0].clone()]
377 );
378 }
379}