referrerpolicy=no-referrer-when-downgrade

polkadot_collator_protocol/validator_side/
mod.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use futures::{
18	channel::oneshot, future::BoxFuture, select, stream::FuturesUnordered, FutureExt, StreamExt,
19};
20use futures_timer::Delay;
21use std::{
22	collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
23	future::Future,
24	time::{Duration, Instant},
25};
26use tokio_util::sync::CancellationToken;
27
28use sp_keystore::KeystorePtr;
29
30use polkadot_node_network_protocol::{
31	self as net_protocol,
32	peer_set::{CollationVersion, PeerSet},
33	request_response::{
34		outgoing::{Recipient, RequestError},
35		v1 as request_v1, v2 as request_v2, OutgoingRequest, Requests,
36	},
37	v1 as protocol_v1, v2 as protocol_v2, CollationProtocols, OurView, PeerId,
38	UnifiedReputationChange as Rep, View,
39};
40use polkadot_node_primitives::{SignedFullStatement, Statement};
41use polkadot_node_subsystem::{
42	messages::{
43		CanSecondRequest, CandidateBackingMessage, CollatorProtocolMessage, IfDisconnected,
44		NetworkBridgeEvent, NetworkBridgeTxMessage, ParentHeadData, ProspectiveParachainsMessage,
45		ProspectiveValidationDataRequest,
46	},
47	overseer, CollatorProtocolSenderTrait, FromOrchestra, OverseerSignal,
48};
49use polkadot_node_subsystem_util::{
50	backing_implicit_view::View as ImplicitView,
51	reputation::{ReputationAggregator, REPUTATION_CHANGE_INTERVAL},
52	request_claim_queue, request_node_features, request_session_index_for_child,
53};
54use polkadot_primitives::{
55	node_features, CandidateDescriptorV2, CandidateDescriptorVersion, CandidateHash, CollatorId,
56	CoreIndex, Hash, HeadData, Id as ParaId, OccupiedCoreAssumption, PersistedValidationData,
57	SessionIndex,
58};
59
60use super::{modify_reputation, tick_stream, LOG_TARGET};
61
62mod claim_queue_state;
63mod collation;
64mod error;
65mod metrics;
66
67use claim_queue_state::ClaimQueueState;
68use collation::{
69	fetched_collation_sanity_check, BlockedCollationId, CollationEvent, CollationFetchError,
70	CollationFetchRequest, CollationStatus, Collations, FetchedCollation, PendingCollation,
71	PendingCollationFetch, ProspectiveCandidate,
72};
73use error::{Error, FetchError, Result, SecondingError};
74
75#[cfg(test)]
76mod tests;
77
78pub use metrics::Metrics;
79
80const COST_UNEXPECTED_MESSAGE: Rep = Rep::CostMinor("An unexpected message");
81/// Message could not be decoded properly.
82const COST_CORRUPTED_MESSAGE: Rep = Rep::CostMinor("Message was corrupt");
83/// Network errors that originated at the remote host should have same cost as timeout.
84const COST_NETWORK_ERROR: Rep = Rep::CostMinor("Some network error");
85const COST_INVALID_SIGNATURE: Rep = Rep::Malicious("Invalid network message signature");
86const COST_REPORT_BAD: Rep = Rep::Malicious("A collator was reported by another subsystem");
87const COST_WRONG_PARA: Rep = Rep::Malicious("A collator provided a collation for the wrong para");
88const COST_PROTOCOL_MISUSE: Rep =
89	Rep::Malicious("A collator advertising a collation for an async backing relay parent using V1");
90const COST_UNNEEDED_COLLATOR: Rep = Rep::CostMinor("An unneeded collator connected");
91const BENEFIT_NOTIFY_GOOD: Rep =
92	Rep::BenefitMinor("A collator was noted good by another subsystem");
93
94/// Time after starting a collation download from a collator we will start another one from the
95/// next collator even if the upload was not finished yet.
96///
97/// This is to protect from a single slow collator preventing collations from happening.
98///
99/// With a collation size of 5MB and bandwidth of 500Mbit/s (requirement for Kusama validators),
100/// the transfer should be possible within 0.1 seconds. 400 milliseconds should therefore be
101/// plenty, even with multiple heads and should be low enough for later collators to still be able
102/// to finish on time.
103///
104/// There is debug logging output, so we can adjust this value based on production results.
105#[cfg(not(test))]
106const MAX_UNSHARED_DOWNLOAD_TIME: Duration = Duration::from_millis(400);
107
108// How often to check all peers with activity.
109#[cfg(not(test))]
110const ACTIVITY_POLL: Duration = Duration::from_secs(1);
111
112#[cfg(test)]
113const MAX_UNSHARED_DOWNLOAD_TIME: Duration = Duration::from_millis(100);
114
115#[cfg(test)]
116const ACTIVITY_POLL: Duration = Duration::from_millis(10);
117
118#[derive(Debug)]
119struct CollatingPeerState {
120	collator_id: CollatorId,
121	para_id: ParaId,
122	/// Collations advertised by peer per relay parent.
123	///
124	/// V1 network protocol doesn't include candidate hash in
125	/// advertisements, we store an empty set in this case to occupy
126	/// a slot in map.
127	advertisements: HashMap<Hash, HashSet<CandidateHash>>,
128	last_active: Instant,
129}
130
131#[derive(Debug)]
132enum PeerState {
133	// The peer has connected at the given instant.
134	Connected(Instant),
135	// Peer is collating.
136	Collating(CollatingPeerState),
137}
138
139#[derive(Debug)]
140enum InsertAdvertisementError {
141	/// Advertisement is already known.
142	Duplicate,
143	/// Collation relay parent is out of our view.
144	OutOfOurView,
145	/// No prior declare message received.
146	UndeclaredCollator,
147	/// A limit for announcements per peer is reached.
148	PeerLimitReached,
149}
150
151#[derive(Debug)]
152struct PeerData {
153	view: View,
154	state: PeerState,
155	version: CollationVersion,
156}
157
158impl PeerData {
159	/// Update the view, clearing all advertisements that are no longer in the
160	/// current view.
161	fn update_view(
162		&mut self,
163		implicit_view: &ImplicitView,
164		active_leaves: &HashSet<Hash>,
165		new_view: View,
166	) {
167		let old_view = std::mem::replace(&mut self.view, new_view);
168		if let PeerState::Collating(ref mut peer_state) = self.state {
169			for removed in old_view.difference(&self.view) {
170				// Remove relay parent advertisements if it went out of our (implicit) view.
171				let keep = is_relay_parent_in_implicit_view(
172					removed,
173					implicit_view,
174					active_leaves,
175					peer_state.para_id,
176				);
177
178				if !keep {
179					peer_state.advertisements.remove(&removed);
180				}
181			}
182		}
183	}
184
185	/// Prune old advertisements relative to our view.
186	fn prune_old_advertisements(
187		&mut self,
188		implicit_view: &ImplicitView,
189		active_leaves: &HashSet<Hash>,
190	) {
191		if let PeerState::Collating(ref mut peer_state) = self.state {
192			peer_state.advertisements.retain(|hash, _| {
193				// Either
194				// - Relay parent is an active leaf
195				// - It belongs to allowed ancestry under some leaf
196				// Discard otherwise.
197				is_relay_parent_in_implicit_view(
198					hash,
199					implicit_view,
200					active_leaves,
201					peer_state.para_id,
202				)
203			});
204		}
205	}
206
207	/// Performs sanity check for an advertisement and notes it as advertised.
208	fn insert_advertisement(
209		&mut self,
210		on_relay_parent: Hash,
211		candidate_hash: Option<CandidateHash>,
212		implicit_view: &ImplicitView,
213		active_leaves: &HashSet<Hash>,
214		per_relay_parent: &PerRelayParent,
215	) -> std::result::Result<(CollatorId, ParaId), InsertAdvertisementError> {
216		match self.state {
217			PeerState::Connected(_) => Err(InsertAdvertisementError::UndeclaredCollator),
218			PeerState::Collating(ref mut state) => {
219				if !is_relay_parent_in_implicit_view(
220					&on_relay_parent,
221					implicit_view,
222					active_leaves,
223					state.para_id,
224				) {
225					return Err(InsertAdvertisementError::OutOfOurView)
226				}
227
228				if let Some(candidate_hash) = candidate_hash {
229					if state
230						.advertisements
231						.get(&on_relay_parent)
232						.map_or(false, |candidates| candidates.contains(&candidate_hash))
233					{
234						return Err(InsertAdvertisementError::Duplicate)
235					}
236
237					let candidates = state.advertisements.entry(on_relay_parent).or_default();
238
239					// Current assignments is equal to the length of the claim queue. No honest
240					// collator should send that many advertisements.
241					if candidates.len() > per_relay_parent.assignment.current.len() {
242						return Err(InsertAdvertisementError::PeerLimitReached)
243					}
244
245					candidates.insert(candidate_hash);
246				} else {
247					if self.version != CollationVersion::V1 {
248						gum::error!(
249							target: LOG_TARGET,
250							"Programming error, `candidate_hash` can not be `None` \
251							 for non `V1` networking.",
252						);
253					}
254
255					if state.advertisements.contains_key(&on_relay_parent) {
256						return Err(InsertAdvertisementError::Duplicate)
257					}
258
259					state
260						.advertisements
261						.insert(on_relay_parent, HashSet::from_iter(candidate_hash));
262				};
263
264				state.last_active = Instant::now();
265				Ok((state.collator_id.clone(), state.para_id))
266			},
267		}
268	}
269
270	/// Whether a peer is collating.
271	fn is_collating(&self) -> bool {
272		match self.state {
273			PeerState::Connected(_) => false,
274			PeerState::Collating(_) => true,
275		}
276	}
277
278	/// Note that a peer is now collating with the given collator and para id.
279	///
280	/// This will overwrite any previous call to `set_collating` and should only be called
281	/// if `is_collating` is false.
282	fn set_collating(&mut self, collator_id: CollatorId, para_id: ParaId) {
283		self.state = PeerState::Collating(CollatingPeerState {
284			collator_id,
285			para_id,
286			advertisements: HashMap::new(),
287			last_active: Instant::now(),
288		});
289	}
290
291	fn collator_id(&self) -> Option<&CollatorId> {
292		match self.state {
293			PeerState::Connected(_) => None,
294			PeerState::Collating(ref state) => Some(&state.collator_id),
295		}
296	}
297
298	fn collating_para(&self) -> Option<ParaId> {
299		match self.state {
300			PeerState::Connected(_) => None,
301			PeerState::Collating(ref state) => Some(state.para_id),
302		}
303	}
304
305	/// Whether the peer has advertised the given collation.
306	fn has_advertised(
307		&self,
308		relay_parent: &Hash,
309		maybe_candidate_hash: Option<CandidateHash>,
310	) -> bool {
311		let collating_state = match self.state {
312			PeerState::Connected(_) => return false,
313			PeerState::Collating(ref state) => state,
314		};
315
316		if let Some(ref candidate_hash) = maybe_candidate_hash {
317			collating_state
318				.advertisements
319				.get(relay_parent)
320				.map_or(false, |candidates| candidates.contains(candidate_hash))
321		} else {
322			collating_state.advertisements.contains_key(relay_parent)
323		}
324	}
325
326	/// Whether the peer is now inactive according to the current instant and the eviction policy.
327	fn is_inactive(&self, policy: &crate::CollatorEvictionPolicy) -> bool {
328		match self.state {
329			PeerState::Connected(connected_at) => connected_at.elapsed() >= policy.undeclared,
330			PeerState::Collating(ref state) =>
331				state.last_active.elapsed() >= policy.inactive_collator,
332		}
333	}
334}
335
336#[derive(Debug)]
337struct GroupAssignments {
338	/// Current assignments.
339	current: Vec<ParaId>,
340}
341
342struct PerRelayParent {
343	assignment: GroupAssignments,
344	collations: Collations,
345	v2_receipts: bool,
346	current_core: CoreIndex,
347	session_index: SessionIndex,
348}
349
350/// All state relevant for the validator side of the protocol lives here.
351#[derive(Default)]
352struct State {
353	/// Leaves that do support asynchronous backing along with
354	/// implicit ancestry. Leaves from the implicit view are present in
355	/// `active_leaves`, the opposite doesn't hold true.
356	///
357	/// Relay-chain blocks which don't support prospective parachains are
358	/// never included in the fragment chains of active leaves which do. In
359	/// particular, this means that if a given relay parent belongs to implicit
360	/// ancestry of some active leaf, then it does support prospective parachains.
361	implicit_view: ImplicitView,
362
363	/// All active leaves observed by us. This works as a replacement for
364	/// [`polkadot_node_network_protocol::View`] and can be dropped once the transition
365	/// to asynchronous backing is done.
366	active_leaves: HashSet<Hash>,
367
368	/// State tracked per relay parent.
369	per_relay_parent: HashMap<Hash, PerRelayParent>,
370
371	/// Track all active collators and their data.
372	peer_data: HashMap<PeerId, PeerData>,
373
374	/// Parachains we're currently assigned to. With async backing enabled
375	/// this includes assignments from the implicit view.
376	current_assignments: HashMap<ParaId, usize>,
377
378	/// The collations we have requested from collators.
379	collation_requests: FuturesUnordered<CollationFetchRequest>,
380
381	/// Cancellation handles for the collation fetch requests.
382	collation_requests_cancel_handles: HashMap<PendingCollation, CancellationToken>,
383
384	/// Metrics.
385	metrics: Metrics,
386
387	/// When a timer in this `FuturesUnordered` triggers, we should dequeue the next request
388	/// attempt in the corresponding `collations_per_relay_parent`.
389	///
390	/// A triggering timer means that the fetching took too long for our taste and we should give
391	/// another collator the chance to be faster (dequeue next fetch request as well).
392	collation_fetch_timeouts:
393		FuturesUnordered<BoxFuture<'static, (CollatorId, Option<CandidateHash>, Hash)>>,
394
395	/// Collations that we have successfully requested from peers and waiting
396	/// on validation.
397	fetched_candidates: HashMap<FetchedCollation, CollationEvent>,
398
399	/// Collations which we haven't been able to second due to their parent not being known by
400	/// prospective-parachains. Mapped from the paraid and parent_head_hash to the fetched
401	/// collation data. Only needed for async backing. For elastic scaling, the fetched collation
402	/// must contain the full parent head data.
403	blocked_from_seconding: HashMap<BlockedCollationId, Vec<PendingCollationFetch>>,
404
405	/// Aggregated reputation change
406	reputation: ReputationAggregator,
407}
408
409impl State {
410	// Returns the number of seconded and pending collations for a specific `ParaId`. Pending
411	// collations are:
412	// 1. Collations being fetched from a collator.
413	// 2. Collations waiting for validation from backing subsystem.
414	// 3. Collations blocked from seconding due to parent not being known by backing subsystem.
415	fn seconded_and_pending_for_para(&self, relay_parent: &Hash, para_id: &ParaId) -> usize {
416		let seconded = self
417			.per_relay_parent
418			.get(relay_parent)
419			.map_or(0, |per_relay_parent| per_relay_parent.collations.seconded_for_para(para_id));
420
421		let pending_fetch = self.per_relay_parent.get(relay_parent).map_or(0, |rp_state| {
422			match rp_state.collations.status {
423				CollationStatus::Fetching(pending_para_id) if pending_para_id == *para_id => 1,
424				_ => 0,
425			}
426		});
427
428		let waiting_for_validation = self
429			.fetched_candidates
430			.keys()
431			.filter(|fc| fc.relay_parent == *relay_parent && fc.para_id == *para_id)
432			.count();
433
434		let blocked_from_seconding =
435			self.blocked_from_seconding.values().fold(0, |acc, blocked_collations| {
436				acc + blocked_collations
437					.iter()
438					.filter(|pc| {
439						pc.candidate_receipt.descriptor.para_id() == *para_id &&
440							pc.candidate_receipt.descriptor.relay_parent() == *relay_parent
441					})
442					.count()
443			});
444
445		gum::trace!(
446			target: LOG_TARGET,
447			?relay_parent,
448			?para_id,
449			seconded,
450			pending_fetch,
451			waiting_for_validation,
452			blocked_from_seconding,
453			"Seconded and pending collations for para",
454		);
455
456		seconded + pending_fetch + waiting_for_validation + blocked_from_seconding
457	}
458}
459
460fn is_relay_parent_in_implicit_view(
461	relay_parent: &Hash,
462	implicit_view: &ImplicitView,
463	active_leaves: &HashSet<Hash>,
464	para_id: ParaId,
465) -> bool {
466	active_leaves.iter().any(|hash| {
467		implicit_view
468			.known_allowed_relay_parents_under(hash, Some(para_id))
469			.unwrap_or_default()
470			.contains(relay_parent)
471	})
472}
473
474async fn construct_per_relay_parent<Sender>(
475	sender: &mut Sender,
476	current_assignments: &mut HashMap<ParaId, usize>,
477	keystore: &KeystorePtr,
478	relay_parent: Hash,
479	v2_receipts: bool,
480	session_index: SessionIndex,
481) -> Result<Option<PerRelayParent>>
482where
483	Sender: CollatorProtocolSenderTrait,
484{
485	let validators = polkadot_node_subsystem_util::request_validators(relay_parent, sender)
486		.await
487		.await
488		.map_err(Error::CancelledActiveValidators)??;
489
490	let (groups, rotation_info) =
491		polkadot_node_subsystem_util::request_validator_groups(relay_parent, sender)
492			.await
493			.await
494			.map_err(Error::CancelledValidatorGroups)??;
495
496	let core_now = if let Some(group) =
497		polkadot_node_subsystem_util::signing_key_and_index(&validators, keystore).and_then(
498			|(_, index)| polkadot_node_subsystem_util::find_validator_group(&groups, index),
499		) {
500		rotation_info.core_for_group(group, groups.len())
501	} else {
502		gum::trace!(target: LOG_TARGET, ?relay_parent, "Not a validator");
503		return Ok(None)
504	};
505
506	let mut claim_queue = request_claim_queue(relay_parent, sender)
507		.await
508		.await
509		.map_err(Error::CancelledClaimQueue)??;
510
511	let assigned_paras = claim_queue.remove(&core_now).unwrap_or_else(|| VecDeque::new());
512
513	for para_id in assigned_paras.iter() {
514		let entry = current_assignments.entry(*para_id).or_default();
515		*entry += 1;
516		if *entry == 1 {
517			gum::debug!(
518				target: LOG_TARGET,
519				?relay_parent,
520				para_id = ?para_id,
521				"Assigned to a parachain",
522			);
523		}
524	}
525
526	let assignment = GroupAssignments { current: assigned_paras.into_iter().collect() };
527	let collations = Collations::new(&assignment.current);
528
529	Ok(Some(PerRelayParent {
530		assignment,
531		collations,
532		v2_receipts,
533		session_index,
534		current_core: core_now,
535	}))
536}
537
538fn remove_outgoing(
539	current_assignments: &mut HashMap<ParaId, usize>,
540	per_relay_parent: PerRelayParent,
541) {
542	let GroupAssignments { current, .. } = per_relay_parent.assignment;
543
544	for cur in current {
545		if let Entry::Occupied(mut occupied) = current_assignments.entry(cur) {
546			*occupied.get_mut() -= 1;
547			if *occupied.get() == 0 {
548				occupied.remove_entry();
549				gum::debug!(
550					target: LOG_TARGET,
551					para_id = ?cur,
552					"Unassigned from a parachain",
553				);
554			}
555		}
556	}
557}
558
559// O(n) search for collator ID by iterating through the peers map. This should be fast enough
560// unless a large amount of peers is expected.
561fn collator_peer_id(
562	peer_data: &HashMap<PeerId, PeerData>,
563	collator_id: &CollatorId,
564) -> Option<PeerId> {
565	peer_data
566		.iter()
567		.find_map(|(peer, data)| data.collator_id().filter(|c| c == &collator_id).map(|_| *peer))
568}
569
570async fn disconnect_peer(sender: &mut impl overseer::CollatorProtocolSenderTrait, peer_id: PeerId) {
571	sender
572		.send_message(NetworkBridgeTxMessage::DisconnectPeers(vec![peer_id], PeerSet::Collation))
573		.await
574}
575
576/// Another subsystem has requested to fetch collations on a particular leaf for some para.
577async fn fetch_collation(
578	sender: &mut impl overseer::CollatorProtocolSenderTrait,
579	state: &mut State,
580	pc: PendingCollation,
581	id: CollatorId,
582) -> std::result::Result<(), FetchError> {
583	let PendingCollation { relay_parent, peer_id, prospective_candidate, .. } = pc;
584	let candidate_hash = prospective_candidate.as_ref().map(ProspectiveCandidate::candidate_hash);
585
586	let peer_data = state.peer_data.get(&peer_id).ok_or(FetchError::UnknownPeer)?;
587
588	if peer_data.has_advertised(&relay_parent, candidate_hash) {
589		request_collation(sender, state, pc, id.clone(), peer_data.version).await?;
590		let timeout = |collator_id, candidate_hash, relay_parent| async move {
591			Delay::new(MAX_UNSHARED_DOWNLOAD_TIME).await;
592			(collator_id, candidate_hash, relay_parent)
593		};
594		state
595			.collation_fetch_timeouts
596			.push(timeout(id.clone(), candidate_hash, relay_parent).boxed());
597
598		Ok(())
599	} else {
600		Err(FetchError::NotAdvertised)
601	}
602}
603
604/// Report a collator for some malicious actions.
605async fn report_collator(
606	reputation: &mut ReputationAggregator,
607	sender: &mut impl overseer::CollatorProtocolSenderTrait,
608	peer_data: &HashMap<PeerId, PeerData>,
609	id: CollatorId,
610) {
611	if let Some(peer_id) = collator_peer_id(peer_data, &id) {
612		modify_reputation(reputation, sender, peer_id, COST_REPORT_BAD).await;
613	}
614}
615
616/// Some other subsystem has reported a collator as a good one, bump reputation.
617async fn note_good_collation(
618	reputation: &mut ReputationAggregator,
619	sender: &mut impl overseer::CollatorProtocolSenderTrait,
620	peer_data: &HashMap<PeerId, PeerData>,
621	id: CollatorId,
622) {
623	if let Some(peer_id) = collator_peer_id(peer_data, &id) {
624		modify_reputation(reputation, sender, peer_id, BENEFIT_NOTIFY_GOOD).await;
625	}
626}
627
628/// Notify a collator that its collation got seconded.
629async fn notify_collation_seconded(
630	sender: &mut impl overseer::CollatorProtocolSenderTrait,
631	peer_id: PeerId,
632	version: CollationVersion,
633	relay_parent: Hash,
634	statement: SignedFullStatement,
635) {
636	let statement = statement.into();
637	let wire_message = match version {
638		CollationVersion::V1 =>
639			CollationProtocols::V1(protocol_v1::CollationProtocol::CollatorProtocol(
640				protocol_v1::CollatorProtocolMessage::CollationSeconded(relay_parent, statement),
641			)),
642		CollationVersion::V2 =>
643			CollationProtocols::V2(protocol_v2::CollationProtocol::CollatorProtocol(
644				protocol_v2::CollatorProtocolMessage::CollationSeconded(relay_parent, statement),
645			)),
646	};
647	sender
648		.send_message(NetworkBridgeTxMessage::SendCollationMessage(vec![peer_id], wire_message))
649		.await;
650}
651
652/// A peer's view has changed. A number of things should be done:
653///  - Ongoing collation requests have to be canceled.
654///  - Advertisements by this peer that are no longer relevant have to be removed.
655fn handle_peer_view_change(state: &mut State, peer_id: PeerId, view: View) {
656	let peer_data = match state.peer_data.get_mut(&peer_id) {
657		Some(peer_data) => peer_data,
658		None => return,
659	};
660
661	peer_data.update_view(&state.implicit_view, &state.active_leaves, view);
662	state.collation_requests_cancel_handles.retain(|pc, handle| {
663		let keep = pc.peer_id != peer_id || peer_data.has_advertised(&pc.relay_parent, None);
664		if !keep {
665			handle.cancel();
666		}
667		keep
668	});
669}
670
671/// Request a collation from the network.
672/// This function will
673///  - Check for duplicate requests.
674///  - Check if the requested collation is in our view.
675/// And as such invocations of this function may rely on that.
676async fn request_collation(
677	sender: &mut impl overseer::CollatorProtocolSenderTrait,
678	state: &mut State,
679	pending_collation: PendingCollation,
680	collator_id: CollatorId,
681	peer_protocol_version: CollationVersion,
682) -> std::result::Result<(), FetchError> {
683	if state.collation_requests_cancel_handles.contains_key(&pending_collation) {
684		return Err(FetchError::AlreadyRequested)
685	}
686
687	let PendingCollation { relay_parent, para_id, peer_id, prospective_candidate, .. } =
688		pending_collation;
689	let per_relay_parent = state
690		.per_relay_parent
691		.get_mut(&relay_parent)
692		.ok_or(FetchError::RelayParentOutOfView)?;
693
694	let (requests, response_recv) = match (peer_protocol_version, prospective_candidate) {
695		(CollationVersion::V1, _) => {
696			let (req, response_recv) = OutgoingRequest::new(
697				Recipient::Peer(peer_id),
698				request_v1::CollationFetchingRequest { relay_parent, para_id },
699			);
700			let requests = Requests::CollationFetchingV1(req);
701			(requests, response_recv.boxed())
702		},
703		(CollationVersion::V2, Some(ProspectiveCandidate { candidate_hash, .. })) => {
704			let (req, response_recv) = OutgoingRequest::new(
705				Recipient::Peer(peer_id),
706				request_v2::CollationFetchingRequest { relay_parent, para_id, candidate_hash },
707			);
708			let requests = Requests::CollationFetchingV2(req);
709			(requests, response_recv.boxed())
710		},
711		_ => return Err(FetchError::ProtocolMismatch),
712	};
713
714	let cancellation_token = CancellationToken::new();
715	let collation_request = CollationFetchRequest {
716		pending_collation,
717		collator_id: collator_id.clone(),
718		collator_protocol_version: peer_protocol_version,
719		from_collator: response_recv,
720		cancellation_token: cancellation_token.clone(),
721		_lifetime_timer: state.metrics.time_collation_request_duration(),
722	};
723
724	state.collation_requests.push(collation_request);
725	state
726		.collation_requests_cancel_handles
727		.insert(pending_collation, cancellation_token);
728
729	gum::debug!(
730		target: LOG_TARGET,
731		peer_id = %peer_id,
732		%para_id,
733		?relay_parent,
734		"Requesting collation",
735	);
736
737	let maybe_candidate_hash =
738		prospective_candidate.as_ref().map(ProspectiveCandidate::candidate_hash);
739	per_relay_parent.collations.status = CollationStatus::Fetching(para_id);
740	per_relay_parent
741		.collations
742		.fetching_from
743		.replace((collator_id, maybe_candidate_hash));
744
745	sender
746		.send_message(NetworkBridgeTxMessage::SendRequests(
747			vec![requests],
748			IfDisconnected::ImmediateError,
749		))
750		.await;
751	Ok(())
752}
753
754/// Networking message has been received.
755#[overseer::contextbounds(CollatorProtocol, prefix = overseer)]
756async fn process_incoming_peer_message<Context>(
757	ctx: &mut Context,
758	state: &mut State,
759	origin: PeerId,
760	msg: CollationProtocols<
761		protocol_v1::CollatorProtocolMessage,
762		protocol_v2::CollatorProtocolMessage,
763	>,
764) {
765	use protocol_v1::CollatorProtocolMessage as V1;
766	use protocol_v2::CollatorProtocolMessage as V2;
767	use sp_runtime::traits::AppVerify;
768
769	match msg {
770		CollationProtocols::V1(V1::Declare(collator_id, para_id, signature)) |
771		CollationProtocols::V2(V2::Declare(collator_id, para_id, signature)) => {
772			if collator_peer_id(&state.peer_data, &collator_id).is_some() {
773				modify_reputation(
774					&mut state.reputation,
775					ctx.sender(),
776					origin,
777					COST_UNEXPECTED_MESSAGE,
778				)
779				.await;
780				return
781			}
782
783			let peer_data = match state.peer_data.get_mut(&origin) {
784				Some(p) => p,
785				None => {
786					gum::debug!(
787						target: LOG_TARGET,
788						peer_id = ?origin,
789						?para_id,
790						"Unknown peer",
791					);
792					modify_reputation(
793						&mut state.reputation,
794						ctx.sender(),
795						origin,
796						COST_UNEXPECTED_MESSAGE,
797					)
798					.await;
799					return
800				},
801			};
802
803			if peer_data.is_collating() {
804				gum::debug!(
805					target: LOG_TARGET,
806					peer_id = ?origin,
807					?para_id,
808					"Peer is already in the collating state",
809				);
810				modify_reputation(
811					&mut state.reputation,
812					ctx.sender(),
813					origin,
814					COST_UNEXPECTED_MESSAGE,
815				)
816				.await;
817				return
818			}
819
820			if !signature.verify(&*protocol_v1::declare_signature_payload(&origin), &collator_id) {
821				gum::debug!(
822					target: LOG_TARGET,
823					peer_id = ?origin,
824					?para_id,
825					"Signature verification failure",
826				);
827				modify_reputation(
828					&mut state.reputation,
829					ctx.sender(),
830					origin,
831					COST_INVALID_SIGNATURE,
832				)
833				.await;
834				return
835			}
836
837			if state.current_assignments.contains_key(&para_id) {
838				gum::debug!(
839					target: LOG_TARGET,
840					peer_id = ?origin,
841					?collator_id,
842					?para_id,
843					"Declared as collator for current para",
844				);
845
846				peer_data.set_collating(collator_id, para_id);
847			} else {
848				gum::debug!(
849					target: LOG_TARGET,
850					peer_id = ?origin,
851					?collator_id,
852					?para_id,
853					"Declared as collator for unneeded para. Current assignments: {:?}",
854					&state.current_assignments
855				);
856
857				modify_reputation(
858					&mut state.reputation,
859					ctx.sender(),
860					origin,
861					COST_UNNEEDED_COLLATOR,
862				)
863				.await;
864				gum::trace!(target: LOG_TARGET, "Disconnecting unneeded collator");
865				disconnect_peer(ctx.sender(), origin).await;
866			}
867		},
868		CollationProtocols::V1(V1::AdvertiseCollation(relay_parent)) =>
869			if let Err(err) =
870				handle_advertisement(ctx.sender(), state, relay_parent, origin, None).await
871			{
872				gum::debug!(
873					target: LOG_TARGET,
874					peer_id = ?origin,
875					?relay_parent,
876					error = ?err,
877					"Rejected v1 advertisement",
878				);
879
880				if let Some(rep) = err.reputation_changes() {
881					modify_reputation(&mut state.reputation, ctx.sender(), origin, rep).await;
882				}
883			},
884		CollationProtocols::V2(V2::AdvertiseCollation {
885			relay_parent,
886			candidate_hash,
887			parent_head_data_hash,
888		}) => {
889			if let Err(err) = handle_advertisement(
890				ctx.sender(),
891				state,
892				relay_parent,
893				origin,
894				Some((candidate_hash, parent_head_data_hash)),
895			)
896			.await
897			{
898				gum::debug!(
899					target: LOG_TARGET,
900					peer_id = ?origin,
901					?relay_parent,
902					?candidate_hash,
903					error = ?err,
904					"Rejected v2 advertisement",
905				);
906
907				if let Some(rep) = err.reputation_changes() {
908					modify_reputation(&mut state.reputation, ctx.sender(), origin, rep).await;
909				}
910			}
911		},
912		CollationProtocols::V1(V1::CollationSeconded(..)) |
913		CollationProtocols::V2(V2::CollationSeconded(..)) => {
914			gum::warn!(
915				target: LOG_TARGET,
916				peer_id = ?origin,
917				"Unexpected `CollationSeconded` message, decreasing reputation",
918			);
919
920			modify_reputation(&mut state.reputation, ctx.sender(), origin, COST_UNEXPECTED_MESSAGE)
921				.await;
922		},
923	}
924}
925
926#[derive(Debug)]
927enum AdvertisementError {
928	/// Relay parent is unknown.
929	RelayParentUnknown,
930	/// Peer is not present in the subsystem state.
931	UnknownPeer,
932	/// Peer has not declared its para id.
933	UndeclaredCollator,
934	/// We're assigned to a different para at the given relay parent.
935	InvalidAssignment,
936	/// Para reached a limit of seconded candidates for this relay parent.
937	SecondedLimitReached,
938	/// Collator trying to advertise a collation using V1 protocol for an async backing relay
939	/// parent.
940	ProtocolMisuse,
941	/// Advertisement is invalid.
942	#[allow(dead_code)]
943	Invalid(InsertAdvertisementError),
944	/// Seconding not allowed by backing subsystem
945	BlockedByBacking,
946}
947
948impl AdvertisementError {
949	fn reputation_changes(&self) -> Option<Rep> {
950		use AdvertisementError::*;
951		match self {
952			InvalidAssignment => Some(COST_WRONG_PARA),
953			ProtocolMisuse => Some(COST_PROTOCOL_MISUSE),
954			RelayParentUnknown | UndeclaredCollator | Invalid(_) => Some(COST_UNEXPECTED_MESSAGE),
955			UnknownPeer | SecondedLimitReached | BlockedByBacking => None,
956		}
957	}
958}
959
960// Requests backing to sanity check the advertisement.
961async fn can_second<Sender>(
962	sender: &mut Sender,
963	candidate_para_id: ParaId,
964	candidate_relay_parent: Hash,
965	candidate_hash: CandidateHash,
966	parent_head_data_hash: Hash,
967) -> bool
968where
969	Sender: CollatorProtocolSenderTrait,
970{
971	let request = CanSecondRequest {
972		candidate_para_id,
973		candidate_relay_parent,
974		candidate_hash,
975		parent_head_data_hash,
976	};
977	let (tx, rx) = oneshot::channel();
978	sender.send_message(CandidateBackingMessage::CanSecond(request, tx)).await;
979
980	rx.await.unwrap_or_else(|err| {
981		gum::warn!(
982			target: LOG_TARGET,
983			?err,
984			?candidate_relay_parent,
985			?candidate_para_id,
986			?candidate_hash,
987			"CanSecond-request responder was dropped",
988		);
989
990		false
991	})
992}
993
994// Try seconding any collations which were waiting on the validation of their parent
995#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
996async fn second_unblocked_collations<Context>(
997	ctx: &mut Context,
998	state: &mut State,
999	para_id: ParaId,
1000	head_data: HeadData,
1001	head_data_hash: Hash,
1002) {
1003	if let Some(unblocked_collations) = state
1004		.blocked_from_seconding
1005		.remove(&BlockedCollationId { para_id, parent_head_data_hash: head_data_hash })
1006	{
1007		if !unblocked_collations.is_empty() {
1008			gum::debug!(
1009				target: LOG_TARGET,
1010				"Candidate outputting head data with hash {} unblocked {} collations for seconding.",
1011				head_data_hash,
1012				unblocked_collations.len()
1013			);
1014		}
1015
1016		for mut unblocked_collation in unblocked_collations {
1017			unblocked_collation.maybe_parent_head_data = Some(head_data.clone());
1018			let peer_id = unblocked_collation.collation_event.pending_collation.peer_id;
1019			let relay_parent = unblocked_collation.candidate_receipt.descriptor.relay_parent();
1020
1021			if let Err(err) = kick_off_seconding(ctx, state, unblocked_collation).await {
1022				gum::warn!(
1023					target: LOG_TARGET,
1024					?relay_parent,
1025					?para_id,
1026					?peer_id,
1027					error = %err,
1028					"Seconding aborted due to an error",
1029				);
1030
1031				if err.is_malicious() {
1032					// Report malicious peer.
1033					modify_reputation(
1034						&mut state.reputation,
1035						ctx.sender(),
1036						peer_id,
1037						COST_REPORT_BAD,
1038					)
1039					.await;
1040				}
1041			}
1042		}
1043	}
1044}
1045
1046fn ensure_seconding_limit_is_respected(
1047	relay_parent: &Hash,
1048	para_id: ParaId,
1049	state: &State,
1050) -> std::result::Result<(), AdvertisementError> {
1051	let paths = state.implicit_view.paths_via_relay_parent(relay_parent);
1052
1053	gum::trace!(
1054		target: LOG_TARGET,
1055		?relay_parent,
1056		?para_id,
1057		?paths,
1058		"Checking seconding limit",
1059	);
1060
1061	let mut has_claim_at_some_path = false;
1062	for path in paths {
1063		let mut cq_state = ClaimQueueState::new();
1064		for ancestor in &path {
1065			let seconded_and_pending = state.seconded_and_pending_for_para(&ancestor, &para_id);
1066			cq_state.add_leaf(
1067				&ancestor,
1068				&state
1069					.per_relay_parent
1070					.get(ancestor)
1071					.ok_or(AdvertisementError::RelayParentUnknown)?
1072					.assignment
1073					.current,
1074			);
1075			for _ in 0..seconded_and_pending {
1076				cq_state.claim_at(ancestor, &para_id);
1077			}
1078		}
1079
1080		if cq_state.can_claim_at(relay_parent, &para_id) {
1081			gum::trace!(
1082				target: LOG_TARGET,
1083				?relay_parent,
1084				?para_id,
1085				?path,
1086				"Seconding limit respected at path",
1087			);
1088			has_claim_at_some_path = true;
1089			break
1090		}
1091	}
1092
1093	// If there is a place in the claim queue for the candidate at at least one path we will accept
1094	// it.
1095	if has_claim_at_some_path {
1096		Ok(())
1097	} else {
1098		Err(AdvertisementError::SecondedLimitReached)
1099	}
1100}
1101
1102async fn handle_advertisement<Sender>(
1103	sender: &mut Sender,
1104	state: &mut State,
1105	relay_parent: Hash,
1106	peer_id: PeerId,
1107	prospective_candidate: Option<(CandidateHash, Hash)>,
1108) -> std::result::Result<(), AdvertisementError>
1109where
1110	Sender: CollatorProtocolSenderTrait,
1111{
1112	let peer_data = state.peer_data.get_mut(&peer_id).ok_or(AdvertisementError::UnknownPeer)?;
1113
1114	if peer_data.version == CollationVersion::V1 && !state.active_leaves.contains(&relay_parent) {
1115		return Err(AdvertisementError::ProtocolMisuse)
1116	}
1117
1118	let per_relay_parent = state
1119		.per_relay_parent
1120		.get(&relay_parent)
1121		.ok_or(AdvertisementError::RelayParentUnknown)?;
1122
1123	let assignment = &per_relay_parent.assignment;
1124
1125	let collator_para_id =
1126		peer_data.collating_para().ok_or(AdvertisementError::UndeclaredCollator)?;
1127
1128	// Check if this is assigned to us.
1129	if !assignment.current.contains(&collator_para_id) {
1130		return Err(AdvertisementError::InvalidAssignment)
1131	}
1132
1133	// Always insert advertisements that pass all the checks for spam protection.
1134	let candidate_hash = prospective_candidate.map(|(hash, ..)| hash);
1135	let (collator_id, para_id) = peer_data
1136		.insert_advertisement(
1137			relay_parent,
1138			candidate_hash,
1139			&state.implicit_view,
1140			&state.active_leaves,
1141			&per_relay_parent,
1142		)
1143		.map_err(AdvertisementError::Invalid)?;
1144
1145	ensure_seconding_limit_is_respected(&relay_parent, para_id, state)?;
1146
1147	if let Some((candidate_hash, parent_head_data_hash)) = prospective_candidate {
1148		// Check if backing subsystem allows to second this candidate.
1149		//
1150		// This is also only important when async backing or elastic scaling is enabled.
1151		let can_second = can_second(
1152			sender,
1153			collator_para_id,
1154			relay_parent,
1155			candidate_hash,
1156			parent_head_data_hash,
1157		)
1158		.await;
1159
1160		if !can_second {
1161			return Err(AdvertisementError::BlockedByBacking)
1162		}
1163	}
1164
1165	let result = enqueue_collation(
1166		sender,
1167		state,
1168		relay_parent,
1169		para_id,
1170		peer_id,
1171		collator_id,
1172		prospective_candidate,
1173	)
1174	.await;
1175
1176	if let Err(fetch_error) = result {
1177		gum::debug!(
1178			target: LOG_TARGET,
1179			relay_parent = ?relay_parent,
1180			para_id = ?para_id,
1181			peer_id = ?peer_id,
1182			error = %fetch_error,
1183			"Failed to request advertised collation",
1184		);
1185	}
1186
1187	Ok(())
1188}
1189
1190/// Enqueue collation for fetching. The advertisement is expected to be validated and the seconding
1191/// limit checked.
1192async fn enqueue_collation<Sender>(
1193	sender: &mut Sender,
1194	state: &mut State,
1195	relay_parent: Hash,
1196	para_id: ParaId,
1197	peer_id: PeerId,
1198	collator_id: CollatorId,
1199	prospective_candidate: Option<(CandidateHash, Hash)>,
1200) -> std::result::Result<(), FetchError>
1201where
1202	Sender: CollatorProtocolSenderTrait,
1203{
1204	gum::debug!(
1205		target: LOG_TARGET,
1206		peer_id = ?peer_id,
1207		%para_id,
1208		?relay_parent,
1209		"Received advertise collation",
1210	);
1211	let per_relay_parent = match state.per_relay_parent.get_mut(&relay_parent) {
1212		Some(rp_state) => rp_state,
1213		None => {
1214			// Race happened, not an error.
1215			gum::trace!(
1216				target: LOG_TARGET,
1217				peer_id = ?peer_id,
1218				%para_id,
1219				?relay_parent,
1220				?prospective_candidate,
1221				"Candidate relay parent went out of view for valid advertisement",
1222			);
1223			return Ok(())
1224		},
1225	};
1226	let prospective_candidate =
1227		prospective_candidate.map(|(candidate_hash, parent_head_data_hash)| ProspectiveCandidate {
1228			candidate_hash,
1229			parent_head_data_hash,
1230		});
1231
1232	let collations = &mut per_relay_parent.collations;
1233	let pending_collation =
1234		PendingCollation::new(relay_parent, para_id, &peer_id, prospective_candidate);
1235
1236	match collations.status {
1237		CollationStatus::Fetching(_) | CollationStatus::WaitingOnValidation => {
1238			gum::trace!(
1239				target: LOG_TARGET,
1240				peer_id = ?peer_id,
1241				%para_id,
1242				?relay_parent,
1243				"Added collation to the pending list"
1244			);
1245			collations.add_to_waiting_queue((pending_collation, collator_id));
1246		},
1247		CollationStatus::Waiting => {
1248			// We were waiting for a collation to be advertised to us (we were idle) so we can fetch
1249			// the new collation immediately
1250			fetch_collation(sender, state, pending_collation, collator_id).await?;
1251		},
1252	}
1253
1254	Ok(())
1255}
1256
1257/// Our view has changed.
1258async fn handle_our_view_change<Sender>(
1259	sender: &mut Sender,
1260	state: &mut State,
1261	keystore: &KeystorePtr,
1262	view: OurView,
1263) -> Result<()>
1264where
1265	Sender: CollatorProtocolSenderTrait,
1266{
1267	let current_leaves = state.active_leaves.clone();
1268
1269	let removed = current_leaves.iter().filter(|h| !view.contains(h));
1270	let added = view.iter().filter(|h| !current_leaves.contains(h));
1271
1272	for leaf in added {
1273		let session_index = request_session_index_for_child(*leaf, sender)
1274			.await
1275			.await
1276			.map_err(Error::CancelledSessionIndex)??;
1277
1278		let v2_receipts = request_node_features(*leaf, session_index, sender)
1279			.await
1280			.await
1281			.map_err(Error::CancelledNodeFeatures)??
1282			.get(node_features::FeatureIndex::CandidateReceiptV2 as usize)
1283			.map(|b| *b)
1284			.unwrap_or(false);
1285
1286		let Some(per_relay_parent) = construct_per_relay_parent(
1287			sender,
1288			&mut state.current_assignments,
1289			keystore,
1290			*leaf,
1291			v2_receipts,
1292			session_index,
1293		)
1294		.await?
1295		else {
1296			continue
1297		};
1298
1299		state.active_leaves.insert(*leaf);
1300		state.per_relay_parent.insert(*leaf, per_relay_parent);
1301
1302		state
1303			.implicit_view
1304			.activate_leaf(sender, *leaf)
1305			.await
1306			.map_err(Error::ImplicitViewFetchError)?;
1307
1308		// Order is always descending.
1309		let allowed_ancestry = state
1310			.implicit_view
1311			.known_allowed_relay_parents_under(leaf, None)
1312			.unwrap_or_default();
1313		for block_hash in allowed_ancestry {
1314			if let Entry::Vacant(entry) = state.per_relay_parent.entry(*block_hash) {
1315				// Safe to use the same v2 receipts config for the allowed relay parents as well
1316				// as the same session index since they must be in the same session.
1317				if let Some(per_relay_parent) = construct_per_relay_parent(
1318					sender,
1319					&mut state.current_assignments,
1320					keystore,
1321					*block_hash,
1322					v2_receipts,
1323					session_index,
1324				)
1325				.await?
1326				{
1327					entry.insert(per_relay_parent);
1328				}
1329			}
1330		}
1331	}
1332
1333	for removed in removed {
1334		gum::trace!(
1335			target: LOG_TARGET,
1336			?view,
1337			?removed,
1338			"handle_our_view_change - removed",
1339		);
1340
1341		state.active_leaves.remove(removed);
1342		// If the leaf is deactivated it still may stay in the view as a part
1343		// of implicit ancestry. Only update the state after the hash is actually
1344		// pruned from the block info storage.
1345		let pruned = state.implicit_view.deactivate_leaf(*removed);
1346
1347		for removed in pruned {
1348			if let Some(per_relay_parent) = state.per_relay_parent.remove(&removed) {
1349				remove_outgoing(&mut state.current_assignments, per_relay_parent);
1350			}
1351
1352			state.collation_requests_cancel_handles.retain(|pc, handle| {
1353				let keep = pc.relay_parent != removed;
1354				if !keep {
1355					handle.cancel();
1356				}
1357				keep
1358			});
1359			state.fetched_candidates.retain(|k, _| k.relay_parent != removed);
1360		}
1361	}
1362
1363	// Remove blocked seconding requests that left the view.
1364	state.blocked_from_seconding.retain(|_, collations| {
1365		collations.retain(|collation| {
1366			state
1367				.per_relay_parent
1368				.contains_key(&collation.candidate_receipt.descriptor.relay_parent())
1369		});
1370
1371		!collations.is_empty()
1372	});
1373
1374	for (peer_id, peer_data) in state.peer_data.iter_mut() {
1375		peer_data.prune_old_advertisements(&state.implicit_view, &state.active_leaves);
1376
1377		// Disconnect peers who are not relevant to our current or next para.
1378		//
1379		// If the peer hasn't declared yet, they will be disconnected if they do not
1380		// declare.
1381		if let Some(para_id) = peer_data.collating_para() {
1382			if !state.current_assignments.contains_key(&para_id) {
1383				gum::trace!(
1384					target: LOG_TARGET,
1385					?peer_id,
1386					?para_id,
1387					"Disconnecting peer on view change (not current parachain id)"
1388				);
1389				disconnect_peer(sender, *peer_id).await;
1390			}
1391		}
1392	}
1393
1394	Ok(())
1395}
1396
1397/// Bridge event switch.
1398#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
1399async fn handle_network_msg<Context>(
1400	ctx: &mut Context,
1401	state: &mut State,
1402	keystore: &KeystorePtr,
1403	bridge_message: NetworkBridgeEvent<net_protocol::CollatorProtocolMessage>,
1404) -> Result<()> {
1405	use NetworkBridgeEvent::*;
1406
1407	match bridge_message {
1408		PeerConnected(peer_id, observed_role, protocol_version, _) => {
1409			let version = match protocol_version.try_into() {
1410				Ok(version) => version,
1411				Err(err) => {
1412					// Network bridge is expected to handle this.
1413					gum::error!(
1414						target: LOG_TARGET,
1415						?peer_id,
1416						?observed_role,
1417						?err,
1418						"Unsupported protocol version"
1419					);
1420					return Ok(())
1421				},
1422			};
1423			state.peer_data.entry(peer_id).or_insert_with(|| PeerData {
1424				view: View::default(),
1425				state: PeerState::Connected(Instant::now()),
1426				version,
1427			});
1428			state.metrics.note_collator_peer_count(state.peer_data.len());
1429		},
1430		PeerDisconnected(peer_id) => {
1431			state.peer_data.remove(&peer_id);
1432			state.metrics.note_collator_peer_count(state.peer_data.len());
1433		},
1434		NewGossipTopology { .. } => {
1435			// impossible!
1436		},
1437		PeerViewChange(peer_id, view) => {
1438			handle_peer_view_change(state, peer_id, view);
1439		},
1440		OurViewChange(view) => {
1441			handle_our_view_change(ctx.sender(), state, keystore, view).await?;
1442		},
1443		PeerMessage(remote, msg) => {
1444			process_incoming_peer_message(ctx, state, remote, msg).await;
1445		},
1446		UpdatedAuthorityIds { .. } => {
1447			// The validator side doesn't deal with `AuthorityDiscoveryId`s.
1448		},
1449	}
1450
1451	Ok(())
1452}
1453
1454/// The main message receiver switch.
1455#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
1456async fn process_msg<Context>(
1457	ctx: &mut Context,
1458	keystore: &KeystorePtr,
1459	msg: CollatorProtocolMessage,
1460	state: &mut State,
1461) {
1462	use CollatorProtocolMessage::*;
1463
1464	let _timer = state.metrics.time_process_msg();
1465
1466	match msg {
1467		CollateOn(id) => {
1468			gum::warn!(
1469				target: LOG_TARGET,
1470				para_id = %id,
1471				"CollateOn message is not expected on the validator side of the protocol",
1472			);
1473		},
1474		DistributeCollation { .. } => {
1475			gum::warn!(
1476				target: LOG_TARGET,
1477				"DistributeCollation message is not expected on the validator side of the protocol",
1478			);
1479		},
1480		NetworkBridgeUpdate(event) => {
1481			if let Err(e) = handle_network_msg(ctx, state, keystore, event).await {
1482				gum::warn!(
1483					target: LOG_TARGET,
1484					err = ?e,
1485					"Failed to handle incoming network message",
1486				);
1487			}
1488		},
1489		Seconded(parent, stmt) => {
1490			let receipt = match stmt.payload() {
1491				Statement::Seconded(receipt) => receipt,
1492				Statement::Valid(_) => {
1493					gum::warn!(
1494						target: LOG_TARGET,
1495						?stmt,
1496						relay_parent = %parent,
1497						"Seconded message received with a `Valid` statement",
1498					);
1499					return
1500				},
1501			};
1502			let output_head_data = receipt.commitments.head_data.clone();
1503			let output_head_data_hash = receipt.descriptor.para_head();
1504			let fetched_collation = FetchedCollation::from(&receipt.to_plain());
1505			if let Some(CollationEvent { collator_id, pending_collation, .. }) =
1506				state.fetched_candidates.remove(&fetched_collation)
1507			{
1508				let PendingCollation {
1509					relay_parent, peer_id, prospective_candidate, para_id, ..
1510				} = pending_collation;
1511				note_good_collation(
1512					&mut state.reputation,
1513					ctx.sender(),
1514					&state.peer_data,
1515					collator_id.clone(),
1516				)
1517				.await;
1518				if let Some(peer_data) = state.peer_data.get(&peer_id) {
1519					notify_collation_seconded(
1520						ctx.sender(),
1521						peer_id,
1522						peer_data.version,
1523						relay_parent,
1524						stmt,
1525					)
1526					.await;
1527				}
1528
1529				if let Some(rp_state) = state.per_relay_parent.get_mut(&parent) {
1530					rp_state.collations.note_seconded(para_id);
1531				}
1532
1533				// See if we've unblocked other collations for seconding.
1534				second_unblocked_collations(
1535					ctx,
1536					state,
1537					fetched_collation.para_id,
1538					output_head_data,
1539					output_head_data_hash,
1540				)
1541				.await;
1542
1543				// If async backing is enabled, make an attempt to fetch next collation.
1544				let maybe_candidate_hash =
1545					prospective_candidate.as_ref().map(ProspectiveCandidate::candidate_hash);
1546				dequeue_next_collation_and_fetch(
1547					ctx,
1548					state,
1549					parent,
1550					(collator_id, maybe_candidate_hash),
1551				)
1552				.await;
1553			} else {
1554				gum::debug!(
1555					target: LOG_TARGET,
1556					relay_parent = ?parent,
1557					"Collation has been seconded, but the relay parent is deactivated",
1558				);
1559			}
1560		},
1561		Invalid(parent, candidate_receipt) => {
1562			// Remove collations which were blocked from seconding and had this candidate as parent.
1563			state.blocked_from_seconding.remove(&BlockedCollationId {
1564				para_id: candidate_receipt.descriptor.para_id(),
1565				parent_head_data_hash: candidate_receipt.descriptor.para_head(),
1566			});
1567
1568			let fetched_collation = FetchedCollation::from(&candidate_receipt);
1569			let candidate_hash = fetched_collation.candidate_hash;
1570			let id = match state.fetched_candidates.entry(fetched_collation) {
1571				Entry::Occupied(entry)
1572					if entry.get().pending_collation.commitments_hash ==
1573						Some(candidate_receipt.commitments_hash) =>
1574					entry.remove().collator_id,
1575				Entry::Occupied(_) => {
1576					gum::error!(
1577						target: LOG_TARGET,
1578						relay_parent = ?parent,
1579						candidate = ?candidate_receipt.hash(),
1580						"Reported invalid candidate for unknown `pending_candidate`!",
1581					);
1582					return
1583				},
1584				Entry::Vacant(_) => return,
1585			};
1586
1587			report_collator(&mut state.reputation, ctx.sender(), &state.peer_data, id.clone())
1588				.await;
1589
1590			dequeue_next_collation_and_fetch(ctx, state, parent, (id, Some(candidate_hash))).await;
1591		},
1592	}
1593}
1594
1595/// The main run loop.
1596#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
1597pub(crate) async fn run<Context>(
1598	ctx: Context,
1599	keystore: KeystorePtr,
1600	eviction_policy: crate::CollatorEvictionPolicy,
1601	metrics: Metrics,
1602) -> std::result::Result<(), std::convert::Infallible> {
1603	run_inner(
1604		ctx,
1605		keystore,
1606		eviction_policy,
1607		metrics,
1608		ReputationAggregator::default(),
1609		REPUTATION_CHANGE_INTERVAL,
1610	)
1611	.await
1612}
1613
1614#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
1615async fn run_inner<Context>(
1616	mut ctx: Context,
1617	keystore: KeystorePtr,
1618	eviction_policy: crate::CollatorEvictionPolicy,
1619	metrics: Metrics,
1620	reputation: ReputationAggregator,
1621	reputation_interval: Duration,
1622) -> std::result::Result<(), std::convert::Infallible> {
1623	let new_reputation_delay = || futures_timer::Delay::new(reputation_interval).fuse();
1624	let mut reputation_delay = new_reputation_delay();
1625
1626	let mut state = State { metrics, reputation, ..Default::default() };
1627
1628	let next_inactivity_stream = tick_stream(ACTIVITY_POLL);
1629	futures::pin_mut!(next_inactivity_stream);
1630
1631	let mut network_error_freq = gum::Freq::new();
1632	let mut canceled_freq = gum::Freq::new();
1633
1634	loop {
1635		select! {
1636			_ = reputation_delay => {
1637				state.reputation.send(ctx.sender()).await;
1638				reputation_delay = new_reputation_delay();
1639			},
1640			res = ctx.recv().fuse() => {
1641				match res {
1642					Ok(FromOrchestra::Communication { msg }) => {
1643						gum::trace!(target: LOG_TARGET, msg = ?msg, "received a message");
1644						process_msg(
1645							&mut ctx,
1646							&keystore,
1647							msg,
1648							&mut state,
1649						).await;
1650					}
1651					Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) | Err(_) => break,
1652					Ok(FromOrchestra::Signal(_)) => continue,
1653				}
1654			},
1655			_ = next_inactivity_stream.next() => {
1656				disconnect_inactive_peers(ctx.sender(), &eviction_policy, &state.peer_data).await;
1657			},
1658			resp = state.collation_requests.select_next_some() => {
1659				let relay_parent = resp.0.pending_collation.relay_parent;
1660				let res = match handle_collation_fetch_response(
1661					&mut state,
1662					resp,
1663					&mut network_error_freq,
1664					&mut canceled_freq,
1665				).await {
1666					Err(Some((peer_id, rep))) => {
1667						modify_reputation(&mut state.reputation, ctx.sender(), peer_id, rep).await;
1668						// Reset the status for the relay parent
1669						state.per_relay_parent.get_mut(&relay_parent).map(|rp| {
1670							rp.collations.status.back_to_waiting();
1671						});
1672						continue
1673					},
1674					Err(None) => {
1675						// Reset the status for the relay parent
1676						state.per_relay_parent.get_mut(&relay_parent).map(|rp| {
1677							rp.collations.status.back_to_waiting();
1678						});
1679						continue
1680					},
1681					Ok(res) => res
1682				};
1683
1684				let CollationEvent {collator_id, pending_collation, .. } = res.collation_event.clone();
1685
1686				match kick_off_seconding(&mut ctx, &mut state, res).await {
1687					Err(err) => {
1688						gum::warn!(
1689							target: LOG_TARGET,
1690							relay_parent = ?pending_collation.relay_parent,
1691							para_id = ?pending_collation.para_id,
1692							peer_id = ?pending_collation.peer_id,
1693							error = %err,
1694							"Seconding aborted due to an error",
1695						);
1696
1697						if err.is_malicious() {
1698							// Report malicious peer.
1699							modify_reputation(&mut state.reputation, ctx.sender(), pending_collation.peer_id, COST_REPORT_BAD).await;
1700						}
1701						let maybe_candidate_hash =
1702						pending_collation.prospective_candidate.as_ref().map(ProspectiveCandidate::candidate_hash);
1703						dequeue_next_collation_and_fetch(
1704							&mut ctx,
1705							&mut state,
1706							pending_collation.relay_parent,
1707							(collator_id, maybe_candidate_hash),
1708						)
1709						.await;
1710					},
1711					Ok(false) => {
1712						// No hard error occurred, but we can try fetching another collation.
1713						let maybe_candidate_hash =
1714						pending_collation.prospective_candidate.as_ref().map(ProspectiveCandidate::candidate_hash);
1715						dequeue_next_collation_and_fetch(
1716							&mut ctx,
1717							&mut state,
1718							pending_collation.relay_parent,
1719							(collator_id, maybe_candidate_hash),
1720						)
1721						.await;
1722					}
1723					Ok(true) => {}
1724				}
1725			},
1726			res = state.collation_fetch_timeouts.select_next_some() => {
1727				let (collator_id, maybe_candidate_hash, relay_parent) = res;
1728				gum::debug!(
1729					target: LOG_TARGET,
1730					?relay_parent,
1731					?collator_id,
1732					"Timeout hit - already seconded?"
1733				);
1734				dequeue_next_collation_and_fetch(
1735					&mut ctx,
1736					&mut state,
1737					relay_parent,
1738					(collator_id, maybe_candidate_hash),
1739				)
1740				.await;
1741			}
1742		}
1743	}
1744
1745	Ok(())
1746}
1747
1748/// Dequeue another collation and fetch.
1749#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
1750async fn dequeue_next_collation_and_fetch<Context>(
1751	ctx: &mut Context,
1752	state: &mut State,
1753	relay_parent: Hash,
1754	// The collator we tried to fetch from last, optionally which candidate.
1755	previous_fetch: (CollatorId, Option<CandidateHash>),
1756) {
1757	while let Some((next, id)) = get_next_collation_to_fetch(&previous_fetch, relay_parent, state) {
1758		gum::debug!(
1759			target: LOG_TARGET,
1760			?relay_parent,
1761			?id,
1762			"Successfully dequeued next advertisement - fetching ..."
1763		);
1764		if let Err(err) = fetch_collation(ctx.sender(), state, next, id).await {
1765			gum::debug!(
1766				target: LOG_TARGET,
1767				relay_parent = ?next.relay_parent,
1768				para_id = ?next.para_id,
1769				peer_id = ?next.peer_id,
1770				error = %err,
1771				"Failed to request a collation, dequeueing next one",
1772			);
1773		} else {
1774			break
1775		}
1776	}
1777}
1778
1779async fn request_persisted_validation_data<Sender>(
1780	sender: &mut Sender,
1781	relay_parent: Hash,
1782	para_id: ParaId,
1783) -> std::result::Result<Option<PersistedValidationData>, SecondingError>
1784where
1785	Sender: CollatorProtocolSenderTrait,
1786{
1787	// The core is guaranteed to be scheduled since we accepted the advertisement.
1788	polkadot_node_subsystem_util::request_persisted_validation_data(
1789		relay_parent,
1790		para_id,
1791		OccupiedCoreAssumption::Free,
1792		sender,
1793	)
1794	.await
1795	.await
1796	.map_err(SecondingError::CancelledRuntimePersistedValidationData)?
1797	.map_err(SecondingError::RuntimeApi)
1798}
1799
1800async fn request_prospective_validation_data<Sender>(
1801	sender: &mut Sender,
1802	candidate_relay_parent: Hash,
1803	parent_head_data_hash: Hash,
1804	para_id: ParaId,
1805	maybe_parent_head_data: Option<HeadData>,
1806) -> std::result::Result<Option<PersistedValidationData>, SecondingError>
1807where
1808	Sender: CollatorProtocolSenderTrait,
1809{
1810	let (tx, rx) = oneshot::channel();
1811
1812	let parent_head_data = if let Some(head_data) = maybe_parent_head_data {
1813		ParentHeadData::WithData { head_data, hash: parent_head_data_hash }
1814	} else {
1815		ParentHeadData::OnlyHash(parent_head_data_hash)
1816	};
1817
1818	let request =
1819		ProspectiveValidationDataRequest { para_id, candidate_relay_parent, parent_head_data };
1820
1821	sender
1822		.send_message(ProspectiveParachainsMessage::GetProspectiveValidationData(request, tx))
1823		.await;
1824
1825	rx.await.map_err(SecondingError::CancelledProspectiveValidationData)
1826}
1827
1828/// Handle a fetched collation result.
1829/// Returns whether or not seconding has begun.
1830#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
1831async fn kick_off_seconding<Context>(
1832	ctx: &mut Context,
1833	state: &mut State,
1834	PendingCollationFetch { mut collation_event, candidate_receipt, pov, maybe_parent_head_data }: PendingCollationFetch,
1835) -> std::result::Result<bool, SecondingError> {
1836	let pending_collation = collation_event.pending_collation;
1837	let relay_parent = pending_collation.relay_parent;
1838
1839	let per_relay_parent = match state.per_relay_parent.get_mut(&relay_parent) {
1840		Some(state) => state,
1841		None => {
1842			// Relay parent went out of view, not an error.
1843			gum::trace!(
1844				target: LOG_TARGET,
1845				relay_parent = ?relay_parent,
1846				"Fetched collation for a parent out of view",
1847			);
1848			return Ok(false)
1849		},
1850	};
1851
1852	// Sanity check of the candidate receipt version.
1853	descriptor_version_sanity_check(candidate_receipt.descriptor(), per_relay_parent)?;
1854
1855	let collations = &mut per_relay_parent.collations;
1856
1857	let fetched_collation = FetchedCollation::from(&candidate_receipt);
1858	if let Entry::Vacant(entry) = state.fetched_candidates.entry(fetched_collation) {
1859		collation_event.pending_collation.commitments_hash =
1860			Some(candidate_receipt.commitments_hash);
1861
1862		let (maybe_pvd, maybe_parent_head, maybe_parent_head_hash) = match (
1863			collation_event.collator_protocol_version,
1864			collation_event.pending_collation.prospective_candidate,
1865		) {
1866			(CollationVersion::V2, Some(ProspectiveCandidate { parent_head_data_hash, .. })) => {
1867				let pvd = request_prospective_validation_data(
1868					ctx.sender(),
1869					relay_parent,
1870					parent_head_data_hash,
1871					pending_collation.para_id,
1872					maybe_parent_head_data.clone(),
1873				)
1874				.await?;
1875
1876				(pvd, maybe_parent_head_data, Some(parent_head_data_hash))
1877			},
1878			(CollationVersion::V1, _) => {
1879				let pvd = request_persisted_validation_data(
1880					ctx.sender(),
1881					candidate_receipt.descriptor().relay_parent(),
1882					candidate_receipt.descriptor().para_id(),
1883				)
1884				.await?;
1885				(
1886					Some(pvd.ok_or(SecondingError::PersistedValidationDataNotFound)?),
1887					maybe_parent_head_data,
1888					None,
1889				)
1890			},
1891			_ => {
1892				// `handle_advertisement` checks for protocol mismatch.
1893				return Ok(false)
1894			},
1895		};
1896
1897		let pvd = match (maybe_pvd, maybe_parent_head.clone(), maybe_parent_head_hash) {
1898			(Some(pvd), _, _) => pvd,
1899			(None, None, Some(parent_head_data_hash)) => {
1900				// In this case, the collator did not supply the head data and neither could
1901				// prospective-parachains. We add this to the blocked_from_seconding collection
1902				// until we second its parent.
1903				let blocked_collation = PendingCollationFetch {
1904					collation_event,
1905					candidate_receipt,
1906					pov,
1907					maybe_parent_head_data: None,
1908				};
1909				gum::debug!(
1910					target: LOG_TARGET,
1911					candidate_hash = ?blocked_collation.candidate_receipt.hash(),
1912					relay_parent = ?blocked_collation.candidate_receipt.descriptor.relay_parent(),
1913					"Collation having parent head data hash {} is blocked from seconding. Waiting on its parent to be validated.",
1914					parent_head_data_hash
1915				);
1916				state
1917					.blocked_from_seconding
1918					.entry(BlockedCollationId {
1919						para_id: blocked_collation.candidate_receipt.descriptor.para_id(),
1920						parent_head_data_hash,
1921					})
1922					.or_insert_with(Vec::new)
1923					.push(blocked_collation);
1924
1925				return Ok(false)
1926			},
1927			(None, _, _) => {
1928				// Even though we already have the parent head data, the pvd fetching failed. We
1929				// don't need to wait for seconding another collation outputting this head data.
1930				return Err(SecondingError::PersistedValidationDataNotFound)
1931			},
1932		};
1933
1934		fetched_collation_sanity_check(
1935			&collation_event.pending_collation,
1936			&candidate_receipt,
1937			&pvd,
1938			maybe_parent_head.and_then(|head| maybe_parent_head_hash.map(|hash| (head, hash))),
1939		)?;
1940
1941		ctx.send_message(CandidateBackingMessage::Second(
1942			relay_parent,
1943			candidate_receipt,
1944			pvd,
1945			pov,
1946		))
1947		.await;
1948		// There's always a single collation being fetched at any moment of time.
1949		// In case of a failure, we reset the status back to waiting.
1950		collations.status = CollationStatus::WaitingOnValidation;
1951
1952		entry.insert(collation_event);
1953		Ok(true)
1954	} else {
1955		Err(SecondingError::Duplicate)
1956	}
1957}
1958
1959// This issues `NetworkBridge` notifications to disconnect from all inactive peers at the
1960// earliest possible point. This does not yet clean up any metadata, as that will be done upon
1961// receipt of the `PeerDisconnected` event.
1962async fn disconnect_inactive_peers(
1963	sender: &mut impl overseer::CollatorProtocolSenderTrait,
1964	eviction_policy: &crate::CollatorEvictionPolicy,
1965	peers: &HashMap<PeerId, PeerData>,
1966) {
1967	for (peer, peer_data) in peers {
1968		if peer_data.is_inactive(&eviction_policy) {
1969			gum::trace!(target: LOG_TARGET, ?peer, "Disconnecting inactive peer");
1970			disconnect_peer(sender, *peer).await;
1971		}
1972	}
1973}
1974
1975/// Handle a collation fetch response.
1976async fn handle_collation_fetch_response(
1977	state: &mut State,
1978	response: <CollationFetchRequest as Future>::Output,
1979	network_error_freq: &mut gum::Freq,
1980	canceled_freq: &mut gum::Freq,
1981) -> std::result::Result<PendingCollationFetch, Option<(PeerId, Rep)>> {
1982	let (CollationEvent { collator_id, collator_protocol_version, pending_collation }, response) =
1983		response;
1984	// Remove the cancellation handle, as the future already completed.
1985	state.collation_requests_cancel_handles.remove(&pending_collation);
1986
1987	let response = match response {
1988		Err(CollationFetchError::Cancelled) => {
1989			gum::debug!(
1990				target: LOG_TARGET,
1991				hash = ?pending_collation.relay_parent,
1992				para_id = ?pending_collation.para_id,
1993				peer_id = ?pending_collation.peer_id,
1994				"Request was cancelled from the validator side"
1995			);
1996			return Err(None)
1997		},
1998		Err(CollationFetchError::Request(req_error)) => Err(req_error),
1999		Ok(resp) => Ok(resp),
2000	};
2001
2002	let _timer = state.metrics.time_handle_collation_request_result();
2003
2004	let mut metrics_result = Err(());
2005
2006	let result = match response {
2007		Err(RequestError::InvalidResponse(err)) => {
2008			gum::warn!(
2009				target: LOG_TARGET,
2010				hash = ?pending_collation.relay_parent,
2011				para_id = ?pending_collation.para_id,
2012				peer_id = ?pending_collation.peer_id,
2013				err = ?err,
2014				"Collator provided response that could not be decoded"
2015			);
2016			Err(Some((pending_collation.peer_id, COST_CORRUPTED_MESSAGE)))
2017		},
2018		Err(err) if err.is_timed_out() => {
2019			gum::debug!(
2020				target: LOG_TARGET,
2021				hash = ?pending_collation.relay_parent,
2022				para_id = ?pending_collation.para_id,
2023				peer_id = ?pending_collation.peer_id,
2024				"Request timed out"
2025			);
2026			// For now we don't want to change reputation on timeout, to mitigate issues like
2027			// this: https://github.com/paritytech/polkadot/issues/4617
2028			Err(None)
2029		},
2030		Err(RequestError::NetworkError(err)) => {
2031			gum::warn_if_frequent!(
2032				freq: network_error_freq,
2033				max_rate: gum::Times::PerHour(100),
2034				target: LOG_TARGET,
2035				hash = ?pending_collation.relay_parent,
2036				para_id = ?pending_collation.para_id,
2037				peer_id = ?pending_collation.peer_id,
2038				err = ?err,
2039				"Fetching collation failed due to network error"
2040			);
2041			// A minor decrease in reputation for any network failure seems
2042			// sensible. In theory this could be exploited, by DoSing this node,
2043			// which would result in reduced reputation for proper nodes, but the
2044			// same can happen for penalties on timeouts, which we also have.
2045			Err(Some((pending_collation.peer_id, COST_NETWORK_ERROR)))
2046		},
2047		Err(RequestError::Canceled(err)) => {
2048			gum::warn_if_frequent!(
2049				freq: canceled_freq,
2050				max_rate: gum::Times::PerHour(100),
2051				target: LOG_TARGET,
2052				hash = ?pending_collation.relay_parent,
2053				para_id = ?pending_collation.para_id,
2054				peer_id = ?pending_collation.peer_id,
2055				err = ?err,
2056				"Canceled should be handled by `is_timed_out` above - this is a bug!"
2057			);
2058			Err(None)
2059		},
2060		Ok(
2061			request_v1::CollationFetchingResponse::Collation(receipt, _) |
2062			request_v2::CollationFetchingResponse::Collation(receipt, _) |
2063			request_v1::CollationFetchingResponse::CollationWithParentHeadData { receipt, .. } |
2064			request_v2::CollationFetchingResponse::CollationWithParentHeadData { receipt, .. },
2065		) if receipt.descriptor().para_id() != pending_collation.para_id => {
2066			gum::debug!(
2067				target: LOG_TARGET,
2068				expected_para_id = ?pending_collation.para_id,
2069				got_para_id = ?receipt.descriptor().para_id(),
2070				peer_id = ?pending_collation.peer_id,
2071				"Got wrong para ID for requested collation."
2072			);
2073
2074			Err(Some((pending_collation.peer_id, COST_WRONG_PARA)))
2075		},
2076		Ok(request_v1::CollationFetchingResponse::Collation(candidate_receipt, pov)) => {
2077			gum::debug!(
2078				target: LOG_TARGET,
2079				para_id = %pending_collation.para_id,
2080				hash = ?pending_collation.relay_parent,
2081				candidate_hash = ?candidate_receipt.hash(),
2082				"Received collation",
2083			);
2084
2085			metrics_result = Ok(());
2086			Ok(PendingCollationFetch {
2087				collation_event: CollationEvent {
2088					collator_id,
2089					pending_collation,
2090					collator_protocol_version,
2091				},
2092				candidate_receipt,
2093				pov,
2094				maybe_parent_head_data: None,
2095			})
2096		},
2097		Ok(request_v2::CollationFetchingResponse::CollationWithParentHeadData {
2098			receipt,
2099			pov,
2100			parent_head_data,
2101		}) => {
2102			gum::debug!(
2103				target: LOG_TARGET,
2104				para_id = %pending_collation.para_id,
2105				hash = ?pending_collation.relay_parent,
2106				candidate_hash = ?receipt.hash(),
2107				"Received collation (v3)",
2108			);
2109
2110			metrics_result = Ok(());
2111			Ok(PendingCollationFetch {
2112				collation_event: CollationEvent {
2113					collator_id,
2114					pending_collation,
2115					collator_protocol_version,
2116				},
2117				candidate_receipt: receipt,
2118				pov,
2119				maybe_parent_head_data: Some(parent_head_data),
2120			})
2121		},
2122	};
2123	state.metrics.on_request(metrics_result);
2124	result
2125}
2126
2127// Returns the claim queue without fetched or pending advertisement. The resulting `Vec` keeps the
2128// order in the claim queue so the earlier an element is located in the `Vec` the higher its
2129// priority is.
2130fn unfulfilled_claim_queue_entries(relay_parent: &Hash, state: &State) -> Result<Vec<ParaId>> {
2131	let relay_parent_state = state
2132		.per_relay_parent
2133		.get(relay_parent)
2134		.ok_or(Error::RelayParentStateNotFound)?;
2135	let scheduled_paras = relay_parent_state.assignment.current.iter().collect::<HashSet<_>>();
2136	let paths = state.implicit_view.paths_via_relay_parent(relay_parent);
2137
2138	let mut claim_queue_states = Vec::new();
2139	for path in paths {
2140		let mut cq_state = ClaimQueueState::new();
2141		for ancestor in &path {
2142			cq_state.add_leaf(
2143				&ancestor,
2144				&state
2145					.per_relay_parent
2146					.get(&ancestor)
2147					.ok_or(Error::RelayParentStateNotFound)?
2148					.assignment
2149					.current,
2150			);
2151
2152			for para_id in &scheduled_paras {
2153				let seconded_and_pending = state.seconded_and_pending_for_para(&ancestor, &para_id);
2154				for _ in 0..seconded_and_pending {
2155					cq_state.claim_at(&ancestor, &para_id);
2156				}
2157			}
2158		}
2159		claim_queue_states.push(cq_state);
2160	}
2161
2162	// From the claim queue state for each leaf we have to return a combined single one. Go for a
2163	// simple solution and return the longest one. In theory we always prefer the earliest entries
2164	// in the claim queue so there is a good chance that the longest path is the one with
2165	// unsatisfied entries in the beginning. This is not guaranteed as we might have fetched 2nd or
2166	// 3rd spot from the claim queue but it should be good enough.
2167	let unfulfilled_entries = claim_queue_states
2168		.iter_mut()
2169		.map(|cq| cq.unclaimed_at(relay_parent))
2170		.max_by(|a, b| a.len().cmp(&b.len()))
2171		.unwrap_or_default();
2172
2173	Ok(unfulfilled_entries)
2174}
2175
2176/// Returns the next collation to fetch from the `waiting_queue` and reset the status back to
2177/// `Waiting`.
2178fn get_next_collation_to_fetch(
2179	finished_one: &(CollatorId, Option<CandidateHash>),
2180	relay_parent: Hash,
2181	state: &mut State,
2182) -> Option<(PendingCollation, CollatorId)> {
2183	let unfulfilled_entries = match unfulfilled_claim_queue_entries(&relay_parent, &state) {
2184		Ok(entries) => entries,
2185		Err(err) => {
2186			gum::error!(
2187				target: LOG_TARGET,
2188				?relay_parent,
2189				?err,
2190				"Failed to get unfulfilled claim queue entries"
2191			);
2192			return None
2193		},
2194	};
2195	let rp_state = match state.per_relay_parent.get_mut(&relay_parent) {
2196		Some(rp_state) => rp_state,
2197		None => {
2198			gum::error!(
2199				target: LOG_TARGET,
2200				?relay_parent,
2201				"Failed to get relay parent state"
2202			);
2203			return None
2204		},
2205	};
2206
2207	// If finished one does not match waiting_collation, then we already dequeued another fetch
2208	// to replace it.
2209	if let Some((collator_id, maybe_candidate_hash)) = rp_state.collations.fetching_from.as_ref() {
2210		// If a candidate hash was saved previously, `finished_one` must include this too.
2211		if collator_id != &finished_one.0 &&
2212			maybe_candidate_hash.map_or(true, |hash| Some(&hash) != finished_one.1.as_ref())
2213		{
2214			gum::trace!(
2215				target: LOG_TARGET,
2216				waiting_collation = ?rp_state.collations.fetching_from,
2217				?finished_one,
2218				"Not proceeding to the next collation - has already been done."
2219			);
2220			return None
2221		}
2222	}
2223	rp_state.collations.status.back_to_waiting();
2224	rp_state.collations.pick_a_collation_to_fetch(unfulfilled_entries)
2225}
2226
2227// Sanity check the candidate descriptor version.
2228fn descriptor_version_sanity_check(
2229	descriptor: &CandidateDescriptorV2,
2230	per_relay_parent: &PerRelayParent,
2231) -> std::result::Result<(), SecondingError> {
2232	match descriptor.version() {
2233		CandidateDescriptorVersion::V1 => Ok(()),
2234		CandidateDescriptorVersion::V2 if per_relay_parent.v2_receipts => {
2235			if let Some(core_index) = descriptor.core_index() {
2236				if core_index != per_relay_parent.current_core {
2237					return Err(SecondingError::InvalidCoreIndex(
2238						core_index.0,
2239						per_relay_parent.current_core.0,
2240					))
2241				}
2242			}
2243
2244			if let Some(session_index) = descriptor.session_index() {
2245				if session_index != per_relay_parent.session_index {
2246					return Err(SecondingError::InvalidSessionIndex(
2247						session_index,
2248						per_relay_parent.session_index,
2249					))
2250				}
2251			}
2252
2253			Ok(())
2254		},
2255		descriptor_version => Err(SecondingError::InvalidReceiptVersion(descriptor_version)),
2256	}
2257}