referrerpolicy=no-referrer-when-downgrade

polkadot_statement_distribution/v2/
requests.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14//! A requester for full information on candidates.
15//!
16//! 1. We use `RequestManager::get_or_insert().get_mut()` to add and mutate [`RequestedCandidate`]s,
17//!    either setting the
18//! priority or adding a peer we know has the candidate. We currently prioritize "cluster"
19//! candidates (those from our own group, although the cluster mechanism could be made to include
20//! multiple groups in the future) over "grid" candidates (those from other groups).
21//!
22//! 2. The main loop of the module will invoke [`RequestManager::next_request`] in a loop until it
23//!    returns `None`,
24//! dispatching all requests with the `NetworkBridgeTxMessage`. The receiving half of the channel is
25//! owned by the [`RequestManager`].
26//!
27//! 3. The main loop of the module will also select over [`RequestManager::await_incoming`] to
28//!    receive
29//! [`UnhandledResponse`]s, which it then validates using [`UnhandledResponse::validate_response`]
30//! (which requires state not owned by the request manager).
31
32use super::{
33	seconded_and_sufficient, CandidateDescriptorVersion, TransposedClaimQueue,
34	BENEFIT_VALID_RESPONSE, BENEFIT_VALID_STATEMENT, COST_IMPROPERLY_DECODED_RESPONSE,
35	COST_INVALID_RESPONSE, COST_INVALID_SESSION_INDEX, COST_INVALID_SIGNATURE,
36	COST_INVALID_UMP_SIGNALS, COST_UNREQUESTED_RESPONSE_STATEMENT,
37	COST_UNSUPPORTED_DESCRIPTOR_VERSION, REQUEST_RETRY_DELAY,
38};
39use crate::LOG_TARGET;
40
41use bitvec::prelude::{BitVec, Lsb0};
42use polkadot_node_network_protocol::{
43	request_response::{
44		outgoing::{Recipient as RequestRecipient, RequestError},
45		v2::{AttestedCandidateRequest, AttestedCandidateResponse},
46		OutgoingRequest, OutgoingResult, MAX_PARALLEL_ATTESTED_CANDIDATE_REQUESTS,
47	},
48	v3::StatementFilter,
49	PeerId, UnifiedReputationChange as Rep,
50};
51use polkadot_primitives::{
52	CandidateHash, CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CompactStatement,
53	GroupIndex, Hash, Id as ParaId, PersistedValidationData, SessionIndex, SignedStatement,
54	SigningContext, ValidatorId, ValidatorIndex,
55};
56
57use futures::{future::BoxFuture, prelude::*, stream::FuturesUnordered};
58
59use std::{
60	collections::{
61		hash_map::{Entry as HEntry, HashMap},
62		HashSet, VecDeque,
63	},
64	time::Instant,
65};
66
67/// An identifier for a candidate.
68///
69/// In this module, we are requesting candidates
70/// for which we have no information other than the candidate hash and statements signed
71/// by validators. It is possible for validators for multiple groups to abuse this lack of
72/// information: until we actually get the preimage of this candidate we cannot confirm
73/// anything other than the candidate hash.
74#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
75pub struct CandidateIdentifier {
76	/// The relay-parent this candidate is ostensibly under.
77	pub relay_parent: Hash,
78	/// The hash of the candidate.
79	pub candidate_hash: CandidateHash,
80	/// The index of the group claiming to be assigned to the candidate's
81	/// para.
82	pub group_index: GroupIndex,
83}
84
85struct TaggedResponse {
86	identifier: CandidateIdentifier,
87	requested_peer: PeerId,
88	props: RequestProperties,
89	response: OutgoingResult<AttestedCandidateResponse>,
90}
91
92/// A pending request.
93#[derive(Debug)]
94pub struct RequestedCandidate {
95	priority: Priority,
96	known_by: VecDeque<PeerId>,
97	/// Has the request been sent out and a response not yet received?
98	in_flight: bool,
99	/// The timestamp for the next time we should retry, if the response failed.
100	next_retry_time: Option<Instant>,
101}
102
103impl RequestedCandidate {
104	fn is_pending(&self) -> bool {
105		if self.in_flight {
106			return false
107		}
108
109		if let Some(next_retry_time) = self.next_retry_time {
110			let can_retry = Instant::now() >= next_retry_time;
111			if !can_retry {
112				return false
113			}
114		}
115
116		true
117	}
118}
119
120#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
121enum Origin {
122	Cluster = 0,
123	Unspecified = 1,
124}
125
126#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
127struct Priority {
128	origin: Origin,
129	attempts: usize,
130}
131
132/// An entry for manipulating a requested candidate.
133pub struct Entry<'a> {
134	prev_index: usize,
135	identifier: CandidateIdentifier,
136	by_priority: &'a mut Vec<(Priority, CandidateIdentifier)>,
137	requested: &'a mut RequestedCandidate,
138}
139
140impl<'a> Entry<'a> {
141	/// Add a peer to the set of known peers.
142	pub fn add_peer(&mut self, peer: PeerId) {
143		if !self.requested.known_by.contains(&peer) {
144			self.requested.known_by.push_back(peer);
145		}
146	}
147
148	/// Note that the candidate is required for the cluster.
149	pub fn set_cluster_priority(&mut self) {
150		self.requested.priority.origin = Origin::Cluster;
151
152		insert_or_update_priority(
153			&mut *self.by_priority,
154			Some(self.prev_index),
155			self.identifier.clone(),
156			self.requested.priority.clone(),
157		);
158	}
159}
160
161/// A manager for outgoing requests.
162pub struct RequestManager {
163	requests: HashMap<CandidateIdentifier, RequestedCandidate>,
164	// sorted by priority.
165	by_priority: Vec<(Priority, CandidateIdentifier)>,
166	// all unique identifiers for the candidate.
167	unique_identifiers: HashMap<CandidateHash, HashSet<CandidateIdentifier>>,
168}
169
170impl RequestManager {
171	/// Create a new [`RequestManager`].
172	pub fn new() -> Self {
173		RequestManager {
174			requests: HashMap::new(),
175			by_priority: Vec::new(),
176			unique_identifiers: HashMap::new(),
177		}
178	}
179
180	/// Gets an [`Entry`] for mutating a request and inserts it if the
181	/// manager doesn't store this request already.
182	pub fn get_or_insert(
183		&mut self,
184		relay_parent: Hash,
185		candidate_hash: CandidateHash,
186		group_index: GroupIndex,
187	) -> Entry {
188		let identifier = CandidateIdentifier { relay_parent, candidate_hash, group_index };
189
190		let (candidate, fresh) = match self.requests.entry(identifier.clone()) {
191			HEntry::Occupied(e) => (e.into_mut(), false),
192			HEntry::Vacant(e) => (
193				e.insert(RequestedCandidate {
194					priority: Priority { attempts: 0, origin: Origin::Unspecified },
195					known_by: VecDeque::new(),
196					in_flight: false,
197					next_retry_time: None,
198				}),
199				true,
200			),
201		};
202
203		let priority_index = if fresh {
204			self.unique_identifiers
205				.entry(candidate_hash)
206				.or_default()
207				.insert(identifier.clone());
208
209			insert_or_update_priority(
210				&mut self.by_priority,
211				None,
212				identifier.clone(),
213				candidate.priority.clone(),
214			)
215		} else {
216			match self
217				.by_priority
218				.binary_search(&(candidate.priority.clone(), identifier.clone()))
219			{
220				Ok(i) => i,
221				Err(_) => unreachable!("requested candidates always have a priority entry; qed"),
222			}
223		};
224
225		Entry {
226			prev_index: priority_index,
227			identifier,
228			by_priority: &mut self.by_priority,
229			requested: candidate,
230		}
231	}
232
233	/// Remove all pending requests for the given candidate.
234	pub fn remove_for(&mut self, candidate: CandidateHash) {
235		if let Some(identifiers) = self.unique_identifiers.remove(&candidate) {
236			self.by_priority.retain(|(_priority, id)| !identifiers.contains(&id));
237			for id in identifiers {
238				self.requests.remove(&id);
239			}
240		}
241	}
242
243	/// Remove based on relay-parent.
244	pub fn remove_by_relay_parent(&mut self, relay_parent: Hash) {
245		let mut candidate_hashes = HashSet::new();
246
247		// Remove from `by_priority` and `requests`.
248		self.by_priority.retain(|(_priority, id)| {
249			let retain = relay_parent != id.relay_parent;
250			if !retain {
251				self.requests.remove(id);
252				candidate_hashes.insert(id.candidate_hash);
253			}
254			retain
255		});
256
257		// Remove from `unique_identifiers`.
258		for candidate_hash in candidate_hashes {
259			match self.unique_identifiers.entry(candidate_hash) {
260				HEntry::Occupied(mut entry) => {
261					entry.get_mut().retain(|id| relay_parent != id.relay_parent);
262					if entry.get().is_empty() {
263						entry.remove();
264					}
265				},
266				// We can expect to encounter vacant entries, but only if nodes are misbehaving and
267				// we don't use a deduplicating collection; there are no issues from ignoring it.
268				HEntry::Vacant(_) => (),
269			}
270		}
271
272		gum::debug!(
273			target: LOG_TARGET,
274			"Requests remaining after cleanup: {}",
275			self.by_priority.len(),
276		);
277	}
278
279	/// Returns true if there are pending requests that are dispatchable.
280	pub fn has_pending_requests(&self) -> bool {
281		for (_id, entry) in &self.requests {
282			if entry.is_pending() {
283				return true
284			}
285		}
286
287		false
288	}
289
290	/// Returns an instant at which the next request to be retried will be ready.
291	pub fn next_retry_time(&mut self) -> Option<Instant> {
292		let mut next = None;
293		for (_id, request) in self.requests.iter().filter(|(_id, request)| !request.in_flight) {
294			if let Some(next_retry_time) = request.next_retry_time {
295				if next.map_or(true, |next| next_retry_time < next) {
296					next = Some(next_retry_time);
297				}
298			}
299		}
300		next
301	}
302
303	/// Yields the next request to dispatch, if there is any.
304	///
305	/// This function accepts two closures as an argument.
306	///
307	/// The first closure is used to gather information about the desired
308	/// properties of a response, which is used to select targets and validate
309	/// the response later on.
310	///
311	/// The second closure is used to determine the specific advertised
312	/// statements by a peer, to be compared against the mask and backing
313	/// threshold and returns `None` if the peer is no longer connected.
314	pub fn next_request(
315		&mut self,
316		response_manager: &mut ResponseManager,
317		request_props: impl Fn(&CandidateIdentifier) -> Option<RequestProperties>,
318		peer_advertised: impl Fn(&CandidateIdentifier, &PeerId) -> Option<StatementFilter>,
319	) -> Option<OutgoingRequest<AttestedCandidateRequest>> {
320		// The number of parallel requests a node can answer is limited by
321		// `MAX_PARALLEL_ATTESTED_CANDIDATE_REQUESTS`, however there is no
322		// need for the current node to limit itself to the same amount the
323		// requests, because the requests are going to different nodes anyways.
324		// While looking at https://github.com/paritytech/polkadot-sdk/issues/3314,
325		// found out that this requests take around 100ms to fulfill, so it
326		// would make sense to try to request things as early as we can, given
327		// we would need to request it for each candidate, around 25 right now
328		// on kusama.
329		if response_manager.len() >= 2 * MAX_PARALLEL_ATTESTED_CANDIDATE_REQUESTS as usize {
330			return None
331		}
332
333		let mut res = None;
334
335		// loop over all requests, in order of priority.
336		// do some active maintenance of the connected peers.
337		// dispatch the first request which is not in-flight already.
338
339		let mut cleanup_outdated = Vec::new();
340		for (i, (_priority, id)) in self.by_priority.iter().enumerate() {
341			let entry = match self.requests.get_mut(&id) {
342				None => {
343					gum::error!(
344						target: LOG_TARGET,
345						identifier = ?id,
346						"Missing entry for priority queue member",
347					);
348
349					continue
350				},
351				Some(e) => e,
352			};
353
354			if !entry.is_pending() {
355				continue
356			}
357
358			let props = match request_props(&id) {
359				None => {
360					cleanup_outdated.push((i, id.clone()));
361					continue
362				},
363				Some(s) => s,
364			};
365
366			let target = match find_request_target_with_update(
367				&mut entry.known_by,
368				id,
369				&props,
370				&peer_advertised,
371				&response_manager,
372			) {
373				None => continue,
374				Some(t) => t,
375			};
376
377			gum::debug!(
378				target: crate::LOG_TARGET,
379				candidate_hash = ?id.candidate_hash,
380				peer = ?target,
381				"Issuing candidate request"
382			);
383
384			let (request, response_fut) = OutgoingRequest::new(
385				RequestRecipient::Peer(target),
386				AttestedCandidateRequest {
387					candidate_hash: id.candidate_hash,
388					mask: props.unwanted_mask.clone(),
389				},
390			);
391
392			let stored_id = id.clone();
393			response_manager.push(
394				Box::pin(async move {
395					TaggedResponse {
396						identifier: stored_id,
397						requested_peer: target,
398						props,
399						response: response_fut.await,
400					}
401				}),
402				target,
403			);
404
405			entry.in_flight = true;
406
407			res = Some(request);
408			break
409		}
410
411		for (priority_index, identifier) in cleanup_outdated.into_iter().rev() {
412			self.by_priority.remove(priority_index);
413			self.requests.remove(&identifier);
414			if let HEntry::Occupied(mut e) =
415				self.unique_identifiers.entry(identifier.candidate_hash)
416			{
417				e.get_mut().remove(&identifier);
418				if e.get().is_empty() {
419					e.remove();
420				}
421			}
422		}
423
424		res
425	}
426}
427
428/// A manager for pending responses.
429pub struct ResponseManager {
430	pending_responses: FuturesUnordered<BoxFuture<'static, TaggedResponse>>,
431	active_peers: HashSet<PeerId>,
432}
433
434impl ResponseManager {
435	pub fn new() -> Self {
436		Self { pending_responses: FuturesUnordered::new(), active_peers: HashSet::new() }
437	}
438
439	/// Await the next incoming response to a sent request, or immediately
440	/// return `None` if there are no pending responses.
441	pub async fn incoming(&mut self) -> Option<UnhandledResponse> {
442		self.pending_responses.next().await.map(|response| {
443			self.active_peers.remove(&response.requested_peer);
444			UnhandledResponse { response }
445		})
446	}
447
448	fn len(&self) -> usize {
449		self.pending_responses.len()
450	}
451
452	fn push(&mut self, response: BoxFuture<'static, TaggedResponse>, target: PeerId) {
453		self.pending_responses.push(response);
454		self.active_peers.insert(target);
455	}
456
457	/// Returns true if we are currently sending a request to the peer.
458	fn is_sending_to(&self, peer: &PeerId) -> bool {
459		self.active_peers.contains(peer)
460	}
461}
462
463/// Properties used in target selection and validation of a request.
464#[derive(Clone)]
465pub struct RequestProperties {
466	/// A mask for limiting the statements the response is allowed to contain.
467	/// The mask has `OR` semantics: statements by validators corresponding to bits
468	/// in the mask are not desired. It also returns the required backing threshold
469	/// for the candidate.
470	pub unwanted_mask: StatementFilter,
471	/// The required backing threshold, if any. If this is `Some`, then requests will only
472	/// be made to peers which can provide enough statements to back the candidate, when
473	/// taking into account the `unwanted_mask`, and a response will only be validated
474	/// in the case of those statements.
475	///
476	/// If this is `None`, it is assumed that only the candidate itself is needed.
477	pub backing_threshold: Option<usize>,
478}
479
480/// Finds a valid request target, returning `None` if none exists.
481/// Cleans up disconnected peers and places the returned peer at the back of the queue.
482fn find_request_target_with_update(
483	known_by: &mut VecDeque<PeerId>,
484	candidate_identifier: &CandidateIdentifier,
485	props: &RequestProperties,
486	peer_advertised: impl Fn(&CandidateIdentifier, &PeerId) -> Option<StatementFilter>,
487	response_manager: &ResponseManager,
488) -> Option<PeerId> {
489	let mut prune = Vec::new();
490	let mut target = None;
491	for (i, p) in known_by.iter().enumerate() {
492		// If we are already sending to that peer, skip for now
493		if response_manager.is_sending_to(p) {
494			continue
495		}
496
497		let mut filter = match peer_advertised(candidate_identifier, p) {
498			None => {
499				prune.push(i);
500				continue
501			},
502			Some(f) => f,
503		};
504
505		filter.mask_seconded(&props.unwanted_mask.seconded_in_group);
506		filter.mask_valid(&props.unwanted_mask.validated_in_group);
507		if seconded_and_sufficient(&filter, props.backing_threshold) {
508			target = Some((i, *p));
509			break
510		}
511	}
512
513	let prune_count = prune.len();
514	for i in prune {
515		known_by.remove(i);
516	}
517
518	if let Some((i, p)) = target {
519		known_by.remove(i - prune_count);
520		known_by.push_back(p);
521		Some(p)
522	} else {
523		None
524	}
525}
526
527/// A response to a request, which has not yet been handled.
528pub struct UnhandledResponse {
529	response: TaggedResponse,
530}
531
532impl UnhandledResponse {
533	/// Get the candidate identifier which the corresponding request
534	/// was classified under.
535	pub fn candidate_identifier(&self) -> &CandidateIdentifier {
536		&self.response.identifier
537	}
538
539	/// Get the peer we made the request to.
540	pub fn requested_peer(&self) -> &PeerId {
541		&self.response.requested_peer
542	}
543
544	/// Validate the response. If the response is valid, this will yield the
545	/// candidate, the [`PersistedValidationData`] of the candidate, and requested
546	/// checked statements.
547	///
548	/// Valid responses are defined as those which provide a valid candidate
549	/// and signatures which match the identifier, and provide enough statements to back the
550	/// candidate.
551	///
552	/// This will also produce a record of misbehaviors by peers:
553	///   * If the response is partially valid, misbehavior by the responding peer.
554	///   * If there are other peers which have advertised the same candidate for different
555	///     relay-parents or para-ids, misbehavior reports for those peers will also be generated.
556	///
557	/// Finally, in the case that the response is either valid or partially valid,
558	/// this will clean up all remaining requests for the candidate in the manager.
559	///
560	/// As parameters, the user should supply the canonical group array as well
561	/// as a mapping from validator index to validator ID. The validator pubkey mapping
562	/// will not be queried except for validator indices in the group.
563	pub fn validate_response(
564		self,
565		manager: &mut RequestManager,
566		group: &[ValidatorIndex],
567		session: SessionIndex,
568		validator_key_lookup: impl Fn(ValidatorIndex) -> Option<ValidatorId>,
569		allowed_para_lookup: impl Fn(ParaId, GroupIndex) -> bool,
570		disabled_mask: BitVec<u8, Lsb0>,
571		transposed_cq: &TransposedClaimQueue,
572		allow_v2_descriptors: bool,
573	) -> ResponseValidationOutput {
574		let UnhandledResponse {
575			response: TaggedResponse { identifier, requested_peer, props, response },
576		} = self;
577
578		// handle races if the candidate is no longer known.
579		// this could happen if we requested the candidate under two
580		// different identifiers at the same time, and received a valid
581		// response on the other.
582		//
583		// it could also happen in the case that we had a request in-flight
584		// and the request entry was garbage-collected on outdated relay parent.
585		let entry = match manager.requests.get_mut(&identifier) {
586			None =>
587				return ResponseValidationOutput {
588					requested_peer,
589					reputation_changes: Vec::new(),
590					request_status: CandidateRequestStatus::Outdated,
591				},
592			Some(e) => e,
593		};
594
595		let priority_index = match manager
596			.by_priority
597			.binary_search(&(entry.priority.clone(), identifier.clone()))
598		{
599			Ok(i) => i,
600			Err(_) => unreachable!("requested candidates always have a priority entry; qed"),
601		};
602
603		// Set the next retry time before clearing the `in_flight` flag.
604		entry.next_retry_time = Some(Instant::now() + REQUEST_RETRY_DELAY);
605		entry.in_flight = false;
606		entry.priority.attempts += 1;
607
608		// update the location in the priority queue.
609		insert_or_update_priority(
610			&mut manager.by_priority,
611			Some(priority_index),
612			identifier.clone(),
613			entry.priority.clone(),
614		);
615
616		let complete_response = match response {
617			Err(RequestError::InvalidResponse(e)) => {
618				gum::trace!(
619					target: LOG_TARGET,
620					err = ?e,
621					peer = ?requested_peer,
622					"Improperly encoded response"
623				);
624
625				return ResponseValidationOutput {
626					requested_peer,
627					reputation_changes: vec![(requested_peer, COST_IMPROPERLY_DECODED_RESPONSE)],
628					request_status: CandidateRequestStatus::Incomplete,
629				}
630			},
631			Err(e @ RequestError::NetworkError(_) | e @ RequestError::Canceled(_)) => {
632				gum::trace!(
633					target: LOG_TARGET,
634					err = ?e,
635					peer = ?requested_peer,
636					"Request error"
637				);
638				return ResponseValidationOutput {
639					requested_peer,
640					reputation_changes: vec![],
641					request_status: CandidateRequestStatus::Incomplete,
642				}
643			},
644			Ok(response) => response,
645		};
646
647		let output = validate_complete_response(
648			&identifier,
649			props,
650			complete_response,
651			requested_peer,
652			group,
653			session,
654			validator_key_lookup,
655			allowed_para_lookup,
656			disabled_mask,
657			transposed_cq,
658			allow_v2_descriptors,
659		);
660
661		if let CandidateRequestStatus::Complete { .. } = output.request_status {
662			manager.remove_for(identifier.candidate_hash);
663		}
664
665		output
666	}
667}
668
669fn validate_complete_response(
670	identifier: &CandidateIdentifier,
671	props: RequestProperties,
672	response: AttestedCandidateResponse,
673	requested_peer: PeerId,
674	group: &[ValidatorIndex],
675	session: SessionIndex,
676	validator_key_lookup: impl Fn(ValidatorIndex) -> Option<ValidatorId>,
677	allowed_para_lookup: impl Fn(ParaId, GroupIndex) -> bool,
678	disabled_mask: BitVec<u8, Lsb0>,
679	transposed_cq: &TransposedClaimQueue,
680	allow_v2_descriptors: bool,
681) -> ResponseValidationOutput {
682	let RequestProperties { backing_threshold, mut unwanted_mask } = props;
683
684	// sanity check bitmask size. this is based entirely on
685	// local logic here.
686	if !unwanted_mask.has_len(group.len()) {
687		gum::error!(
688			target: LOG_TARGET,
689			group_len = group.len(),
690			"Logic bug: group size != sent bitmask len"
691		);
692
693		// resize and attempt to continue.
694		unwanted_mask.seconded_in_group.resize(group.len(), true);
695		unwanted_mask.validated_in_group.resize(group.len(), true);
696	}
697
698	let invalid_candidate_output = |cost: Rep| ResponseValidationOutput {
699		request_status: CandidateRequestStatus::Incomplete,
700		reputation_changes: vec![(requested_peer, cost)],
701		requested_peer,
702	};
703
704	let mut rep_changes = Vec::new();
705
706	// sanity-check candidate response.
707	// note: roughly ascending cost of operations
708	{
709		if response.candidate_receipt.descriptor.relay_parent() != identifier.relay_parent {
710			return invalid_candidate_output(COST_INVALID_RESPONSE)
711		}
712
713		if response.candidate_receipt.descriptor.persisted_validation_data_hash() !=
714			response.persisted_validation_data.hash()
715		{
716			return invalid_candidate_output(COST_INVALID_RESPONSE)
717		}
718
719		if !allowed_para_lookup(
720			response.candidate_receipt.descriptor.para_id(),
721			identifier.group_index,
722		) {
723			return invalid_candidate_output(COST_INVALID_RESPONSE)
724		}
725
726		if response.candidate_receipt.hash() != identifier.candidate_hash {
727			return invalid_candidate_output(COST_INVALID_RESPONSE)
728		}
729
730		let candidate_hash = response.candidate_receipt.hash();
731
732		// V2 descriptors are invalid if not enabled by runtime.
733		if !allow_v2_descriptors &&
734			response.candidate_receipt.descriptor.version() == CandidateDescriptorVersion::V2
735		{
736			gum::debug!(
737				target: LOG_TARGET,
738				?candidate_hash,
739				peer = ?requested_peer,
740				"Version 2 candidate receipts are not enabled by the runtime"
741			);
742			return invalid_candidate_output(COST_UNSUPPORTED_DESCRIPTOR_VERSION)
743		}
744		// Validate the ump signals.
745		if let Err(err) = response.candidate_receipt.parse_ump_signals(transposed_cq) {
746			gum::debug!(
747				target: LOG_TARGET,
748				?candidate_hash,
749				?err,
750				peer = ?requested_peer,
751				"Received candidate has invalid UMP signals"
752			);
753			return invalid_candidate_output(COST_INVALID_UMP_SIGNALS)
754		}
755
756		// Check if `session_index` of relay parent matches candidate descriptor
757		// `session_index`.
758		if let Some(candidate_session_index) = response.candidate_receipt.descriptor.session_index()
759		{
760			if candidate_session_index != session {
761				gum::debug!(
762					target: LOG_TARGET,
763					?candidate_hash,
764					peer = ?requested_peer,
765					session_index = session,
766					candidate_session_index,
767					"Received candidate has invalid session index"
768				);
769				return invalid_candidate_output(COST_INVALID_SESSION_INDEX)
770			}
771		}
772	}
773
774	// statement checks.
775	let statements = {
776		let mut statements =
777			Vec::with_capacity(std::cmp::min(response.statements.len(), group.len() * 2));
778
779		let mut received_filter = StatementFilter::blank(group.len());
780
781		let index_in_group = |v: ValidatorIndex| group.iter().position(|x| &v == x);
782
783		let signing_context =
784			SigningContext { parent_hash: identifier.relay_parent, session_index: session };
785
786		for unchecked_statement in response.statements.into_iter().take(group.len() * 2) {
787			// ensure statement is from a validator in the group.
788			let i = match index_in_group(unchecked_statement.unchecked_validator_index()) {
789				Some(i) => i,
790				None => {
791					rep_changes.push((requested_peer, COST_UNREQUESTED_RESPONSE_STATEMENT));
792					continue
793				},
794			};
795
796			// ensure statement is on the correct candidate hash.
797			if unchecked_statement.unchecked_payload().candidate_hash() !=
798				&identifier.candidate_hash
799			{
800				rep_changes.push((requested_peer, COST_UNREQUESTED_RESPONSE_STATEMENT));
801				continue
802			}
803
804			// filter out duplicates or statements outside the mask.
805			// note on indexing: we have ensured that the bitmask and the
806			// duplicate trackers have the correct size for the group.
807			match unchecked_statement.unchecked_payload() {
808				CompactStatement::Seconded(_) => {
809					if unwanted_mask.seconded_in_group[i] {
810						rep_changes.push((requested_peer, COST_UNREQUESTED_RESPONSE_STATEMENT));
811						continue
812					}
813
814					if received_filter.seconded_in_group[i] {
815						rep_changes.push((requested_peer, COST_UNREQUESTED_RESPONSE_STATEMENT));
816						continue
817					}
818				},
819				CompactStatement::Valid(_) => {
820					if unwanted_mask.validated_in_group[i] {
821						rep_changes.push((requested_peer, COST_UNREQUESTED_RESPONSE_STATEMENT));
822						continue
823					}
824
825					if received_filter.validated_in_group[i] {
826						rep_changes.push((requested_peer, COST_UNREQUESTED_RESPONSE_STATEMENT));
827						continue
828					}
829				},
830			}
831
832			if disabled_mask.get(i).map_or(false, |x| *x) {
833				continue
834			}
835
836			let validator_public =
837				match validator_key_lookup(unchecked_statement.unchecked_validator_index()) {
838					None => {
839						rep_changes.push((requested_peer, COST_INVALID_SIGNATURE));
840						continue
841					},
842					Some(p) => p,
843				};
844
845			let checked_statement =
846				match unchecked_statement.try_into_checked(&signing_context, &validator_public) {
847					Err(_) => {
848						rep_changes.push((requested_peer, COST_INVALID_SIGNATURE));
849						continue
850					},
851					Ok(checked) => checked,
852				};
853
854			match checked_statement.payload() {
855				CompactStatement::Seconded(_) => {
856					received_filter.seconded_in_group.set(i, true);
857				},
858				CompactStatement::Valid(_) => {
859					received_filter.validated_in_group.set(i, true);
860				},
861			}
862
863			statements.push(checked_statement);
864			rep_changes.push((requested_peer, BENEFIT_VALID_STATEMENT));
865		}
866
867		// Only accept responses which are sufficient, according to our
868		// required backing threshold.
869		if !seconded_and_sufficient(&received_filter, backing_threshold) {
870			return invalid_candidate_output(COST_INVALID_RESPONSE)
871		}
872
873		statements
874	};
875
876	rep_changes.push((requested_peer, BENEFIT_VALID_RESPONSE));
877
878	ResponseValidationOutput {
879		requested_peer,
880		request_status: CandidateRequestStatus::Complete {
881			candidate: response.candidate_receipt,
882			persisted_validation_data: response.persisted_validation_data,
883			statements,
884		},
885		reputation_changes: rep_changes,
886	}
887}
888
889/// The status of the candidate request after the handling of a response.
890#[derive(Debug, PartialEq)]
891pub enum CandidateRequestStatus {
892	/// The request was outdated at the point of receiving the response.
893	Outdated,
894	/// The response either did not arrive or was invalid.
895	Incomplete,
896	/// The response completed the request. Statements sent beyond the
897	/// mask have been ignored.
898	Complete {
899		candidate: CommittedCandidateReceipt,
900		persisted_validation_data: PersistedValidationData,
901		statements: Vec<SignedStatement>,
902	},
903}
904
905/// Output of the response validation.
906#[derive(Debug, PartialEq)]
907pub struct ResponseValidationOutput {
908	/// The peer we requested from.
909	pub requested_peer: PeerId,
910	/// The status of the request.
911	pub request_status: CandidateRequestStatus,
912	/// Any reputation changes as a result of validating the response.
913	pub reputation_changes: Vec<(PeerId, Rep)>,
914}
915
916fn insert_or_update_priority(
917	priority_sorted: &mut Vec<(Priority, CandidateIdentifier)>,
918	prev_index: Option<usize>,
919	candidate_identifier: CandidateIdentifier,
920	new_priority: Priority,
921) -> usize {
922	if let Some(prev_index) = prev_index {
923		// GIGO: this behaves strangely if prev-index is not for the
924		// expected identifier.
925		if priority_sorted[prev_index].0 == new_priority {
926			// unchanged.
927			return prev_index
928		} else {
929			priority_sorted.remove(prev_index);
930		}
931	}
932
933	let item = (new_priority, candidate_identifier);
934	match priority_sorted.binary_search(&item) {
935		Ok(i) => i, // ignore if already present.
936		Err(i) => {
937			priority_sorted.insert(i, item);
938			i
939		},
940	}
941}
942
943#[cfg(test)]
944mod tests {
945	use super::*;
946	use polkadot_primitives::HeadData;
947	use polkadot_primitives_test_helpers as test_helpers;
948
949	fn dummy_pvd() -> PersistedValidationData {
950		PersistedValidationData {
951			parent_head: HeadData(vec![7, 8, 9]),
952			relay_parent_number: 5,
953			max_pov_size: 1024,
954			relay_parent_storage_root: Default::default(),
955		}
956	}
957
958	#[test]
959	fn test_remove_by_relay_parent() {
960		let parent_a = Hash::from_low_u64_le(1);
961		let parent_b = Hash::from_low_u64_le(2);
962		let parent_c = Hash::from_low_u64_le(3);
963
964		let candidate_a1 = CandidateHash(Hash::from_low_u64_le(11));
965		let candidate_a2 = CandidateHash(Hash::from_low_u64_le(12));
966		let candidate_b1 = CandidateHash(Hash::from_low_u64_le(21));
967		let candidate_b2 = CandidateHash(Hash::from_low_u64_le(22));
968		let candidate_c1 = CandidateHash(Hash::from_low_u64_le(31));
969		let duplicate_hash = CandidateHash(Hash::from_low_u64_le(31));
970
971		let mut request_manager = RequestManager::new();
972		request_manager.get_or_insert(parent_a, candidate_a1, 1.into());
973		request_manager.get_or_insert(parent_a, candidate_a2, 1.into());
974		request_manager.get_or_insert(parent_b, candidate_b1, 1.into());
975		request_manager.get_or_insert(parent_b, candidate_b2, 2.into());
976		request_manager.get_or_insert(parent_c, candidate_c1, 2.into());
977		request_manager.get_or_insert(parent_a, duplicate_hash, 1.into());
978
979		assert_eq!(request_manager.requests.len(), 6);
980		assert_eq!(request_manager.by_priority.len(), 6);
981		assert_eq!(request_manager.unique_identifiers.len(), 5);
982
983		request_manager.remove_by_relay_parent(parent_a);
984
985		assert_eq!(request_manager.requests.len(), 3);
986		assert_eq!(request_manager.by_priority.len(), 3);
987		assert_eq!(request_manager.unique_identifiers.len(), 3);
988
989		assert!(!request_manager.unique_identifiers.contains_key(&candidate_a1));
990		assert!(!request_manager.unique_identifiers.contains_key(&candidate_a2));
991		// Duplicate hash should still be there (under a different parent).
992		assert!(request_manager.unique_identifiers.contains_key(&duplicate_hash));
993
994		request_manager.remove_by_relay_parent(parent_b);
995
996		assert_eq!(request_manager.requests.len(), 1);
997		assert_eq!(request_manager.by_priority.len(), 1);
998		assert_eq!(request_manager.unique_identifiers.len(), 1);
999
1000		assert!(!request_manager.unique_identifiers.contains_key(&candidate_b1));
1001		assert!(!request_manager.unique_identifiers.contains_key(&candidate_b2));
1002
1003		request_manager.remove_by_relay_parent(parent_c);
1004
1005		assert!(request_manager.requests.is_empty());
1006		assert!(request_manager.by_priority.is_empty());
1007		assert!(request_manager.unique_identifiers.is_empty());
1008	}
1009
1010	#[test]
1011	fn test_priority_ordering() {
1012		let parent_a = Hash::from_low_u64_le(1);
1013		let parent_b = Hash::from_low_u64_le(2);
1014		let parent_c = Hash::from_low_u64_le(3);
1015
1016		let candidate_a1 = CandidateHash(Hash::from_low_u64_le(11));
1017		let candidate_a2 = CandidateHash(Hash::from_low_u64_le(12));
1018		let candidate_b1 = CandidateHash(Hash::from_low_u64_le(21));
1019		let candidate_b2 = CandidateHash(Hash::from_low_u64_le(22));
1020		let candidate_c1 = CandidateHash(Hash::from_low_u64_le(31));
1021
1022		let mut request_manager = RequestManager::new();
1023
1024		// Add some entries, set a couple of them to cluster (high) priority.
1025		let identifier_a1 = request_manager
1026			.get_or_insert(parent_a, candidate_a1, 1.into())
1027			.identifier
1028			.clone();
1029		let identifier_a2 = {
1030			let mut entry = request_manager.get_or_insert(parent_a, candidate_a2, 1.into());
1031			entry.set_cluster_priority();
1032			entry.identifier.clone()
1033		};
1034		let identifier_b1 = request_manager
1035			.get_or_insert(parent_b, candidate_b1, 1.into())
1036			.identifier
1037			.clone();
1038		let identifier_b2 = request_manager
1039			.get_or_insert(parent_b, candidate_b2, 2.into())
1040			.identifier
1041			.clone();
1042		let identifier_c1 = {
1043			let mut entry = request_manager.get_or_insert(parent_c, candidate_c1, 2.into());
1044			entry.set_cluster_priority();
1045			entry.identifier.clone()
1046		};
1047
1048		let attempts = 0;
1049		assert_eq!(
1050			request_manager.by_priority,
1051			vec![
1052				(Priority { origin: Origin::Cluster, attempts }, identifier_a2),
1053				(Priority { origin: Origin::Cluster, attempts }, identifier_c1),
1054				(Priority { origin: Origin::Unspecified, attempts }, identifier_a1),
1055				(Priority { origin: Origin::Unspecified, attempts }, identifier_b1),
1056				(Priority { origin: Origin::Unspecified, attempts }, identifier_b2),
1057			]
1058		);
1059	}
1060
1061	// Test case where candidate is requested under two different identifiers at the same time.
1062	// Should result in `Outdated` error.
1063	#[test]
1064	fn handle_outdated_response_due_to_requests_for_different_identifiers() {
1065		let mut request_manager = RequestManager::new();
1066		let mut response_manager = ResponseManager::new();
1067
1068		let relay_parent = Hash::from_low_u64_le(1);
1069		let mut candidate_receipt = test_helpers::dummy_committed_candidate_receipt(relay_parent);
1070		let persisted_validation_data = dummy_pvd();
1071		candidate_receipt.descriptor.persisted_validation_data_hash =
1072			persisted_validation_data.hash();
1073		let candidate = candidate_receipt.hash();
1074		let candidate_receipt: CommittedCandidateReceipt = candidate_receipt.into();
1075		let requested_peer_1 = PeerId::random();
1076		let requested_peer_2 = PeerId::random();
1077
1078		let identifier1 = request_manager
1079			.get_or_insert(relay_parent, candidate, 1.into())
1080			.identifier
1081			.clone();
1082		request_manager
1083			.get_or_insert(relay_parent, candidate, 1.into())
1084			.add_peer(requested_peer_1);
1085		let identifier2 = request_manager
1086			.get_or_insert(relay_parent, candidate, 2.into())
1087			.identifier
1088			.clone();
1089		request_manager
1090			.get_or_insert(relay_parent, candidate, 2.into())
1091			.add_peer(requested_peer_2);
1092
1093		assert_ne!(identifier1, identifier2);
1094		assert_eq!(request_manager.requests.len(), 2);
1095
1096		let group_size = 3;
1097		let group = &[ValidatorIndex(0), ValidatorIndex(1), ValidatorIndex(2)];
1098
1099		let unwanted_mask = StatementFilter::blank(group_size);
1100		let disabled_mask: BitVec<u8, Lsb0> = Default::default();
1101		let request_properties = RequestProperties { unwanted_mask, backing_threshold: None };
1102
1103		// Get requests.
1104		{
1105			let request_props =
1106				|_identifier: &CandidateIdentifier| Some((&request_properties).clone());
1107			let peer_advertised = |_identifier: &CandidateIdentifier, _peer: &_| {
1108				Some(StatementFilter::full(group_size))
1109			};
1110
1111			let outgoing = request_manager
1112				.next_request(&mut response_manager, request_props, peer_advertised)
1113				.unwrap();
1114			assert_eq!(outgoing.payload.candidate_hash, candidate);
1115			let outgoing = request_manager
1116				.next_request(&mut response_manager, request_props, peer_advertised)
1117				.unwrap();
1118			assert_eq!(outgoing.payload.candidate_hash, candidate);
1119		}
1120
1121		// Validate first response.
1122		{
1123			let statements = vec![];
1124			let response = UnhandledResponse {
1125				response: TaggedResponse {
1126					identifier: identifier1,
1127					requested_peer: requested_peer_1,
1128					props: request_properties.clone(),
1129					response: Ok(AttestedCandidateResponse {
1130						candidate_receipt: candidate_receipt.clone().into(),
1131						persisted_validation_data: persisted_validation_data.clone(),
1132						statements,
1133					}),
1134				},
1135			};
1136			let validator_key_lookup = |_v| None;
1137			let allowed_para_lookup = |_para, _g_index| true;
1138			let statements = vec![];
1139			let output = response.validate_response(
1140				&mut request_manager,
1141				group,
1142				0,
1143				validator_key_lookup,
1144				allowed_para_lookup,
1145				disabled_mask.clone(),
1146				&Default::default(),
1147				false,
1148			);
1149			assert_eq!(
1150				output,
1151				ResponseValidationOutput {
1152					requested_peer: requested_peer_1,
1153					request_status: CandidateRequestStatus::Complete {
1154						candidate: candidate_receipt.clone(),
1155						persisted_validation_data: persisted_validation_data.clone(),
1156						statements,
1157					},
1158					reputation_changes: vec![(requested_peer_1, BENEFIT_VALID_RESPONSE)],
1159				}
1160			);
1161		}
1162
1163		// Try to validate second response.
1164		{
1165			let statements = vec![];
1166			let response = UnhandledResponse {
1167				response: TaggedResponse {
1168					identifier: identifier2,
1169					requested_peer: requested_peer_2,
1170					props: request_properties,
1171					response: Ok(AttestedCandidateResponse {
1172						candidate_receipt: candidate_receipt.clone().into(),
1173						persisted_validation_data: persisted_validation_data.clone(),
1174						statements,
1175					}),
1176				},
1177			};
1178			let validator_key_lookup = |_v| None;
1179			let allowed_para_lookup = |_para, _g_index| true;
1180			let output = response.validate_response(
1181				&mut request_manager,
1182				group,
1183				0,
1184				validator_key_lookup,
1185				allowed_para_lookup,
1186				disabled_mask,
1187				&Default::default(),
1188				false,
1189			);
1190			assert_eq!(
1191				output,
1192				ResponseValidationOutput {
1193					requested_peer: requested_peer_2,
1194					request_status: CandidateRequestStatus::Outdated,
1195					reputation_changes: vec![],
1196				}
1197			);
1198		}
1199
1200		assert_eq!(request_manager.requests.len(), 0);
1201	}
1202
1203	// Test case where we had a request in-flight and the request entry was garbage-collected on
1204	// outdated relay parent.
1205	#[test]
1206	fn handle_outdated_response_due_to_garbage_collection() {
1207		let mut request_manager = RequestManager::new();
1208		let mut response_manager = ResponseManager::new();
1209
1210		let relay_parent = Hash::from_low_u64_le(1);
1211		let mut candidate_receipt = test_helpers::dummy_committed_candidate_receipt(relay_parent);
1212		let persisted_validation_data = dummy_pvd();
1213		candidate_receipt.descriptor.persisted_validation_data_hash =
1214			persisted_validation_data.hash();
1215		let candidate = candidate_receipt.hash();
1216		let requested_peer = PeerId::random();
1217
1218		let identifier = request_manager
1219			.get_or_insert(relay_parent, candidate, 1.into())
1220			.identifier
1221			.clone();
1222		request_manager
1223			.get_or_insert(relay_parent, candidate, 1.into())
1224			.add_peer(requested_peer);
1225
1226		let group_size = 3;
1227		let group = &[ValidatorIndex(0), ValidatorIndex(1), ValidatorIndex(2)];
1228
1229		let unwanted_mask = StatementFilter::blank(group_size);
1230		let request_properties = RequestProperties { unwanted_mask, backing_threshold: None };
1231		let peer_advertised =
1232			|_identifier: &CandidateIdentifier, _peer: &_| Some(StatementFilter::full(group_size));
1233
1234		// Get request once successfully.
1235		{
1236			let request_props =
1237				|_identifier: &CandidateIdentifier| Some((&request_properties).clone());
1238
1239			let outgoing = request_manager
1240				.next_request(&mut response_manager, request_props, peer_advertised)
1241				.unwrap();
1242			assert_eq!(outgoing.payload.candidate_hash, candidate);
1243		}
1244
1245		// Garbage collect based on relay parent.
1246		request_manager.remove_by_relay_parent(relay_parent);
1247
1248		// Try to validate response.
1249		{
1250			let statements = vec![];
1251			let response = UnhandledResponse {
1252				response: TaggedResponse {
1253					identifier,
1254					requested_peer,
1255					props: request_properties,
1256					response: Ok(AttestedCandidateResponse {
1257						candidate_receipt: candidate_receipt.clone().into(),
1258						persisted_validation_data: persisted_validation_data.clone(),
1259						statements,
1260					}),
1261				},
1262			};
1263			let validator_key_lookup = |_v| None;
1264			let allowed_para_lookup = |_para, _g_index| true;
1265			let disabled_mask: BitVec<u8, Lsb0> = Default::default();
1266			let output = response.validate_response(
1267				&mut request_manager,
1268				group,
1269				0,
1270				validator_key_lookup,
1271				allowed_para_lookup,
1272				disabled_mask,
1273				&Default::default(),
1274				false,
1275			);
1276			assert_eq!(
1277				output,
1278				ResponseValidationOutput {
1279					requested_peer,
1280					request_status: CandidateRequestStatus::Outdated,
1281					reputation_changes: vec![],
1282				}
1283			);
1284		}
1285	}
1286
1287	#[test]
1288	fn should_clean_up_after_successful_requests() {
1289		let mut request_manager = RequestManager::new();
1290		let mut response_manager = ResponseManager::new();
1291
1292		let relay_parent = Hash::from_low_u64_le(1);
1293		let mut candidate_receipt = test_helpers::dummy_committed_candidate_receipt(relay_parent);
1294		let persisted_validation_data = dummy_pvd();
1295		candidate_receipt.descriptor.persisted_validation_data_hash =
1296			persisted_validation_data.hash();
1297		let candidate = candidate_receipt.hash();
1298		let requested_peer = PeerId::random();
1299
1300		let identifier = request_manager
1301			.get_or_insert(relay_parent, candidate, 1.into())
1302			.identifier
1303			.clone();
1304		request_manager
1305			.get_or_insert(relay_parent, candidate, 1.into())
1306			.add_peer(requested_peer);
1307
1308		assert_eq!(request_manager.requests.len(), 1);
1309		assert_eq!(request_manager.by_priority.len(), 1);
1310
1311		let group_size = 3;
1312		let group = &[ValidatorIndex(0), ValidatorIndex(1), ValidatorIndex(2)];
1313
1314		let unwanted_mask = StatementFilter::blank(group_size);
1315		let request_properties = RequestProperties { unwanted_mask, backing_threshold: None };
1316		let peer_advertised =
1317			|_identifier: &CandidateIdentifier, _peer: &_| Some(StatementFilter::full(group_size));
1318
1319		// Get request once successfully.
1320		{
1321			let request_props =
1322				|_identifier: &CandidateIdentifier| Some((&request_properties).clone());
1323
1324			let outgoing = request_manager
1325				.next_request(&mut response_manager, request_props, peer_advertised)
1326				.unwrap();
1327			assert_eq!(outgoing.payload.candidate_hash, candidate);
1328		}
1329
1330		// Validate response.
1331		{
1332			let statements = vec![];
1333			let response = UnhandledResponse {
1334				response: TaggedResponse {
1335					identifier,
1336					requested_peer,
1337					props: request_properties.clone(),
1338					response: Ok(AttestedCandidateResponse {
1339						candidate_receipt: candidate_receipt.clone().into(),
1340						persisted_validation_data: persisted_validation_data.clone(),
1341						statements,
1342					}),
1343				},
1344			};
1345			let validator_key_lookup = |_v| None;
1346			let allowed_para_lookup = |_para, _g_index| true;
1347			let statements = vec![];
1348			let disabled_mask: BitVec<u8, Lsb0> = Default::default();
1349			let output = response.validate_response(
1350				&mut request_manager,
1351				group,
1352				0,
1353				validator_key_lookup,
1354				allowed_para_lookup,
1355				disabled_mask,
1356				&Default::default(),
1357				false,
1358			);
1359			assert_eq!(
1360				output,
1361				ResponseValidationOutput {
1362					requested_peer,
1363					request_status: CandidateRequestStatus::Complete {
1364						candidate: candidate_receipt.clone().into(),
1365						persisted_validation_data: persisted_validation_data.clone(),
1366						statements,
1367					},
1368					reputation_changes: vec![(requested_peer, BENEFIT_VALID_RESPONSE)],
1369				}
1370			);
1371		}
1372
1373		// Ensure that cleanup occurred.
1374		assert_eq!(request_manager.requests.len(), 0);
1375		assert_eq!(request_manager.by_priority.len(), 0);
1376	}
1377
1378	// Test case where we queue 2 requests to be sent to the same peer and 1 request to another
1379	// peer. Same peer requests should be served one at a time but they should not block the other
1380	// peer request.
1381	#[test]
1382	fn rate_limit_requests_to_same_peer() {
1383		let mut request_manager = RequestManager::new();
1384		let mut response_manager = ResponseManager::new();
1385
1386		let relay_parent = Hash::from_low_u64_le(1);
1387
1388		// Create 3 candidates
1389		let mut candidate_receipt_1 = test_helpers::dummy_committed_candidate_receipt(relay_parent);
1390		let persisted_validation_data_1 = dummy_pvd();
1391		candidate_receipt_1.descriptor.persisted_validation_data_hash =
1392			persisted_validation_data_1.hash();
1393		let candidate_1 = candidate_receipt_1.hash();
1394
1395		let mut candidate_receipt_2 = test_helpers::dummy_committed_candidate_receipt(relay_parent);
1396		let persisted_validation_data_2 = dummy_pvd();
1397		candidate_receipt_2.descriptor.persisted_validation_data_hash =
1398			persisted_validation_data_2.hash();
1399		let candidate_2 = candidate_receipt_2.hash();
1400
1401		let mut candidate_receipt_3 = test_helpers::dummy_committed_candidate_receipt(relay_parent);
1402		let persisted_validation_data_3 = dummy_pvd();
1403		candidate_receipt_3.descriptor.persisted_validation_data_hash =
1404			persisted_validation_data_3.hash();
1405		let candidate_3 = candidate_receipt_3.hash();
1406
1407		// Create 2 peers
1408		let requested_peer_1 = PeerId::random();
1409		let requested_peer_2 = PeerId::random();
1410
1411		let group_size = 3;
1412		let group = &[ValidatorIndex(0), ValidatorIndex(1), ValidatorIndex(2)];
1413		let unwanted_mask = StatementFilter::blank(group_size);
1414		let disabled_mask: BitVec<u8, Lsb0> = Default::default();
1415		let request_properties = RequestProperties { unwanted_mask, backing_threshold: None };
1416		let request_props = |_identifier: &CandidateIdentifier| Some((&request_properties).clone());
1417		let peer_advertised =
1418			|_identifier: &CandidateIdentifier, _peer: &_| Some(StatementFilter::full(group_size));
1419
1420		// Add request for candidate 1 from peer 1
1421		let identifier1 = request_manager
1422			.get_or_insert(relay_parent, candidate_1, 1.into())
1423			.identifier
1424			.clone();
1425		request_manager
1426			.get_or_insert(relay_parent, candidate_1, 1.into())
1427			.add_peer(requested_peer_1);
1428
1429		// Add request for candidate 3 from peer 2 (this one can be served in parallel)
1430		let _identifier3 = request_manager
1431			.get_or_insert(relay_parent, candidate_3, 1.into())
1432			.identifier
1433			.clone();
1434		request_manager
1435			.get_or_insert(relay_parent, candidate_3, 1.into())
1436			.add_peer(requested_peer_2);
1437
1438		// Successfully dispatch request for candidate 1 from peer 1 and candidate 3 from peer 2
1439		for _ in 0..2 {
1440			let outgoing =
1441				request_manager.next_request(&mut response_manager, request_props, peer_advertised);
1442			assert!(outgoing.is_some());
1443		}
1444		assert_eq!(response_manager.active_peers.len(), 2);
1445		assert!(response_manager.is_sending_to(&requested_peer_1));
1446		assert!(response_manager.is_sending_to(&requested_peer_2));
1447		assert_eq!(request_manager.requests.len(), 2);
1448
1449		// Add request for candidate 2 from peer 1
1450		let _identifier2 = request_manager
1451			.get_or_insert(relay_parent, candidate_2, 1.into())
1452			.identifier
1453			.clone();
1454		request_manager
1455			.get_or_insert(relay_parent, candidate_2, 1.into())
1456			.add_peer(requested_peer_1);
1457
1458		// Do not dispatch the request for the second candidate from peer 1 (already serving that
1459		// peer)
1460		let outgoing =
1461			request_manager.next_request(&mut response_manager, request_props, peer_advertised);
1462		assert!(outgoing.is_none());
1463		assert_eq!(response_manager.active_peers.len(), 2);
1464		assert!(response_manager.is_sending_to(&requested_peer_1));
1465		assert!(response_manager.is_sending_to(&requested_peer_2));
1466		assert_eq!(request_manager.requests.len(), 3);
1467
1468		// Manually mark response received (response future resolved)
1469		response_manager.active_peers.remove(&requested_peer_1);
1470		response_manager.pending_responses = FuturesUnordered::new();
1471
1472		// Validate first response (candidate 1 from peer 1)
1473		{
1474			let statements = vec![];
1475			let response = UnhandledResponse {
1476				response: TaggedResponse {
1477					identifier: identifier1,
1478					requested_peer: requested_peer_1,
1479					props: request_properties.clone(),
1480					response: Ok(AttestedCandidateResponse {
1481						candidate_receipt: candidate_receipt_1.clone().into(),
1482						persisted_validation_data: persisted_validation_data_1.clone(),
1483						statements,
1484					}),
1485				},
1486			};
1487			let validator_key_lookup = |_v| None;
1488			let allowed_para_lookup = |_para, _g_index| true;
1489			let _output = response.validate_response(
1490				&mut request_manager,
1491				group,
1492				0,
1493				validator_key_lookup,
1494				allowed_para_lookup,
1495				disabled_mask.clone(),
1496				&Default::default(),
1497				false,
1498			);
1499
1500			// First request served successfully
1501			assert_eq!(request_manager.requests.len(), 2);
1502			assert_eq!(response_manager.active_peers.len(), 1);
1503			assert!(response_manager.is_sending_to(&requested_peer_2));
1504		}
1505
1506		// Check if the request that was ignored previously will be served now
1507		let outgoing =
1508			request_manager.next_request(&mut response_manager, request_props, peer_advertised);
1509		assert!(outgoing.is_some());
1510		assert_eq!(response_manager.active_peers.len(), 2);
1511		assert!(response_manager.is_sending_to(&requested_peer_1));
1512		assert!(response_manager.is_sending_to(&requested_peer_2));
1513		assert_eq!(request_manager.requests.len(), 2);
1514	}
1515}