1use super::{
33 seconded_and_sufficient, CandidateDescriptorVersion, TransposedClaimQueue,
34 BENEFIT_VALID_RESPONSE, BENEFIT_VALID_STATEMENT, COST_IMPROPERLY_DECODED_RESPONSE,
35 COST_INVALID_RESPONSE, COST_INVALID_SESSION_INDEX, COST_INVALID_SIGNATURE,
36 COST_INVALID_UMP_SIGNALS, COST_UNREQUESTED_RESPONSE_STATEMENT,
37 COST_UNSUPPORTED_DESCRIPTOR_VERSION, REQUEST_RETRY_DELAY,
38};
39use crate::LOG_TARGET;
40
41use bitvec::prelude::{BitVec, Lsb0};
42use polkadot_node_network_protocol::{
43 request_response::{
44 outgoing::{Recipient as RequestRecipient, RequestError},
45 v2::{AttestedCandidateRequest, AttestedCandidateResponse},
46 OutgoingRequest, OutgoingResult, MAX_PARALLEL_ATTESTED_CANDIDATE_REQUESTS,
47 },
48 v3::StatementFilter,
49 PeerId, UnifiedReputationChange as Rep,
50};
51use polkadot_primitives::{
52 CandidateHash, CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CompactStatement,
53 GroupIndex, Hash, Id as ParaId, PersistedValidationData, SessionIndex, SignedStatement,
54 SigningContext, ValidatorId, ValidatorIndex,
55};
56
57use futures::{future::BoxFuture, prelude::*, stream::FuturesUnordered};
58
59use std::{
60 collections::{
61 hash_map::{Entry as HEntry, HashMap},
62 HashSet, VecDeque,
63 },
64 time::Instant,
65};
66
67#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
75pub struct CandidateIdentifier {
76 pub relay_parent: Hash,
78 pub candidate_hash: CandidateHash,
80 pub group_index: GroupIndex,
83}
84
85struct TaggedResponse {
86 identifier: CandidateIdentifier,
87 requested_peer: PeerId,
88 props: RequestProperties,
89 response: OutgoingResult<AttestedCandidateResponse>,
90}
91
92#[derive(Debug)]
94pub struct RequestedCandidate {
95 priority: Priority,
96 known_by: VecDeque<PeerId>,
97 in_flight: bool,
99 next_retry_time: Option<Instant>,
101}
102
103impl RequestedCandidate {
104 fn is_pending(&self) -> bool {
105 if self.in_flight {
106 return false
107 }
108
109 if let Some(next_retry_time) = self.next_retry_time {
110 let can_retry = Instant::now() >= next_retry_time;
111 if !can_retry {
112 return false
113 }
114 }
115
116 true
117 }
118}
119
120#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
121enum Origin {
122 Cluster = 0,
123 Unspecified = 1,
124}
125
126#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
127struct Priority {
128 origin: Origin,
129 attempts: usize,
130}
131
132pub struct Entry<'a> {
134 prev_index: usize,
135 identifier: CandidateIdentifier,
136 by_priority: &'a mut Vec<(Priority, CandidateIdentifier)>,
137 requested: &'a mut RequestedCandidate,
138}
139
140impl<'a> Entry<'a> {
141 pub fn add_peer(&mut self, peer: PeerId) {
143 if !self.requested.known_by.contains(&peer) {
144 self.requested.known_by.push_back(peer);
145 }
146 }
147
148 pub fn set_cluster_priority(&mut self) {
150 self.requested.priority.origin = Origin::Cluster;
151
152 insert_or_update_priority(
153 &mut *self.by_priority,
154 Some(self.prev_index),
155 self.identifier.clone(),
156 self.requested.priority.clone(),
157 );
158 }
159}
160
161pub struct RequestManager {
163 requests: HashMap<CandidateIdentifier, RequestedCandidate>,
164 by_priority: Vec<(Priority, CandidateIdentifier)>,
166 unique_identifiers: HashMap<CandidateHash, HashSet<CandidateIdentifier>>,
168}
169
170impl RequestManager {
171 pub fn new() -> Self {
173 RequestManager {
174 requests: HashMap::new(),
175 by_priority: Vec::new(),
176 unique_identifiers: HashMap::new(),
177 }
178 }
179
180 pub fn get_or_insert(
183 &mut self,
184 relay_parent: Hash,
185 candidate_hash: CandidateHash,
186 group_index: GroupIndex,
187 ) -> Entry {
188 let identifier = CandidateIdentifier { relay_parent, candidate_hash, group_index };
189
190 let (candidate, fresh) = match self.requests.entry(identifier.clone()) {
191 HEntry::Occupied(e) => (e.into_mut(), false),
192 HEntry::Vacant(e) => (
193 e.insert(RequestedCandidate {
194 priority: Priority { attempts: 0, origin: Origin::Unspecified },
195 known_by: VecDeque::new(),
196 in_flight: false,
197 next_retry_time: None,
198 }),
199 true,
200 ),
201 };
202
203 let priority_index = if fresh {
204 self.unique_identifiers
205 .entry(candidate_hash)
206 .or_default()
207 .insert(identifier.clone());
208
209 insert_or_update_priority(
210 &mut self.by_priority,
211 None,
212 identifier.clone(),
213 candidate.priority.clone(),
214 )
215 } else {
216 match self
217 .by_priority
218 .binary_search(&(candidate.priority.clone(), identifier.clone()))
219 {
220 Ok(i) => i,
221 Err(_) => unreachable!("requested candidates always have a priority entry; qed"),
222 }
223 };
224
225 Entry {
226 prev_index: priority_index,
227 identifier,
228 by_priority: &mut self.by_priority,
229 requested: candidate,
230 }
231 }
232
233 pub fn remove_for(&mut self, candidate: CandidateHash) {
235 if let Some(identifiers) = self.unique_identifiers.remove(&candidate) {
236 self.by_priority.retain(|(_priority, id)| !identifiers.contains(&id));
237 for id in identifiers {
238 self.requests.remove(&id);
239 }
240 }
241 }
242
243 pub fn remove_by_relay_parent(&mut self, relay_parent: Hash) {
245 let mut candidate_hashes = HashSet::new();
246
247 self.by_priority.retain(|(_priority, id)| {
249 let retain = relay_parent != id.relay_parent;
250 if !retain {
251 self.requests.remove(id);
252 candidate_hashes.insert(id.candidate_hash);
253 }
254 retain
255 });
256
257 for candidate_hash in candidate_hashes {
259 match self.unique_identifiers.entry(candidate_hash) {
260 HEntry::Occupied(mut entry) => {
261 entry.get_mut().retain(|id| relay_parent != id.relay_parent);
262 if entry.get().is_empty() {
263 entry.remove();
264 }
265 },
266 HEntry::Vacant(_) => (),
269 }
270 }
271
272 gum::debug!(
273 target: LOG_TARGET,
274 "Requests remaining after cleanup: {}",
275 self.by_priority.len(),
276 );
277 }
278
279 pub fn has_pending_requests(&self) -> bool {
281 for (_id, entry) in &self.requests {
282 if entry.is_pending() {
283 return true
284 }
285 }
286
287 false
288 }
289
290 pub fn next_retry_time(&mut self) -> Option<Instant> {
292 let mut next = None;
293 for (_id, request) in self.requests.iter().filter(|(_id, request)| !request.in_flight) {
294 if let Some(next_retry_time) = request.next_retry_time {
295 if next.map_or(true, |next| next_retry_time < next) {
296 next = Some(next_retry_time);
297 }
298 }
299 }
300 next
301 }
302
303 pub fn next_request(
315 &mut self,
316 response_manager: &mut ResponseManager,
317 request_props: impl Fn(&CandidateIdentifier) -> Option<RequestProperties>,
318 peer_advertised: impl Fn(&CandidateIdentifier, &PeerId) -> Option<StatementFilter>,
319 ) -> Option<OutgoingRequest<AttestedCandidateRequest>> {
320 if response_manager.len() >= 2 * MAX_PARALLEL_ATTESTED_CANDIDATE_REQUESTS as usize {
330 return None
331 }
332
333 let mut res = None;
334
335 let mut cleanup_outdated = Vec::new();
340 for (i, (_priority, id)) in self.by_priority.iter().enumerate() {
341 let entry = match self.requests.get_mut(&id) {
342 None => {
343 gum::error!(
344 target: LOG_TARGET,
345 identifier = ?id,
346 "Missing entry for priority queue member",
347 );
348
349 continue
350 },
351 Some(e) => e,
352 };
353
354 if !entry.is_pending() {
355 continue
356 }
357
358 let props = match request_props(&id) {
359 None => {
360 cleanup_outdated.push((i, id.clone()));
361 continue
362 },
363 Some(s) => s,
364 };
365
366 let target = match find_request_target_with_update(
367 &mut entry.known_by,
368 id,
369 &props,
370 &peer_advertised,
371 &response_manager,
372 ) {
373 None => continue,
374 Some(t) => t,
375 };
376
377 gum::debug!(
378 target: crate::LOG_TARGET,
379 candidate_hash = ?id.candidate_hash,
380 peer = ?target,
381 "Issuing candidate request"
382 );
383
384 let (request, response_fut) = OutgoingRequest::new(
385 RequestRecipient::Peer(target),
386 AttestedCandidateRequest {
387 candidate_hash: id.candidate_hash,
388 mask: props.unwanted_mask.clone(),
389 },
390 );
391
392 let stored_id = id.clone();
393 response_manager.push(
394 Box::pin(async move {
395 TaggedResponse {
396 identifier: stored_id,
397 requested_peer: target,
398 props,
399 response: response_fut.await,
400 }
401 }),
402 target,
403 );
404
405 entry.in_flight = true;
406
407 res = Some(request);
408 break
409 }
410
411 for (priority_index, identifier) in cleanup_outdated.into_iter().rev() {
412 self.by_priority.remove(priority_index);
413 self.requests.remove(&identifier);
414 if let HEntry::Occupied(mut e) =
415 self.unique_identifiers.entry(identifier.candidate_hash)
416 {
417 e.get_mut().remove(&identifier);
418 if e.get().is_empty() {
419 e.remove();
420 }
421 }
422 }
423
424 res
425 }
426}
427
428pub struct ResponseManager {
430 pending_responses: FuturesUnordered<BoxFuture<'static, TaggedResponse>>,
431 active_peers: HashSet<PeerId>,
432}
433
434impl ResponseManager {
435 pub fn new() -> Self {
436 Self { pending_responses: FuturesUnordered::new(), active_peers: HashSet::new() }
437 }
438
439 pub async fn incoming(&mut self) -> Option<UnhandledResponse> {
442 self.pending_responses.next().await.map(|response| {
443 self.active_peers.remove(&response.requested_peer);
444 UnhandledResponse { response }
445 })
446 }
447
448 fn len(&self) -> usize {
449 self.pending_responses.len()
450 }
451
452 fn push(&mut self, response: BoxFuture<'static, TaggedResponse>, target: PeerId) {
453 self.pending_responses.push(response);
454 self.active_peers.insert(target);
455 }
456
457 fn is_sending_to(&self, peer: &PeerId) -> bool {
459 self.active_peers.contains(peer)
460 }
461}
462
463#[derive(Clone)]
465pub struct RequestProperties {
466 pub unwanted_mask: StatementFilter,
471 pub backing_threshold: Option<usize>,
478}
479
480fn find_request_target_with_update(
483 known_by: &mut VecDeque<PeerId>,
484 candidate_identifier: &CandidateIdentifier,
485 props: &RequestProperties,
486 peer_advertised: impl Fn(&CandidateIdentifier, &PeerId) -> Option<StatementFilter>,
487 response_manager: &ResponseManager,
488) -> Option<PeerId> {
489 let mut prune = Vec::new();
490 let mut target = None;
491 for (i, p) in known_by.iter().enumerate() {
492 if response_manager.is_sending_to(p) {
494 continue
495 }
496
497 let mut filter = match peer_advertised(candidate_identifier, p) {
498 None => {
499 prune.push(i);
500 continue
501 },
502 Some(f) => f,
503 };
504
505 filter.mask_seconded(&props.unwanted_mask.seconded_in_group);
506 filter.mask_valid(&props.unwanted_mask.validated_in_group);
507 if seconded_and_sufficient(&filter, props.backing_threshold) {
508 target = Some((i, *p));
509 break
510 }
511 }
512
513 let prune_count = prune.len();
514 for i in prune {
515 known_by.remove(i);
516 }
517
518 if let Some((i, p)) = target {
519 known_by.remove(i - prune_count);
520 known_by.push_back(p);
521 Some(p)
522 } else {
523 None
524 }
525}
526
527pub struct UnhandledResponse {
529 response: TaggedResponse,
530}
531
532impl UnhandledResponse {
533 pub fn candidate_identifier(&self) -> &CandidateIdentifier {
536 &self.response.identifier
537 }
538
539 pub fn requested_peer(&self) -> &PeerId {
541 &self.response.requested_peer
542 }
543
544 pub fn validate_response(
564 self,
565 manager: &mut RequestManager,
566 group: &[ValidatorIndex],
567 session: SessionIndex,
568 validator_key_lookup: impl Fn(ValidatorIndex) -> Option<ValidatorId>,
569 allowed_para_lookup: impl Fn(ParaId, GroupIndex) -> bool,
570 disabled_mask: BitVec<u8, Lsb0>,
571 transposed_cq: &TransposedClaimQueue,
572 allow_v2_descriptors: bool,
573 ) -> ResponseValidationOutput {
574 let UnhandledResponse {
575 response: TaggedResponse { identifier, requested_peer, props, response },
576 } = self;
577
578 let entry = match manager.requests.get_mut(&identifier) {
586 None =>
587 return ResponseValidationOutput {
588 requested_peer,
589 reputation_changes: Vec::new(),
590 request_status: CandidateRequestStatus::Outdated,
591 },
592 Some(e) => e,
593 };
594
595 let priority_index = match manager
596 .by_priority
597 .binary_search(&(entry.priority.clone(), identifier.clone()))
598 {
599 Ok(i) => i,
600 Err(_) => unreachable!("requested candidates always have a priority entry; qed"),
601 };
602
603 entry.next_retry_time = Some(Instant::now() + REQUEST_RETRY_DELAY);
605 entry.in_flight = false;
606 entry.priority.attempts += 1;
607
608 insert_or_update_priority(
610 &mut manager.by_priority,
611 Some(priority_index),
612 identifier.clone(),
613 entry.priority.clone(),
614 );
615
616 let complete_response = match response {
617 Err(RequestError::InvalidResponse(e)) => {
618 gum::trace!(
619 target: LOG_TARGET,
620 err = ?e,
621 peer = ?requested_peer,
622 "Improperly encoded response"
623 );
624
625 return ResponseValidationOutput {
626 requested_peer,
627 reputation_changes: vec![(requested_peer, COST_IMPROPERLY_DECODED_RESPONSE)],
628 request_status: CandidateRequestStatus::Incomplete,
629 }
630 },
631 Err(e @ RequestError::NetworkError(_) | e @ RequestError::Canceled(_)) => {
632 gum::trace!(
633 target: LOG_TARGET,
634 err = ?e,
635 peer = ?requested_peer,
636 "Request error"
637 );
638 return ResponseValidationOutput {
639 requested_peer,
640 reputation_changes: vec![],
641 request_status: CandidateRequestStatus::Incomplete,
642 }
643 },
644 Ok(response) => response,
645 };
646
647 let output = validate_complete_response(
648 &identifier,
649 props,
650 complete_response,
651 requested_peer,
652 group,
653 session,
654 validator_key_lookup,
655 allowed_para_lookup,
656 disabled_mask,
657 transposed_cq,
658 allow_v2_descriptors,
659 );
660
661 if let CandidateRequestStatus::Complete { .. } = output.request_status {
662 manager.remove_for(identifier.candidate_hash);
663 }
664
665 output
666 }
667}
668
669fn validate_complete_response(
670 identifier: &CandidateIdentifier,
671 props: RequestProperties,
672 response: AttestedCandidateResponse,
673 requested_peer: PeerId,
674 group: &[ValidatorIndex],
675 session: SessionIndex,
676 validator_key_lookup: impl Fn(ValidatorIndex) -> Option<ValidatorId>,
677 allowed_para_lookup: impl Fn(ParaId, GroupIndex) -> bool,
678 disabled_mask: BitVec<u8, Lsb0>,
679 transposed_cq: &TransposedClaimQueue,
680 allow_v2_descriptors: bool,
681) -> ResponseValidationOutput {
682 let RequestProperties { backing_threshold, mut unwanted_mask } = props;
683
684 if !unwanted_mask.has_len(group.len()) {
687 gum::error!(
688 target: LOG_TARGET,
689 group_len = group.len(),
690 "Logic bug: group size != sent bitmask len"
691 );
692
693 unwanted_mask.seconded_in_group.resize(group.len(), true);
695 unwanted_mask.validated_in_group.resize(group.len(), true);
696 }
697
698 let invalid_candidate_output = |cost: Rep| ResponseValidationOutput {
699 request_status: CandidateRequestStatus::Incomplete,
700 reputation_changes: vec![(requested_peer, cost)],
701 requested_peer,
702 };
703
704 let mut rep_changes = Vec::new();
705
706 {
709 if response.candidate_receipt.descriptor.relay_parent() != identifier.relay_parent {
710 return invalid_candidate_output(COST_INVALID_RESPONSE)
711 }
712
713 if response.candidate_receipt.descriptor.persisted_validation_data_hash() !=
714 response.persisted_validation_data.hash()
715 {
716 return invalid_candidate_output(COST_INVALID_RESPONSE)
717 }
718
719 if !allowed_para_lookup(
720 response.candidate_receipt.descriptor.para_id(),
721 identifier.group_index,
722 ) {
723 return invalid_candidate_output(COST_INVALID_RESPONSE)
724 }
725
726 if response.candidate_receipt.hash() != identifier.candidate_hash {
727 return invalid_candidate_output(COST_INVALID_RESPONSE)
728 }
729
730 let candidate_hash = response.candidate_receipt.hash();
731
732 if !allow_v2_descriptors &&
734 response.candidate_receipt.descriptor.version() == CandidateDescriptorVersion::V2
735 {
736 gum::debug!(
737 target: LOG_TARGET,
738 ?candidate_hash,
739 peer = ?requested_peer,
740 "Version 2 candidate receipts are not enabled by the runtime"
741 );
742 return invalid_candidate_output(COST_UNSUPPORTED_DESCRIPTOR_VERSION)
743 }
744 if let Err(err) = response.candidate_receipt.parse_ump_signals(transposed_cq) {
746 gum::debug!(
747 target: LOG_TARGET,
748 ?candidate_hash,
749 ?err,
750 peer = ?requested_peer,
751 "Received candidate has invalid UMP signals"
752 );
753 return invalid_candidate_output(COST_INVALID_UMP_SIGNALS)
754 }
755
756 if let Some(candidate_session_index) = response.candidate_receipt.descriptor.session_index()
759 {
760 if candidate_session_index != session {
761 gum::debug!(
762 target: LOG_TARGET,
763 ?candidate_hash,
764 peer = ?requested_peer,
765 session_index = session,
766 candidate_session_index,
767 "Received candidate has invalid session index"
768 );
769 return invalid_candidate_output(COST_INVALID_SESSION_INDEX)
770 }
771 }
772 }
773
774 let statements = {
776 let mut statements =
777 Vec::with_capacity(std::cmp::min(response.statements.len(), group.len() * 2));
778
779 let mut received_filter = StatementFilter::blank(group.len());
780
781 let index_in_group = |v: ValidatorIndex| group.iter().position(|x| &v == x);
782
783 let signing_context =
784 SigningContext { parent_hash: identifier.relay_parent, session_index: session };
785
786 for unchecked_statement in response.statements.into_iter().take(group.len() * 2) {
787 let i = match index_in_group(unchecked_statement.unchecked_validator_index()) {
789 Some(i) => i,
790 None => {
791 rep_changes.push((requested_peer, COST_UNREQUESTED_RESPONSE_STATEMENT));
792 continue
793 },
794 };
795
796 if unchecked_statement.unchecked_payload().candidate_hash() !=
798 &identifier.candidate_hash
799 {
800 rep_changes.push((requested_peer, COST_UNREQUESTED_RESPONSE_STATEMENT));
801 continue
802 }
803
804 match unchecked_statement.unchecked_payload() {
808 CompactStatement::Seconded(_) => {
809 if unwanted_mask.seconded_in_group[i] {
810 rep_changes.push((requested_peer, COST_UNREQUESTED_RESPONSE_STATEMENT));
811 continue
812 }
813
814 if received_filter.seconded_in_group[i] {
815 rep_changes.push((requested_peer, COST_UNREQUESTED_RESPONSE_STATEMENT));
816 continue
817 }
818 },
819 CompactStatement::Valid(_) => {
820 if unwanted_mask.validated_in_group[i] {
821 rep_changes.push((requested_peer, COST_UNREQUESTED_RESPONSE_STATEMENT));
822 continue
823 }
824
825 if received_filter.validated_in_group[i] {
826 rep_changes.push((requested_peer, COST_UNREQUESTED_RESPONSE_STATEMENT));
827 continue
828 }
829 },
830 }
831
832 if disabled_mask.get(i).map_or(false, |x| *x) {
833 continue
834 }
835
836 let validator_public =
837 match validator_key_lookup(unchecked_statement.unchecked_validator_index()) {
838 None => {
839 rep_changes.push((requested_peer, COST_INVALID_SIGNATURE));
840 continue
841 },
842 Some(p) => p,
843 };
844
845 let checked_statement =
846 match unchecked_statement.try_into_checked(&signing_context, &validator_public) {
847 Err(_) => {
848 rep_changes.push((requested_peer, COST_INVALID_SIGNATURE));
849 continue
850 },
851 Ok(checked) => checked,
852 };
853
854 match checked_statement.payload() {
855 CompactStatement::Seconded(_) => {
856 received_filter.seconded_in_group.set(i, true);
857 },
858 CompactStatement::Valid(_) => {
859 received_filter.validated_in_group.set(i, true);
860 },
861 }
862
863 statements.push(checked_statement);
864 rep_changes.push((requested_peer, BENEFIT_VALID_STATEMENT));
865 }
866
867 if !seconded_and_sufficient(&received_filter, backing_threshold) {
870 return invalid_candidate_output(COST_INVALID_RESPONSE)
871 }
872
873 statements
874 };
875
876 rep_changes.push((requested_peer, BENEFIT_VALID_RESPONSE));
877
878 ResponseValidationOutput {
879 requested_peer,
880 request_status: CandidateRequestStatus::Complete {
881 candidate: response.candidate_receipt,
882 persisted_validation_data: response.persisted_validation_data,
883 statements,
884 },
885 reputation_changes: rep_changes,
886 }
887}
888
889#[derive(Debug, PartialEq)]
891pub enum CandidateRequestStatus {
892 Outdated,
894 Incomplete,
896 Complete {
899 candidate: CommittedCandidateReceipt,
900 persisted_validation_data: PersistedValidationData,
901 statements: Vec<SignedStatement>,
902 },
903}
904
905#[derive(Debug, PartialEq)]
907pub struct ResponseValidationOutput {
908 pub requested_peer: PeerId,
910 pub request_status: CandidateRequestStatus,
912 pub reputation_changes: Vec<(PeerId, Rep)>,
914}
915
916fn insert_or_update_priority(
917 priority_sorted: &mut Vec<(Priority, CandidateIdentifier)>,
918 prev_index: Option<usize>,
919 candidate_identifier: CandidateIdentifier,
920 new_priority: Priority,
921) -> usize {
922 if let Some(prev_index) = prev_index {
923 if priority_sorted[prev_index].0 == new_priority {
926 return prev_index
928 } else {
929 priority_sorted.remove(prev_index);
930 }
931 }
932
933 let item = (new_priority, candidate_identifier);
934 match priority_sorted.binary_search(&item) {
935 Ok(i) => i, Err(i) => {
937 priority_sorted.insert(i, item);
938 i
939 },
940 }
941}
942
943#[cfg(test)]
944mod tests {
945 use super::*;
946 use polkadot_primitives::HeadData;
947 use polkadot_primitives_test_helpers as test_helpers;
948
949 fn dummy_pvd() -> PersistedValidationData {
950 PersistedValidationData {
951 parent_head: HeadData(vec![7, 8, 9]),
952 relay_parent_number: 5,
953 max_pov_size: 1024,
954 relay_parent_storage_root: Default::default(),
955 }
956 }
957
958 #[test]
959 fn test_remove_by_relay_parent() {
960 let parent_a = Hash::from_low_u64_le(1);
961 let parent_b = Hash::from_low_u64_le(2);
962 let parent_c = Hash::from_low_u64_le(3);
963
964 let candidate_a1 = CandidateHash(Hash::from_low_u64_le(11));
965 let candidate_a2 = CandidateHash(Hash::from_low_u64_le(12));
966 let candidate_b1 = CandidateHash(Hash::from_low_u64_le(21));
967 let candidate_b2 = CandidateHash(Hash::from_low_u64_le(22));
968 let candidate_c1 = CandidateHash(Hash::from_low_u64_le(31));
969 let duplicate_hash = CandidateHash(Hash::from_low_u64_le(31));
970
971 let mut request_manager = RequestManager::new();
972 request_manager.get_or_insert(parent_a, candidate_a1, 1.into());
973 request_manager.get_or_insert(parent_a, candidate_a2, 1.into());
974 request_manager.get_or_insert(parent_b, candidate_b1, 1.into());
975 request_manager.get_or_insert(parent_b, candidate_b2, 2.into());
976 request_manager.get_or_insert(parent_c, candidate_c1, 2.into());
977 request_manager.get_or_insert(parent_a, duplicate_hash, 1.into());
978
979 assert_eq!(request_manager.requests.len(), 6);
980 assert_eq!(request_manager.by_priority.len(), 6);
981 assert_eq!(request_manager.unique_identifiers.len(), 5);
982
983 request_manager.remove_by_relay_parent(parent_a);
984
985 assert_eq!(request_manager.requests.len(), 3);
986 assert_eq!(request_manager.by_priority.len(), 3);
987 assert_eq!(request_manager.unique_identifiers.len(), 3);
988
989 assert!(!request_manager.unique_identifiers.contains_key(&candidate_a1));
990 assert!(!request_manager.unique_identifiers.contains_key(&candidate_a2));
991 assert!(request_manager.unique_identifiers.contains_key(&duplicate_hash));
993
994 request_manager.remove_by_relay_parent(parent_b);
995
996 assert_eq!(request_manager.requests.len(), 1);
997 assert_eq!(request_manager.by_priority.len(), 1);
998 assert_eq!(request_manager.unique_identifiers.len(), 1);
999
1000 assert!(!request_manager.unique_identifiers.contains_key(&candidate_b1));
1001 assert!(!request_manager.unique_identifiers.contains_key(&candidate_b2));
1002
1003 request_manager.remove_by_relay_parent(parent_c);
1004
1005 assert!(request_manager.requests.is_empty());
1006 assert!(request_manager.by_priority.is_empty());
1007 assert!(request_manager.unique_identifiers.is_empty());
1008 }
1009
1010 #[test]
1011 fn test_priority_ordering() {
1012 let parent_a = Hash::from_low_u64_le(1);
1013 let parent_b = Hash::from_low_u64_le(2);
1014 let parent_c = Hash::from_low_u64_le(3);
1015
1016 let candidate_a1 = CandidateHash(Hash::from_low_u64_le(11));
1017 let candidate_a2 = CandidateHash(Hash::from_low_u64_le(12));
1018 let candidate_b1 = CandidateHash(Hash::from_low_u64_le(21));
1019 let candidate_b2 = CandidateHash(Hash::from_low_u64_le(22));
1020 let candidate_c1 = CandidateHash(Hash::from_low_u64_le(31));
1021
1022 let mut request_manager = RequestManager::new();
1023
1024 let identifier_a1 = request_manager
1026 .get_or_insert(parent_a, candidate_a1, 1.into())
1027 .identifier
1028 .clone();
1029 let identifier_a2 = {
1030 let mut entry = request_manager.get_or_insert(parent_a, candidate_a2, 1.into());
1031 entry.set_cluster_priority();
1032 entry.identifier.clone()
1033 };
1034 let identifier_b1 = request_manager
1035 .get_or_insert(parent_b, candidate_b1, 1.into())
1036 .identifier
1037 .clone();
1038 let identifier_b2 = request_manager
1039 .get_or_insert(parent_b, candidate_b2, 2.into())
1040 .identifier
1041 .clone();
1042 let identifier_c1 = {
1043 let mut entry = request_manager.get_or_insert(parent_c, candidate_c1, 2.into());
1044 entry.set_cluster_priority();
1045 entry.identifier.clone()
1046 };
1047
1048 let attempts = 0;
1049 assert_eq!(
1050 request_manager.by_priority,
1051 vec![
1052 (Priority { origin: Origin::Cluster, attempts }, identifier_a2),
1053 (Priority { origin: Origin::Cluster, attempts }, identifier_c1),
1054 (Priority { origin: Origin::Unspecified, attempts }, identifier_a1),
1055 (Priority { origin: Origin::Unspecified, attempts }, identifier_b1),
1056 (Priority { origin: Origin::Unspecified, attempts }, identifier_b2),
1057 ]
1058 );
1059 }
1060
1061 #[test]
1064 fn handle_outdated_response_due_to_requests_for_different_identifiers() {
1065 let mut request_manager = RequestManager::new();
1066 let mut response_manager = ResponseManager::new();
1067
1068 let relay_parent = Hash::from_low_u64_le(1);
1069 let mut candidate_receipt = test_helpers::dummy_committed_candidate_receipt(relay_parent);
1070 let persisted_validation_data = dummy_pvd();
1071 candidate_receipt.descriptor.persisted_validation_data_hash =
1072 persisted_validation_data.hash();
1073 let candidate = candidate_receipt.hash();
1074 let candidate_receipt: CommittedCandidateReceipt = candidate_receipt.into();
1075 let requested_peer_1 = PeerId::random();
1076 let requested_peer_2 = PeerId::random();
1077
1078 let identifier1 = request_manager
1079 .get_or_insert(relay_parent, candidate, 1.into())
1080 .identifier
1081 .clone();
1082 request_manager
1083 .get_or_insert(relay_parent, candidate, 1.into())
1084 .add_peer(requested_peer_1);
1085 let identifier2 = request_manager
1086 .get_or_insert(relay_parent, candidate, 2.into())
1087 .identifier
1088 .clone();
1089 request_manager
1090 .get_or_insert(relay_parent, candidate, 2.into())
1091 .add_peer(requested_peer_2);
1092
1093 assert_ne!(identifier1, identifier2);
1094 assert_eq!(request_manager.requests.len(), 2);
1095
1096 let group_size = 3;
1097 let group = &[ValidatorIndex(0), ValidatorIndex(1), ValidatorIndex(2)];
1098
1099 let unwanted_mask = StatementFilter::blank(group_size);
1100 let disabled_mask: BitVec<u8, Lsb0> = Default::default();
1101 let request_properties = RequestProperties { unwanted_mask, backing_threshold: None };
1102
1103 {
1105 let request_props =
1106 |_identifier: &CandidateIdentifier| Some((&request_properties).clone());
1107 let peer_advertised = |_identifier: &CandidateIdentifier, _peer: &_| {
1108 Some(StatementFilter::full(group_size))
1109 };
1110
1111 let outgoing = request_manager
1112 .next_request(&mut response_manager, request_props, peer_advertised)
1113 .unwrap();
1114 assert_eq!(outgoing.payload.candidate_hash, candidate);
1115 let outgoing = request_manager
1116 .next_request(&mut response_manager, request_props, peer_advertised)
1117 .unwrap();
1118 assert_eq!(outgoing.payload.candidate_hash, candidate);
1119 }
1120
1121 {
1123 let statements = vec![];
1124 let response = UnhandledResponse {
1125 response: TaggedResponse {
1126 identifier: identifier1,
1127 requested_peer: requested_peer_1,
1128 props: request_properties.clone(),
1129 response: Ok(AttestedCandidateResponse {
1130 candidate_receipt: candidate_receipt.clone().into(),
1131 persisted_validation_data: persisted_validation_data.clone(),
1132 statements,
1133 }),
1134 },
1135 };
1136 let validator_key_lookup = |_v| None;
1137 let allowed_para_lookup = |_para, _g_index| true;
1138 let statements = vec![];
1139 let output = response.validate_response(
1140 &mut request_manager,
1141 group,
1142 0,
1143 validator_key_lookup,
1144 allowed_para_lookup,
1145 disabled_mask.clone(),
1146 &Default::default(),
1147 false,
1148 );
1149 assert_eq!(
1150 output,
1151 ResponseValidationOutput {
1152 requested_peer: requested_peer_1,
1153 request_status: CandidateRequestStatus::Complete {
1154 candidate: candidate_receipt.clone(),
1155 persisted_validation_data: persisted_validation_data.clone(),
1156 statements,
1157 },
1158 reputation_changes: vec![(requested_peer_1, BENEFIT_VALID_RESPONSE)],
1159 }
1160 );
1161 }
1162
1163 {
1165 let statements = vec![];
1166 let response = UnhandledResponse {
1167 response: TaggedResponse {
1168 identifier: identifier2,
1169 requested_peer: requested_peer_2,
1170 props: request_properties,
1171 response: Ok(AttestedCandidateResponse {
1172 candidate_receipt: candidate_receipt.clone().into(),
1173 persisted_validation_data: persisted_validation_data.clone(),
1174 statements,
1175 }),
1176 },
1177 };
1178 let validator_key_lookup = |_v| None;
1179 let allowed_para_lookup = |_para, _g_index| true;
1180 let output = response.validate_response(
1181 &mut request_manager,
1182 group,
1183 0,
1184 validator_key_lookup,
1185 allowed_para_lookup,
1186 disabled_mask,
1187 &Default::default(),
1188 false,
1189 );
1190 assert_eq!(
1191 output,
1192 ResponseValidationOutput {
1193 requested_peer: requested_peer_2,
1194 request_status: CandidateRequestStatus::Outdated,
1195 reputation_changes: vec![],
1196 }
1197 );
1198 }
1199
1200 assert_eq!(request_manager.requests.len(), 0);
1201 }
1202
1203 #[test]
1206 fn handle_outdated_response_due_to_garbage_collection() {
1207 let mut request_manager = RequestManager::new();
1208 let mut response_manager = ResponseManager::new();
1209
1210 let relay_parent = Hash::from_low_u64_le(1);
1211 let mut candidate_receipt = test_helpers::dummy_committed_candidate_receipt(relay_parent);
1212 let persisted_validation_data = dummy_pvd();
1213 candidate_receipt.descriptor.persisted_validation_data_hash =
1214 persisted_validation_data.hash();
1215 let candidate = candidate_receipt.hash();
1216 let requested_peer = PeerId::random();
1217
1218 let identifier = request_manager
1219 .get_or_insert(relay_parent, candidate, 1.into())
1220 .identifier
1221 .clone();
1222 request_manager
1223 .get_or_insert(relay_parent, candidate, 1.into())
1224 .add_peer(requested_peer);
1225
1226 let group_size = 3;
1227 let group = &[ValidatorIndex(0), ValidatorIndex(1), ValidatorIndex(2)];
1228
1229 let unwanted_mask = StatementFilter::blank(group_size);
1230 let request_properties = RequestProperties { unwanted_mask, backing_threshold: None };
1231 let peer_advertised =
1232 |_identifier: &CandidateIdentifier, _peer: &_| Some(StatementFilter::full(group_size));
1233
1234 {
1236 let request_props =
1237 |_identifier: &CandidateIdentifier| Some((&request_properties).clone());
1238
1239 let outgoing = request_manager
1240 .next_request(&mut response_manager, request_props, peer_advertised)
1241 .unwrap();
1242 assert_eq!(outgoing.payload.candidate_hash, candidate);
1243 }
1244
1245 request_manager.remove_by_relay_parent(relay_parent);
1247
1248 {
1250 let statements = vec![];
1251 let response = UnhandledResponse {
1252 response: TaggedResponse {
1253 identifier,
1254 requested_peer,
1255 props: request_properties,
1256 response: Ok(AttestedCandidateResponse {
1257 candidate_receipt: candidate_receipt.clone().into(),
1258 persisted_validation_data: persisted_validation_data.clone(),
1259 statements,
1260 }),
1261 },
1262 };
1263 let validator_key_lookup = |_v| None;
1264 let allowed_para_lookup = |_para, _g_index| true;
1265 let disabled_mask: BitVec<u8, Lsb0> = Default::default();
1266 let output = response.validate_response(
1267 &mut request_manager,
1268 group,
1269 0,
1270 validator_key_lookup,
1271 allowed_para_lookup,
1272 disabled_mask,
1273 &Default::default(),
1274 false,
1275 );
1276 assert_eq!(
1277 output,
1278 ResponseValidationOutput {
1279 requested_peer,
1280 request_status: CandidateRequestStatus::Outdated,
1281 reputation_changes: vec![],
1282 }
1283 );
1284 }
1285 }
1286
1287 #[test]
1288 fn should_clean_up_after_successful_requests() {
1289 let mut request_manager = RequestManager::new();
1290 let mut response_manager = ResponseManager::new();
1291
1292 let relay_parent = Hash::from_low_u64_le(1);
1293 let mut candidate_receipt = test_helpers::dummy_committed_candidate_receipt(relay_parent);
1294 let persisted_validation_data = dummy_pvd();
1295 candidate_receipt.descriptor.persisted_validation_data_hash =
1296 persisted_validation_data.hash();
1297 let candidate = candidate_receipt.hash();
1298 let requested_peer = PeerId::random();
1299
1300 let identifier = request_manager
1301 .get_or_insert(relay_parent, candidate, 1.into())
1302 .identifier
1303 .clone();
1304 request_manager
1305 .get_or_insert(relay_parent, candidate, 1.into())
1306 .add_peer(requested_peer);
1307
1308 assert_eq!(request_manager.requests.len(), 1);
1309 assert_eq!(request_manager.by_priority.len(), 1);
1310
1311 let group_size = 3;
1312 let group = &[ValidatorIndex(0), ValidatorIndex(1), ValidatorIndex(2)];
1313
1314 let unwanted_mask = StatementFilter::blank(group_size);
1315 let request_properties = RequestProperties { unwanted_mask, backing_threshold: None };
1316 let peer_advertised =
1317 |_identifier: &CandidateIdentifier, _peer: &_| Some(StatementFilter::full(group_size));
1318
1319 {
1321 let request_props =
1322 |_identifier: &CandidateIdentifier| Some((&request_properties).clone());
1323
1324 let outgoing = request_manager
1325 .next_request(&mut response_manager, request_props, peer_advertised)
1326 .unwrap();
1327 assert_eq!(outgoing.payload.candidate_hash, candidate);
1328 }
1329
1330 {
1332 let statements = vec![];
1333 let response = UnhandledResponse {
1334 response: TaggedResponse {
1335 identifier,
1336 requested_peer,
1337 props: request_properties.clone(),
1338 response: Ok(AttestedCandidateResponse {
1339 candidate_receipt: candidate_receipt.clone().into(),
1340 persisted_validation_data: persisted_validation_data.clone(),
1341 statements,
1342 }),
1343 },
1344 };
1345 let validator_key_lookup = |_v| None;
1346 let allowed_para_lookup = |_para, _g_index| true;
1347 let statements = vec![];
1348 let disabled_mask: BitVec<u8, Lsb0> = Default::default();
1349 let output = response.validate_response(
1350 &mut request_manager,
1351 group,
1352 0,
1353 validator_key_lookup,
1354 allowed_para_lookup,
1355 disabled_mask,
1356 &Default::default(),
1357 false,
1358 );
1359 assert_eq!(
1360 output,
1361 ResponseValidationOutput {
1362 requested_peer,
1363 request_status: CandidateRequestStatus::Complete {
1364 candidate: candidate_receipt.clone().into(),
1365 persisted_validation_data: persisted_validation_data.clone(),
1366 statements,
1367 },
1368 reputation_changes: vec![(requested_peer, BENEFIT_VALID_RESPONSE)],
1369 }
1370 );
1371 }
1372
1373 assert_eq!(request_manager.requests.len(), 0);
1375 assert_eq!(request_manager.by_priority.len(), 0);
1376 }
1377
1378 #[test]
1382 fn rate_limit_requests_to_same_peer() {
1383 let mut request_manager = RequestManager::new();
1384 let mut response_manager = ResponseManager::new();
1385
1386 let relay_parent = Hash::from_low_u64_le(1);
1387
1388 let mut candidate_receipt_1 = test_helpers::dummy_committed_candidate_receipt(relay_parent);
1390 let persisted_validation_data_1 = dummy_pvd();
1391 candidate_receipt_1.descriptor.persisted_validation_data_hash =
1392 persisted_validation_data_1.hash();
1393 let candidate_1 = candidate_receipt_1.hash();
1394
1395 let mut candidate_receipt_2 = test_helpers::dummy_committed_candidate_receipt(relay_parent);
1396 let persisted_validation_data_2 = dummy_pvd();
1397 candidate_receipt_2.descriptor.persisted_validation_data_hash =
1398 persisted_validation_data_2.hash();
1399 let candidate_2 = candidate_receipt_2.hash();
1400
1401 let mut candidate_receipt_3 = test_helpers::dummy_committed_candidate_receipt(relay_parent);
1402 let persisted_validation_data_3 = dummy_pvd();
1403 candidate_receipt_3.descriptor.persisted_validation_data_hash =
1404 persisted_validation_data_3.hash();
1405 let candidate_3 = candidate_receipt_3.hash();
1406
1407 let requested_peer_1 = PeerId::random();
1409 let requested_peer_2 = PeerId::random();
1410
1411 let group_size = 3;
1412 let group = &[ValidatorIndex(0), ValidatorIndex(1), ValidatorIndex(2)];
1413 let unwanted_mask = StatementFilter::blank(group_size);
1414 let disabled_mask: BitVec<u8, Lsb0> = Default::default();
1415 let request_properties = RequestProperties { unwanted_mask, backing_threshold: None };
1416 let request_props = |_identifier: &CandidateIdentifier| Some((&request_properties).clone());
1417 let peer_advertised =
1418 |_identifier: &CandidateIdentifier, _peer: &_| Some(StatementFilter::full(group_size));
1419
1420 let identifier1 = request_manager
1422 .get_or_insert(relay_parent, candidate_1, 1.into())
1423 .identifier
1424 .clone();
1425 request_manager
1426 .get_or_insert(relay_parent, candidate_1, 1.into())
1427 .add_peer(requested_peer_1);
1428
1429 let _identifier3 = request_manager
1431 .get_or_insert(relay_parent, candidate_3, 1.into())
1432 .identifier
1433 .clone();
1434 request_manager
1435 .get_or_insert(relay_parent, candidate_3, 1.into())
1436 .add_peer(requested_peer_2);
1437
1438 for _ in 0..2 {
1440 let outgoing =
1441 request_manager.next_request(&mut response_manager, request_props, peer_advertised);
1442 assert!(outgoing.is_some());
1443 }
1444 assert_eq!(response_manager.active_peers.len(), 2);
1445 assert!(response_manager.is_sending_to(&requested_peer_1));
1446 assert!(response_manager.is_sending_to(&requested_peer_2));
1447 assert_eq!(request_manager.requests.len(), 2);
1448
1449 let _identifier2 = request_manager
1451 .get_or_insert(relay_parent, candidate_2, 1.into())
1452 .identifier
1453 .clone();
1454 request_manager
1455 .get_or_insert(relay_parent, candidate_2, 1.into())
1456 .add_peer(requested_peer_1);
1457
1458 let outgoing =
1461 request_manager.next_request(&mut response_manager, request_props, peer_advertised);
1462 assert!(outgoing.is_none());
1463 assert_eq!(response_manager.active_peers.len(), 2);
1464 assert!(response_manager.is_sending_to(&requested_peer_1));
1465 assert!(response_manager.is_sending_to(&requested_peer_2));
1466 assert_eq!(request_manager.requests.len(), 3);
1467
1468 response_manager.active_peers.remove(&requested_peer_1);
1470 response_manager.pending_responses = FuturesUnordered::new();
1471
1472 {
1474 let statements = vec![];
1475 let response = UnhandledResponse {
1476 response: TaggedResponse {
1477 identifier: identifier1,
1478 requested_peer: requested_peer_1,
1479 props: request_properties.clone(),
1480 response: Ok(AttestedCandidateResponse {
1481 candidate_receipt: candidate_receipt_1.clone().into(),
1482 persisted_validation_data: persisted_validation_data_1.clone(),
1483 statements,
1484 }),
1485 },
1486 };
1487 let validator_key_lookup = |_v| None;
1488 let allowed_para_lookup = |_para, _g_index| true;
1489 let _output = response.validate_response(
1490 &mut request_manager,
1491 group,
1492 0,
1493 validator_key_lookup,
1494 allowed_para_lookup,
1495 disabled_mask.clone(),
1496 &Default::default(),
1497 false,
1498 );
1499
1500 assert_eq!(request_manager.requests.len(), 2);
1502 assert_eq!(response_manager.active_peers.len(), 1);
1503 assert!(response_manager.is_sending_to(&requested_peer_2));
1504 }
1505
1506 let outgoing =
1508 request_manager.next_request(&mut response_manager, request_props, peer_advertised);
1509 assert!(outgoing.is_some());
1510 assert_eq!(response_manager.active_peers.len(), 2);
1511 assert!(response_manager.is_sending_to(&requested_peer_1));
1512 assert!(response_manager.is_sending_to(&requested_peer_2));
1513 assert_eq!(request_manager.requests.len(), 2);
1514 }
1515}