1use crate::{futures_stream::FuturesStream, LOG_TARGET};
23use futures::{stream::FusedStream, Future, FutureExt, Stream, StreamExt};
24use log::{debug, error, trace, warn};
25use sc_network_common::sync::message::BlockAnnounce;
26use sc_network_types::PeerId;
27use sp_consensus::block_validation::Validation;
28use sp_runtime::traits::{Block as BlockT, Header, Zero};
29use std::{
30 collections::{hash_map::Entry, HashMap},
31 default::Default,
32 pin::Pin,
33 task::{Context, Poll},
34};
35
36const MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS: usize = 256;
41
42const MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS_PER_PEER: usize = 4;
46
47#[derive(Debug, Clone, PartialEq, Eq)]
49pub(crate) enum BlockAnnounceValidationResult<H> {
50 Failure {
54 peer_id: PeerId,
56 disconnect: bool,
58 },
59 Process {
61 peer_id: PeerId,
63 is_new_best: bool,
65 announce: BlockAnnounce<H>,
67 },
68 Skip {
70 peer_id: PeerId,
72 },
73}
74
75impl<H> BlockAnnounceValidationResult<H> {
76 fn peer_id(&self) -> &PeerId {
77 match self {
78 BlockAnnounceValidationResult::Failure { peer_id, .. } |
79 BlockAnnounceValidationResult::Process { peer_id, .. } |
80 BlockAnnounceValidationResult::Skip { peer_id } => peer_id,
81 }
82 }
83}
84
85enum AllocateSlotForBlockAnnounceValidation {
87 Allocated,
89 TotalMaximumSlotsReached,
91 MaximumPeerSlotsReached,
93}
94
95pub(crate) struct BlockAnnounceValidator<B: BlockT> {
96 validator: Box<dyn sp_consensus::block_validation::BlockAnnounceValidator<B> + Send>,
98 validations: FuturesStream<
100 Pin<Box<dyn Future<Output = BlockAnnounceValidationResult<B::Header>> + Send>>,
101 >,
102 validations_per_peer: HashMap<PeerId, usize>,
104}
105
106impl<B: BlockT> BlockAnnounceValidator<B> {
107 pub(crate) fn new(
108 validator: Box<dyn sp_consensus::block_validation::BlockAnnounceValidator<B> + Send>,
109 ) -> Self {
110 Self {
111 validator,
112 validations: Default::default(),
113 validations_per_peer: Default::default(),
114 }
115 }
116
117 pub(crate) fn push_block_announce_validation(
119 &mut self,
120 peer_id: PeerId,
121 hash: B::Hash,
122 announce: BlockAnnounce<B::Header>,
123 is_best: bool,
124 ) {
125 let header = &announce.header;
126 let number = *header.number();
127 debug!(
128 target: LOG_TARGET,
129 "Pre-validating received block announcement {:?} with number {:?} from {}",
130 hash,
131 number,
132 peer_id,
133 );
134
135 if number.is_zero() {
136 warn!(
137 target: LOG_TARGET,
138 "๐ Ignored genesis block (#0) announcement from {}: {}",
139 peer_id,
140 hash,
141 );
142 return
143 }
144
145 match self.allocate_slot_for_block_announce_validation(&peer_id) {
147 AllocateSlotForBlockAnnounceValidation::Allocated => {},
148 AllocateSlotForBlockAnnounceValidation::TotalMaximumSlotsReached => {
149 warn!(
150 target: LOG_TARGET,
151 "๐ Ignored block (#{} -- {}) announcement from {} because all validation slots are occupied.",
152 number,
153 hash,
154 peer_id,
155 );
156 return
157 },
158 AllocateSlotForBlockAnnounceValidation::MaximumPeerSlotsReached => {
159 debug!(
160 target: LOG_TARGET,
161 "๐ Ignored block (#{} -- {}) announcement from {} because all validation slots for this peer are occupied.",
162 number,
163 hash,
164 peer_id,
165 );
166 return
167 },
168 }
169
170 let assoc_data = announce.data.as_ref().map_or(&[][..], |v| v.as_slice());
172 let future = self.validator.validate(header, assoc_data);
173
174 self.validations.push(
175 async move {
176 match future.await {
177 Ok(Validation::Success { is_new_best }) => {
178 let is_new_best = is_new_best || is_best;
179
180 trace!(
181 target: LOG_TARGET,
182 "Block announcement validated successfully: from {}: {:?}. Local best: {}.",
183 peer_id,
184 announce.summary(),
185 is_new_best,
186 );
187
188 BlockAnnounceValidationResult::Process { is_new_best, announce, peer_id }
189 },
190 Ok(Validation::Failure { disconnect }) => {
191 debug!(
192 target: LOG_TARGET,
193 "Block announcement validation failed: from {}, block {:?}. Disconnect: {}.",
194 peer_id,
195 hash,
196 disconnect,
197 );
198
199 BlockAnnounceValidationResult::Failure { peer_id, disconnect }
200 },
201 Err(e) => {
202 debug!(
203 target: LOG_TARGET,
204 "๐ Ignoring block announcement validation from {} of block {:?} due to internal error: {}.",
205 peer_id,
206 hash,
207 e,
208 );
209
210 BlockAnnounceValidationResult::Skip { peer_id }
211 },
212 }
213 }
214 .boxed(),
215 );
216 }
217
218 fn allocate_slot_for_block_announce_validation(
230 &mut self,
231 peer_id: &PeerId,
232 ) -> AllocateSlotForBlockAnnounceValidation {
233 if self.validations.len() >= MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS {
234 return AllocateSlotForBlockAnnounceValidation::TotalMaximumSlotsReached
235 }
236
237 match self.validations_per_peer.entry(*peer_id) {
238 Entry::Vacant(entry) => {
239 entry.insert(1);
240 AllocateSlotForBlockAnnounceValidation::Allocated
241 },
242 Entry::Occupied(mut entry) => {
243 if *entry.get() < MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS_PER_PEER {
244 *entry.get_mut() += 1;
245 AllocateSlotForBlockAnnounceValidation::Allocated
246 } else {
247 AllocateSlotForBlockAnnounceValidation::MaximumPeerSlotsReached
248 }
249 },
250 }
251 }
252
253 fn deallocate_slot_for_block_announce_validation(&mut self, peer_id: &PeerId) {
256 match self.validations_per_peer.entry(*peer_id) {
257 Entry::Vacant(_) => {
258 error!(
259 target: LOG_TARGET,
260 "๐ Block announcement validation from peer {} finished for a slot that was not allocated!",
261 peer_id,
262 );
263 },
264 Entry::Occupied(mut entry) => match entry.get().checked_sub(1) {
265 Some(value) =>
266 if value == 0 {
267 entry.remove();
268 } else {
269 *entry.get_mut() = value;
270 },
271 None => {
272 entry.remove();
273
274 error!(
275 target: LOG_TARGET,
276 "Invalid (zero) block announce validation slot counter for peer {peer_id}.",
277 );
278 debug_assert!(
279 false,
280 "Invalid (zero) block announce validation slot counter for peer {peer_id}.",
281 );
282 },
283 },
284 }
285 }
286}
287
288impl<B: BlockT> Stream for BlockAnnounceValidator<B> {
289 type Item = BlockAnnounceValidationResult<B::Header>;
290
291 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
293 let validation = futures::ready!(self.validations.poll_next_unpin(cx))
294 .expect("`FuturesStream` never terminates; qed");
295 self.deallocate_slot_for_block_announce_validation(validation.peer_id());
296
297 Poll::Ready(Some(validation))
298 }
299}
300
301impl<B: BlockT> FusedStream for BlockAnnounceValidator<B> {
303 fn is_terminated(&self) -> bool {
304 false
305 }
306}
307
308#[cfg(test)]
309mod tests {
310 use super::*;
311 use crate::block_announce_validator::AllocateSlotForBlockAnnounceValidation;
312 use sc_network_types::PeerId;
313 use sp_consensus::block_validation::DefaultBlockAnnounceValidator;
314 use substrate_test_runtime_client::runtime::Block;
315
316 #[test]
317 fn allocate_one_validation_slot() {
318 let mut validator =
319 BlockAnnounceValidator::<Block>::new(Box::new(DefaultBlockAnnounceValidator {}));
320 let peer_id = PeerId::random();
321
322 assert!(matches!(
323 validator.allocate_slot_for_block_announce_validation(&peer_id),
324 AllocateSlotForBlockAnnounceValidation::Allocated,
325 ));
326 }
327
328 #[test]
329 fn allocate_validation_slots_for_two_peers() {
330 let mut validator =
331 BlockAnnounceValidator::<Block>::new(Box::new(DefaultBlockAnnounceValidator {}));
332 let peer_id_1 = PeerId::random();
333 let peer_id_2 = PeerId::random();
334
335 assert!(matches!(
336 validator.allocate_slot_for_block_announce_validation(&peer_id_1),
337 AllocateSlotForBlockAnnounceValidation::Allocated,
338 ));
339 assert!(matches!(
340 validator.allocate_slot_for_block_announce_validation(&peer_id_2),
341 AllocateSlotForBlockAnnounceValidation::Allocated,
342 ));
343 }
344
345 #[test]
346 fn maximum_validation_slots_per_peer() {
347 let mut validator =
348 BlockAnnounceValidator::<Block>::new(Box::new(DefaultBlockAnnounceValidator {}));
349 let peer_id = PeerId::random();
350
351 for _ in 0..MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS_PER_PEER {
352 assert!(matches!(
353 validator.allocate_slot_for_block_announce_validation(&peer_id),
354 AllocateSlotForBlockAnnounceValidation::Allocated,
355 ));
356 }
357
358 assert!(matches!(
359 validator.allocate_slot_for_block_announce_validation(&peer_id),
360 AllocateSlotForBlockAnnounceValidation::MaximumPeerSlotsReached,
361 ));
362 }
363
364 #[test]
365 fn validation_slots_per_peer_deallocated() {
366 let mut validator =
367 BlockAnnounceValidator::<Block>::new(Box::new(DefaultBlockAnnounceValidator {}));
368 let peer_id = PeerId::random();
369
370 for _ in 0..MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS_PER_PEER {
371 assert!(matches!(
372 validator.allocate_slot_for_block_announce_validation(&peer_id),
373 AllocateSlotForBlockAnnounceValidation::Allocated,
374 ));
375 }
376
377 assert!(matches!(
378 validator.allocate_slot_for_block_announce_validation(&peer_id),
379 AllocateSlotForBlockAnnounceValidation::MaximumPeerSlotsReached,
380 ));
381
382 validator.deallocate_slot_for_block_announce_validation(&peer_id);
383
384 assert!(matches!(
385 validator.allocate_slot_for_block_announce_validation(&peer_id),
386 AllocateSlotForBlockAnnounceValidation::Allocated,
387 ));
388 }
389
390 #[test]
391 fn maximum_validation_slots_for_all_peers() {
392 let mut validator =
393 BlockAnnounceValidator::<Block>::new(Box::new(DefaultBlockAnnounceValidator {}));
394
395 for _ in 0..MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS {
396 validator.validations.push(
397 futures::future::ready(BlockAnnounceValidationResult::Skip {
398 peer_id: PeerId::random(),
399 })
400 .boxed(),
401 );
402 }
403
404 let peer_id = PeerId::random();
405 assert!(matches!(
406 validator.allocate_slot_for_block_announce_validation(&peer_id),
407 AllocateSlotForBlockAnnounceValidation::TotalMaximumSlotsReached,
408 ));
409 }
410}