1use std::{
18 collections::{hash_map::Entry, HashMap, HashSet},
19 time::Duration,
20};
21
22use bitvec::{bitvec, vec::BitVec};
23use futures::{
24 channel::oneshot, future::Fuse, pin_mut, select, stream::FuturesUnordered, FutureExt, StreamExt,
25};
26use metrics::{CollationStats, CollationTracker};
27use schnellru::{ByLength, LruMap};
28use sp_core::Pair;
29
30use polkadot_node_network_protocol::{
31 self as net_protocol,
32 peer_set::{CollationVersion, PeerSet},
33 request_response::{
34 incoming::{self, OutgoingResponse},
35 v2 as request_v2, IncomingRequestReceiver,
36 },
37 v1 as protocol_v1, v2 as protocol_v2, CollationProtocols, OurView, PeerId,
38 UnifiedReputationChange as Rep, View,
39};
40use polkadot_node_primitives::{CollationSecondedSignal, PoV, Statement};
41use polkadot_node_subsystem::{
42 messages::{CollatorProtocolMessage, NetworkBridgeEvent, NetworkBridgeTxMessage},
43 overseer, FromOrchestra, OverseerSignal,
44};
45use polkadot_node_subsystem_util::{
46 backing_implicit_view::View as ImplicitView,
47 reputation::{ReputationAggregator, REPUTATION_CHANGE_INTERVAL},
48 runtime::{
49 fetch_claim_queue, get_candidate_events, get_group_rotation_info, ClaimQueueSnapshot,
50 RuntimeInfo,
51 },
52 TimeoutExt,
53};
54use polkadot_primitives::{
55 AuthorityDiscoveryId, BlockNumber, CandidateEvent, CandidateHash,
56 CandidateReceiptV2 as CandidateReceipt, CollatorPair, CoreIndex, GroupIndex, Hash, HeadData,
57 Id as ParaId, SessionIndex,
58};
59
60use crate::{modify_reputation, LOG_TARGET, LOG_TARGET_STATS};
61
62mod collation;
63mod error;
64mod metrics;
65#[cfg(test)]
66mod tests;
67mod validators_buffer;
68
69use collation::{
70 ActiveCollationFetches, Collation, CollationSendResult, CollationStatus,
71 VersionedCollationRequest, WaitingCollationFetches,
72};
73use error::{log_error, Error, FatalError, Result};
74use validators_buffer::{
75 ResetInterestTimeout, ValidatorGroupsBuffer, RESET_INTEREST_TIMEOUT, VALIDATORS_BUFFER_CAPACITY,
76};
77
78pub use metrics::Metrics;
79
80const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Peer sent unparsable request");
81const COST_UNEXPECTED_MESSAGE: Rep = Rep::CostMinor("An unexpected message");
82const COST_APPARENT_FLOOD: Rep =
83 Rep::CostMinor("Message received when previous one was still being processed");
84
85const MAX_UNSHARED_UPLOAD_TIME: Duration = Duration::from_millis(150);
92
93const RECONNECT_AFTER_LEAF_TIMEOUT: Duration = Duration::from_secs(4);
105
106type ReconnectTimeout = Fuse<futures_timer::Delay>;
112
113#[derive(Debug)]
114enum ShouldAdvertiseTo {
115 Yes,
116 NotAuthority,
117 AlreadyAdvertised,
118}
119
120#[derive(Debug, Default)]
124struct ValidatorGroup {
125 validators: Vec<AuthorityDiscoveryId>,
128
129 advertised_to: HashMap<CandidateHash, BitVec>,
132}
133
134impl ValidatorGroup {
135 fn should_advertise_to(
137 &self,
138 candidate_hash: &CandidateHash,
139 peer_ids: &HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,
140 peer: &PeerId,
141 ) -> ShouldAdvertiseTo {
142 let authority_ids = match peer_ids.get(peer) {
143 Some(authority_ids) => authority_ids,
144 None => return ShouldAdvertiseTo::NotAuthority,
145 };
146
147 for id in authority_ids {
148 let validator_index = match self.validators.iter().position(|v| v == id) {
152 Some(idx) => idx,
153 None => continue,
154 };
155
156 if self
159 .advertised_to
160 .get(candidate_hash)
161 .map_or(true, |advertised| !advertised[validator_index])
162 {
163 return ShouldAdvertiseTo::Yes
164 } else {
165 return ShouldAdvertiseTo::AlreadyAdvertised
166 }
167 }
168
169 ShouldAdvertiseTo::NotAuthority
170 }
171
172 fn advertised_to_peer(
174 &mut self,
175 candidate_hash: &CandidateHash,
176 peer_ids: &HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,
177 peer: &PeerId,
178 ) {
179 if let Some(authority_ids) = peer_ids.get(peer) {
180 for id in authority_ids {
181 let validator_index = match self.validators.iter().position(|v| v == id) {
182 Some(idx) => idx,
183 None => continue,
184 };
185 self.advertised_to
186 .entry(*candidate_hash)
187 .or_insert_with(|| bitvec![0; self.validators.len()])
188 .set(validator_index, true);
189 }
190 }
191 }
192}
193
194#[derive(Debug)]
195struct PeerData {
196 view: View,
198 unknown_heads: LruMap<Hash, (), ByLength>,
203}
204
205struct CollationData {
207 collation: Collation,
208 core_index: CoreIndex,
209 stats: Option<CollationStats>,
210 session_index: SessionIndex,
211}
212
213impl CollationData {
214 pub fn collation(&self) -> &Collation {
216 &self.collation
217 }
218
219 pub fn collation_mut(&mut self) -> &mut Collation {
221 &mut self.collation
222 }
223
224 pub fn core_index(&self) -> &CoreIndex {
226 &self.core_index
227 }
228
229 pub fn take_stats(&mut self) -> Option<CollationStats> {
231 self.stats.take()
232 }
233}
234
235struct PerRelayParent {
236 validator_group: HashMap<CoreIndex, ValidatorGroup>,
239 collations: HashMap<CandidateHash, CollationData>,
241 assignments: HashMap<CoreIndex, usize>,
243 block_number: Option<BlockNumber>,
245 session_index: SessionIndex,
247}
248
249impl PerRelayParent {
250 fn new(
251 para_id: ParaId,
252 claim_queue: ClaimQueueSnapshot,
253 block_number: Option<BlockNumber>,
254 session_index: SessionIndex,
255 ) -> Self {
256 let assignments =
257 claim_queue.iter_all_claims().fold(HashMap::new(), |mut acc, (core, claims)| {
258 let n_claims = claims.iter().filter(|para| para == &¶_id).count();
259 if n_claims > 0 {
260 acc.insert(*core, n_claims);
261 }
262 acc
263 });
264
265 Self {
266 validator_group: HashMap::default(),
267 collations: HashMap::new(),
268 assignments,
269 block_number,
270 session_index,
271 }
272 }
273}
274
275struct State {
276 local_peer_id: PeerId,
278
279 collator_pair: CollatorPair,
281
282 collating_on: Option<ParaId>,
285
286 peer_data: HashMap<PeerId, PeerData>,
289
290 implicit_view: Option<ImplicitView>,
294
295 per_relay_parent: HashMap<Hash, PerRelayParent>,
298
299 collation_result_senders: HashMap<CandidateHash, oneshot::Sender<CollationSecondedSignal>>,
301
302 peer_ids: HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,
305
306 validator_groups_buf: ValidatorGroupsBuffer,
308
309 reconnect_timeout: ReconnectTimeout,
312
313 metrics: Metrics,
315
316 waiting_collation_fetches: HashMap<Hash, WaitingCollationFetches>,
321
322 active_collation_fetches: ActiveCollationFetches,
326
327 advertisement_timeouts: FuturesUnordered<ResetInterestTimeout>,
334
335 reputation: ReputationAggregator,
337
338 collation_tracker: CollationTracker,
340}
341
342impl State {
343 fn new(
346 local_peer_id: PeerId,
347 collator_pair: CollatorPair,
348 metrics: Metrics,
349 reputation: ReputationAggregator,
350 ) -> State {
351 State {
352 local_peer_id,
353 collator_pair,
354 metrics,
355 collating_on: Default::default(),
356 peer_data: Default::default(),
357 implicit_view: None,
358 per_relay_parent: Default::default(),
359 collation_result_senders: Default::default(),
360 peer_ids: Default::default(),
361 validator_groups_buf: ValidatorGroupsBuffer::with_capacity(VALIDATORS_BUFFER_CAPACITY),
362 reconnect_timeout: Fuse::terminated(),
363 waiting_collation_fetches: Default::default(),
364 active_collation_fetches: Default::default(),
365 advertisement_timeouts: Default::default(),
366 reputation,
367 collation_tracker: Default::default(),
368 }
369 }
370}
371
372#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
381async fn distribute_collation<Context>(
382 ctx: &mut Context,
383 runtime: &mut RuntimeInfo,
384 state: &mut State,
385 id: ParaId,
386 receipt: CandidateReceipt,
387 pov: PoV,
388 parent_head_data: HeadData,
389 result_sender: Option<oneshot::Sender<CollationSecondedSignal>>,
390 core_index: CoreIndex,
391) -> Result<()> {
392 let candidate_relay_parent = receipt.descriptor.relay_parent();
393 let candidate_hash = receipt.hash();
394 let cores_assigned = has_assigned_cores(&state.implicit_view, &state.per_relay_parent);
395
396 let per_relay_parent = match state.per_relay_parent.get_mut(&candidate_relay_parent) {
397 Some(per_relay_parent) => per_relay_parent,
398 None => {
399 gum::debug!(
400 target: LOG_TARGET,
401 para_id = %id,
402 candidate_relay_parent = %candidate_relay_parent,
403 candidate_hash = ?candidate_hash,
404 "Candidate relay parent is out of our view",
405 );
406 return Ok(())
407 },
408 };
409
410 let Some(collations_limit) = per_relay_parent.assignments.get(&core_index) else {
411 gum::warn!(
412 target: LOG_TARGET,
413 para_id = %id,
414 relay_parent = ?candidate_relay_parent,
415 cores = ?per_relay_parent.assignments.keys(),
416 ?core_index,
417 "Attempting to distribute collation for a core we are not assigned to ",
418 );
419
420 return Ok(())
421 };
422
423 let current_collations_count = per_relay_parent
424 .collations
425 .values()
426 .filter(|c| c.core_index() == &core_index)
427 .count();
428 if current_collations_count >= *collations_limit {
429 gum::debug!(
430 target: LOG_TARGET,
431 ?candidate_relay_parent,
432 "The limit of {} collations per relay parent for core {} is already reached",
433 collations_limit,
434 core_index.0,
435 );
436 return Ok(())
437 }
438
439 if per_relay_parent.collations.contains_key(&candidate_hash) {
441 gum::debug!(
442 target: LOG_TARGET,
443 ?candidate_relay_parent,
444 ?candidate_hash,
445 "Already seen this candidate",
446 );
447 return Ok(())
448 }
449
450 let elastic_scaling = per_relay_parent.assignments.len() > 1;
451 if elastic_scaling {
452 gum::debug!(
453 target: LOG_TARGET,
454 para_id = %id,
455 cores = ?per_relay_parent.assignments.keys(),
456 "{} is assigned to {} cores at {}", id, per_relay_parent.assignments.len(), candidate_relay_parent,
457 );
458 }
459
460 let our_core = core_index;
461
462 let GroupValidators { validators, session_index, group_index } =
464 determine_our_validators(ctx, runtime, our_core, candidate_relay_parent).await?;
465
466 if validators.is_empty() {
467 gum::warn!(
468 target: LOG_TARGET,
469 core = ?our_core,
470 "there are no validators assigned to core",
471 );
472
473 return Ok(())
474 }
475
476 state.validator_groups_buf.note_collation_advertised(
482 candidate_hash,
483 session_index,
484 group_index,
485 &validators,
486 );
487
488 gum::debug!(
489 target: LOG_TARGET,
490 para_id = %id,
491 candidate_relay_parent = %candidate_relay_parent,
492 ?candidate_hash,
493 pov_hash = ?pov.hash(),
494 core = ?our_core,
495 current_validators = ?validators,
496 "Accepted collation, connecting to validators."
497 );
498
499 per_relay_parent.validator_group.entry(core_index).or_insert_with(|| {
501 let mut group = ValidatorGroup::default();
502 group.validators = validators;
503 group
504 });
505
506 connect_to_validators(ctx, cores_assigned, &state.validator_groups_buf).await;
508
509 if let Some(result_sender) = result_sender {
510 state.collation_result_senders.insert(candidate_hash, result_sender);
511 }
512
513 let para_head = receipt.descriptor.para_head();
514 per_relay_parent.collations.insert(
515 candidate_hash,
516 CollationData {
517 collation: Collation {
518 receipt,
519 pov,
520 parent_head_data,
521 status: CollationStatus::Created,
522 },
523 core_index,
524 session_index,
525 stats: per_relay_parent
526 .block_number
527 .map(|n| CollationStats::new(para_head, n, &state.metrics)),
528 },
529 );
530
531 let interested = state
536 .peer_data
537 .iter()
538 .filter(|(_, PeerData { view: v, .. })| {
539 v.iter().any(|block_hash| {
540 state.implicit_view.as_ref().map(|implicit_view| {
541 implicit_view
542 .known_allowed_relay_parents_under(block_hash, Some(id))
543 .unwrap_or_default()
544 .contains(&candidate_relay_parent)
545 }) == Some(true)
546 })
547 })
548 .map(|(id, _)| id);
549
550 for peer_id in interested {
552 advertise_collation(
553 ctx,
554 candidate_relay_parent,
555 per_relay_parent,
556 peer_id,
557 &state.peer_ids,
558 &mut state.advertisement_timeouts,
559 &state.metrics,
560 )
561 .await;
562 }
563
564 Ok(())
565}
566
567#[derive(Debug)]
569struct GroupValidators {
570 validators: Vec<AuthorityDiscoveryId>,
572
573 session_index: SessionIndex,
574 group_index: GroupIndex,
575}
576
577#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
581async fn determine_our_validators<Context>(
582 ctx: &mut Context,
583 runtime: &mut RuntimeInfo,
584 core_index: CoreIndex,
585 relay_parent: Hash,
586) -> Result<GroupValidators> {
587 let session_index = runtime.get_session_index_for_child(ctx.sender(), relay_parent).await?;
588 let info = &runtime
589 .get_session_info_by_index(ctx.sender(), relay_parent, session_index)
590 .await?
591 .session_info;
592 gum::debug!(target: LOG_TARGET, ?session_index, "Received session info");
593 let groups = &info.validator_groups;
594 let num_cores = groups.len();
595 let rotation_info = get_group_rotation_info(ctx.sender(), relay_parent).await?;
596
597 let current_group_index = rotation_info.group_for_core(core_index, num_cores);
598 let current_validators =
599 groups.get(current_group_index).map(|v| v.as_slice()).unwrap_or_default();
600
601 let validators = &info.discovery_keys;
602
603 let current_validators =
604 current_validators.iter().map(|i| validators[i.0 as usize].clone()).collect();
605
606 let current_validators = GroupValidators {
607 validators: current_validators,
608 session_index,
609 group_index: current_group_index,
610 };
611
612 Ok(current_validators)
613}
614
615fn declare_message(
617 state: &mut State,
618) -> Option<CollationProtocols<protocol_v1::CollationProtocol, protocol_v2::CollationProtocol>> {
619 let para_id = state.collating_on?;
620 let declare_signature_payload = protocol_v2::declare_signature_payload(&state.local_peer_id);
621 let wire_message = protocol_v2::CollatorProtocolMessage::Declare(
622 state.collator_pair.public(),
623 para_id,
624 state.collator_pair.sign(&declare_signature_payload),
625 );
626 Some(CollationProtocols::V2(protocol_v2::CollationProtocol::CollatorProtocol(wire_message)))
627}
628
629#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
631async fn declare<Context>(ctx: &mut Context, state: &mut State, peer: &PeerId) {
632 if let Some(wire_message) = declare_message(state) {
633 ctx.send_message(NetworkBridgeTxMessage::SendCollationMessage(vec![*peer], wire_message))
634 .await;
635 }
636}
637
638fn has_assigned_cores(
640 implicit_view: &Option<ImplicitView>,
641 per_relay_parent: &HashMap<Hash, PerRelayParent>,
642) -> bool {
643 let Some(implicit_view) = implicit_view else { return false };
644
645 for leaf in implicit_view.leaves() {
646 if let Some(relay_parent) = per_relay_parent.get(leaf) {
647 if !relay_parent.assignments.is_empty() {
648 return true;
649 }
650 }
651 }
652
653 false
654}
655
656#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
659async fn connect_to_validators<Context>(
660 ctx: &mut Context,
661 cores_assigned: bool,
662 validator_groups_buf: &ValidatorGroupsBuffer,
663) {
664 let validator_ids =
668 if cores_assigned { validator_groups_buf.validators_to_connect() } else { Vec::new() };
669
670 gum::trace!(
671 target: LOG_TARGET,
672 ?cores_assigned,
673 "Sending connection request to validators: {:?}",
674 validator_ids,
675 );
676
677 let (failed, _) = oneshot::channel();
680 ctx.send_message(NetworkBridgeTxMessage::ConnectToValidators {
681 validator_ids,
682 peer_set: PeerSet::Collation,
683 failed,
684 })
685 .await;
686}
687
688#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
696async fn advertise_collation<Context>(
697 ctx: &mut Context,
698 relay_parent: Hash,
699 per_relay_parent: &mut PerRelayParent,
700 peer: &PeerId,
701 peer_ids: &HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,
702 advertisement_timeouts: &mut FuturesUnordered<ResetInterestTimeout>,
703 metrics: &Metrics,
704) {
705 for (candidate_hash, collation_and_core) in per_relay_parent.collations.iter_mut() {
706 let core_index = *collation_and_core.core_index();
707 let collation = collation_and_core.collation_mut();
708
709 let Some(validator_group) = per_relay_parent.validator_group.get_mut(&core_index) else {
710 gum::debug!(
711 target: LOG_TARGET,
712 ?relay_parent,
713 ?core_index,
714 "Skipping advertising to validator, validator group for core not found",
715 );
716 return
717 };
718
719 let should_advertise = validator_group.should_advertise_to(candidate_hash, peer_ids, &peer);
720 match should_advertise {
721 ShouldAdvertiseTo::Yes => {},
722 ShouldAdvertiseTo::NotAuthority | ShouldAdvertiseTo::AlreadyAdvertised => {
723 gum::trace!(
724 target: LOG_TARGET,
725 ?relay_parent,
726 ?candidate_hash,
727 peer_id = %peer,
728 reason = ?should_advertise,
729 "Not advertising collation"
730 );
731 continue
732 },
733 }
734
735 gum::debug!(
736 target: LOG_TARGET,
737 ?relay_parent,
738 ?candidate_hash,
739 peer_id = %peer,
740 "Advertising collation.",
741 );
742
743 collation.status.advance_to_advertised();
744
745 ctx.send_message(NetworkBridgeTxMessage::SendCollationMessage(
746 vec![*peer],
747 CollationProtocols::V2(protocol_v2::CollationProtocol::CollatorProtocol(
748 protocol_v2::CollatorProtocolMessage::AdvertiseCollation {
749 relay_parent,
750 candidate_hash: *candidate_hash,
751 parent_head_data_hash: collation.parent_head_data.hash(),
752 },
753 )),
754 ))
755 .await;
756
757 validator_group.advertised_to_peer(candidate_hash, &peer_ids, peer);
758
759 advertisement_timeouts.push(ResetInterestTimeout::new(
760 *candidate_hash,
761 *peer,
762 RESET_INTEREST_TIMEOUT,
763 ));
764
765 metrics.on_advertisement_made();
766 }
767}
768
769#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
771async fn process_msg<Context>(
772 ctx: &mut Context,
773 runtime: &mut RuntimeInfo,
774 state: &mut State,
775 msg: CollatorProtocolMessage,
776) -> Result<()> {
777 use CollatorProtocolMessage::*;
778
779 match msg {
780 CollateOn(id) => {
781 state.collating_on = Some(id);
782 state.implicit_view = Some(ImplicitView::new(Some(id)));
783 },
784 DistributeCollation {
785 candidate_receipt,
786 parent_head_data_hash: _,
787 pov,
788 parent_head_data,
789 result_sender,
790 core_index,
791 } => {
792 match state.collating_on {
793 Some(id) if candidate_receipt.descriptor.para_id() != id => {
794 gum::warn!(
797 target: LOG_TARGET,
798 para_id = %candidate_receipt.descriptor.para_id(),
799 collating_on = %id,
800 "DistributeCollation for unexpected para_id",
801 );
802 },
803 Some(id) => {
804 let _ = state.metrics.time_collation_distribution("distribute");
805 distribute_collation(
806 ctx,
807 runtime,
808 state,
809 id,
810 candidate_receipt,
811 pov,
812 parent_head_data,
813 result_sender,
814 core_index,
815 )
816 .await?;
817 },
818 None => {
819 gum::warn!(
820 target: LOG_TARGET,
821 para_id = %candidate_receipt.descriptor.para_id(),
822 "DistributeCollation message while not collating on any",
823 );
824 },
825 }
826 },
827 NetworkBridgeUpdate(event) => {
828 let _ = state.metrics.time_process_msg();
831
832 if let Err(e) = handle_network_msg(ctx, runtime, state, event).await {
833 gum::warn!(
834 target: LOG_TARGET,
835 err = ?e,
836 "Failed to handle incoming network message",
837 );
838 }
839 },
840 msg @ (Invalid(..) | Seconded(..)) => {
841 gum::warn!(
842 target: LOG_TARGET,
843 "{:?} message is not expected on the collator side of the protocol",
844 msg,
845 );
846 },
847 }
848
849 Ok(())
850}
851
852async fn send_collation(
854 state: &mut State,
855 request: VersionedCollationRequest,
856 receipt: CandidateReceipt,
857 pov: PoV,
858 parent_head_data: HeadData,
859) {
860 let (tx, rx) = oneshot::channel();
861
862 let relay_parent = request.relay_parent();
863 let peer_id = request.peer_id();
864 let candidate_hash = receipt.hash();
865
866 let result = Ok(request_v2::CollationFetchingResponse::CollationWithParentHeadData {
867 receipt,
868 pov,
869 parent_head_data,
870 });
871
872 let response =
873 OutgoingResponse { result, reputation_changes: Vec::new(), sent_feedback: Some(tx) };
874
875 if let Err(_) = request.send_outgoing_response(response) {
876 gum::warn!(target: LOG_TARGET, "Sending collation response failed");
877 }
878
879 state.active_collation_fetches.push(
880 async move {
881 let r = rx.timeout(MAX_UNSHARED_UPLOAD_TIME).await;
882 let timed_out = r.is_none();
883
884 CollationSendResult { relay_parent, candidate_hash, peer_id, timed_out }
885 }
886 .boxed(),
887 );
888
889 state.metrics.on_collation_sent();
890}
891
892#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
894async fn handle_incoming_peer_message<Context>(
895 ctx: &mut Context,
896 runtime: &mut RuntimeInfo,
897 state: &mut State,
898 origin: PeerId,
899 msg: CollationProtocols<
900 protocol_v1::CollatorProtocolMessage,
901 protocol_v2::CollatorProtocolMessage,
902 >,
903) -> Result<()> {
904 use protocol_v1::CollatorProtocolMessage as V1;
905 use protocol_v2::CollatorProtocolMessage as V2;
906
907 match msg {
908 CollationProtocols::V1(V1::Declare(..)) | CollationProtocols::V2(V2::Declare(..)) => {
909 gum::trace!(
910 target: LOG_TARGET,
911 ?origin,
912 "Declare message is not expected on the collator side of the protocol",
913 );
914
915 ctx.send_message(NetworkBridgeTxMessage::DisconnectPeers(
917 vec![origin],
918 PeerSet::Collation,
919 ))
920 .await;
921 },
922 CollationProtocols::V1(V1::AdvertiseCollation(_)) |
923 CollationProtocols::V2(V2::AdvertiseCollation { .. }) => {
924 gum::trace!(
925 target: LOG_TARGET,
926 ?origin,
927 "AdvertiseCollation message is not expected on the collator side of the protocol",
928 );
929
930 modify_reputation(&mut state.reputation, ctx.sender(), origin, COST_UNEXPECTED_MESSAGE)
931 .await;
932
933 ctx.send_message(NetworkBridgeTxMessage::DisconnectPeers(
935 vec![origin],
936 PeerSet::Collation,
937 ))
938 .await;
939 },
940 CollationProtocols::V1(V1::CollationSeconded(relay_parent, statement)) => {
941 gum::warn!(
943 target: LOG_TARGET,
944 ?statement,
945 ?origin,
946 ?relay_parent,
947 "Collation seconded message received on unsupported protocol version 1",
948 );
949 },
950 CollationProtocols::V2(V2::CollationSeconded(relay_parent, statement)) => {
951 if !matches!(statement.unchecked_payload(), Statement::Seconded(_)) {
952 gum::warn!(
953 target: LOG_TARGET,
954 ?statement,
955 ?origin,
956 "Collation seconded message received with none-seconded statement.",
957 );
958 } else {
959 let statement = runtime
960 .check_signature(ctx.sender(), relay_parent, statement)
961 .await?
962 .map_err(Error::InvalidStatementSignature)?;
963
964 let removed =
965 state.collation_result_senders.remove(&statement.payload().candidate_hash());
966
967 if let Some(sender) = removed {
968 gum::trace!(
969 target: LOG_TARGET,
970 ?statement,
971 ?origin,
972 "received a valid `CollationSeconded`, forwarding result to collator",
973 );
974 let _ = sender.send(CollationSecondedSignal { statement, relay_parent });
975 } else {
976 let relay_parent = match state.per_relay_parent.get(&relay_parent) {
978 Some(per_relay_parent) => per_relay_parent,
979 None => {
980 gum::debug!(
981 target: LOG_TARGET,
982 candidate_relay_parent = %relay_parent,
983 candidate_hash = ?&statement.payload().candidate_hash(),
984 "Seconded statement relay parent is out of our view",
985 );
986 return Ok(())
987 },
988 };
989 match relay_parent.collations.get(&statement.payload().candidate_hash()) {
990 Some(_) => {
991 gum::trace!(
993 target: LOG_TARGET,
994 ?statement,
995 ?origin,
996 "received a valid `CollationSeconded`",
997 );
998 },
999 None => {
1000 gum::debug!(
1001 target: LOG_TARGET,
1002 candidate_hash = ?&statement.payload().candidate_hash(),
1003 ?origin,
1004 "received an unexpected `CollationSeconded`: unknown statement",
1005 );
1006 },
1007 }
1008 }
1009 }
1010 },
1011 }
1012
1013 Ok(())
1014}
1015
1016#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
1018async fn handle_incoming_request<Context>(
1019 ctx: &mut Context,
1020 state: &mut State,
1021 req: std::result::Result<VersionedCollationRequest, incoming::Error>,
1022) -> Result<()> {
1023 let req = req?;
1024 let relay_parent = req.relay_parent();
1025 let peer_id = req.peer_id();
1026 let para_id = req.para_id();
1027
1028 match state.collating_on {
1029 Some(our_para_id) if our_para_id == para_id => {
1030 let per_relay_parent = match state.per_relay_parent.get_mut(&relay_parent) {
1031 Some(per_relay_parent) => per_relay_parent,
1032 None => {
1033 gum::debug!(
1034 target: LOG_TARGET,
1035 relay_parent = %relay_parent,
1036 "received a `RequestCollation` for a relay parent out of our view",
1037 );
1038
1039 return Ok(())
1040 },
1041 };
1042
1043 let collation_with_core = match &req {
1044 VersionedCollationRequest::V2(req) =>
1045 per_relay_parent.collations.get_mut(&req.payload.candidate_hash),
1046 };
1047 let (receipt, pov, parent_head_data) =
1048 if let Some(collation_with_core) = collation_with_core {
1049 let collation = collation_with_core.collation_mut();
1050 collation.status.advance_to_requested();
1051 (
1052 collation.receipt.clone(),
1053 collation.pov.clone(),
1054 collation.parent_head_data.clone(),
1055 )
1056 } else {
1057 gum::warn!(
1058 target: LOG_TARGET,
1059 relay_parent = %relay_parent,
1060 "received a `RequestCollation` for a relay parent we don't have collation stored.",
1061 );
1062
1063 return Ok(())
1064 };
1065
1066 state.metrics.on_collation_sent_requested();
1067
1068 let waiting = state.waiting_collation_fetches.entry(relay_parent).or_default();
1069 let candidate_hash = receipt.hash();
1070
1071 if !waiting.waiting_peers.insert((peer_id, candidate_hash)) {
1072 gum::debug!(
1073 target: LOG_TARGET,
1074 "Dropping incoming request as peer has a request in flight already."
1075 );
1076 modify_reputation(
1077 &mut state.reputation,
1078 ctx.sender(),
1079 peer_id,
1080 COST_APPARENT_FLOOD.into(),
1081 )
1082 .await;
1083 return Ok(())
1084 }
1085
1086 if waiting.collation_fetch_active {
1087 waiting.req_queue.push_back(req);
1088 } else {
1089 waiting.collation_fetch_active = true;
1090 let _ = state.metrics.time_collation_distribution("send");
1092
1093 send_collation(state, req, receipt, pov, parent_head_data).await;
1094 }
1095 },
1096 Some(our_para_id) => {
1097 gum::warn!(
1098 target: LOG_TARGET,
1099 for_para_id = %para_id,
1100 our_para_id = %our_para_id,
1101 "received a `CollationFetchingRequest` for unexpected para_id",
1102 );
1103 },
1104 None => {
1105 gum::warn!(
1106 target: LOG_TARGET,
1107 for_para_id = %para_id,
1108 "received a `RequestCollation` while not collating on any para",
1109 );
1110 },
1111 }
1112 Ok(())
1113}
1114
1115#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
1118async fn handle_peer_view_change<Context>(
1119 ctx: &mut Context,
1120 state: &mut State,
1121 peer_id: PeerId,
1122 view: View,
1123) {
1124 let Some(PeerData { view: current, unknown_heads }) = state.peer_data.get_mut(&peer_id) else {
1125 return
1126 };
1127
1128 let added: Vec<Hash> = view.difference(&*current).cloned().collect();
1129
1130 *current = view;
1131
1132 for added in added.into_iter() {
1133 let block_hashes = match state.per_relay_parent.contains_key(&added) {
1134 true => state
1135 .implicit_view
1136 .as_ref()
1137 .and_then(|implicit_view| {
1138 implicit_view.known_allowed_relay_parents_under(&added, state.collating_on)
1139 })
1140 .unwrap_or_default(),
1141 false => {
1142 gum::trace!(
1143 target: LOG_TARGET,
1144 ?peer_id,
1145 new_leaf = ?added,
1146 "New leaf in peer's view is unknown",
1147 );
1148
1149 unknown_heads.insert(added, ());
1150
1151 continue
1152 },
1153 };
1154
1155 for block_hash in block_hashes {
1156 let Some(per_relay_parent) = state.per_relay_parent.get_mut(block_hash) else {
1157 continue
1158 };
1159
1160 advertise_collation(
1161 ctx,
1162 *block_hash,
1163 per_relay_parent,
1164 &peer_id,
1165 &state.peer_ids,
1166 &mut state.advertisement_timeouts,
1167 &state.metrics,
1168 )
1169 .await;
1170 }
1171 }
1172}
1173
1174#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
1176async fn handle_network_msg<Context>(
1177 ctx: &mut Context,
1178 runtime: &mut RuntimeInfo,
1179 state: &mut State,
1180 bridge_message: NetworkBridgeEvent<net_protocol::CollatorProtocolMessage>,
1181) -> Result<()> {
1182 use NetworkBridgeEvent::*;
1183
1184 match bridge_message {
1185 PeerConnected(peer_id, observed_role, protocol_version, maybe_authority) => {
1186 gum::trace!(target: LOG_TARGET, ?peer_id, ?observed_role, ?maybe_authority, "Peer connected");
1189
1190 let version: CollationVersion = match protocol_version.try_into() {
1191 Ok(version) => version,
1192 Err(err) => {
1193 gum::error!(
1195 target: LOG_TARGET,
1196 ?peer_id,
1197 ?observed_role,
1198 ?err,
1199 "Unsupported protocol version"
1200 );
1201 return Ok(())
1202 },
1203 };
1204 if version == CollationVersion::V1 {
1205 gum::warn!(
1206 target: LOG_TARGET,
1207 ?peer_id,
1208 ?observed_role,
1209 "Unsupported protocol version v1"
1210 );
1211
1212 ctx.send_message(NetworkBridgeTxMessage::DisconnectPeers(
1214 vec![peer_id],
1215 PeerSet::Collation,
1216 ))
1217 .await;
1218 return Ok(())
1219 }
1220
1221 state.peer_data.entry(peer_id).or_insert_with(|| PeerData {
1222 view: View::default(),
1223 unknown_heads: LruMap::new(ByLength::new(10)),
1226 });
1227
1228 if let Some(authority_ids) = maybe_authority {
1229 gum::trace!(
1230 target: LOG_TARGET,
1231 ?authority_ids,
1232 ?peer_id,
1233 "Connected to requested validator"
1234 );
1235 state.peer_ids.insert(peer_id, authority_ids);
1236
1237 declare(ctx, state, &peer_id).await;
1238 }
1239 },
1240 PeerViewChange(peer_id, view) => {
1241 gum::trace!(target: LOG_TARGET, ?peer_id, ?view, "Peer view change");
1242 handle_peer_view_change(ctx, state, peer_id, view).await;
1243 },
1244 PeerDisconnected(peer_id) => {
1245 gum::trace!(target: LOG_TARGET, ?peer_id, "Peer disconnected");
1246 state.peer_data.remove(&peer_id);
1247 state.peer_ids.remove(&peer_id);
1248 },
1249 OurViewChange(view) => {
1250 gum::trace!(target: LOG_TARGET, ?view, "Own view change");
1251 handle_our_view_change(ctx, state, view, runtime).await?;
1252 },
1253 PeerMessage(remote, msg) => {
1254 handle_incoming_peer_message(ctx, runtime, state, remote, msg).await?;
1255 },
1256 UpdatedAuthorityIds(peer_id, authority_ids) => {
1257 gum::trace!(target: LOG_TARGET, ?peer_id, ?authority_ids, "Updated authority ids");
1258 if state.peer_data.contains_key(&peer_id) {
1259 if state.peer_ids.insert(peer_id, authority_ids).is_none() {
1260 declare(ctx, state, &peer_id).await;
1261 }
1262 }
1263 },
1264 NewGossipTopology { .. } => {
1265 },
1267 }
1268
1269 Ok(())
1270}
1271
1272#[overseer::contextbounds(CollatorProtocol, prefix = crate::overseer)]
1274async fn process_block_events<Context>(
1275 ctx: &mut Context,
1276 collation_tracker: &mut CollationTracker,
1277 leaf: Hash,
1278 maybe_block_number: Option<BlockNumber>,
1279 para_id: ParaId,
1280 metrics: &Metrics,
1281) {
1282 if let Ok(events) = get_candidate_events(ctx.sender(), leaf).await {
1283 let Some(block_number) = maybe_block_number else {
1284 gum::debug!(
1287 target: crate::LOG_TARGET_STATS,
1288 relay_block = ?leaf,
1289 ?para_id,
1290 "Failed to get relay chain block number",
1291 );
1292 return
1293 };
1294
1295 for ev in events {
1296 match ev {
1297 CandidateEvent::CandidateIncluded(receipt, _, _, _) => {
1298 if receipt.descriptor.para_id() != para_id {
1299 continue
1300 }
1301 collation_tracker.collation_included(block_number, leaf, receipt, metrics);
1302 },
1303 CandidateEvent::CandidateBacked(receipt, _, _, _) => {
1304 if receipt.descriptor.para_id() != para_id {
1305 continue
1306 }
1307
1308 let Some(block_number) = maybe_block_number else { continue };
1309 let Some(stats) =
1310 collation_tracker.collation_backed(block_number, leaf, receipt, metrics)
1311 else {
1312 continue
1313 };
1314
1315 collation_tracker.track(stats);
1317 },
1318 _ => {
1319 },
1321 }
1322 }
1323 }
1324}
1325
1326#[overseer::contextbounds(CollatorProtocol, prefix = crate::overseer)]
1328async fn handle_our_view_change<Context>(
1329 ctx: &mut Context,
1330 state: &mut State,
1331 view: OurView,
1332 runtime_info: &mut RuntimeInfo,
1333) -> Result<()> {
1334 let Some(implicit_view) = &mut state.implicit_view else { return Ok(()) };
1335 let Some(para_id) = state.collating_on else { return Ok(()) };
1336
1337 let removed: Vec<_> = implicit_view.leaves().filter(|h| !view.contains(h)).copied().collect();
1338 let added: Vec<_> = view.iter().filter(|h| !implicit_view.contains_leaf(h)).collect();
1339
1340 for leaf in added {
1341 let session_index = runtime_info.get_session_index_for_child(ctx.sender(), *leaf).await?;
1342
1343 let claim_queue = fetch_claim_queue(ctx.sender(), *leaf).await?;
1344
1345 implicit_view
1346 .activate_leaf(ctx.sender(), *leaf)
1347 .await
1348 .map_err(Error::ImplicitViewFetchError)?;
1349
1350 let block_number = implicit_view.block_number(leaf);
1351 state
1352 .per_relay_parent
1353 .insert(*leaf, PerRelayParent::new(para_id, claim_queue, block_number, session_index));
1354
1355 process_block_events(
1356 ctx,
1357 &mut state.collation_tracker,
1358 *leaf,
1359 block_number,
1360 para_id,
1361 &state.metrics,
1362 )
1363 .await;
1364 let allowed_ancestry = implicit_view
1365 .known_allowed_relay_parents_under(leaf, state.collating_on)
1366 .unwrap_or_default();
1367
1368 let peers = state
1371 .peer_data
1372 .iter_mut()
1373 .filter_map(|(id, data)| data.unknown_heads.remove(leaf).map(|_| id))
1374 .collect::<Vec<_>>();
1375
1376 for block_hash in allowed_ancestry {
1377 let block_number = implicit_view.block_number(block_hash);
1378
1379 let per_relay_parent = match state.per_relay_parent.entry(*block_hash) {
1380 Entry::Vacant(entry) => {
1381 let claim_queue = match fetch_claim_queue(ctx.sender(), *block_hash).await {
1382 Ok(cq) => cq,
1383 Err(error) => {
1384 gum::debug!(
1385 target: LOG_TARGET,
1386 ?block_hash,
1387 ?error,
1388 "Failed to fetch claim queue while iterating allowed ancestry",
1389 );
1390 continue
1391 },
1392 };
1393 let session_index =
1394 match runtime_info.get_session_index_for_child(ctx.sender(), *leaf).await {
1395 Ok(si) => si,
1396 Err(error) => {
1397 gum::debug!(
1398 target: LOG_TARGET,
1399 ?block_hash,
1400 ?error,
1401 "Failed to fetch session index while iterating allowed ancestry",
1402 );
1403 continue
1404 },
1405 };
1406
1407 entry.insert(PerRelayParent::new(
1408 para_id,
1409 claim_queue,
1410 block_number,
1411 session_index,
1412 ))
1413 },
1414 Entry::Occupied(entry) => entry.into_mut(),
1415 };
1416
1417 for peer_id in &peers {
1419 advertise_collation(
1420 ctx,
1421 *block_hash,
1422 per_relay_parent,
1423 &peer_id,
1424 &state.peer_ids,
1425 &mut state.advertisement_timeouts,
1426 &state.metrics,
1427 )
1428 .await;
1429 }
1430 }
1431 }
1432
1433 let highest_session_index = state.per_relay_parent.values().map(|pr| pr.session_index).max();
1434
1435 for leaf in removed {
1436 let maybe_block_number = implicit_view.block_number(&leaf);
1440 let pruned = implicit_view.deactivate_leaf(leaf);
1441
1442 for removed in &pruned {
1443 gum::debug!(
1444 target: LOG_TARGET,
1445 relay_parent = ?removed,
1446 "Removing relay parent because our view changed.",
1447 );
1448
1449 if let Some(block_number) = maybe_block_number {
1450 let expired_collations = state.collation_tracker.drain_expired(block_number);
1451 process_expired_collations(expired_collations, *removed, para_id, &state.metrics);
1452 }
1453
1454 let collations = state
1456 .per_relay_parent
1457 .remove(removed)
1458 .map(|per_relay_parent| per_relay_parent.collations)
1459 .unwrap_or_default();
1460
1461 for collation_with_core in collations.into_values() {
1462 let collation = collation_with_core.collation();
1463 let candidate_hash = collation.receipt.hash();
1464
1465 state.collation_result_senders.remove(&candidate_hash);
1466 state.validator_groups_buf.remove_candidate(&candidate_hash);
1467
1468 process_out_of_view_collation(
1469 &mut state.collation_tracker,
1470 collation_with_core,
1471 highest_session_index,
1472 );
1473 }
1474
1475 state.waiting_collation_fetches.remove(removed);
1476 }
1477 }
1478 Ok(())
1479}
1480
1481fn process_out_of_view_collation(
1482 collation_tracker: &mut CollationTracker,
1483 mut collation_with_core: CollationData,
1484 highest_session_index: Option<SessionIndex>,
1485) {
1486 let is_same_session =
1487 highest_session_index.map_or(true, |hs| hs == collation_with_core.session_index);
1488 let collation = collation_with_core.collation_mut();
1489 let candidate_hash = collation.receipt.hash();
1490
1491 match collation.status {
1492 CollationStatus::Created =>
1493 if is_same_session {
1494 gum::warn!(
1495 target: LOG_TARGET,
1496 ?candidate_hash,
1497 pov_hash = ?collation.pov.hash(),
1498 "Collation wasn't advertised to any validator.",
1499 )
1500 } else {
1501 gum::debug!(
1502 target: LOG_TARGET,
1503 ?candidate_hash,
1504 pov_hash = ?collation.pov.hash(),
1505 "Collation wasn't advertised because it was built on a relay chain block that is now part of an old session.",
1506 )
1507 },
1508 CollationStatus::Advertised => gum::debug!(
1509 target: LOG_TARGET,
1510 ?candidate_hash,
1511 pov_hash = ?collation.pov.hash(),
1512 "Collation was advertised but not requested by any validator.",
1513 ),
1514 CollationStatus::Requested => {
1515 gum::debug!(
1516 target: LOG_TARGET,
1517 ?candidate_hash,
1518 pov_hash = ?collation.pov.hash(),
1519 "Collation was requested.",
1520 );
1521 },
1522 }
1523
1524 let collation_status = collation.status.clone();
1525 let Some(mut stats) = collation_with_core.take_stats() else { return };
1526
1527 stats.set_pre_backing_status(collation_status);
1532 collation_tracker.track(stats);
1533}
1534
1535fn process_expired_collations(
1536 expired_collations: Vec<CollationStats>,
1537 removed: Hash,
1538 para_id: ParaId,
1539 metrics: &Metrics,
1540) {
1541 for expired_collation in expired_collations {
1542 let collation_state = if expired_collation.fetch_latency().is_none() {
1543 expired_collation.pre_backing_status().label()
1546 } else if expired_collation.backed().is_none() {
1547 "fetched"
1548 } else if expired_collation.included().is_none() {
1549 "backed"
1550 } else {
1551 "none"
1552 };
1553
1554 let age = expired_collation.expired().unwrap_or_default();
1555 gum::debug!(
1556 target: crate::LOG_TARGET_STATS,
1557 ?age,
1558 ?collation_state,
1559 relay_parent = ?removed,
1560 ?para_id,
1561 head = ?expired_collation.head(),
1562 "Collation expired",
1563 );
1564
1565 metrics.on_collation_expired(age as f64, collation_state);
1566 }
1567}
1568
1569#[overseer::contextbounds(CollatorProtocol, prefix = crate::overseer)]
1571pub(crate) async fn run<Context>(
1572 ctx: Context,
1573 local_peer_id: PeerId,
1574 collator_pair: CollatorPair,
1575 req_v2_receiver: IncomingRequestReceiver<request_v2::CollationFetchingRequest>,
1576 metrics: Metrics,
1577) -> std::result::Result<(), FatalError> {
1578 run_inner(
1579 ctx,
1580 local_peer_id,
1581 collator_pair,
1582 req_v2_receiver,
1583 metrics,
1584 ReputationAggregator::default(),
1585 REPUTATION_CHANGE_INTERVAL,
1586 )
1587 .await
1588}
1589
1590#[overseer::contextbounds(CollatorProtocol, prefix = crate::overseer)]
1591async fn run_inner<Context>(
1592 mut ctx: Context,
1593 local_peer_id: PeerId,
1594 collator_pair: CollatorPair,
1595 mut req_v2_receiver: IncomingRequestReceiver<request_v2::CollationFetchingRequest>,
1596 metrics: Metrics,
1597 reputation: ReputationAggregator,
1598 reputation_interval: Duration,
1599) -> std::result::Result<(), FatalError> {
1600 use OverseerSignal::*;
1601
1602 let new_reputation_delay = || futures_timer::Delay::new(reputation_interval).fuse();
1603 let mut reputation_delay = new_reputation_delay();
1604
1605 let mut state = State::new(local_peer_id, collator_pair, metrics.clone(), reputation);
1606 let mut runtime = RuntimeInfo::new(None);
1607
1608 loop {
1609 let reputation_changes = || vec![COST_INVALID_REQUEST];
1610 let recv_req_v2 = req_v2_receiver.recv(reputation_changes).fuse();
1611 pin_mut!(recv_req_v2);
1612
1613 let mut reconnect_timeout = &mut state.reconnect_timeout;
1614 select! {
1615 _ = reputation_delay => {
1616 state.reputation.send(ctx.sender()).await;
1617 reputation_delay = new_reputation_delay();
1618 },
1619 msg = ctx.recv().fuse() => match msg.map_err(FatalError::SubsystemReceive)? {
1620 FromOrchestra::Communication { msg } => {
1621 log_error(
1622 process_msg(&mut ctx, &mut runtime, &mut state, msg).await,
1623 "Failed to process message"
1624 )?;
1625 },
1626 FromOrchestra::Signal(ActiveLeaves(update)) => {
1627 if update.activated.is_some() {
1628 *reconnect_timeout = futures_timer::Delay::new(RECONNECT_AFTER_LEAF_TIMEOUT).fuse();
1629 }
1630 }
1631 FromOrchestra::Signal(BlockFinalized(..)) => {}
1632 FromOrchestra::Signal(Conclude) => return Ok(()),
1633 },
1634 CollationSendResult { relay_parent, candidate_hash, peer_id, timed_out } =
1635 state.active_collation_fetches.select_next_some() => {
1636
1637 let next = if let Some(waiting) = state.waiting_collation_fetches.get_mut(&relay_parent) {
1638 if timed_out {
1639 gum::debug!(
1640 target: LOG_TARGET_STATS,
1641 ?relay_parent,
1642 ?peer_id,
1643 ?candidate_hash,
1644 "Sending collation to validator timed out, carrying on with next validator."
1645 );
1646 } else {
1651 for authority_id in state.peer_ids.get(&peer_id).into_iter().flatten() {
1652 state.validator_groups_buf.reset_validator_interest(candidate_hash, authority_id);
1654 }
1655 waiting.waiting_peers.remove(&(peer_id, candidate_hash));
1656
1657 if let Some(per_relay_parent) = state.per_relay_parent.get_mut(&relay_parent) {
1659 if let Some(collation_with_core) = per_relay_parent.collations.get_mut(&candidate_hash) {
1660 let maybe_stats = collation_with_core.take_stats();
1661 let our_para_id = collation_with_core.collation().receipt.descriptor.para_id();
1662
1663 if let Some(mut stats) = maybe_stats {
1664 stats.set_fetched_at(std::time::Instant::now());
1666 gum::debug!(
1667 target: LOG_TARGET_STATS,
1668 para_head = ?stats.head(),
1669 %our_para_id,
1670 "Collation fetch latency is {}ms",
1671 stats.fetch_latency().unwrap_or_default().as_millis(),
1672 );
1673
1674 stats.set_pre_backing_status(collation_with_core.collation().status.clone());
1676 debug_assert_eq!(collation_with_core.collation().status, CollationStatus::Requested);
1677
1678 stats.take_fetch_latency_metric();
1680 stats.set_backed_latency_metric(metrics.time_collation_backing_latency());
1681
1682 state.collation_tracker.track(stats);
1684 }
1685 }
1686 }
1687 }
1688
1689 if let Some(next) = waiting.req_queue.pop_front() {
1690 next
1691 } else {
1692 waiting.collation_fetch_active = false;
1693 continue
1694 }
1695 } else {
1696 continue
1698 };
1699
1700 let next_collation_with_core = {
1701 let per_relay_parent = match state.per_relay_parent.get(&relay_parent) {
1702 Some(per_relay_parent) => per_relay_parent,
1703 None => continue,
1704 };
1705
1706 per_relay_parent.collations.get(&next.candidate_hash())
1707 };
1708
1709 if let Some(collation_with_core) = next_collation_with_core {
1710 let collation = collation_with_core.collation();
1711 let receipt = collation.receipt.clone();
1712 let pov = collation.pov.clone();
1713 let parent_head_data = collation.parent_head_data.clone();
1714
1715 send_collation(&mut state, next, receipt, pov, parent_head_data).await;
1716 }
1717 },
1718 (candidate_hash, peer_id) = state.advertisement_timeouts.select_next_some() => {
1719 for authority_id in state.peer_ids.get(&peer_id).into_iter().flatten() {
1724 state
1725 .validator_groups_buf
1726 .reset_validator_interest(candidate_hash, &authority_id);
1727 }
1728 }
1729 _ = reconnect_timeout => {
1730 let cores_assigned = has_assigned_cores(&state.implicit_view, &state.per_relay_parent);
1731 connect_to_validators(&mut ctx, cores_assigned, &state.validator_groups_buf).await;
1732
1733 gum::trace!(
1734 target: LOG_TARGET,
1735 timeout = ?RECONNECT_AFTER_LEAF_TIMEOUT,
1736 "Peer-set updated due to a timeout"
1737 );
1738 },
1739 in_req = recv_req_v2 => {
1740 let request = in_req.map(VersionedCollationRequest::from);
1741
1742 log_error(
1743 handle_incoming_request(&mut ctx, &mut state, request).await,
1744 "Handling incoming collation fetch request V2"
1745 )?;
1746 }
1747 }
1748 }
1749}