use super::{
seconded_and_sufficient, CandidateDescriptorVersion, TransposedClaimQueue,
BENEFIT_VALID_RESPONSE, BENEFIT_VALID_STATEMENT, COST_IMPROPERLY_DECODED_RESPONSE,
COST_INVALID_CORE_INDEX, COST_INVALID_RESPONSE, COST_INVALID_SESSION_INDEX,
COST_INVALID_SIGNATURE, COST_UNREQUESTED_RESPONSE_STATEMENT,
COST_UNSUPPORTED_DESCRIPTOR_VERSION, REQUEST_RETRY_DELAY,
};
use crate::LOG_TARGET;
use bitvec::prelude::{BitVec, Lsb0};
use polkadot_node_network_protocol::{
request_response::{
outgoing::{Recipient as RequestRecipient, RequestError},
v2::{AttestedCandidateRequest, AttestedCandidateResponse},
OutgoingRequest, OutgoingResult, MAX_PARALLEL_ATTESTED_CANDIDATE_REQUESTS,
},
v2::StatementFilter,
PeerId, UnifiedReputationChange as Rep,
};
use polkadot_primitives::{
vstaging::CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CandidateHash,
CompactStatement, GroupIndex, Hash, Id as ParaId, PersistedValidationData, SessionIndex,
SignedStatement, SigningContext, ValidatorId, ValidatorIndex,
};
use futures::{future::BoxFuture, prelude::*, stream::FuturesUnordered};
use std::{
collections::{
hash_map::{Entry as HEntry, HashMap},
HashSet, VecDeque,
},
time::Instant,
};
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct CandidateIdentifier {
pub relay_parent: Hash,
pub candidate_hash: CandidateHash,
pub group_index: GroupIndex,
}
struct TaggedResponse {
identifier: CandidateIdentifier,
requested_peer: PeerId,
props: RequestProperties,
response: OutgoingResult<AttestedCandidateResponse>,
}
#[derive(Debug)]
pub struct RequestedCandidate {
priority: Priority,
known_by: VecDeque<PeerId>,
in_flight: bool,
next_retry_time: Option<Instant>,
}
impl RequestedCandidate {
fn is_pending(&self) -> bool {
if self.in_flight {
return false
}
if let Some(next_retry_time) = self.next_retry_time {
let can_retry = Instant::now() >= next_retry_time;
if !can_retry {
return false
}
}
true
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
enum Origin {
Cluster = 0,
Unspecified = 1,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
struct Priority {
origin: Origin,
attempts: usize,
}
pub struct Entry<'a> {
prev_index: usize,
identifier: CandidateIdentifier,
by_priority: &'a mut Vec<(Priority, CandidateIdentifier)>,
requested: &'a mut RequestedCandidate,
}
impl<'a> Entry<'a> {
pub fn add_peer(&mut self, peer: PeerId) {
if !self.requested.known_by.contains(&peer) {
self.requested.known_by.push_back(peer);
}
}
pub fn set_cluster_priority(&mut self) {
self.requested.priority.origin = Origin::Cluster;
insert_or_update_priority(
&mut *self.by_priority,
Some(self.prev_index),
self.identifier.clone(),
self.requested.priority.clone(),
);
}
}
pub struct RequestManager {
requests: HashMap<CandidateIdentifier, RequestedCandidate>,
by_priority: Vec<(Priority, CandidateIdentifier)>,
unique_identifiers: HashMap<CandidateHash, HashSet<CandidateIdentifier>>,
}
impl RequestManager {
pub fn new() -> Self {
RequestManager {
requests: HashMap::new(),
by_priority: Vec::new(),
unique_identifiers: HashMap::new(),
}
}
pub fn get_or_insert(
&mut self,
relay_parent: Hash,
candidate_hash: CandidateHash,
group_index: GroupIndex,
) -> Entry {
let identifier = CandidateIdentifier { relay_parent, candidate_hash, group_index };
let (candidate, fresh) = match self.requests.entry(identifier.clone()) {
HEntry::Occupied(e) => (e.into_mut(), false),
HEntry::Vacant(e) => (
e.insert(RequestedCandidate {
priority: Priority { attempts: 0, origin: Origin::Unspecified },
known_by: VecDeque::new(),
in_flight: false,
next_retry_time: None,
}),
true,
),
};
let priority_index = if fresh {
self.unique_identifiers
.entry(candidate_hash)
.or_default()
.insert(identifier.clone());
insert_or_update_priority(
&mut self.by_priority,
None,
identifier.clone(),
candidate.priority.clone(),
)
} else {
match self
.by_priority
.binary_search(&(candidate.priority.clone(), identifier.clone()))
{
Ok(i) => i,
Err(_) => unreachable!("requested candidates always have a priority entry; qed"),
}
};
Entry {
prev_index: priority_index,
identifier,
by_priority: &mut self.by_priority,
requested: candidate,
}
}
pub fn remove_for(&mut self, candidate: CandidateHash) {
if let Some(identifiers) = self.unique_identifiers.remove(&candidate) {
self.by_priority.retain(|(_priority, id)| !identifiers.contains(&id));
for id in identifiers {
self.requests.remove(&id);
}
}
}
pub fn remove_by_relay_parent(&mut self, relay_parent: Hash) {
let mut candidate_hashes = HashSet::new();
self.by_priority.retain(|(_priority, id)| {
let retain = relay_parent != id.relay_parent;
if !retain {
self.requests.remove(id);
candidate_hashes.insert(id.candidate_hash);
}
retain
});
for candidate_hash in candidate_hashes {
match self.unique_identifiers.entry(candidate_hash) {
HEntry::Occupied(mut entry) => {
entry.get_mut().retain(|id| relay_parent != id.relay_parent);
if entry.get().is_empty() {
entry.remove();
}
},
HEntry::Vacant(_) => (),
}
}
gum::debug!(
target: LOG_TARGET,
"Requests remaining after cleanup: {}",
self.by_priority.len(),
);
}
pub fn has_pending_requests(&self) -> bool {
for (_id, entry) in &self.requests {
if entry.is_pending() {
return true
}
}
false
}
pub fn next_retry_time(&mut self) -> Option<Instant> {
let mut next = None;
for (_id, request) in self.requests.iter().filter(|(_id, request)| !request.in_flight) {
if let Some(next_retry_time) = request.next_retry_time {
if next.map_or(true, |next| next_retry_time < next) {
next = Some(next_retry_time);
}
}
}
next
}
pub fn next_request(
&mut self,
response_manager: &mut ResponseManager,
request_props: impl Fn(&CandidateIdentifier) -> Option<RequestProperties>,
peer_advertised: impl Fn(&CandidateIdentifier, &PeerId) -> Option<StatementFilter>,
) -> Option<OutgoingRequest<AttestedCandidateRequest>> {
if response_manager.len() >= 2 * MAX_PARALLEL_ATTESTED_CANDIDATE_REQUESTS as usize {
return None
}
let mut res = None;
let mut cleanup_outdated = Vec::new();
for (i, (_priority, id)) in self.by_priority.iter().enumerate() {
let entry = match self.requests.get_mut(&id) {
None => {
gum::error!(
target: LOG_TARGET,
identifier = ?id,
"Missing entry for priority queue member",
);
continue
},
Some(e) => e,
};
if !entry.is_pending() {
continue
}
let props = match request_props(&id) {
None => {
cleanup_outdated.push((i, id.clone()));
continue
},
Some(s) => s,
};
let target = match find_request_target_with_update(
&mut entry.known_by,
id,
&props,
&peer_advertised,
&response_manager,
) {
None => continue,
Some(t) => t,
};
gum::debug!(
target: crate::LOG_TARGET,
candidate_hash = ?id.candidate_hash,
peer = ?target,
"Issuing candidate request"
);
let (request, response_fut) = OutgoingRequest::new(
RequestRecipient::Peer(target),
AttestedCandidateRequest {
candidate_hash: id.candidate_hash,
mask: props.unwanted_mask.clone(),
},
);
let stored_id = id.clone();
response_manager.push(
Box::pin(async move {
TaggedResponse {
identifier: stored_id,
requested_peer: target,
props,
response: response_fut.await,
}
}),
target,
);
entry.in_flight = true;
res = Some(request);
break
}
for (priority_index, identifier) in cleanup_outdated.into_iter().rev() {
self.by_priority.remove(priority_index);
self.requests.remove(&identifier);
if let HEntry::Occupied(mut e) =
self.unique_identifiers.entry(identifier.candidate_hash)
{
e.get_mut().remove(&identifier);
if e.get().is_empty() {
e.remove();
}
}
}
res
}
}
pub struct ResponseManager {
pending_responses: FuturesUnordered<BoxFuture<'static, TaggedResponse>>,
active_peers: HashSet<PeerId>,
}
impl ResponseManager {
pub fn new() -> Self {
Self { pending_responses: FuturesUnordered::new(), active_peers: HashSet::new() }
}
pub async fn incoming(&mut self) -> Option<UnhandledResponse> {
self.pending_responses.next().await.map(|response| {
self.active_peers.remove(&response.requested_peer);
UnhandledResponse { response }
})
}
fn len(&self) -> usize {
self.pending_responses.len()
}
fn push(&mut self, response: BoxFuture<'static, TaggedResponse>, target: PeerId) {
self.pending_responses.push(response);
self.active_peers.insert(target);
}
fn is_sending_to(&self, peer: &PeerId) -> bool {
self.active_peers.contains(peer)
}
}
#[derive(Clone)]
pub struct RequestProperties {
pub unwanted_mask: StatementFilter,
pub backing_threshold: Option<usize>,
}
fn find_request_target_with_update(
known_by: &mut VecDeque<PeerId>,
candidate_identifier: &CandidateIdentifier,
props: &RequestProperties,
peer_advertised: impl Fn(&CandidateIdentifier, &PeerId) -> Option<StatementFilter>,
response_manager: &ResponseManager,
) -> Option<PeerId> {
let mut prune = Vec::new();
let mut target = None;
for (i, p) in known_by.iter().enumerate() {
if response_manager.is_sending_to(p) {
continue
}
let mut filter = match peer_advertised(candidate_identifier, p) {
None => {
prune.push(i);
continue
},
Some(f) => f,
};
filter.mask_seconded(&props.unwanted_mask.seconded_in_group);
filter.mask_valid(&props.unwanted_mask.validated_in_group);
if seconded_and_sufficient(&filter, props.backing_threshold) {
target = Some((i, *p));
break
}
}
let prune_count = prune.len();
for i in prune {
known_by.remove(i);
}
if let Some((i, p)) = target {
known_by.remove(i - prune_count);
known_by.push_back(p);
Some(p)
} else {
None
}
}
pub struct UnhandledResponse {
response: TaggedResponse,
}
impl UnhandledResponse {
pub fn candidate_identifier(&self) -> &CandidateIdentifier {
&self.response.identifier
}
pub fn requested_peer(&self) -> &PeerId {
&self.response.requested_peer
}
pub fn validate_response(
self,
manager: &mut RequestManager,
group: &[ValidatorIndex],
session: SessionIndex,
validator_key_lookup: impl Fn(ValidatorIndex) -> Option<ValidatorId>,
allowed_para_lookup: impl Fn(ParaId, GroupIndex) -> bool,
disabled_mask: BitVec<u8, Lsb0>,
transposed_cq: &TransposedClaimQueue,
allow_v2_descriptors: bool,
) -> ResponseValidationOutput {
let UnhandledResponse {
response: TaggedResponse { identifier, requested_peer, props, response },
} = self;
let entry = match manager.requests.get_mut(&identifier) {
None =>
return ResponseValidationOutput {
requested_peer,
reputation_changes: Vec::new(),
request_status: CandidateRequestStatus::Outdated,
},
Some(e) => e,
};
let priority_index = match manager
.by_priority
.binary_search(&(entry.priority.clone(), identifier.clone()))
{
Ok(i) => i,
Err(_) => unreachable!("requested candidates always have a priority entry; qed"),
};
entry.next_retry_time = Some(Instant::now() + REQUEST_RETRY_DELAY);
entry.in_flight = false;
entry.priority.attempts += 1;
insert_or_update_priority(
&mut manager.by_priority,
Some(priority_index),
identifier.clone(),
entry.priority.clone(),
);
let complete_response = match response {
Err(RequestError::InvalidResponse(e)) => {
gum::trace!(
target: LOG_TARGET,
err = ?e,
peer = ?requested_peer,
"Improperly encoded response"
);
return ResponseValidationOutput {
requested_peer,
reputation_changes: vec![(requested_peer, COST_IMPROPERLY_DECODED_RESPONSE)],
request_status: CandidateRequestStatus::Incomplete,
}
},
Err(e @ RequestError::NetworkError(_) | e @ RequestError::Canceled(_)) => {
gum::trace!(
target: LOG_TARGET,
err = ?e,
peer = ?requested_peer,
"Request error"
);
return ResponseValidationOutput {
requested_peer,
reputation_changes: vec![],
request_status: CandidateRequestStatus::Incomplete,
}
},
Ok(response) => response,
};
let output = validate_complete_response(
&identifier,
props,
complete_response,
requested_peer,
group,
session,
validator_key_lookup,
allowed_para_lookup,
disabled_mask,
transposed_cq,
allow_v2_descriptors,
);
if let CandidateRequestStatus::Complete { .. } = output.request_status {
manager.remove_for(identifier.candidate_hash);
}
output
}
}
fn validate_complete_response(
identifier: &CandidateIdentifier,
props: RequestProperties,
response: AttestedCandidateResponse,
requested_peer: PeerId,
group: &[ValidatorIndex],
session: SessionIndex,
validator_key_lookup: impl Fn(ValidatorIndex) -> Option<ValidatorId>,
allowed_para_lookup: impl Fn(ParaId, GroupIndex) -> bool,
disabled_mask: BitVec<u8, Lsb0>,
transposed_cq: &TransposedClaimQueue,
allow_v2_descriptors: bool,
) -> ResponseValidationOutput {
let RequestProperties { backing_threshold, mut unwanted_mask } = props;
if !unwanted_mask.has_len(group.len()) {
gum::error!(
target: LOG_TARGET,
group_len = group.len(),
"Logic bug: group size != sent bitmask len"
);
unwanted_mask.seconded_in_group.resize(group.len(), true);
unwanted_mask.validated_in_group.resize(group.len(), true);
}
let invalid_candidate_output = |cost: Rep| ResponseValidationOutput {
request_status: CandidateRequestStatus::Incomplete,
reputation_changes: vec![(requested_peer, cost)],
requested_peer,
};
let mut rep_changes = Vec::new();
{
if response.candidate_receipt.descriptor.relay_parent() != identifier.relay_parent {
return invalid_candidate_output(COST_INVALID_RESPONSE)
}
if response.candidate_receipt.descriptor.persisted_validation_data_hash() !=
response.persisted_validation_data.hash()
{
return invalid_candidate_output(COST_INVALID_RESPONSE)
}
if !allowed_para_lookup(
response.candidate_receipt.descriptor.para_id(),
identifier.group_index,
) {
return invalid_candidate_output(COST_INVALID_RESPONSE)
}
if response.candidate_receipt.hash() != identifier.candidate_hash {
return invalid_candidate_output(COST_INVALID_RESPONSE)
}
let candidate_hash = response.candidate_receipt.hash();
if !allow_v2_descriptors &&
response.candidate_receipt.descriptor.version() == CandidateDescriptorVersion::V2
{
gum::debug!(
target: LOG_TARGET,
?candidate_hash,
peer = ?requested_peer,
"Version 2 candidate receipts are not enabled by the runtime"
);
return invalid_candidate_output(COST_UNSUPPORTED_DESCRIPTOR_VERSION)
}
if let Err(err) = response.candidate_receipt.check_core_index(transposed_cq) {
gum::debug!(
target: LOG_TARGET,
?candidate_hash,
?err,
peer = ?requested_peer,
"Received candidate has invalid core index"
);
return invalid_candidate_output(COST_INVALID_CORE_INDEX)
}
if let Some(candidate_session_index) = response.candidate_receipt.descriptor.session_index()
{
if candidate_session_index != session {
gum::debug!(
target: LOG_TARGET,
?candidate_hash,
peer = ?requested_peer,
session_index = session,
candidate_session_index,
"Received candidate has invalid session index"
);
return invalid_candidate_output(COST_INVALID_SESSION_INDEX)
}
}
}
let statements = {
let mut statements =
Vec::with_capacity(std::cmp::min(response.statements.len(), group.len() * 2));
let mut received_filter = StatementFilter::blank(group.len());
let index_in_group = |v: ValidatorIndex| group.iter().position(|x| &v == x);
let signing_context =
SigningContext { parent_hash: identifier.relay_parent, session_index: session };
for unchecked_statement in response.statements.into_iter().take(group.len() * 2) {
let i = match index_in_group(unchecked_statement.unchecked_validator_index()) {
Some(i) => i,
None => {
rep_changes.push((requested_peer, COST_UNREQUESTED_RESPONSE_STATEMENT));
continue
},
};
if unchecked_statement.unchecked_payload().candidate_hash() !=
&identifier.candidate_hash
{
rep_changes.push((requested_peer, COST_UNREQUESTED_RESPONSE_STATEMENT));
continue
}
match unchecked_statement.unchecked_payload() {
CompactStatement::Seconded(_) => {
if unwanted_mask.seconded_in_group[i] {
rep_changes.push((requested_peer, COST_UNREQUESTED_RESPONSE_STATEMENT));
continue
}
if received_filter.seconded_in_group[i] {
rep_changes.push((requested_peer, COST_UNREQUESTED_RESPONSE_STATEMENT));
continue
}
},
CompactStatement::Valid(_) => {
if unwanted_mask.validated_in_group[i] {
rep_changes.push((requested_peer, COST_UNREQUESTED_RESPONSE_STATEMENT));
continue
}
if received_filter.validated_in_group[i] {
rep_changes.push((requested_peer, COST_UNREQUESTED_RESPONSE_STATEMENT));
continue
}
},
}
if disabled_mask.get(i).map_or(false, |x| *x) {
continue
}
let validator_public =
match validator_key_lookup(unchecked_statement.unchecked_validator_index()) {
None => {
rep_changes.push((requested_peer, COST_INVALID_SIGNATURE));
continue
},
Some(p) => p,
};
let checked_statement =
match unchecked_statement.try_into_checked(&signing_context, &validator_public) {
Err(_) => {
rep_changes.push((requested_peer, COST_INVALID_SIGNATURE));
continue
},
Ok(checked) => checked,
};
match checked_statement.payload() {
CompactStatement::Seconded(_) => {
received_filter.seconded_in_group.set(i, true);
},
CompactStatement::Valid(_) => {
received_filter.validated_in_group.set(i, true);
},
}
statements.push(checked_statement);
rep_changes.push((requested_peer, BENEFIT_VALID_STATEMENT));
}
if !seconded_and_sufficient(&received_filter, backing_threshold) {
return invalid_candidate_output(COST_INVALID_RESPONSE)
}
statements
};
rep_changes.push((requested_peer, BENEFIT_VALID_RESPONSE));
ResponseValidationOutput {
requested_peer,
request_status: CandidateRequestStatus::Complete {
candidate: response.candidate_receipt,
persisted_validation_data: response.persisted_validation_data,
statements,
},
reputation_changes: rep_changes,
}
}
#[derive(Debug, PartialEq)]
pub enum CandidateRequestStatus {
Outdated,
Incomplete,
Complete {
candidate: CommittedCandidateReceipt,
persisted_validation_data: PersistedValidationData,
statements: Vec<SignedStatement>,
},
}
#[derive(Debug, PartialEq)]
pub struct ResponseValidationOutput {
pub requested_peer: PeerId,
pub request_status: CandidateRequestStatus,
pub reputation_changes: Vec<(PeerId, Rep)>,
}
fn insert_or_update_priority(
priority_sorted: &mut Vec<(Priority, CandidateIdentifier)>,
prev_index: Option<usize>,
candidate_identifier: CandidateIdentifier,
new_priority: Priority,
) -> usize {
if let Some(prev_index) = prev_index {
if priority_sorted[prev_index].0 == new_priority {
return prev_index
} else {
priority_sorted.remove(prev_index);
}
}
let item = (new_priority, candidate_identifier);
match priority_sorted.binary_search(&item) {
Ok(i) => i, Err(i) => {
priority_sorted.insert(i, item);
i
},
}
}
#[cfg(test)]
mod tests {
use super::*;
use polkadot_primitives::HeadData;
use polkadot_primitives_test_helpers as test_helpers;
fn dummy_pvd() -> PersistedValidationData {
PersistedValidationData {
parent_head: HeadData(vec![7, 8, 9]),
relay_parent_number: 5,
max_pov_size: 1024,
relay_parent_storage_root: Default::default(),
}
}
#[test]
fn test_remove_by_relay_parent() {
let parent_a = Hash::from_low_u64_le(1);
let parent_b = Hash::from_low_u64_le(2);
let parent_c = Hash::from_low_u64_le(3);
let candidate_a1 = CandidateHash(Hash::from_low_u64_le(11));
let candidate_a2 = CandidateHash(Hash::from_low_u64_le(12));
let candidate_b1 = CandidateHash(Hash::from_low_u64_le(21));
let candidate_b2 = CandidateHash(Hash::from_low_u64_le(22));
let candidate_c1 = CandidateHash(Hash::from_low_u64_le(31));
let duplicate_hash = CandidateHash(Hash::from_low_u64_le(31));
let mut request_manager = RequestManager::new();
request_manager.get_or_insert(parent_a, candidate_a1, 1.into());
request_manager.get_or_insert(parent_a, candidate_a2, 1.into());
request_manager.get_or_insert(parent_b, candidate_b1, 1.into());
request_manager.get_or_insert(parent_b, candidate_b2, 2.into());
request_manager.get_or_insert(parent_c, candidate_c1, 2.into());
request_manager.get_or_insert(parent_a, duplicate_hash, 1.into());
assert_eq!(request_manager.requests.len(), 6);
assert_eq!(request_manager.by_priority.len(), 6);
assert_eq!(request_manager.unique_identifiers.len(), 5);
request_manager.remove_by_relay_parent(parent_a);
assert_eq!(request_manager.requests.len(), 3);
assert_eq!(request_manager.by_priority.len(), 3);
assert_eq!(request_manager.unique_identifiers.len(), 3);
assert!(!request_manager.unique_identifiers.contains_key(&candidate_a1));
assert!(!request_manager.unique_identifiers.contains_key(&candidate_a2));
assert!(request_manager.unique_identifiers.contains_key(&duplicate_hash));
request_manager.remove_by_relay_parent(parent_b);
assert_eq!(request_manager.requests.len(), 1);
assert_eq!(request_manager.by_priority.len(), 1);
assert_eq!(request_manager.unique_identifiers.len(), 1);
assert!(!request_manager.unique_identifiers.contains_key(&candidate_b1));
assert!(!request_manager.unique_identifiers.contains_key(&candidate_b2));
request_manager.remove_by_relay_parent(parent_c);
assert!(request_manager.requests.is_empty());
assert!(request_manager.by_priority.is_empty());
assert!(request_manager.unique_identifiers.is_empty());
}
#[test]
fn test_priority_ordering() {
let parent_a = Hash::from_low_u64_le(1);
let parent_b = Hash::from_low_u64_le(2);
let parent_c = Hash::from_low_u64_le(3);
let candidate_a1 = CandidateHash(Hash::from_low_u64_le(11));
let candidate_a2 = CandidateHash(Hash::from_low_u64_le(12));
let candidate_b1 = CandidateHash(Hash::from_low_u64_le(21));
let candidate_b2 = CandidateHash(Hash::from_low_u64_le(22));
let candidate_c1 = CandidateHash(Hash::from_low_u64_le(31));
let mut request_manager = RequestManager::new();
let identifier_a1 = request_manager
.get_or_insert(parent_a, candidate_a1, 1.into())
.identifier
.clone();
let identifier_a2 = {
let mut entry = request_manager.get_or_insert(parent_a, candidate_a2, 1.into());
entry.set_cluster_priority();
entry.identifier.clone()
};
let identifier_b1 = request_manager
.get_or_insert(parent_b, candidate_b1, 1.into())
.identifier
.clone();
let identifier_b2 = request_manager
.get_or_insert(parent_b, candidate_b2, 2.into())
.identifier
.clone();
let identifier_c1 = {
let mut entry = request_manager.get_or_insert(parent_c, candidate_c1, 2.into());
entry.set_cluster_priority();
entry.identifier.clone()
};
let attempts = 0;
assert_eq!(
request_manager.by_priority,
vec![
(Priority { origin: Origin::Cluster, attempts }, identifier_a2),
(Priority { origin: Origin::Cluster, attempts }, identifier_c1),
(Priority { origin: Origin::Unspecified, attempts }, identifier_a1),
(Priority { origin: Origin::Unspecified, attempts }, identifier_b1),
(Priority { origin: Origin::Unspecified, attempts }, identifier_b2),
]
);
}
#[test]
fn handle_outdated_response_due_to_requests_for_different_identifiers() {
let mut request_manager = RequestManager::new();
let mut response_manager = ResponseManager::new();
let relay_parent = Hash::from_low_u64_le(1);
let mut candidate_receipt = test_helpers::dummy_committed_candidate_receipt(relay_parent);
let persisted_validation_data = dummy_pvd();
candidate_receipt.descriptor.persisted_validation_data_hash =
persisted_validation_data.hash();
let candidate = candidate_receipt.hash();
let candidate_receipt: CommittedCandidateReceipt = candidate_receipt.into();
let requested_peer_1 = PeerId::random();
let requested_peer_2 = PeerId::random();
let identifier1 = request_manager
.get_or_insert(relay_parent, candidate, 1.into())
.identifier
.clone();
request_manager
.get_or_insert(relay_parent, candidate, 1.into())
.add_peer(requested_peer_1);
let identifier2 = request_manager
.get_or_insert(relay_parent, candidate, 2.into())
.identifier
.clone();
request_manager
.get_or_insert(relay_parent, candidate, 2.into())
.add_peer(requested_peer_2);
assert_ne!(identifier1, identifier2);
assert_eq!(request_manager.requests.len(), 2);
let group_size = 3;
let group = &[ValidatorIndex(0), ValidatorIndex(1), ValidatorIndex(2)];
let unwanted_mask = StatementFilter::blank(group_size);
let disabled_mask: BitVec<u8, Lsb0> = Default::default();
let request_properties = RequestProperties { unwanted_mask, backing_threshold: None };
{
let request_props =
|_identifier: &CandidateIdentifier| Some((&request_properties).clone());
let peer_advertised = |_identifier: &CandidateIdentifier, _peer: &_| {
Some(StatementFilter::full(group_size))
};
let outgoing = request_manager
.next_request(&mut response_manager, request_props, peer_advertised)
.unwrap();
assert_eq!(outgoing.payload.candidate_hash, candidate);
let outgoing = request_manager
.next_request(&mut response_manager, request_props, peer_advertised)
.unwrap();
assert_eq!(outgoing.payload.candidate_hash, candidate);
}
{
let statements = vec![];
let response = UnhandledResponse {
response: TaggedResponse {
identifier: identifier1,
requested_peer: requested_peer_1,
props: request_properties.clone(),
response: Ok(AttestedCandidateResponse {
candidate_receipt: candidate_receipt.clone().into(),
persisted_validation_data: persisted_validation_data.clone(),
statements,
}),
},
};
let validator_key_lookup = |_v| None;
let allowed_para_lookup = |_para, _g_index| true;
let statements = vec![];
let output = response.validate_response(
&mut request_manager,
group,
0,
validator_key_lookup,
allowed_para_lookup,
disabled_mask.clone(),
&Default::default(),
false,
);
assert_eq!(
output,
ResponseValidationOutput {
requested_peer: requested_peer_1,
request_status: CandidateRequestStatus::Complete {
candidate: candidate_receipt.clone(),
persisted_validation_data: persisted_validation_data.clone(),
statements,
},
reputation_changes: vec![(requested_peer_1, BENEFIT_VALID_RESPONSE)],
}
);
}
{
let statements = vec![];
let response = UnhandledResponse {
response: TaggedResponse {
identifier: identifier2,
requested_peer: requested_peer_2,
props: request_properties,
response: Ok(AttestedCandidateResponse {
candidate_receipt: candidate_receipt.clone().into(),
persisted_validation_data: persisted_validation_data.clone(),
statements,
}),
},
};
let validator_key_lookup = |_v| None;
let allowed_para_lookup = |_para, _g_index| true;
let output = response.validate_response(
&mut request_manager,
group,
0,
validator_key_lookup,
allowed_para_lookup,
disabled_mask,
&Default::default(),
false,
);
assert_eq!(
output,
ResponseValidationOutput {
requested_peer: requested_peer_2,
request_status: CandidateRequestStatus::Outdated,
reputation_changes: vec![],
}
);
}
assert_eq!(request_manager.requests.len(), 0);
}
#[test]
fn handle_outdated_response_due_to_garbage_collection() {
let mut request_manager = RequestManager::new();
let mut response_manager = ResponseManager::new();
let relay_parent = Hash::from_low_u64_le(1);
let mut candidate_receipt = test_helpers::dummy_committed_candidate_receipt(relay_parent);
let persisted_validation_data = dummy_pvd();
candidate_receipt.descriptor.persisted_validation_data_hash =
persisted_validation_data.hash();
let candidate = candidate_receipt.hash();
let requested_peer = PeerId::random();
let identifier = request_manager
.get_or_insert(relay_parent, candidate, 1.into())
.identifier
.clone();
request_manager
.get_or_insert(relay_parent, candidate, 1.into())
.add_peer(requested_peer);
let group_size = 3;
let group = &[ValidatorIndex(0), ValidatorIndex(1), ValidatorIndex(2)];
let unwanted_mask = StatementFilter::blank(group_size);
let request_properties = RequestProperties { unwanted_mask, backing_threshold: None };
let peer_advertised =
|_identifier: &CandidateIdentifier, _peer: &_| Some(StatementFilter::full(group_size));
{
let request_props =
|_identifier: &CandidateIdentifier| Some((&request_properties).clone());
let outgoing = request_manager
.next_request(&mut response_manager, request_props, peer_advertised)
.unwrap();
assert_eq!(outgoing.payload.candidate_hash, candidate);
}
request_manager.remove_by_relay_parent(relay_parent);
{
let statements = vec![];
let response = UnhandledResponse {
response: TaggedResponse {
identifier,
requested_peer,
props: request_properties,
response: Ok(AttestedCandidateResponse {
candidate_receipt: candidate_receipt.clone().into(),
persisted_validation_data: persisted_validation_data.clone(),
statements,
}),
},
};
let validator_key_lookup = |_v| None;
let allowed_para_lookup = |_para, _g_index| true;
let disabled_mask: BitVec<u8, Lsb0> = Default::default();
let output = response.validate_response(
&mut request_manager,
group,
0,
validator_key_lookup,
allowed_para_lookup,
disabled_mask,
&Default::default(),
false,
);
assert_eq!(
output,
ResponseValidationOutput {
requested_peer,
request_status: CandidateRequestStatus::Outdated,
reputation_changes: vec![],
}
);
}
}
#[test]
fn should_clean_up_after_successful_requests() {
let mut request_manager = RequestManager::new();
let mut response_manager = ResponseManager::new();
let relay_parent = Hash::from_low_u64_le(1);
let mut candidate_receipt = test_helpers::dummy_committed_candidate_receipt(relay_parent);
let persisted_validation_data = dummy_pvd();
candidate_receipt.descriptor.persisted_validation_data_hash =
persisted_validation_data.hash();
let candidate = candidate_receipt.hash();
let candidate_receipt: CommittedCandidateReceipt = candidate_receipt.into();
let requested_peer = PeerId::random();
let identifier = request_manager
.get_or_insert(relay_parent, candidate, 1.into())
.identifier
.clone();
request_manager
.get_or_insert(relay_parent, candidate, 1.into())
.add_peer(requested_peer);
assert_eq!(request_manager.requests.len(), 1);
assert_eq!(request_manager.by_priority.len(), 1);
let group_size = 3;
let group = &[ValidatorIndex(0), ValidatorIndex(1), ValidatorIndex(2)];
let unwanted_mask = StatementFilter::blank(group_size);
let request_properties = RequestProperties { unwanted_mask, backing_threshold: None };
let peer_advertised =
|_identifier: &CandidateIdentifier, _peer: &_| Some(StatementFilter::full(group_size));
{
let request_props =
|_identifier: &CandidateIdentifier| Some((&request_properties).clone());
let outgoing = request_manager
.next_request(&mut response_manager, request_props, peer_advertised)
.unwrap();
assert_eq!(outgoing.payload.candidate_hash, candidate);
}
{
let statements = vec![];
let response = UnhandledResponse {
response: TaggedResponse {
identifier,
requested_peer,
props: request_properties.clone(),
response: Ok(AttestedCandidateResponse {
candidate_receipt: candidate_receipt.clone(),
persisted_validation_data: persisted_validation_data.clone(),
statements,
}),
},
};
let validator_key_lookup = |_v| None;
let allowed_para_lookup = |_para, _g_index| true;
let statements = vec![];
let disabled_mask: BitVec<u8, Lsb0> = Default::default();
let output = response.validate_response(
&mut request_manager,
group,
0,
validator_key_lookup,
allowed_para_lookup,
disabled_mask,
&Default::default(),
false,
);
assert_eq!(
output,
ResponseValidationOutput {
requested_peer,
request_status: CandidateRequestStatus::Complete {
candidate: candidate_receipt.clone(),
persisted_validation_data: persisted_validation_data.clone(),
statements,
},
reputation_changes: vec![(requested_peer, BENEFIT_VALID_RESPONSE)],
}
);
}
assert_eq!(request_manager.requests.len(), 0);
assert_eq!(request_manager.by_priority.len(), 0);
}
#[test]
fn rate_limit_requests_to_same_peer() {
let mut request_manager = RequestManager::new();
let mut response_manager = ResponseManager::new();
let relay_parent = Hash::from_low_u64_le(1);
let mut candidate_receipt_1 = test_helpers::dummy_committed_candidate_receipt(relay_parent);
let persisted_validation_data_1 = dummy_pvd();
candidate_receipt_1.descriptor.persisted_validation_data_hash =
persisted_validation_data_1.hash();
let candidate_1 = candidate_receipt_1.hash();
let mut candidate_receipt_2 = test_helpers::dummy_committed_candidate_receipt(relay_parent);
let persisted_validation_data_2 = dummy_pvd();
candidate_receipt_2.descriptor.persisted_validation_data_hash =
persisted_validation_data_2.hash();
let candidate_2 = candidate_receipt_2.hash();
let mut candidate_receipt_3 = test_helpers::dummy_committed_candidate_receipt(relay_parent);
let persisted_validation_data_3 = dummy_pvd();
candidate_receipt_3.descriptor.persisted_validation_data_hash =
persisted_validation_data_3.hash();
let candidate_3 = candidate_receipt_3.hash();
let requested_peer_1 = PeerId::random();
let requested_peer_2 = PeerId::random();
let group_size = 3;
let group = &[ValidatorIndex(0), ValidatorIndex(1), ValidatorIndex(2)];
let unwanted_mask = StatementFilter::blank(group_size);
let disabled_mask: BitVec<u8, Lsb0> = Default::default();
let request_properties = RequestProperties { unwanted_mask, backing_threshold: None };
let request_props = |_identifier: &CandidateIdentifier| Some((&request_properties).clone());
let peer_advertised =
|_identifier: &CandidateIdentifier, _peer: &_| Some(StatementFilter::full(group_size));
let identifier1 = request_manager
.get_or_insert(relay_parent, candidate_1, 1.into())
.identifier
.clone();
request_manager
.get_or_insert(relay_parent, candidate_1, 1.into())
.add_peer(requested_peer_1);
let _identifier3 = request_manager
.get_or_insert(relay_parent, candidate_3, 1.into())
.identifier
.clone();
request_manager
.get_or_insert(relay_parent, candidate_3, 1.into())
.add_peer(requested_peer_2);
for _ in 0..2 {
let outgoing =
request_manager.next_request(&mut response_manager, request_props, peer_advertised);
assert!(outgoing.is_some());
}
assert_eq!(response_manager.active_peers.len(), 2);
assert!(response_manager.is_sending_to(&requested_peer_1));
assert!(response_manager.is_sending_to(&requested_peer_2));
assert_eq!(request_manager.requests.len(), 2);
let _identifier2 = request_manager
.get_or_insert(relay_parent, candidate_2, 1.into())
.identifier
.clone();
request_manager
.get_or_insert(relay_parent, candidate_2, 1.into())
.add_peer(requested_peer_1);
let outgoing =
request_manager.next_request(&mut response_manager, request_props, peer_advertised);
assert!(outgoing.is_none());
assert_eq!(response_manager.active_peers.len(), 2);
assert!(response_manager.is_sending_to(&requested_peer_1));
assert!(response_manager.is_sending_to(&requested_peer_2));
assert_eq!(request_manager.requests.len(), 3);
response_manager.active_peers.remove(&requested_peer_1);
response_manager.pending_responses = FuturesUnordered::new();
{
let statements = vec![];
let response = UnhandledResponse {
response: TaggedResponse {
identifier: identifier1,
requested_peer: requested_peer_1,
props: request_properties.clone(),
response: Ok(AttestedCandidateResponse {
candidate_receipt: candidate_receipt_1.clone().into(),
persisted_validation_data: persisted_validation_data_1.clone(),
statements,
}),
},
};
let validator_key_lookup = |_v| None;
let allowed_para_lookup = |_para, _g_index| true;
let _output = response.validate_response(
&mut request_manager,
group,
0,
validator_key_lookup,
allowed_para_lookup,
disabled_mask.clone(),
&Default::default(),
false,
);
assert_eq!(request_manager.requests.len(), 2);
assert_eq!(response_manager.active_peers.len(), 1);
assert!(response_manager.is_sending_to(&requested_peer_2));
}
let outgoing =
request_manager.next_request(&mut response_manager, request_props, peer_advertised);
assert!(outgoing.is_some());
assert_eq!(response_manager.active_peers.len(), 2);
assert!(response_manager.is_sending_to(&requested_peer_1));
assert!(response_manager.is_sending_to(&requested_peer_2));
assert_eq!(request_manager.requests.len(), 2);
}
}