referrerpolicy=no-referrer-when-downgrade

polkadot_collator_protocol/collator_side/
mod.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use std::{
18	collections::{hash_map::Entry, HashMap, HashSet},
19	time::Duration,
20};
21
22use bitvec::{bitvec, vec::BitVec};
23use futures::{
24	channel::oneshot, future::Fuse, pin_mut, select, stream::FuturesUnordered, FutureExt, StreamExt,
25};
26use metrics::{CollationStats, CollationTracker};
27use schnellru::{ByLength, LruMap};
28use sp_core::Pair;
29
30use polkadot_node_network_protocol::{
31	self as net_protocol,
32	peer_set::{CollationVersion, PeerSet},
33	request_response::{
34		incoming::{self, OutgoingResponse},
35		v2 as request_v2, IncomingRequestReceiver,
36	},
37	v1 as protocol_v1, v2 as protocol_v2, CollationProtocols, OurView, PeerId,
38	UnifiedReputationChange as Rep, View,
39};
40use polkadot_node_primitives::{CollationSecondedSignal, PoV, Statement};
41use polkadot_node_subsystem::{
42	messages::{CollatorProtocolMessage, NetworkBridgeEvent, NetworkBridgeTxMessage},
43	overseer, FromOrchestra, OverseerSignal,
44};
45use polkadot_node_subsystem_util::{
46	backing_implicit_view::View as ImplicitView,
47	reputation::{ReputationAggregator, REPUTATION_CHANGE_INTERVAL},
48	runtime::{
49		fetch_claim_queue, get_candidate_events, get_group_rotation_info, ClaimQueueSnapshot,
50		RuntimeInfo,
51	},
52	TimeoutExt,
53};
54use polkadot_primitives::{
55	AuthorityDiscoveryId, BlockNumber, CandidateEvent, CandidateHash,
56	CandidateReceiptV2 as CandidateReceipt, CollatorPair, CoreIndex, GroupIndex, Hash, HeadData,
57	Id as ParaId, SessionIndex,
58};
59
60use crate::{modify_reputation, LOG_TARGET, LOG_TARGET_STATS};
61
62mod collation;
63mod error;
64mod metrics;
65#[cfg(test)]
66mod tests;
67mod validators_buffer;
68
69use collation::{
70	ActiveCollationFetches, Collation, CollationSendResult, CollationStatus,
71	VersionedCollationRequest, WaitingCollationFetches,
72};
73use error::{log_error, Error, FatalError, Result};
74use validators_buffer::{
75	ResetInterestTimeout, ValidatorGroupsBuffer, RESET_INTEREST_TIMEOUT, VALIDATORS_BUFFER_CAPACITY,
76};
77
78pub use metrics::Metrics;
79
80const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Peer sent unparsable request");
81const COST_UNEXPECTED_MESSAGE: Rep = Rep::CostMinor("An unexpected message");
82const COST_APPARENT_FLOOD: Rep =
83	Rep::CostMinor("Message received when previous one was still being processed");
84
85/// Time after starting an upload to a validator we will start another one to the next validator,
86/// even if the upload was not finished yet.
87///
88/// This is to protect from a single slow validator preventing collations from happening.
89///
90/// For considerations on this value, see: https://github.com/paritytech/polkadot/issues/4386
91const MAX_UNSHARED_UPLOAD_TIME: Duration = Duration::from_millis(150);
92
93/// Ensure that collator updates its connection requests to validators
94/// this long after the most recent leaf.
95///
96/// The timeout is designed for substreams to be properly closed if they need to be
97/// reopened shortly after the next leaf.
98///
99/// Collators also update their connection requests on every new collation.
100/// This timeout is mostly about removing stale connections while avoiding races
101/// with new collations which may want to reactivate them.
102///
103/// Validators are obtained from [`ValidatorGroupsBuffer::validators_to_connect`].
104const RECONNECT_AFTER_LEAF_TIMEOUT: Duration = Duration::from_secs(4);
105
106/// Future that when resolved indicates that we should update reserved peer-set
107/// of validators we want to be connected to.
108///
109/// `Pending` variant never finishes and should be used when there're no peers
110/// connected.
111type ReconnectTimeout = Fuse<futures_timer::Delay>;
112
113#[derive(Debug)]
114enum ShouldAdvertiseTo {
115	Yes,
116	NotAuthority,
117	AlreadyAdvertised,
118}
119
120/// Info about validators we are currently connected to.
121///
122/// It keeps track to which validators we advertised our collation.
123#[derive(Debug, Default)]
124struct ValidatorGroup {
125	/// Validators discovery ids. Lazily initialized when first
126	/// distributing a collation.
127	validators: Vec<AuthorityDiscoveryId>,
128
129	/// Bits indicating which validators have already seen the announcement
130	/// per candidate.
131	advertised_to: HashMap<CandidateHash, BitVec>,
132}
133
134impl ValidatorGroup {
135	/// Returns `true` if we should advertise our collation to the given peer.
136	fn should_advertise_to(
137		&self,
138		candidate_hash: &CandidateHash,
139		peer_ids: &HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,
140		peer: &PeerId,
141	) -> ShouldAdvertiseTo {
142		let authority_ids = match peer_ids.get(peer) {
143			Some(authority_ids) => authority_ids,
144			None => return ShouldAdvertiseTo::NotAuthority,
145		};
146
147		for id in authority_ids {
148			// One peer id may correspond to different discovery ids across sessions,
149			// having a non-empty intersection is sufficient to assume that this peer
150			// belongs to this particular validator group.
151			let validator_index = match self.validators.iter().position(|v| v == id) {
152				Some(idx) => idx,
153				None => continue,
154			};
155
156			// Either the candidate is unseen by this validator group
157			// or the corresponding bit is not set.
158			if self
159				.advertised_to
160				.get(candidate_hash)
161				.map_or(true, |advertised| !advertised[validator_index])
162			{
163				return ShouldAdvertiseTo::Yes
164			} else {
165				return ShouldAdvertiseTo::AlreadyAdvertised
166			}
167		}
168
169		ShouldAdvertiseTo::NotAuthority
170	}
171
172	/// Should be called after we advertised our collation to the given `peer` to keep track of it.
173	fn advertised_to_peer(
174		&mut self,
175		candidate_hash: &CandidateHash,
176		peer_ids: &HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,
177		peer: &PeerId,
178	) {
179		if let Some(authority_ids) = peer_ids.get(peer) {
180			for id in authority_ids {
181				let validator_index = match self.validators.iter().position(|v| v == id) {
182					Some(idx) => idx,
183					None => continue,
184				};
185				self.advertised_to
186					.entry(*candidate_hash)
187					.or_insert_with(|| bitvec![0; self.validators.len()])
188					.set(validator_index, true);
189			}
190		}
191	}
192}
193
194#[derive(Debug)]
195struct PeerData {
196	/// Peer's view.
197	view: View,
198	/// Unknown heads in the view.
199	///
200	/// This can happen when the validator is faster at importing a block and sending out its
201	/// `View` than the collator is able to import a block.
202	unknown_heads: LruMap<Hash, (), ByLength>,
203}
204
205/// A type wrapping a collation, it's designated core index and stats.
206struct CollationData {
207	collation: Collation,
208	core_index: CoreIndex,
209	stats: Option<CollationStats>,
210	session_index: SessionIndex,
211}
212
213impl CollationData {
214	/// Returns inner collation ref.
215	pub fn collation(&self) -> &Collation {
216		&self.collation
217	}
218
219	/// Returns inner collation mut ref.
220	pub fn collation_mut(&mut self) -> &mut Collation {
221		&mut self.collation
222	}
223
224	/// Returns inner core index.
225	pub fn core_index(&self) -> &CoreIndex {
226		&self.core_index
227	}
228
229	/// Takes the stats and returns them.
230	pub fn take_stats(&mut self) -> Option<CollationStats> {
231		self.stats.take()
232	}
233}
234
235struct PerRelayParent {
236	/// Per core index validators group responsible for backing candidates built
237	/// on top of this relay parent.
238	validator_group: HashMap<CoreIndex, ValidatorGroup>,
239	/// Distributed collations.
240	collations: HashMap<CandidateHash, CollationData>,
241	/// Number of assignments per core
242	assignments: HashMap<CoreIndex, usize>,
243	/// The relay parent block number
244	block_number: Option<BlockNumber>,
245	/// The session index of this relay parent.
246	session_index: SessionIndex,
247}
248
249impl PerRelayParent {
250	fn new(
251		para_id: ParaId,
252		claim_queue: ClaimQueueSnapshot,
253		block_number: Option<BlockNumber>,
254		session_index: SessionIndex,
255	) -> Self {
256		let assignments =
257			claim_queue.iter_all_claims().fold(HashMap::new(), |mut acc, (core, claims)| {
258				let n_claims = claims.iter().filter(|para| para == &&para_id).count();
259				if n_claims > 0 {
260					acc.insert(*core, n_claims);
261				}
262				acc
263			});
264
265		Self {
266			validator_group: HashMap::default(),
267			collations: HashMap::new(),
268			assignments,
269			block_number,
270			session_index,
271		}
272	}
273}
274
275struct State {
276	/// Our network peer id.
277	local_peer_id: PeerId,
278
279	/// Our collator pair.
280	collator_pair: CollatorPair,
281
282	/// The para this collator is collating on.
283	/// Starts as `None` and is updated with every `CollateOn` message.
284	collating_on: Option<ParaId>,
285
286	/// Track all active peers and their views
287	/// to determine what is relevant to them.
288	peer_data: HashMap<PeerId, PeerData>,
289
290	/// Leaves along with implicit ancestry.
291	///
292	/// It's `None` if the collator is not yet collating for a paraid.
293	implicit_view: Option<ImplicitView>,
294
295	/// Validators and distributed collations tracked for each relay parent from
296	/// our view, including both leaves and implicit ancestry.
297	per_relay_parent: HashMap<Hash, PerRelayParent>,
298
299	/// The result senders per collation.
300	collation_result_senders: HashMap<CandidateHash, oneshot::Sender<CollationSecondedSignal>>,
301
302	/// The mapping from [`PeerId`] to [`HashSet<AuthorityDiscoveryId>`]. This is filled over time
303	/// as we learn the [`PeerId`]'s by `PeerConnected` events.
304	peer_ids: HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,
305
306	/// Tracks which validators we want to stay connected to.
307	validator_groups_buf: ValidatorGroupsBuffer,
308
309	/// Timeout-future which is reset after every leaf to [`RECONNECT_AFTER_LEAF_TIMEOUT`] seconds.
310	/// When it fires, we update our reserved peers.
311	reconnect_timeout: ReconnectTimeout,
312
313	/// Metrics.
314	metrics: Metrics,
315
316	/// All collation fetching requests that are still waiting to be answered.
317	///
318	/// They are stored per relay parent, when our view changes and the relay parent moves out, we
319	/// will cancel the fetch request.
320	waiting_collation_fetches: HashMap<Hash, WaitingCollationFetches>,
321
322	/// Active collation fetches.
323	///
324	/// Each future returns the relay parent of the finished collation fetch.
325	active_collation_fetches: ActiveCollationFetches,
326
327	/// Time limits for validators to fetch the collation once the advertisement
328	/// was sent.
329	///
330	/// Given an implicit view a collation may stay in memory for significant amount
331	/// of time, if we don't timeout validators the node will keep attempting to connect
332	/// to unneeded peers.
333	advertisement_timeouts: FuturesUnordered<ResetInterestTimeout>,
334
335	/// Aggregated reputation change
336	reputation: ReputationAggregator,
337
338	/// An utility for tracking all collations produced by the collator.
339	collation_tracker: CollationTracker,
340}
341
342impl State {
343	/// Creates a new `State` instance with the given parameters and setting all remaining
344	/// state fields to their default values (i.e. empty).
345	fn new(
346		local_peer_id: PeerId,
347		collator_pair: CollatorPair,
348		metrics: Metrics,
349		reputation: ReputationAggregator,
350	) -> State {
351		State {
352			local_peer_id,
353			collator_pair,
354			metrics,
355			collating_on: Default::default(),
356			peer_data: Default::default(),
357			implicit_view: None,
358			per_relay_parent: Default::default(),
359			collation_result_senders: Default::default(),
360			peer_ids: Default::default(),
361			validator_groups_buf: ValidatorGroupsBuffer::with_capacity(VALIDATORS_BUFFER_CAPACITY),
362			reconnect_timeout: Fuse::terminated(),
363			waiting_collation_fetches: Default::default(),
364			active_collation_fetches: Default::default(),
365			advertisement_timeouts: Default::default(),
366			reputation,
367			collation_tracker: Default::default(),
368		}
369	}
370}
371
372/// Distribute a collation.
373///
374/// Figure out the core our para is assigned to and the relevant validators.
375/// Issue a connection request to these validators.
376/// If the para is not scheduled or next up on any core, at the relay-parent,
377/// or the relay-parent isn't in the implicit ancestry, we ignore the message
378/// as it must be invalid in that case - although this indicates a logic error
379/// elsewhere in the node.
380#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
381async fn distribute_collation<Context>(
382	ctx: &mut Context,
383	runtime: &mut RuntimeInfo,
384	state: &mut State,
385	id: ParaId,
386	receipt: CandidateReceipt,
387	pov: PoV,
388	parent_head_data: HeadData,
389	result_sender: Option<oneshot::Sender<CollationSecondedSignal>>,
390	core_index: CoreIndex,
391) -> Result<()> {
392	let candidate_relay_parent = receipt.descriptor.relay_parent();
393	let candidate_hash = receipt.hash();
394	let cores_assigned = has_assigned_cores(&state.implicit_view, &state.per_relay_parent);
395
396	let per_relay_parent = match state.per_relay_parent.get_mut(&candidate_relay_parent) {
397		Some(per_relay_parent) => per_relay_parent,
398		None => {
399			gum::debug!(
400				target: LOG_TARGET,
401				para_id = %id,
402				candidate_relay_parent = %candidate_relay_parent,
403				candidate_hash = ?candidate_hash,
404				"Candidate relay parent is out of our view",
405			);
406			return Ok(())
407		},
408	};
409
410	let Some(collations_limit) = per_relay_parent.assignments.get(&core_index) else {
411		gum::warn!(
412			target: LOG_TARGET,
413			para_id = %id,
414			relay_parent = ?candidate_relay_parent,
415			cores = ?per_relay_parent.assignments.keys(),
416			?core_index,
417			"Attempting to distribute collation for a core we are not assigned to ",
418		);
419
420		return Ok(())
421	};
422
423	let current_collations_count = per_relay_parent
424		.collations
425		.values()
426		.filter(|c| c.core_index() == &core_index)
427		.count();
428	if current_collations_count >= *collations_limit {
429		gum::debug!(
430			target: LOG_TARGET,
431			?candidate_relay_parent,
432			"The limit of {} collations per relay parent for core {} is already reached",
433			collations_limit,
434			core_index.0,
435		);
436		return Ok(())
437	}
438
439	// We have already seen collation for this relay parent.
440	if per_relay_parent.collations.contains_key(&candidate_hash) {
441		gum::debug!(
442			target: LOG_TARGET,
443			?candidate_relay_parent,
444			?candidate_hash,
445			"Already seen this candidate",
446		);
447		return Ok(())
448	}
449
450	let elastic_scaling = per_relay_parent.assignments.len() > 1;
451	if elastic_scaling {
452		gum::debug!(
453			target: LOG_TARGET,
454			para_id = %id,
455			cores = ?per_relay_parent.assignments.keys(),
456			"{} is assigned to {} cores at {}", id, per_relay_parent.assignments.len(), candidate_relay_parent,
457		);
458	}
459
460	let our_core = core_index;
461
462	// Determine the group on that core.
463	let GroupValidators { validators, session_index, group_index } =
464		determine_our_validators(ctx, runtime, our_core, candidate_relay_parent).await?;
465
466	if validators.is_empty() {
467		gum::warn!(
468			target: LOG_TARGET,
469			core = ?our_core,
470			"there are no validators assigned to core",
471		);
472
473		return Ok(())
474	}
475
476	// It's important to insert new collation interests **before**
477	// issuing a connection request.
478	//
479	// If a validator managed to fetch all the relevant collations
480	// but still assigned to our core, we keep the connection alive.
481	state.validator_groups_buf.note_collation_advertised(
482		candidate_hash,
483		session_index,
484		group_index,
485		&validators,
486	);
487
488	gum::debug!(
489		target: LOG_TARGET,
490		para_id = %id,
491		candidate_relay_parent = %candidate_relay_parent,
492		?candidate_hash,
493		pov_hash = ?pov.hash(),
494		core = ?our_core,
495		current_validators = ?validators,
496		"Accepted collation, connecting to validators."
497	);
498
499	// Insert validator group for the `core_index` at relay parent.
500	per_relay_parent.validator_group.entry(core_index).or_insert_with(|| {
501		let mut group = ValidatorGroup::default();
502		group.validators = validators;
503		group
504	});
505
506	// Update a set of connected validators if necessary.
507	connect_to_validators(ctx, cores_assigned, &state.validator_groups_buf).await;
508
509	if let Some(result_sender) = result_sender {
510		state.collation_result_senders.insert(candidate_hash, result_sender);
511	}
512
513	let para_head = receipt.descriptor.para_head();
514	per_relay_parent.collations.insert(
515		candidate_hash,
516		CollationData {
517			collation: Collation {
518				receipt,
519				pov,
520				parent_head_data,
521				status: CollationStatus::Created,
522			},
523			core_index,
524			session_index,
525			stats: per_relay_parent
526				.block_number
527				.map(|n| CollationStats::new(para_head, n, &state.metrics)),
528		},
529	);
530
531	// The leaf should be present in the allowed ancestry of some leaf.
532	//
533	// It's collation-producer responsibility to verify that there exists
534	// a hypothetical membership in a fragment chain for the candidate.
535	let interested = state
536		.peer_data
537		.iter()
538		.filter(|(_, PeerData { view: v, .. })| {
539			v.iter().any(|block_hash| {
540				state.implicit_view.as_ref().map(|implicit_view| {
541					implicit_view
542						.known_allowed_relay_parents_under(block_hash, Some(id))
543						.unwrap_or_default()
544						.contains(&candidate_relay_parent)
545				}) == Some(true)
546			})
547		})
548		.map(|(id, _)| id);
549
550	// Make sure already connected peers get collations:
551	for peer_id in interested {
552		advertise_collation(
553			ctx,
554			candidate_relay_parent,
555			per_relay_parent,
556			peer_id,
557			&state.peer_ids,
558			&mut state.advertisement_timeouts,
559			&state.metrics,
560		)
561		.await;
562	}
563
564	Ok(())
565}
566
567/// Validators of a particular group index.
568#[derive(Debug)]
569struct GroupValidators {
570	/// The validators of above group (their discovery keys).
571	validators: Vec<AuthorityDiscoveryId>,
572
573	session_index: SessionIndex,
574	group_index: GroupIndex,
575}
576
577/// Figure out current group of validators assigned to the para being collated on.
578///
579/// Returns [`ValidatorId`]'s of current group as determined based on the `relay_parent`.
580#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
581async fn determine_our_validators<Context>(
582	ctx: &mut Context,
583	runtime: &mut RuntimeInfo,
584	core_index: CoreIndex,
585	relay_parent: Hash,
586) -> Result<GroupValidators> {
587	let session_index = runtime.get_session_index_for_child(ctx.sender(), relay_parent).await?;
588	let info = &runtime
589		.get_session_info_by_index(ctx.sender(), relay_parent, session_index)
590		.await?
591		.session_info;
592	gum::debug!(target: LOG_TARGET, ?session_index, "Received session info");
593	let groups = &info.validator_groups;
594	let num_cores = groups.len();
595	let rotation_info = get_group_rotation_info(ctx.sender(), relay_parent).await?;
596
597	let current_group_index = rotation_info.group_for_core(core_index, num_cores);
598	let current_validators =
599		groups.get(current_group_index).map(|v| v.as_slice()).unwrap_or_default();
600
601	let validators = &info.discovery_keys;
602
603	let current_validators =
604		current_validators.iter().map(|i| validators[i.0 as usize].clone()).collect();
605
606	let current_validators = GroupValidators {
607		validators: current_validators,
608		session_index,
609		group_index: current_group_index,
610	};
611
612	Ok(current_validators)
613}
614
615/// Construct the declare message to be sent to validator.
616fn declare_message(
617	state: &mut State,
618) -> Option<CollationProtocols<protocol_v1::CollationProtocol, protocol_v2::CollationProtocol>> {
619	let para_id = state.collating_on?;
620	let declare_signature_payload = protocol_v2::declare_signature_payload(&state.local_peer_id);
621	let wire_message = protocol_v2::CollatorProtocolMessage::Declare(
622		state.collator_pair.public(),
623		para_id,
624		state.collator_pair.sign(&declare_signature_payload),
625	);
626	Some(CollationProtocols::V2(protocol_v2::CollationProtocol::CollatorProtocol(wire_message)))
627}
628
629/// Issue versioned `Declare` collation message to the given `peer`.
630#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
631async fn declare<Context>(ctx: &mut Context, state: &mut State, peer: &PeerId) {
632	if let Some(wire_message) = declare_message(state) {
633		ctx.send_message(NetworkBridgeTxMessage::SendCollationMessage(vec![*peer], wire_message))
634			.await;
635	}
636}
637
638/// Checks whether there are any core assignments for our para on any active relay chain leaves.
639fn has_assigned_cores(
640	implicit_view: &Option<ImplicitView>,
641	per_relay_parent: &HashMap<Hash, PerRelayParent>,
642) -> bool {
643	let Some(implicit_view) = implicit_view else { return false };
644
645	for leaf in implicit_view.leaves() {
646		if let Some(relay_parent) = per_relay_parent.get(leaf) {
647			if !relay_parent.assignments.is_empty() {
648				return true;
649			}
650		}
651	}
652
653	false
654}
655
656/// Updates a set of connected validators based on their advertisement-bits
657/// in a validators buffer.
658#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
659async fn connect_to_validators<Context>(
660	ctx: &mut Context,
661	cores_assigned: bool,
662	validator_groups_buf: &ValidatorGroupsBuffer,
663) {
664	// If no cores are assigned to the para, we still need to send a ConnectToValidators request to
665	// the network bridge passing an empty list of validator ids. Otherwise, it will keep connecting
666	// to the last requested validators until a new request is issued.
667	let validator_ids =
668		if cores_assigned { validator_groups_buf.validators_to_connect() } else { Vec::new() };
669
670	gum::trace!(
671		target: LOG_TARGET,
672		?cores_assigned,
673		"Sending connection request to validators: {:?}",
674		validator_ids,
675	);
676
677	// ignore address resolution failure
678	// will reissue a new request on new collation
679	let (failed, _) = oneshot::channel();
680	ctx.send_message(NetworkBridgeTxMessage::ConnectToValidators {
681		validator_ids,
682		peer_set: PeerSet::Collation,
683		failed,
684	})
685	.await;
686}
687
688/// Advertise collation to the given `peer`.
689///
690/// This will only advertise a collation if there exists at least one for the given
691/// `relay_parent` and the given `peer` is set as validator for our para at the given
692/// `relay_parent`.
693///
694/// We also make sure not to advertise the same collation multiple times to the same validator.
695#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
696async fn advertise_collation<Context>(
697	ctx: &mut Context,
698	relay_parent: Hash,
699	per_relay_parent: &mut PerRelayParent,
700	peer: &PeerId,
701	peer_ids: &HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,
702	advertisement_timeouts: &mut FuturesUnordered<ResetInterestTimeout>,
703	metrics: &Metrics,
704) {
705	for (candidate_hash, collation_and_core) in per_relay_parent.collations.iter_mut() {
706		let core_index = *collation_and_core.core_index();
707		let collation = collation_and_core.collation_mut();
708
709		let Some(validator_group) = per_relay_parent.validator_group.get_mut(&core_index) else {
710			gum::debug!(
711				target: LOG_TARGET,
712				?relay_parent,
713				?core_index,
714				"Skipping advertising to validator, validator group for core not found",
715			);
716			return
717		};
718
719		let should_advertise = validator_group.should_advertise_to(candidate_hash, peer_ids, &peer);
720		match should_advertise {
721			ShouldAdvertiseTo::Yes => {},
722			ShouldAdvertiseTo::NotAuthority | ShouldAdvertiseTo::AlreadyAdvertised => {
723				gum::trace!(
724					target: LOG_TARGET,
725					?relay_parent,
726					?candidate_hash,
727					peer_id = %peer,
728					reason = ?should_advertise,
729					"Not advertising collation"
730				);
731				continue
732			},
733		}
734
735		gum::debug!(
736			target: LOG_TARGET,
737			?relay_parent,
738			?candidate_hash,
739			peer_id = %peer,
740			"Advertising collation.",
741		);
742
743		collation.status.advance_to_advertised();
744
745		ctx.send_message(NetworkBridgeTxMessage::SendCollationMessage(
746			vec![*peer],
747			CollationProtocols::V2(protocol_v2::CollationProtocol::CollatorProtocol(
748				protocol_v2::CollatorProtocolMessage::AdvertiseCollation {
749					relay_parent,
750					candidate_hash: *candidate_hash,
751					parent_head_data_hash: collation.parent_head_data.hash(),
752				},
753			)),
754		))
755		.await;
756
757		validator_group.advertised_to_peer(candidate_hash, &peer_ids, peer);
758
759		advertisement_timeouts.push(ResetInterestTimeout::new(
760			*candidate_hash,
761			*peer,
762			RESET_INTEREST_TIMEOUT,
763		));
764
765		metrics.on_advertisement_made();
766	}
767}
768
769/// The main incoming message dispatching switch.
770#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
771async fn process_msg<Context>(
772	ctx: &mut Context,
773	runtime: &mut RuntimeInfo,
774	state: &mut State,
775	msg: CollatorProtocolMessage,
776) -> Result<()> {
777	use CollatorProtocolMessage::*;
778
779	match msg {
780		CollateOn(id) => {
781			state.collating_on = Some(id);
782			state.implicit_view = Some(ImplicitView::new(Some(id)));
783		},
784		DistributeCollation {
785			candidate_receipt,
786			parent_head_data_hash: _,
787			pov,
788			parent_head_data,
789			result_sender,
790			core_index,
791		} => {
792			match state.collating_on {
793				Some(id) if candidate_receipt.descriptor.para_id() != id => {
794					// If the ParaId of a collation requested to be distributed does not match
795					// the one we expect, we ignore the message.
796					gum::warn!(
797						target: LOG_TARGET,
798						para_id = %candidate_receipt.descriptor.para_id(),
799						collating_on = %id,
800						"DistributeCollation for unexpected para_id",
801					);
802				},
803				Some(id) => {
804					let _ = state.metrics.time_collation_distribution("distribute");
805					distribute_collation(
806						ctx,
807						runtime,
808						state,
809						id,
810						candidate_receipt,
811						pov,
812						parent_head_data,
813						result_sender,
814						core_index,
815					)
816					.await?;
817				},
818				None => {
819					gum::warn!(
820						target: LOG_TARGET,
821						para_id = %candidate_receipt.descriptor.para_id(),
822						"DistributeCollation message while not collating on any",
823					);
824				},
825			}
826		},
827		NetworkBridgeUpdate(event) => {
828			// We should count only this shoulder in the histogram, as other shoulders are just
829			// introducing noise
830			let _ = state.metrics.time_process_msg();
831
832			if let Err(e) = handle_network_msg(ctx, runtime, state, event).await {
833				gum::warn!(
834					target: LOG_TARGET,
835					err = ?e,
836					"Failed to handle incoming network message",
837				);
838			}
839		},
840		msg @ (Invalid(..) | Seconded(..)) => {
841			gum::warn!(
842				target: LOG_TARGET,
843				"{:?} message is not expected on the collator side of the protocol",
844				msg,
845			);
846		},
847	}
848
849	Ok(())
850}
851
852/// Issue a response to a previously requested collation.
853async fn send_collation(
854	state: &mut State,
855	request: VersionedCollationRequest,
856	receipt: CandidateReceipt,
857	pov: PoV,
858	parent_head_data: HeadData,
859) {
860	let (tx, rx) = oneshot::channel();
861
862	let relay_parent = request.relay_parent();
863	let peer_id = request.peer_id();
864	let candidate_hash = receipt.hash();
865
866	let result = Ok(request_v2::CollationFetchingResponse::CollationWithParentHeadData {
867		receipt,
868		pov,
869		parent_head_data,
870	});
871
872	let response =
873		OutgoingResponse { result, reputation_changes: Vec::new(), sent_feedback: Some(tx) };
874
875	if let Err(_) = request.send_outgoing_response(response) {
876		gum::warn!(target: LOG_TARGET, "Sending collation response failed");
877	}
878
879	state.active_collation_fetches.push(
880		async move {
881			let r = rx.timeout(MAX_UNSHARED_UPLOAD_TIME).await;
882			let timed_out = r.is_none();
883
884			CollationSendResult { relay_parent, candidate_hash, peer_id, timed_out }
885		}
886		.boxed(),
887	);
888
889	state.metrics.on_collation_sent();
890}
891
892/// A networking messages switch.
893#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
894async fn handle_incoming_peer_message<Context>(
895	ctx: &mut Context,
896	runtime: &mut RuntimeInfo,
897	state: &mut State,
898	origin: PeerId,
899	msg: CollationProtocols<
900		protocol_v1::CollatorProtocolMessage,
901		protocol_v2::CollatorProtocolMessage,
902	>,
903) -> Result<()> {
904	use protocol_v1::CollatorProtocolMessage as V1;
905	use protocol_v2::CollatorProtocolMessage as V2;
906
907	match msg {
908		CollationProtocols::V1(V1::Declare(..)) | CollationProtocols::V2(V2::Declare(..)) => {
909			gum::trace!(
910				target: LOG_TARGET,
911				?origin,
912				"Declare message is not expected on the collator side of the protocol",
913			);
914
915			// If we are declared to, this is another collator, and we should disconnect.
916			ctx.send_message(NetworkBridgeTxMessage::DisconnectPeers(
917				vec![origin],
918				PeerSet::Collation,
919			))
920			.await;
921		},
922		CollationProtocols::V1(V1::AdvertiseCollation(_)) |
923		CollationProtocols::V2(V2::AdvertiseCollation { .. }) => {
924			gum::trace!(
925				target: LOG_TARGET,
926				?origin,
927				"AdvertiseCollation message is not expected on the collator side of the protocol",
928			);
929
930			modify_reputation(&mut state.reputation, ctx.sender(), origin, COST_UNEXPECTED_MESSAGE)
931				.await;
932
933			// If we are advertised to, this is another collator, and we should disconnect.
934			ctx.send_message(NetworkBridgeTxMessage::DisconnectPeers(
935				vec![origin],
936				PeerSet::Collation,
937			))
938			.await;
939		},
940		CollationProtocols::V1(V1::CollationSeconded(relay_parent, statement)) => {
941			// Impossible, we no longer accept connections on v1.
942			gum::warn!(
943				target: LOG_TARGET,
944				?statement,
945				?origin,
946				?relay_parent,
947				"Collation seconded message received on unsupported protocol version 1",
948			);
949		},
950		CollationProtocols::V2(V2::CollationSeconded(relay_parent, statement)) => {
951			if !matches!(statement.unchecked_payload(), Statement::Seconded(_)) {
952				gum::warn!(
953					target: LOG_TARGET,
954					?statement,
955					?origin,
956					"Collation seconded message received with none-seconded statement.",
957				);
958			} else {
959				let statement = runtime
960					.check_signature(ctx.sender(), relay_parent, statement)
961					.await?
962					.map_err(Error::InvalidStatementSignature)?;
963
964				let removed =
965					state.collation_result_senders.remove(&statement.payload().candidate_hash());
966
967				if let Some(sender) = removed {
968					gum::trace!(
969						target: LOG_TARGET,
970						?statement,
971						?origin,
972						"received a valid `CollationSeconded`, forwarding result to collator",
973					);
974					let _ = sender.send(CollationSecondedSignal { statement, relay_parent });
975				} else {
976					// Checking whether the `CollationSeconded` statement is unexpected
977					let relay_parent = match state.per_relay_parent.get(&relay_parent) {
978						Some(per_relay_parent) => per_relay_parent,
979						None => {
980							gum::debug!(
981								target: LOG_TARGET,
982								candidate_relay_parent = %relay_parent,
983								candidate_hash = ?&statement.payload().candidate_hash(),
984								"Seconded statement relay parent is out of our view",
985							);
986							return Ok(())
987						},
988					};
989					match relay_parent.collations.get(&statement.payload().candidate_hash()) {
990						Some(_) => {
991							// We've seen this collation before, so a seconded statement is expected
992							gum::trace!(
993								target: LOG_TARGET,
994								?statement,
995								?origin,
996								"received a valid `CollationSeconded`",
997							);
998						},
999						None => {
1000							gum::debug!(
1001								target: LOG_TARGET,
1002								candidate_hash = ?&statement.payload().candidate_hash(),
1003								?origin,
1004								"received an unexpected `CollationSeconded`: unknown statement",
1005							);
1006						},
1007					}
1008				}
1009			}
1010		},
1011	}
1012
1013	Ok(())
1014}
1015
1016/// Process an incoming network request for a collation.
1017#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
1018async fn handle_incoming_request<Context>(
1019	ctx: &mut Context,
1020	state: &mut State,
1021	req: std::result::Result<VersionedCollationRequest, incoming::Error>,
1022) -> Result<()> {
1023	let req = req?;
1024	let relay_parent = req.relay_parent();
1025	let peer_id = req.peer_id();
1026	let para_id = req.para_id();
1027
1028	match state.collating_on {
1029		Some(our_para_id) if our_para_id == para_id => {
1030			let per_relay_parent = match state.per_relay_parent.get_mut(&relay_parent) {
1031				Some(per_relay_parent) => per_relay_parent,
1032				None => {
1033					gum::debug!(
1034						target: LOG_TARGET,
1035						relay_parent = %relay_parent,
1036						"received a `RequestCollation` for a relay parent out of our view",
1037					);
1038
1039					return Ok(())
1040				},
1041			};
1042
1043			let collation_with_core = match &req {
1044				VersionedCollationRequest::V2(req) =>
1045					per_relay_parent.collations.get_mut(&req.payload.candidate_hash),
1046			};
1047			let (receipt, pov, parent_head_data) =
1048				if let Some(collation_with_core) = collation_with_core {
1049					let collation = collation_with_core.collation_mut();
1050					collation.status.advance_to_requested();
1051					(
1052						collation.receipt.clone(),
1053						collation.pov.clone(),
1054						collation.parent_head_data.clone(),
1055					)
1056				} else {
1057					gum::warn!(
1058						target: LOG_TARGET,
1059						relay_parent = %relay_parent,
1060						"received a `RequestCollation` for a relay parent we don't have collation stored.",
1061					);
1062
1063					return Ok(())
1064				};
1065
1066			state.metrics.on_collation_sent_requested();
1067
1068			let waiting = state.waiting_collation_fetches.entry(relay_parent).or_default();
1069			let candidate_hash = receipt.hash();
1070
1071			if !waiting.waiting_peers.insert((peer_id, candidate_hash)) {
1072				gum::debug!(
1073					target: LOG_TARGET,
1074					"Dropping incoming request as peer has a request in flight already."
1075				);
1076				modify_reputation(
1077					&mut state.reputation,
1078					ctx.sender(),
1079					peer_id,
1080					COST_APPARENT_FLOOD.into(),
1081				)
1082				.await;
1083				return Ok(())
1084			}
1085
1086			if waiting.collation_fetch_active {
1087				waiting.req_queue.push_back(req);
1088			} else {
1089				waiting.collation_fetch_active = true;
1090				// Obtain a timer for sending collation
1091				let _ = state.metrics.time_collation_distribution("send");
1092
1093				send_collation(state, req, receipt, pov, parent_head_data).await;
1094			}
1095		},
1096		Some(our_para_id) => {
1097			gum::warn!(
1098				target: LOG_TARGET,
1099				for_para_id = %para_id,
1100				our_para_id = %our_para_id,
1101				"received a `CollationFetchingRequest` for unexpected para_id",
1102			);
1103		},
1104		None => {
1105			gum::warn!(
1106				target: LOG_TARGET,
1107				for_para_id = %para_id,
1108				"received a `RequestCollation` while not collating on any para",
1109			);
1110		},
1111	}
1112	Ok(())
1113}
1114
1115/// Peer's view has changed. Send advertisements for new relay parents
1116/// if there're any.
1117#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
1118async fn handle_peer_view_change<Context>(
1119	ctx: &mut Context,
1120	state: &mut State,
1121	peer_id: PeerId,
1122	view: View,
1123) {
1124	let Some(PeerData { view: current, unknown_heads }) = state.peer_data.get_mut(&peer_id) else {
1125		return
1126	};
1127
1128	let added: Vec<Hash> = view.difference(&*current).cloned().collect();
1129
1130	*current = view;
1131
1132	for added in added.into_iter() {
1133		let block_hashes = match state.per_relay_parent.contains_key(&added) {
1134			true => state
1135				.implicit_view
1136				.as_ref()
1137				.and_then(|implicit_view| {
1138					implicit_view.known_allowed_relay_parents_under(&added, state.collating_on)
1139				})
1140				.unwrap_or_default(),
1141			false => {
1142				gum::trace!(
1143					target: LOG_TARGET,
1144					?peer_id,
1145					new_leaf = ?added,
1146					"New leaf in peer's view is unknown",
1147				);
1148
1149				unknown_heads.insert(added, ());
1150
1151				continue
1152			},
1153		};
1154
1155		for block_hash in block_hashes {
1156			let Some(per_relay_parent) = state.per_relay_parent.get_mut(block_hash) else {
1157				continue
1158			};
1159
1160			advertise_collation(
1161				ctx,
1162				*block_hash,
1163				per_relay_parent,
1164				&peer_id,
1165				&state.peer_ids,
1166				&mut state.advertisement_timeouts,
1167				&state.metrics,
1168			)
1169			.await;
1170		}
1171	}
1172}
1173
1174/// Bridge messages switch.
1175#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
1176async fn handle_network_msg<Context>(
1177	ctx: &mut Context,
1178	runtime: &mut RuntimeInfo,
1179	state: &mut State,
1180	bridge_message: NetworkBridgeEvent<net_protocol::CollatorProtocolMessage>,
1181) -> Result<()> {
1182	use NetworkBridgeEvent::*;
1183
1184	match bridge_message {
1185		PeerConnected(peer_id, observed_role, protocol_version, maybe_authority) => {
1186			// If it is possible that a disconnected validator would attempt a reconnect
1187			// it should be handled here.
1188			gum::trace!(target: LOG_TARGET, ?peer_id, ?observed_role, ?maybe_authority, "Peer connected");
1189
1190			let version: CollationVersion = match protocol_version.try_into() {
1191				Ok(version) => version,
1192				Err(err) => {
1193					// Network bridge is expected to handle this.
1194					gum::error!(
1195						target: LOG_TARGET,
1196						?peer_id,
1197						?observed_role,
1198						?err,
1199						"Unsupported protocol version"
1200					);
1201					return Ok(())
1202				},
1203			};
1204			if version == CollationVersion::V1 {
1205				gum::warn!(
1206					target: LOG_TARGET,
1207					?peer_id,
1208					?observed_role,
1209					"Unsupported protocol version v1"
1210				);
1211
1212				// V1 no longer supported, we should disconnect.
1213				ctx.send_message(NetworkBridgeTxMessage::DisconnectPeers(
1214					vec![peer_id],
1215					PeerSet::Collation,
1216				))
1217				.await;
1218				return Ok(())
1219			}
1220
1221			state.peer_data.entry(peer_id).or_insert_with(|| PeerData {
1222				view: View::default(),
1223				// Unlikely that the collator is falling 10 blocks behind and if so, it probably is
1224				// not able to keep up any way.
1225				unknown_heads: LruMap::new(ByLength::new(10)),
1226			});
1227
1228			if let Some(authority_ids) = maybe_authority {
1229				gum::trace!(
1230					target: LOG_TARGET,
1231					?authority_ids,
1232					?peer_id,
1233					"Connected to requested validator"
1234				);
1235				state.peer_ids.insert(peer_id, authority_ids);
1236
1237				declare(ctx, state, &peer_id).await;
1238			}
1239		},
1240		PeerViewChange(peer_id, view) => {
1241			gum::trace!(target: LOG_TARGET, ?peer_id, ?view, "Peer view change");
1242			handle_peer_view_change(ctx, state, peer_id, view).await;
1243		},
1244		PeerDisconnected(peer_id) => {
1245			gum::trace!(target: LOG_TARGET, ?peer_id, "Peer disconnected");
1246			state.peer_data.remove(&peer_id);
1247			state.peer_ids.remove(&peer_id);
1248		},
1249		OurViewChange(view) => {
1250			gum::trace!(target: LOG_TARGET, ?view, "Own view change");
1251			handle_our_view_change(ctx, state, view, runtime).await?;
1252		},
1253		PeerMessage(remote, msg) => {
1254			handle_incoming_peer_message(ctx, runtime, state, remote, msg).await?;
1255		},
1256		UpdatedAuthorityIds(peer_id, authority_ids) => {
1257			gum::trace!(target: LOG_TARGET, ?peer_id, ?authority_ids, "Updated authority ids");
1258			if state.peer_data.contains_key(&peer_id) {
1259				if state.peer_ids.insert(peer_id, authority_ids).is_none() {
1260					declare(ctx, state, &peer_id).await;
1261				}
1262			}
1263		},
1264		NewGossipTopology { .. } => {
1265			// impossible!
1266		},
1267	}
1268
1269	Ok(())
1270}
1271
1272/// Update collation tracker with the backed and included candidates.
1273#[overseer::contextbounds(CollatorProtocol, prefix = crate::overseer)]
1274async fn process_block_events<Context>(
1275	ctx: &mut Context,
1276	collation_tracker: &mut CollationTracker,
1277	leaf: Hash,
1278	maybe_block_number: Option<BlockNumber>,
1279	para_id: ParaId,
1280	metrics: &Metrics,
1281) {
1282	if let Ok(events) = get_candidate_events(ctx.sender(), leaf).await {
1283		let Some(block_number) = maybe_block_number else {
1284			// This should not happen. If it does this log message explains why
1285			// metrics and logs are missing for the candidates under this block.
1286			gum::debug!(
1287				target: crate::LOG_TARGET_STATS,
1288				relay_block = ?leaf,
1289				?para_id,
1290				"Failed to get relay chain block number",
1291			);
1292			return
1293		};
1294
1295		for ev in events {
1296			match ev {
1297				CandidateEvent::CandidateIncluded(receipt, _, _, _) => {
1298					if receipt.descriptor.para_id() != para_id {
1299						continue
1300					}
1301					collation_tracker.collation_included(block_number, leaf, receipt, metrics);
1302				},
1303				CandidateEvent::CandidateBacked(receipt, _, _, _) => {
1304					if receipt.descriptor.para_id() != para_id {
1305						continue
1306					}
1307
1308					let Some(block_number) = maybe_block_number else { continue };
1309					let Some(stats) =
1310						collation_tracker.collation_backed(block_number, leaf, receipt, metrics)
1311					else {
1312						continue
1313					};
1314
1315					// Continue measuring inclusion latency.
1316					collation_tracker.track(stats);
1317				},
1318				_ => {
1319					// do not care about other events
1320				},
1321			}
1322		}
1323	}
1324}
1325
1326/// Handles our view changes.
1327#[overseer::contextbounds(CollatorProtocol, prefix = crate::overseer)]
1328async fn handle_our_view_change<Context>(
1329	ctx: &mut Context,
1330	state: &mut State,
1331	view: OurView,
1332	runtime_info: &mut RuntimeInfo,
1333) -> Result<()> {
1334	let Some(implicit_view) = &mut state.implicit_view else { return Ok(()) };
1335	let Some(para_id) = state.collating_on else { return Ok(()) };
1336
1337	let removed: Vec<_> = implicit_view.leaves().filter(|h| !view.contains(h)).copied().collect();
1338	let added: Vec<_> = view.iter().filter(|h| !implicit_view.contains_leaf(h)).collect();
1339
1340	for leaf in added {
1341		let session_index = runtime_info.get_session_index_for_child(ctx.sender(), *leaf).await?;
1342
1343		let claim_queue = fetch_claim_queue(ctx.sender(), *leaf).await?;
1344
1345		implicit_view
1346			.activate_leaf(ctx.sender(), *leaf)
1347			.await
1348			.map_err(Error::ImplicitViewFetchError)?;
1349
1350		let block_number = implicit_view.block_number(leaf);
1351		state
1352			.per_relay_parent
1353			.insert(*leaf, PerRelayParent::new(para_id, claim_queue, block_number, session_index));
1354
1355		process_block_events(
1356			ctx,
1357			&mut state.collation_tracker,
1358			*leaf,
1359			block_number,
1360			para_id,
1361			&state.metrics,
1362		)
1363		.await;
1364		let allowed_ancestry = implicit_view
1365			.known_allowed_relay_parents_under(leaf, state.collating_on)
1366			.unwrap_or_default();
1367
1368		// Get the peers that already reported us this head, but we didn't know it at this
1369		// point.
1370		let peers = state
1371			.peer_data
1372			.iter_mut()
1373			.filter_map(|(id, data)| data.unknown_heads.remove(leaf).map(|_| id))
1374			.collect::<Vec<_>>();
1375
1376		for block_hash in allowed_ancestry {
1377			let block_number = implicit_view.block_number(block_hash);
1378
1379			let per_relay_parent = match state.per_relay_parent.entry(*block_hash) {
1380				Entry::Vacant(entry) => {
1381					let claim_queue = match fetch_claim_queue(ctx.sender(), *block_hash).await {
1382						Ok(cq) => cq,
1383						Err(error) => {
1384							gum::debug!(
1385								target: LOG_TARGET,
1386								?block_hash,
1387								?error,
1388								"Failed to fetch claim queue while iterating allowed ancestry",
1389							);
1390							continue
1391						},
1392					};
1393					let session_index =
1394						match runtime_info.get_session_index_for_child(ctx.sender(), *leaf).await {
1395							Ok(si) => si,
1396							Err(error) => {
1397								gum::debug!(
1398									target: LOG_TARGET,
1399									?block_hash,
1400									?error,
1401									"Failed to fetch session index while iterating allowed ancestry",
1402								);
1403								continue
1404							},
1405						};
1406
1407					entry.insert(PerRelayParent::new(
1408						para_id,
1409						claim_queue,
1410						block_number,
1411						session_index,
1412					))
1413				},
1414				Entry::Occupied(entry) => entry.into_mut(),
1415			};
1416
1417			// Announce relevant collations to these peers.
1418			for peer_id in &peers {
1419				advertise_collation(
1420					ctx,
1421					*block_hash,
1422					per_relay_parent,
1423					&peer_id,
1424					&state.peer_ids,
1425					&mut state.advertisement_timeouts,
1426					&state.metrics,
1427				)
1428				.await;
1429			}
1430		}
1431	}
1432
1433	let highest_session_index = state.per_relay_parent.values().map(|pr| pr.session_index).max();
1434
1435	for leaf in removed {
1436		// If the leaf is deactivated it still may stay in the view as a part
1437		// of implicit ancestry. Only update the state after the hash is actually
1438		// pruned from the block info storage.
1439		let maybe_block_number = implicit_view.block_number(&leaf);
1440		let pruned = implicit_view.deactivate_leaf(leaf);
1441
1442		for removed in &pruned {
1443			gum::debug!(
1444				target: LOG_TARGET,
1445				relay_parent = ?removed,
1446				"Removing relay parent because our view changed.",
1447			);
1448
1449			if let Some(block_number) = maybe_block_number {
1450				let expired_collations = state.collation_tracker.drain_expired(block_number);
1451				process_expired_collations(expired_collations, *removed, para_id, &state.metrics);
1452			}
1453
1454			// Get all the collations built on top of the removed leaf.
1455			let collations = state
1456				.per_relay_parent
1457				.remove(removed)
1458				.map(|per_relay_parent| per_relay_parent.collations)
1459				.unwrap_or_default();
1460
1461			for collation_with_core in collations.into_values() {
1462				let collation = collation_with_core.collation();
1463				let candidate_hash = collation.receipt.hash();
1464
1465				state.collation_result_senders.remove(&candidate_hash);
1466				state.validator_groups_buf.remove_candidate(&candidate_hash);
1467
1468				process_out_of_view_collation(
1469					&mut state.collation_tracker,
1470					collation_with_core,
1471					highest_session_index,
1472				);
1473			}
1474
1475			state.waiting_collation_fetches.remove(removed);
1476		}
1477	}
1478	Ok(())
1479}
1480
1481fn process_out_of_view_collation(
1482	collation_tracker: &mut CollationTracker,
1483	mut collation_with_core: CollationData,
1484	highest_session_index: Option<SessionIndex>,
1485) {
1486	let is_same_session =
1487		highest_session_index.map_or(true, |hs| hs == collation_with_core.session_index);
1488	let collation = collation_with_core.collation_mut();
1489	let candidate_hash = collation.receipt.hash();
1490
1491	match collation.status {
1492		CollationStatus::Created =>
1493			if is_same_session {
1494				gum::warn!(
1495					target: LOG_TARGET,
1496					?candidate_hash,
1497					pov_hash = ?collation.pov.hash(),
1498					"Collation wasn't advertised to any validator.",
1499				)
1500			} else {
1501				gum::debug!(
1502					target: LOG_TARGET,
1503					?candidate_hash,
1504					pov_hash = ?collation.pov.hash(),
1505					"Collation wasn't advertised because it was built on a relay chain block that is now part of an old session.",
1506				)
1507			},
1508		CollationStatus::Advertised => gum::debug!(
1509			target: LOG_TARGET,
1510			?candidate_hash,
1511			pov_hash = ?collation.pov.hash(),
1512			"Collation was advertised but not requested by any validator.",
1513		),
1514		CollationStatus::Requested => {
1515			gum::debug!(
1516				target: LOG_TARGET,
1517				?candidate_hash,
1518				pov_hash = ?collation.pov.hash(),
1519				"Collation was requested.",
1520			);
1521		},
1522	}
1523
1524	let collation_status = collation.status.clone();
1525	let Some(mut stats) = collation_with_core.take_stats() else { return };
1526
1527	// If the collation stats are still available, it means it was never
1528	// succesfully fetched, even if a fetch request was received, but not succeed.
1529	//
1530	// Will expire in it's current state at the next block import.
1531	stats.set_pre_backing_status(collation_status);
1532	collation_tracker.track(stats);
1533}
1534
1535fn process_expired_collations(
1536	expired_collations: Vec<CollationStats>,
1537	removed: Hash,
1538	para_id: ParaId,
1539	metrics: &Metrics,
1540) {
1541	for expired_collation in expired_collations {
1542		let collation_state = if expired_collation.fetch_latency().is_none() {
1543			// If collation was not fetched, we rely on the status provided
1544			// by the collator protocol.
1545			expired_collation.pre_backing_status().label()
1546		} else if expired_collation.backed().is_none() {
1547			"fetched"
1548		} else if expired_collation.included().is_none() {
1549			"backed"
1550		} else {
1551			"none"
1552		};
1553
1554		let age = expired_collation.expired().unwrap_or_default();
1555		gum::debug!(
1556			target: crate::LOG_TARGET_STATS,
1557			?age,
1558			?collation_state,
1559			relay_parent = ?removed,
1560			?para_id,
1561			head = ?expired_collation.head(),
1562			"Collation expired",
1563		);
1564
1565		metrics.on_collation_expired(age as f64, collation_state);
1566	}
1567}
1568
1569/// The collator protocol collator side main loop.
1570#[overseer::contextbounds(CollatorProtocol, prefix = crate::overseer)]
1571pub(crate) async fn run<Context>(
1572	ctx: Context,
1573	local_peer_id: PeerId,
1574	collator_pair: CollatorPair,
1575	req_v2_receiver: IncomingRequestReceiver<request_v2::CollationFetchingRequest>,
1576	metrics: Metrics,
1577) -> std::result::Result<(), FatalError> {
1578	run_inner(
1579		ctx,
1580		local_peer_id,
1581		collator_pair,
1582		req_v2_receiver,
1583		metrics,
1584		ReputationAggregator::default(),
1585		REPUTATION_CHANGE_INTERVAL,
1586	)
1587	.await
1588}
1589
1590#[overseer::contextbounds(CollatorProtocol, prefix = crate::overseer)]
1591async fn run_inner<Context>(
1592	mut ctx: Context,
1593	local_peer_id: PeerId,
1594	collator_pair: CollatorPair,
1595	mut req_v2_receiver: IncomingRequestReceiver<request_v2::CollationFetchingRequest>,
1596	metrics: Metrics,
1597	reputation: ReputationAggregator,
1598	reputation_interval: Duration,
1599) -> std::result::Result<(), FatalError> {
1600	use OverseerSignal::*;
1601
1602	let new_reputation_delay = || futures_timer::Delay::new(reputation_interval).fuse();
1603	let mut reputation_delay = new_reputation_delay();
1604
1605	let mut state = State::new(local_peer_id, collator_pair, metrics.clone(), reputation);
1606	let mut runtime = RuntimeInfo::new(None);
1607
1608	loop {
1609		let reputation_changes = || vec![COST_INVALID_REQUEST];
1610		let recv_req_v2 = req_v2_receiver.recv(reputation_changes).fuse();
1611		pin_mut!(recv_req_v2);
1612
1613		let mut reconnect_timeout = &mut state.reconnect_timeout;
1614		select! {
1615			_ = reputation_delay => {
1616				state.reputation.send(ctx.sender()).await;
1617				reputation_delay = new_reputation_delay();
1618			},
1619			msg = ctx.recv().fuse() => match msg.map_err(FatalError::SubsystemReceive)? {
1620				FromOrchestra::Communication { msg } => {
1621					log_error(
1622						process_msg(&mut ctx, &mut runtime, &mut state, msg).await,
1623						"Failed to process message"
1624					)?;
1625				},
1626				FromOrchestra::Signal(ActiveLeaves(update)) => {
1627					if update.activated.is_some() {
1628						*reconnect_timeout = futures_timer::Delay::new(RECONNECT_AFTER_LEAF_TIMEOUT).fuse();
1629					}
1630				}
1631				FromOrchestra::Signal(BlockFinalized(..)) => {}
1632				FromOrchestra::Signal(Conclude) => return Ok(()),
1633			},
1634			CollationSendResult { relay_parent, candidate_hash, peer_id, timed_out } =
1635				state.active_collation_fetches.select_next_some() => {
1636
1637				let next = if let Some(waiting) = state.waiting_collation_fetches.get_mut(&relay_parent) {
1638					if timed_out {
1639						gum::debug!(
1640							target: LOG_TARGET_STATS,
1641							?relay_parent,
1642							?peer_id,
1643							?candidate_hash,
1644							"Sending collation to validator timed out, carrying on with next validator."
1645						);
1646						// We try to throttle requests per relay parent to give validators
1647						// more bandwidth, but if the collation is not received within the
1648						// timeout, we simply start processing next request.
1649						// The request it still alive, it should be kept in a waiting queue.
1650					} else {
1651						for authority_id in state.peer_ids.get(&peer_id).into_iter().flatten() {
1652							// This peer has received the candidate. Not interested anymore.
1653							state.validator_groups_buf.reset_validator_interest(candidate_hash, authority_id);
1654						}
1655						waiting.waiting_peers.remove(&(peer_id, candidate_hash));
1656
1657						// Update collation status to fetched.
1658						if let Some(per_relay_parent) =  state.per_relay_parent.get_mut(&relay_parent) {
1659							if let Some(collation_with_core) = per_relay_parent.collations.get_mut(&candidate_hash) {
1660								let maybe_stats = collation_with_core.take_stats();
1661								let our_para_id = collation_with_core.collation().receipt.descriptor.para_id();
1662
1663								if let Some(mut stats) = maybe_stats {
1664									// Update the timestamp when collation has been sent (from subsysytem perspective)
1665									stats.set_fetched_at(std::time::Instant::now());
1666									gum::debug!(
1667										target: LOG_TARGET_STATS,
1668										para_head = ?stats.head(),
1669										%our_para_id,
1670										"Collation fetch latency is {}ms",
1671										stats.fetch_latency().unwrap_or_default().as_millis(),
1672									);
1673
1674									// Update the pre-backing status. Should be requested at this point.
1675									stats.set_pre_backing_status(collation_with_core.collation().status.clone());
1676									debug_assert_eq!(collation_with_core.collation().status, CollationStatus::Requested);
1677
1678									// Observe fetch latency metric.
1679									stats.take_fetch_latency_metric();
1680									stats.set_backed_latency_metric(metrics.time_collation_backing_latency());
1681
1682									// Next step is to measure backing latency.
1683									state.collation_tracker.track(stats);
1684								}
1685							}
1686						}
1687					}
1688
1689					if let Some(next) = waiting.req_queue.pop_front() {
1690						next
1691					} else {
1692						waiting.collation_fetch_active = false;
1693						continue
1694					}
1695				} else {
1696					// No waiting collation fetches means we already removed the relay parent from our view.
1697					continue
1698				};
1699
1700				let next_collation_with_core = {
1701					let per_relay_parent = match state.per_relay_parent.get(&relay_parent) {
1702						Some(per_relay_parent) => per_relay_parent,
1703						None => continue,
1704					};
1705
1706					per_relay_parent.collations.get(&next.candidate_hash())
1707				};
1708
1709				if let Some(collation_with_core) = next_collation_with_core {
1710					let collation = collation_with_core.collation();
1711					let receipt = collation.receipt.clone();
1712					let pov = collation.pov.clone();
1713					let parent_head_data = collation.parent_head_data.clone();
1714
1715					send_collation(&mut state, next, receipt, pov, parent_head_data).await;
1716				}
1717			},
1718			(candidate_hash, peer_id) = state.advertisement_timeouts.select_next_some() => {
1719				// NOTE: it doesn't necessarily mean that a validator gets disconnected,
1720				// it only will if there're no other advertisements we want to send.
1721				//
1722				// No-op if the collation was already fetched or went out of view.
1723				for authority_id in state.peer_ids.get(&peer_id).into_iter().flatten() {
1724					state
1725						.validator_groups_buf
1726						.reset_validator_interest(candidate_hash, &authority_id);
1727				}
1728			}
1729			_ = reconnect_timeout => {
1730				let cores_assigned = has_assigned_cores(&state.implicit_view, &state.per_relay_parent);
1731				connect_to_validators(&mut ctx, cores_assigned, &state.validator_groups_buf).await;
1732
1733				gum::trace!(
1734					target: LOG_TARGET,
1735					timeout = ?RECONNECT_AFTER_LEAF_TIMEOUT,
1736					"Peer-set updated due to a timeout"
1737				);
1738			},
1739			in_req = recv_req_v2 => {
1740				let request = in_req.map(VersionedCollationRequest::from);
1741
1742				log_error(
1743					handle_incoming_request(&mut ctx, &mut state, request).await,
1744					"Handling incoming collation fetch request V2"
1745				)?;
1746			}
1747		}
1748	}
1749}