referrerpolicy=no-referrer-when-downgrade

sc_network_sync/
block_announce_validator.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! [`BlockAnnounceValidator`] is responsible for async validation of block announcements.
20//! [`Stream`] implemented by [`BlockAnnounceValidator`] never terminates.
21
22use crate::{futures_stream::FuturesStream, LOG_TARGET};
23use futures::{stream::FusedStream, Future, FutureExt, Stream, StreamExt};
24use log::{debug, error, trace, warn};
25use sc_network_common::sync::message::BlockAnnounce;
26use sc_network_types::PeerId;
27use sp_consensus::block_validation::Validation;
28use sp_runtime::traits::{Block as BlockT, Header, Zero};
29use std::{
30	collections::{hash_map::Entry, HashMap},
31	default::Default,
32	pin::Pin,
33	task::{Context, Poll},
34};
35
36/// Maximum number of concurrent block announce validations.
37///
38/// If the queue reaches the maximum, we drop any new block
39/// announcements.
40const MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS: usize = 256;
41
42/// Maximum number of concurrent block announce validations per peer.
43///
44/// See [`MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS`] for more information.
45const MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS_PER_PEER: usize = 4;
46
47/// Item that yields [`Stream`] implementation of [`BlockAnnounceValidator`].
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub(crate) enum BlockAnnounceValidationResult<H> {
50	/// The announcement failed at validation.
51	///
52	/// The peer reputation should be decreased.
53	Failure {
54		/// The id of the peer that send us the announcement.
55		peer_id: PeerId,
56		/// Should the peer be disconnected?
57		disconnect: bool,
58	},
59	/// The announcement was validated successfully and should be passed to [`crate::ChainSync`].
60	Process {
61		/// The id of the peer that send us the announcement.
62		peer_id: PeerId,
63		/// Was this their new best block?
64		is_new_best: bool,
65		/// The announcement.
66		announce: BlockAnnounce<H>,
67	},
68	/// The block announcement should be skipped.
69	Skip {
70		/// The id of the peer that send us the announcement.
71		peer_id: PeerId,
72	},
73}
74
75impl<H> BlockAnnounceValidationResult<H> {
76	fn peer_id(&self) -> &PeerId {
77		match self {
78			BlockAnnounceValidationResult::Failure { peer_id, .. } |
79			BlockAnnounceValidationResult::Process { peer_id, .. } |
80			BlockAnnounceValidationResult::Skip { peer_id } => peer_id,
81		}
82	}
83}
84
85/// Result of [`BlockAnnounceValidator::allocate_slot_for_block_announce_validation`].
86enum AllocateSlotForBlockAnnounceValidation {
87	/// Success, there is a slot for the block announce validation.
88	Allocated,
89	/// We reached the total maximum number of validation slots.
90	TotalMaximumSlotsReached,
91	/// We reached the maximum number of validation slots for the given peer.
92	MaximumPeerSlotsReached,
93}
94
95pub(crate) struct BlockAnnounceValidator<B: BlockT> {
96	/// A type to check incoming block announcements.
97	validator: Box<dyn sp_consensus::block_validation::BlockAnnounceValidator<B> + Send>,
98	/// All block announcements that are currently being validated.
99	validations: FuturesStream<
100		Pin<Box<dyn Future<Output = BlockAnnounceValidationResult<B::Header>> + Send>>,
101	>,
102	/// Number of concurrent block announce validations per peer.
103	validations_per_peer: HashMap<PeerId, usize>,
104}
105
106impl<B: BlockT> BlockAnnounceValidator<B> {
107	pub(crate) fn new(
108		validator: Box<dyn sp_consensus::block_validation::BlockAnnounceValidator<B> + Send>,
109	) -> Self {
110		Self {
111			validator,
112			validations: Default::default(),
113			validations_per_peer: Default::default(),
114		}
115	}
116
117	/// Push a block announce validation.
118	pub(crate) fn push_block_announce_validation(
119		&mut self,
120		peer_id: PeerId,
121		hash: B::Hash,
122		announce: BlockAnnounce<B::Header>,
123		is_best: bool,
124	) {
125		let header = &announce.header;
126		let number = *header.number();
127		debug!(
128			target: LOG_TARGET,
129			"Pre-validating received block announcement {:?} with number {:?} from {}",
130			hash,
131			number,
132			peer_id,
133		);
134
135		if number.is_zero() {
136			warn!(
137				target: LOG_TARGET,
138				"๐Ÿ’” Ignored genesis block (#0) announcement from {}: {}",
139				peer_id,
140				hash,
141			);
142			return
143		}
144
145		// Try to allocate a slot for this block announce validation.
146		match self.allocate_slot_for_block_announce_validation(&peer_id) {
147			AllocateSlotForBlockAnnounceValidation::Allocated => {},
148			AllocateSlotForBlockAnnounceValidation::TotalMaximumSlotsReached => {
149				warn!(
150					target: LOG_TARGET,
151					"๐Ÿ’” Ignored block (#{} -- {}) announcement from {} because all validation slots are occupied.",
152					number,
153					hash,
154					peer_id,
155				);
156				return
157			},
158			AllocateSlotForBlockAnnounceValidation::MaximumPeerSlotsReached => {
159				debug!(
160					target: LOG_TARGET,
161					"๐Ÿ’” Ignored block (#{} -- {}) announcement from {} because all validation slots for this peer are occupied.",
162					number,
163					hash,
164					peer_id,
165				);
166				return
167			},
168		}
169
170		// Let external validator check the block announcement.
171		let assoc_data = announce.data.as_ref().map_or(&[][..], |v| v.as_slice());
172		let future = self.validator.validate(header, assoc_data);
173
174		self.validations.push(
175			async move {
176				match future.await {
177					Ok(Validation::Success { is_new_best }) => {
178						let is_new_best = is_new_best || is_best;
179
180						trace!(
181							target: LOG_TARGET,
182							"Block announcement validated successfully: from {}: {:?}. Local best: {}.",
183							peer_id,
184							announce.summary(),
185							is_new_best,
186						);
187
188						BlockAnnounceValidationResult::Process { is_new_best, announce, peer_id }
189					},
190					Ok(Validation::Failure { disconnect }) => {
191						debug!(
192							target: LOG_TARGET,
193							"Block announcement validation failed: from {}, block {:?}. Disconnect: {}.",
194							peer_id,
195							hash,
196							disconnect,
197						);
198
199						BlockAnnounceValidationResult::Failure { peer_id, disconnect }
200					},
201					Err(e) => {
202						debug!(
203							target: LOG_TARGET,
204							"๐Ÿ’” Ignoring block announcement validation from {} of block {:?} due to internal error: {}.",
205							peer_id,
206							hash,
207							e,
208						);
209
210						BlockAnnounceValidationResult::Skip { peer_id }
211					},
212				}
213			}
214			.boxed(),
215		);
216	}
217
218	/// Checks if there is a slot for a block announce validation.
219	///
220	/// The total number and the number per peer of concurrent block announce validations
221	/// is capped.
222	///
223	/// Returns [`AllocateSlotForBlockAnnounceValidation`] to inform about the result.
224	///
225	/// # Note
226	///
227	/// It is *required* to call [`Self::deallocate_slot_for_block_announce_validation`] when the
228	/// validation is finished to clear the slot.
229	fn allocate_slot_for_block_announce_validation(
230		&mut self,
231		peer_id: &PeerId,
232	) -> AllocateSlotForBlockAnnounceValidation {
233		if self.validations.len() >= MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS {
234			return AllocateSlotForBlockAnnounceValidation::TotalMaximumSlotsReached
235		}
236
237		match self.validations_per_peer.entry(*peer_id) {
238			Entry::Vacant(entry) => {
239				entry.insert(1);
240				AllocateSlotForBlockAnnounceValidation::Allocated
241			},
242			Entry::Occupied(mut entry) => {
243				if *entry.get() < MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS_PER_PEER {
244					*entry.get_mut() += 1;
245					AllocateSlotForBlockAnnounceValidation::Allocated
246				} else {
247					AllocateSlotForBlockAnnounceValidation::MaximumPeerSlotsReached
248				}
249			},
250		}
251	}
252
253	/// Should be called when a block announce validation is finished, to update the slots
254	/// of the peer that send the block announce.
255	fn deallocate_slot_for_block_announce_validation(&mut self, peer_id: &PeerId) {
256		match self.validations_per_peer.entry(*peer_id) {
257			Entry::Vacant(_) => {
258				error!(
259					target: LOG_TARGET,
260					"๐Ÿ’” Block announcement validation from peer {} finished for a slot that was not allocated!",
261					peer_id,
262				);
263			},
264			Entry::Occupied(mut entry) => match entry.get().checked_sub(1) {
265				Some(value) =>
266					if value == 0 {
267						entry.remove();
268					} else {
269						*entry.get_mut() = value;
270					},
271				None => {
272					entry.remove();
273
274					error!(
275						target: LOG_TARGET,
276						"Invalid (zero) block announce validation slot counter for peer {peer_id}.",
277					);
278					debug_assert!(
279						false,
280						"Invalid (zero) block announce validation slot counter for peer {peer_id}.",
281					);
282				},
283			},
284		}
285	}
286}
287
288impl<B: BlockT> Stream for BlockAnnounceValidator<B> {
289	type Item = BlockAnnounceValidationResult<B::Header>;
290
291	/// Poll for finished block announce validations. The stream never terminates.
292	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
293		let validation = futures::ready!(self.validations.poll_next_unpin(cx))
294			.expect("`FuturesStream` never terminates; qed");
295		self.deallocate_slot_for_block_announce_validation(validation.peer_id());
296
297		Poll::Ready(Some(validation))
298	}
299}
300
301// As [`BlockAnnounceValidator`] never terminates, we can easily implement [`FusedStream`] for it.
302impl<B: BlockT> FusedStream for BlockAnnounceValidator<B> {
303	fn is_terminated(&self) -> bool {
304		false
305	}
306}
307
308#[cfg(test)]
309mod tests {
310	use super::*;
311	use crate::block_announce_validator::AllocateSlotForBlockAnnounceValidation;
312	use sc_network_types::PeerId;
313	use sp_consensus::block_validation::DefaultBlockAnnounceValidator;
314	use substrate_test_runtime_client::runtime::Block;
315
316	#[test]
317	fn allocate_one_validation_slot() {
318		let mut validator =
319			BlockAnnounceValidator::<Block>::new(Box::new(DefaultBlockAnnounceValidator {}));
320		let peer_id = PeerId::random();
321
322		assert!(matches!(
323			validator.allocate_slot_for_block_announce_validation(&peer_id),
324			AllocateSlotForBlockAnnounceValidation::Allocated,
325		));
326	}
327
328	#[test]
329	fn allocate_validation_slots_for_two_peers() {
330		let mut validator =
331			BlockAnnounceValidator::<Block>::new(Box::new(DefaultBlockAnnounceValidator {}));
332		let peer_id_1 = PeerId::random();
333		let peer_id_2 = PeerId::random();
334
335		assert!(matches!(
336			validator.allocate_slot_for_block_announce_validation(&peer_id_1),
337			AllocateSlotForBlockAnnounceValidation::Allocated,
338		));
339		assert!(matches!(
340			validator.allocate_slot_for_block_announce_validation(&peer_id_2),
341			AllocateSlotForBlockAnnounceValidation::Allocated,
342		));
343	}
344
345	#[test]
346	fn maximum_validation_slots_per_peer() {
347		let mut validator =
348			BlockAnnounceValidator::<Block>::new(Box::new(DefaultBlockAnnounceValidator {}));
349		let peer_id = PeerId::random();
350
351		for _ in 0..MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS_PER_PEER {
352			assert!(matches!(
353				validator.allocate_slot_for_block_announce_validation(&peer_id),
354				AllocateSlotForBlockAnnounceValidation::Allocated,
355			));
356		}
357
358		assert!(matches!(
359			validator.allocate_slot_for_block_announce_validation(&peer_id),
360			AllocateSlotForBlockAnnounceValidation::MaximumPeerSlotsReached,
361		));
362	}
363
364	#[test]
365	fn validation_slots_per_peer_deallocated() {
366		let mut validator =
367			BlockAnnounceValidator::<Block>::new(Box::new(DefaultBlockAnnounceValidator {}));
368		let peer_id = PeerId::random();
369
370		for _ in 0..MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS_PER_PEER {
371			assert!(matches!(
372				validator.allocate_slot_for_block_announce_validation(&peer_id),
373				AllocateSlotForBlockAnnounceValidation::Allocated,
374			));
375		}
376
377		assert!(matches!(
378			validator.allocate_slot_for_block_announce_validation(&peer_id),
379			AllocateSlotForBlockAnnounceValidation::MaximumPeerSlotsReached,
380		));
381
382		validator.deallocate_slot_for_block_announce_validation(&peer_id);
383
384		assert!(matches!(
385			validator.allocate_slot_for_block_announce_validation(&peer_id),
386			AllocateSlotForBlockAnnounceValidation::Allocated,
387		));
388	}
389
390	#[test]
391	fn maximum_validation_slots_for_all_peers() {
392		let mut validator =
393			BlockAnnounceValidator::<Block>::new(Box::new(DefaultBlockAnnounceValidator {}));
394
395		for _ in 0..MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS {
396			validator.validations.push(
397				futures::future::ready(BlockAnnounceValidationResult::Skip {
398					peer_id: PeerId::random(),
399				})
400				.boxed(),
401			);
402		}
403
404		let peer_id = PeerId::random();
405		assert!(matches!(
406			validator.allocate_slot_for_block_announce_validation(&peer_id),
407			AllocateSlotForBlockAnnounceValidation::TotalMaximumSlotsReached,
408		));
409	}
410}