use crate::{futures_stream::FuturesStream, LOG_TARGET};
use futures::{stream::FusedStream, Future, FutureExt, Stream, StreamExt};
use log::{debug, error, trace, warn};
use sc_network_common::sync::message::BlockAnnounce;
use sc_network_types::PeerId;
use sp_consensus::block_validation::Validation;
use sp_runtime::traits::{Block as BlockT, Header, Zero};
use std::{
collections::{hash_map::Entry, HashMap},
default::Default,
pin::Pin,
task::{Context, Poll},
};
const MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS: usize = 256;
const MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS_PER_PEER: usize = 4;
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum BlockAnnounceValidationResult<H> {
Failure {
peer_id: PeerId,
disconnect: bool,
},
Process {
peer_id: PeerId,
is_new_best: bool,
announce: BlockAnnounce<H>,
},
Skip {
peer_id: PeerId,
},
}
impl<H> BlockAnnounceValidationResult<H> {
fn peer_id(&self) -> &PeerId {
match self {
BlockAnnounceValidationResult::Failure { peer_id, .. } |
BlockAnnounceValidationResult::Process { peer_id, .. } |
BlockAnnounceValidationResult::Skip { peer_id } => peer_id,
}
}
}
enum AllocateSlotForBlockAnnounceValidation {
Allocated,
TotalMaximumSlotsReached,
MaximumPeerSlotsReached,
}
pub(crate) struct BlockAnnounceValidator<B: BlockT> {
validator: Box<dyn sp_consensus::block_validation::BlockAnnounceValidator<B> + Send>,
validations: FuturesStream<
Pin<Box<dyn Future<Output = BlockAnnounceValidationResult<B::Header>> + Send>>,
>,
validations_per_peer: HashMap<PeerId, usize>,
}
impl<B: BlockT> BlockAnnounceValidator<B> {
pub(crate) fn new(
validator: Box<dyn sp_consensus::block_validation::BlockAnnounceValidator<B> + Send>,
) -> Self {
Self {
validator,
validations: Default::default(),
validations_per_peer: Default::default(),
}
}
pub(crate) fn push_block_announce_validation(
&mut self,
peer_id: PeerId,
hash: B::Hash,
announce: BlockAnnounce<B::Header>,
is_best: bool,
) {
let header = &announce.header;
let number = *header.number();
debug!(
target: LOG_TARGET,
"Pre-validating received block announcement {:?} with number {:?} from {}",
hash,
number,
peer_id,
);
if number.is_zero() {
warn!(
target: LOG_TARGET,
"๐ Ignored genesis block (#0) announcement from {}: {}",
peer_id,
hash,
);
return
}
match self.allocate_slot_for_block_announce_validation(&peer_id) {
AllocateSlotForBlockAnnounceValidation::Allocated => {},
AllocateSlotForBlockAnnounceValidation::TotalMaximumSlotsReached => {
warn!(
target: LOG_TARGET,
"๐ Ignored block (#{} -- {}) announcement from {} because all validation slots are occupied.",
number,
hash,
peer_id,
);
return
},
AllocateSlotForBlockAnnounceValidation::MaximumPeerSlotsReached => {
debug!(
target: LOG_TARGET,
"๐ Ignored block (#{} -- {}) announcement from {} because all validation slots for this peer are occupied.",
number,
hash,
peer_id,
);
return
},
}
let assoc_data = announce.data.as_ref().map_or(&[][..], |v| v.as_slice());
let future = self.validator.validate(header, assoc_data);
self.validations.push(
async move {
match future.await {
Ok(Validation::Success { is_new_best }) => {
let is_new_best = is_new_best || is_best;
trace!(
target: LOG_TARGET,
"Block announcement validated successfully: from {}: {:?}. Local best: {}.",
peer_id,
announce.summary(),
is_new_best,
);
BlockAnnounceValidationResult::Process { is_new_best, announce, peer_id }
},
Ok(Validation::Failure { disconnect }) => {
debug!(
target: LOG_TARGET,
"Block announcement validation failed: from {}, block {:?}. Disconnect: {}.",
peer_id,
hash,
disconnect,
);
BlockAnnounceValidationResult::Failure { peer_id, disconnect }
},
Err(e) => {
debug!(
target: LOG_TARGET,
"๐ Ignoring block announcement validation from {} of block {:?} due to internal error: {}.",
peer_id,
hash,
e,
);
BlockAnnounceValidationResult::Skip { peer_id }
},
}
}
.boxed(),
);
}
fn allocate_slot_for_block_announce_validation(
&mut self,
peer_id: &PeerId,
) -> AllocateSlotForBlockAnnounceValidation {
if self.validations.len() >= MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS {
return AllocateSlotForBlockAnnounceValidation::TotalMaximumSlotsReached
}
match self.validations_per_peer.entry(*peer_id) {
Entry::Vacant(entry) => {
entry.insert(1);
AllocateSlotForBlockAnnounceValidation::Allocated
},
Entry::Occupied(mut entry) => {
if *entry.get() < MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS_PER_PEER {
*entry.get_mut() += 1;
AllocateSlotForBlockAnnounceValidation::Allocated
} else {
AllocateSlotForBlockAnnounceValidation::MaximumPeerSlotsReached
}
},
}
}
fn deallocate_slot_for_block_announce_validation(&mut self, peer_id: &PeerId) {
match self.validations_per_peer.entry(*peer_id) {
Entry::Vacant(_) => {
error!(
target: LOG_TARGET,
"๐ Block announcement validation from peer {} finished for a slot that was not allocated!",
peer_id,
);
},
Entry::Occupied(mut entry) => match entry.get().checked_sub(1) {
Some(value) =>
if value == 0 {
entry.remove();
} else {
*entry.get_mut() = value;
},
None => {
entry.remove();
error!(
target: LOG_TARGET,
"Invalid (zero) block announce validation slot counter for peer {peer_id}.",
);
debug_assert!(
false,
"Invalid (zero) block announce validation slot counter for peer {peer_id}.",
);
},
},
}
}
}
impl<B: BlockT> Stream for BlockAnnounceValidator<B> {
type Item = BlockAnnounceValidationResult<B::Header>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let validation = futures::ready!(self.validations.poll_next_unpin(cx))
.expect("`FuturesStream` never terminates; qed");
self.deallocate_slot_for_block_announce_validation(validation.peer_id());
Poll::Ready(Some(validation))
}
}
impl<B: BlockT> FusedStream for BlockAnnounceValidator<B> {
fn is_terminated(&self) -> bool {
false
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::block_announce_validator::AllocateSlotForBlockAnnounceValidation;
use sc_network_types::PeerId;
use sp_consensus::block_validation::DefaultBlockAnnounceValidator;
use substrate_test_runtime_client::runtime::Block;
#[test]
fn allocate_one_validation_slot() {
let mut validator =
BlockAnnounceValidator::<Block>::new(Box::new(DefaultBlockAnnounceValidator {}));
let peer_id = PeerId::random();
assert!(matches!(
validator.allocate_slot_for_block_announce_validation(&peer_id),
AllocateSlotForBlockAnnounceValidation::Allocated,
));
}
#[test]
fn allocate_validation_slots_for_two_peers() {
let mut validator =
BlockAnnounceValidator::<Block>::new(Box::new(DefaultBlockAnnounceValidator {}));
let peer_id_1 = PeerId::random();
let peer_id_2 = PeerId::random();
assert!(matches!(
validator.allocate_slot_for_block_announce_validation(&peer_id_1),
AllocateSlotForBlockAnnounceValidation::Allocated,
));
assert!(matches!(
validator.allocate_slot_for_block_announce_validation(&peer_id_2),
AllocateSlotForBlockAnnounceValidation::Allocated,
));
}
#[test]
fn maximum_validation_slots_per_peer() {
let mut validator =
BlockAnnounceValidator::<Block>::new(Box::new(DefaultBlockAnnounceValidator {}));
let peer_id = PeerId::random();
for _ in 0..MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS_PER_PEER {
assert!(matches!(
validator.allocate_slot_for_block_announce_validation(&peer_id),
AllocateSlotForBlockAnnounceValidation::Allocated,
));
}
assert!(matches!(
validator.allocate_slot_for_block_announce_validation(&peer_id),
AllocateSlotForBlockAnnounceValidation::MaximumPeerSlotsReached,
));
}
#[test]
fn validation_slots_per_peer_deallocated() {
let mut validator =
BlockAnnounceValidator::<Block>::new(Box::new(DefaultBlockAnnounceValidator {}));
let peer_id = PeerId::random();
for _ in 0..MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS_PER_PEER {
assert!(matches!(
validator.allocate_slot_for_block_announce_validation(&peer_id),
AllocateSlotForBlockAnnounceValidation::Allocated,
));
}
assert!(matches!(
validator.allocate_slot_for_block_announce_validation(&peer_id),
AllocateSlotForBlockAnnounceValidation::MaximumPeerSlotsReached,
));
validator.deallocate_slot_for_block_announce_validation(&peer_id);
assert!(matches!(
validator.allocate_slot_for_block_announce_validation(&peer_id),
AllocateSlotForBlockAnnounceValidation::Allocated,
));
}
#[test]
fn maximum_validation_slots_for_all_peers() {
let mut validator =
BlockAnnounceValidator::<Block>::new(Box::new(DefaultBlockAnnounceValidator {}));
for _ in 0..MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS {
validator.validations.push(
futures::future::ready(BlockAnnounceValidationResult::Skip {
peer_id: PeerId::random(),
})
.boxed(),
);
}
let peer_id = PeerId::random();
assert!(matches!(
validator.allocate_slot_for_block_announce_validation(&peer_id),
AllocateSlotForBlockAnnounceValidation::TotalMaximumSlotsReached,
));
}
}