1use futures::{
18 channel::oneshot, future::BoxFuture, select, stream::FuturesUnordered, FutureExt, StreamExt,
19};
20use futures_timer::Delay;
21use std::{
22 collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
23 future::Future,
24 time::{Duration, Instant},
25};
26use tokio_util::sync::CancellationToken;
27
28use sp_keystore::KeystorePtr;
29
30use polkadot_node_network_protocol::{
31 self as net_protocol,
32 peer_set::{CollationVersion, PeerSet},
33 request_response::{
34 outgoing::{Recipient, RequestError},
35 v1 as request_v1, v2 as request_v2, OutgoingRequest, Requests,
36 },
37 v1 as protocol_v1, v2 as protocol_v2, CollationProtocols, OurView, PeerId,
38 UnifiedReputationChange as Rep, View,
39};
40use polkadot_node_primitives::{SignedFullStatement, Statement};
41use polkadot_node_subsystem::{
42 messages::{
43 CanSecondRequest, CandidateBackingMessage, CollatorProtocolMessage, IfDisconnected,
44 NetworkBridgeEvent, NetworkBridgeTxMessage, ParentHeadData, ProspectiveParachainsMessage,
45 ProspectiveValidationDataRequest,
46 },
47 overseer, CollatorProtocolSenderTrait, FromOrchestra, OverseerSignal,
48};
49use polkadot_node_subsystem_util::{
50 backing_implicit_view::View as ImplicitView,
51 reputation::{ReputationAggregator, REPUTATION_CHANGE_INTERVAL},
52 request_claim_queue, request_node_features, request_session_index_for_child,
53};
54use polkadot_primitives::{
55 node_features, CandidateDescriptorV2, CandidateDescriptorVersion, CandidateHash, CollatorId,
56 CoreIndex, Hash, HeadData, Id as ParaId, OccupiedCoreAssumption, PersistedValidationData,
57 SessionIndex,
58};
59
60use super::{modify_reputation, tick_stream, LOG_TARGET};
61
62mod claim_queue_state;
63mod collation;
64mod error;
65mod metrics;
66
67use claim_queue_state::ClaimQueueState;
68use collation::{
69 fetched_collation_sanity_check, BlockedCollationId, CollationEvent, CollationFetchError,
70 CollationFetchRequest, CollationStatus, Collations, FetchedCollation, PendingCollation,
71 PendingCollationFetch, ProspectiveCandidate,
72};
73use error::{Error, FetchError, Result, SecondingError};
74
75#[cfg(test)]
76mod tests;
77
78pub use metrics::Metrics;
79
80const COST_UNEXPECTED_MESSAGE: Rep = Rep::CostMinor("An unexpected message");
81const COST_CORRUPTED_MESSAGE: Rep = Rep::CostMinor("Message was corrupt");
83const COST_NETWORK_ERROR: Rep = Rep::CostMinor("Some network error");
85const COST_INVALID_SIGNATURE: Rep = Rep::Malicious("Invalid network message signature");
86const COST_REPORT_BAD: Rep = Rep::Malicious("A collator was reported by another subsystem");
87const COST_WRONG_PARA: Rep = Rep::Malicious("A collator provided a collation for the wrong para");
88const COST_PROTOCOL_MISUSE: Rep =
89 Rep::Malicious("A collator advertising a collation for an async backing relay parent using V1");
90const COST_UNNEEDED_COLLATOR: Rep = Rep::CostMinor("An unneeded collator connected");
91const BENEFIT_NOTIFY_GOOD: Rep =
92 Rep::BenefitMinor("A collator was noted good by another subsystem");
93
94#[cfg(not(test))]
106const MAX_UNSHARED_DOWNLOAD_TIME: Duration = Duration::from_millis(400);
107
108#[cfg(not(test))]
110const ACTIVITY_POLL: Duration = Duration::from_secs(1);
111
112#[cfg(test)]
113const MAX_UNSHARED_DOWNLOAD_TIME: Duration = Duration::from_millis(100);
114
115#[cfg(test)]
116const ACTIVITY_POLL: Duration = Duration::from_millis(10);
117
118#[derive(Debug)]
119struct CollatingPeerState {
120 collator_id: CollatorId,
121 para_id: ParaId,
122 advertisements: HashMap<Hash, HashSet<CandidateHash>>,
128 last_active: Instant,
129}
130
131#[derive(Debug)]
132enum PeerState {
133 Connected(Instant),
135 Collating(CollatingPeerState),
137}
138
139#[derive(Debug)]
140enum InsertAdvertisementError {
141 Duplicate,
143 OutOfOurView,
145 UndeclaredCollator,
147 PeerLimitReached,
149}
150
151#[derive(Debug)]
152struct PeerData {
153 view: View,
154 state: PeerState,
155 version: CollationVersion,
156}
157
158impl PeerData {
159 fn update_view(
162 &mut self,
163 implicit_view: &ImplicitView,
164 active_leaves: &HashSet<Hash>,
165 new_view: View,
166 ) {
167 let old_view = std::mem::replace(&mut self.view, new_view);
168 if let PeerState::Collating(ref mut peer_state) = self.state {
169 for removed in old_view.difference(&self.view) {
170 let keep = is_relay_parent_in_implicit_view(
172 removed,
173 implicit_view,
174 active_leaves,
175 peer_state.para_id,
176 );
177
178 if !keep {
179 peer_state.advertisements.remove(&removed);
180 }
181 }
182 }
183 }
184
185 fn prune_old_advertisements(
187 &mut self,
188 implicit_view: &ImplicitView,
189 active_leaves: &HashSet<Hash>,
190 ) {
191 if let PeerState::Collating(ref mut peer_state) = self.state {
192 peer_state.advertisements.retain(|hash, _| {
193 is_relay_parent_in_implicit_view(
198 hash,
199 implicit_view,
200 active_leaves,
201 peer_state.para_id,
202 )
203 });
204 }
205 }
206
207 fn insert_advertisement(
209 &mut self,
210 on_relay_parent: Hash,
211 candidate_hash: Option<CandidateHash>,
212 implicit_view: &ImplicitView,
213 active_leaves: &HashSet<Hash>,
214 per_relay_parent: &PerRelayParent,
215 ) -> std::result::Result<(CollatorId, ParaId), InsertAdvertisementError> {
216 match self.state {
217 PeerState::Connected(_) => Err(InsertAdvertisementError::UndeclaredCollator),
218 PeerState::Collating(ref mut state) => {
219 if !is_relay_parent_in_implicit_view(
220 &on_relay_parent,
221 implicit_view,
222 active_leaves,
223 state.para_id,
224 ) {
225 return Err(InsertAdvertisementError::OutOfOurView)
226 }
227
228 if let Some(candidate_hash) = candidate_hash {
229 if state
230 .advertisements
231 .get(&on_relay_parent)
232 .map_or(false, |candidates| candidates.contains(&candidate_hash))
233 {
234 return Err(InsertAdvertisementError::Duplicate)
235 }
236
237 let candidates = state.advertisements.entry(on_relay_parent).or_default();
238
239 if candidates.len() > per_relay_parent.assignment.current.len() {
242 return Err(InsertAdvertisementError::PeerLimitReached)
243 }
244
245 candidates.insert(candidate_hash);
246 } else {
247 if self.version != CollationVersion::V1 {
248 gum::error!(
249 target: LOG_TARGET,
250 "Programming error, `candidate_hash` can not be `None` \
251 for non `V1` networking.",
252 );
253 }
254
255 if state.advertisements.contains_key(&on_relay_parent) {
256 return Err(InsertAdvertisementError::Duplicate)
257 }
258
259 state
260 .advertisements
261 .insert(on_relay_parent, HashSet::from_iter(candidate_hash));
262 };
263
264 state.last_active = Instant::now();
265 Ok((state.collator_id.clone(), state.para_id))
266 },
267 }
268 }
269
270 fn is_collating(&self) -> bool {
272 match self.state {
273 PeerState::Connected(_) => false,
274 PeerState::Collating(_) => true,
275 }
276 }
277
278 fn set_collating(&mut self, collator_id: CollatorId, para_id: ParaId) {
283 self.state = PeerState::Collating(CollatingPeerState {
284 collator_id,
285 para_id,
286 advertisements: HashMap::new(),
287 last_active: Instant::now(),
288 });
289 }
290
291 fn collator_id(&self) -> Option<&CollatorId> {
292 match self.state {
293 PeerState::Connected(_) => None,
294 PeerState::Collating(ref state) => Some(&state.collator_id),
295 }
296 }
297
298 fn collating_para(&self) -> Option<ParaId> {
299 match self.state {
300 PeerState::Connected(_) => None,
301 PeerState::Collating(ref state) => Some(state.para_id),
302 }
303 }
304
305 fn has_advertised(
307 &self,
308 relay_parent: &Hash,
309 maybe_candidate_hash: Option<CandidateHash>,
310 ) -> bool {
311 let collating_state = match self.state {
312 PeerState::Connected(_) => return false,
313 PeerState::Collating(ref state) => state,
314 };
315
316 if let Some(ref candidate_hash) = maybe_candidate_hash {
317 collating_state
318 .advertisements
319 .get(relay_parent)
320 .map_or(false, |candidates| candidates.contains(candidate_hash))
321 } else {
322 collating_state.advertisements.contains_key(relay_parent)
323 }
324 }
325
326 fn is_inactive(&self, policy: &crate::CollatorEvictionPolicy) -> bool {
328 match self.state {
329 PeerState::Connected(connected_at) => connected_at.elapsed() >= policy.undeclared,
330 PeerState::Collating(ref state) =>
331 state.last_active.elapsed() >= policy.inactive_collator,
332 }
333 }
334}
335
336#[derive(Debug)]
337struct GroupAssignments {
338 current: Vec<ParaId>,
340}
341
342struct PerRelayParent {
343 assignment: GroupAssignments,
344 collations: Collations,
345 v2_receipts: bool,
346 current_core: CoreIndex,
347 session_index: SessionIndex,
348}
349
350#[derive(Default)]
352struct State {
353 implicit_view: ImplicitView,
362
363 active_leaves: HashSet<Hash>,
367
368 per_relay_parent: HashMap<Hash, PerRelayParent>,
370
371 peer_data: HashMap<PeerId, PeerData>,
373
374 current_assignments: HashMap<ParaId, usize>,
377
378 collation_requests: FuturesUnordered<CollationFetchRequest>,
380
381 collation_requests_cancel_handles: HashMap<PendingCollation, CancellationToken>,
383
384 metrics: Metrics,
386
387 collation_fetch_timeouts:
393 FuturesUnordered<BoxFuture<'static, (CollatorId, Option<CandidateHash>, Hash)>>,
394
395 fetched_candidates: HashMap<FetchedCollation, CollationEvent>,
398
399 blocked_from_seconding: HashMap<BlockedCollationId, Vec<PendingCollationFetch>>,
404
405 reputation: ReputationAggregator,
407}
408
409impl State {
410 fn seconded_and_pending_for_para(&self, relay_parent: &Hash, para_id: &ParaId) -> usize {
416 let seconded = self
417 .per_relay_parent
418 .get(relay_parent)
419 .map_or(0, |per_relay_parent| per_relay_parent.collations.seconded_for_para(para_id));
420
421 let pending_fetch = self.per_relay_parent.get(relay_parent).map_or(0, |rp_state| {
422 match rp_state.collations.status {
423 CollationStatus::Fetching(pending_para_id) if pending_para_id == *para_id => 1,
424 _ => 0,
425 }
426 });
427
428 let waiting_for_validation = self
429 .fetched_candidates
430 .keys()
431 .filter(|fc| fc.relay_parent == *relay_parent && fc.para_id == *para_id)
432 .count();
433
434 let blocked_from_seconding =
435 self.blocked_from_seconding.values().fold(0, |acc, blocked_collations| {
436 acc + blocked_collations
437 .iter()
438 .filter(|pc| {
439 pc.candidate_receipt.descriptor.para_id() == *para_id &&
440 pc.candidate_receipt.descriptor.relay_parent() == *relay_parent
441 })
442 .count()
443 });
444
445 gum::trace!(
446 target: LOG_TARGET,
447 ?relay_parent,
448 ?para_id,
449 seconded,
450 pending_fetch,
451 waiting_for_validation,
452 blocked_from_seconding,
453 "Seconded and pending collations for para",
454 );
455
456 seconded + pending_fetch + waiting_for_validation + blocked_from_seconding
457 }
458}
459
460fn is_relay_parent_in_implicit_view(
461 relay_parent: &Hash,
462 implicit_view: &ImplicitView,
463 active_leaves: &HashSet<Hash>,
464 para_id: ParaId,
465) -> bool {
466 active_leaves.iter().any(|hash| {
467 implicit_view
468 .known_allowed_relay_parents_under(hash, Some(para_id))
469 .unwrap_or_default()
470 .contains(relay_parent)
471 })
472}
473
474async fn construct_per_relay_parent<Sender>(
475 sender: &mut Sender,
476 current_assignments: &mut HashMap<ParaId, usize>,
477 keystore: &KeystorePtr,
478 relay_parent: Hash,
479 v2_receipts: bool,
480 session_index: SessionIndex,
481) -> Result<Option<PerRelayParent>>
482where
483 Sender: CollatorProtocolSenderTrait,
484{
485 let validators = polkadot_node_subsystem_util::request_validators(relay_parent, sender)
486 .await
487 .await
488 .map_err(Error::CancelledActiveValidators)??;
489
490 let (groups, rotation_info) =
491 polkadot_node_subsystem_util::request_validator_groups(relay_parent, sender)
492 .await
493 .await
494 .map_err(Error::CancelledValidatorGroups)??;
495
496 let core_now = if let Some(group) =
497 polkadot_node_subsystem_util::signing_key_and_index(&validators, keystore).and_then(
498 |(_, index)| polkadot_node_subsystem_util::find_validator_group(&groups, index),
499 ) {
500 rotation_info.core_for_group(group, groups.len())
501 } else {
502 gum::trace!(target: LOG_TARGET, ?relay_parent, "Not a validator");
503 return Ok(None)
504 };
505
506 let mut claim_queue = request_claim_queue(relay_parent, sender)
507 .await
508 .await
509 .map_err(Error::CancelledClaimQueue)??;
510
511 let assigned_paras = claim_queue.remove(&core_now).unwrap_or_else(|| VecDeque::new());
512
513 for para_id in assigned_paras.iter() {
514 let entry = current_assignments.entry(*para_id).or_default();
515 *entry += 1;
516 if *entry == 1 {
517 gum::debug!(
518 target: LOG_TARGET,
519 ?relay_parent,
520 para_id = ?para_id,
521 "Assigned to a parachain",
522 );
523 }
524 }
525
526 let assignment = GroupAssignments { current: assigned_paras.into_iter().collect() };
527 let collations = Collations::new(&assignment.current);
528
529 Ok(Some(PerRelayParent {
530 assignment,
531 collations,
532 v2_receipts,
533 session_index,
534 current_core: core_now,
535 }))
536}
537
538fn remove_outgoing(
539 current_assignments: &mut HashMap<ParaId, usize>,
540 per_relay_parent: PerRelayParent,
541) {
542 let GroupAssignments { current, .. } = per_relay_parent.assignment;
543
544 for cur in current {
545 if let Entry::Occupied(mut occupied) = current_assignments.entry(cur) {
546 *occupied.get_mut() -= 1;
547 if *occupied.get() == 0 {
548 occupied.remove_entry();
549 gum::debug!(
550 target: LOG_TARGET,
551 para_id = ?cur,
552 "Unassigned from a parachain",
553 );
554 }
555 }
556 }
557}
558
559fn collator_peer_id(
562 peer_data: &HashMap<PeerId, PeerData>,
563 collator_id: &CollatorId,
564) -> Option<PeerId> {
565 peer_data
566 .iter()
567 .find_map(|(peer, data)| data.collator_id().filter(|c| c == &collator_id).map(|_| *peer))
568}
569
570async fn disconnect_peer(sender: &mut impl overseer::CollatorProtocolSenderTrait, peer_id: PeerId) {
571 sender
572 .send_message(NetworkBridgeTxMessage::DisconnectPeers(vec![peer_id], PeerSet::Collation))
573 .await
574}
575
576async fn fetch_collation(
578 sender: &mut impl overseer::CollatorProtocolSenderTrait,
579 state: &mut State,
580 pc: PendingCollation,
581 id: CollatorId,
582) -> std::result::Result<(), FetchError> {
583 let PendingCollation { relay_parent, peer_id, prospective_candidate, .. } = pc;
584 let candidate_hash = prospective_candidate.as_ref().map(ProspectiveCandidate::candidate_hash);
585
586 let peer_data = state.peer_data.get(&peer_id).ok_or(FetchError::UnknownPeer)?;
587
588 if peer_data.has_advertised(&relay_parent, candidate_hash) {
589 request_collation(sender, state, pc, id.clone(), peer_data.version).await?;
590 let timeout = |collator_id, candidate_hash, relay_parent| async move {
591 Delay::new(MAX_UNSHARED_DOWNLOAD_TIME).await;
592 (collator_id, candidate_hash, relay_parent)
593 };
594 state
595 .collation_fetch_timeouts
596 .push(timeout(id.clone(), candidate_hash, relay_parent).boxed());
597
598 Ok(())
599 } else {
600 Err(FetchError::NotAdvertised)
601 }
602}
603
604async fn report_collator(
606 reputation: &mut ReputationAggregator,
607 sender: &mut impl overseer::CollatorProtocolSenderTrait,
608 peer_data: &HashMap<PeerId, PeerData>,
609 id: CollatorId,
610) {
611 if let Some(peer_id) = collator_peer_id(peer_data, &id) {
612 modify_reputation(reputation, sender, peer_id, COST_REPORT_BAD).await;
613 }
614}
615
616async fn note_good_collation(
618 reputation: &mut ReputationAggregator,
619 sender: &mut impl overseer::CollatorProtocolSenderTrait,
620 peer_data: &HashMap<PeerId, PeerData>,
621 id: CollatorId,
622) {
623 if let Some(peer_id) = collator_peer_id(peer_data, &id) {
624 modify_reputation(reputation, sender, peer_id, BENEFIT_NOTIFY_GOOD).await;
625 }
626}
627
628async fn notify_collation_seconded(
630 sender: &mut impl overseer::CollatorProtocolSenderTrait,
631 peer_id: PeerId,
632 version: CollationVersion,
633 relay_parent: Hash,
634 statement: SignedFullStatement,
635) {
636 let statement = statement.into();
637 let wire_message = match version {
638 CollationVersion::V1 =>
639 CollationProtocols::V1(protocol_v1::CollationProtocol::CollatorProtocol(
640 protocol_v1::CollatorProtocolMessage::CollationSeconded(relay_parent, statement),
641 )),
642 CollationVersion::V2 =>
643 CollationProtocols::V2(protocol_v2::CollationProtocol::CollatorProtocol(
644 protocol_v2::CollatorProtocolMessage::CollationSeconded(relay_parent, statement),
645 )),
646 };
647 sender
648 .send_message(NetworkBridgeTxMessage::SendCollationMessage(vec![peer_id], wire_message))
649 .await;
650}
651
652fn handle_peer_view_change(state: &mut State, peer_id: PeerId, view: View) {
656 let peer_data = match state.peer_data.get_mut(&peer_id) {
657 Some(peer_data) => peer_data,
658 None => return,
659 };
660
661 peer_data.update_view(&state.implicit_view, &state.active_leaves, view);
662 state.collation_requests_cancel_handles.retain(|pc, handle| {
663 let keep = pc.peer_id != peer_id || peer_data.has_advertised(&pc.relay_parent, None);
664 if !keep {
665 handle.cancel();
666 }
667 keep
668 });
669}
670
671async fn request_collation(
677 sender: &mut impl overseer::CollatorProtocolSenderTrait,
678 state: &mut State,
679 pending_collation: PendingCollation,
680 collator_id: CollatorId,
681 peer_protocol_version: CollationVersion,
682) -> std::result::Result<(), FetchError> {
683 if state.collation_requests_cancel_handles.contains_key(&pending_collation) {
684 return Err(FetchError::AlreadyRequested)
685 }
686
687 let PendingCollation { relay_parent, para_id, peer_id, prospective_candidate, .. } =
688 pending_collation;
689 let per_relay_parent = state
690 .per_relay_parent
691 .get_mut(&relay_parent)
692 .ok_or(FetchError::RelayParentOutOfView)?;
693
694 let (requests, response_recv) = match (peer_protocol_version, prospective_candidate) {
695 (CollationVersion::V1, _) => {
696 let (req, response_recv) = OutgoingRequest::new(
697 Recipient::Peer(peer_id),
698 request_v1::CollationFetchingRequest { relay_parent, para_id },
699 );
700 let requests = Requests::CollationFetchingV1(req);
701 (requests, response_recv.boxed())
702 },
703 (CollationVersion::V2, Some(ProspectiveCandidate { candidate_hash, .. })) => {
704 let (req, response_recv) = OutgoingRequest::new(
705 Recipient::Peer(peer_id),
706 request_v2::CollationFetchingRequest { relay_parent, para_id, candidate_hash },
707 );
708 let requests = Requests::CollationFetchingV2(req);
709 (requests, response_recv.boxed())
710 },
711 _ => return Err(FetchError::ProtocolMismatch),
712 };
713
714 let cancellation_token = CancellationToken::new();
715 let collation_request = CollationFetchRequest {
716 pending_collation,
717 collator_id: collator_id.clone(),
718 collator_protocol_version: peer_protocol_version,
719 from_collator: response_recv,
720 cancellation_token: cancellation_token.clone(),
721 _lifetime_timer: state.metrics.time_collation_request_duration(),
722 };
723
724 state.collation_requests.push(collation_request);
725 state
726 .collation_requests_cancel_handles
727 .insert(pending_collation, cancellation_token);
728
729 gum::debug!(
730 target: LOG_TARGET,
731 peer_id = %peer_id,
732 %para_id,
733 ?relay_parent,
734 "Requesting collation",
735 );
736
737 let maybe_candidate_hash =
738 prospective_candidate.as_ref().map(ProspectiveCandidate::candidate_hash);
739 per_relay_parent.collations.status = CollationStatus::Fetching(para_id);
740 per_relay_parent
741 .collations
742 .fetching_from
743 .replace((collator_id, maybe_candidate_hash));
744
745 sender
746 .send_message(NetworkBridgeTxMessage::SendRequests(
747 vec![requests],
748 IfDisconnected::ImmediateError,
749 ))
750 .await;
751 Ok(())
752}
753
754#[overseer::contextbounds(CollatorProtocol, prefix = overseer)]
756async fn process_incoming_peer_message<Context>(
757 ctx: &mut Context,
758 state: &mut State,
759 origin: PeerId,
760 msg: CollationProtocols<
761 protocol_v1::CollatorProtocolMessage,
762 protocol_v2::CollatorProtocolMessage,
763 >,
764) {
765 use protocol_v1::CollatorProtocolMessage as V1;
766 use protocol_v2::CollatorProtocolMessage as V2;
767 use sp_runtime::traits::AppVerify;
768
769 match msg {
770 CollationProtocols::V1(V1::Declare(collator_id, para_id, signature)) |
771 CollationProtocols::V2(V2::Declare(collator_id, para_id, signature)) => {
772 if collator_peer_id(&state.peer_data, &collator_id).is_some() {
773 modify_reputation(
774 &mut state.reputation,
775 ctx.sender(),
776 origin,
777 COST_UNEXPECTED_MESSAGE,
778 )
779 .await;
780 return
781 }
782
783 let peer_data = match state.peer_data.get_mut(&origin) {
784 Some(p) => p,
785 None => {
786 gum::debug!(
787 target: LOG_TARGET,
788 peer_id = ?origin,
789 ?para_id,
790 "Unknown peer",
791 );
792 modify_reputation(
793 &mut state.reputation,
794 ctx.sender(),
795 origin,
796 COST_UNEXPECTED_MESSAGE,
797 )
798 .await;
799 return
800 },
801 };
802
803 if peer_data.is_collating() {
804 gum::debug!(
805 target: LOG_TARGET,
806 peer_id = ?origin,
807 ?para_id,
808 "Peer is already in the collating state",
809 );
810 modify_reputation(
811 &mut state.reputation,
812 ctx.sender(),
813 origin,
814 COST_UNEXPECTED_MESSAGE,
815 )
816 .await;
817 return
818 }
819
820 if !signature.verify(&*protocol_v1::declare_signature_payload(&origin), &collator_id) {
821 gum::debug!(
822 target: LOG_TARGET,
823 peer_id = ?origin,
824 ?para_id,
825 "Signature verification failure",
826 );
827 modify_reputation(
828 &mut state.reputation,
829 ctx.sender(),
830 origin,
831 COST_INVALID_SIGNATURE,
832 )
833 .await;
834 return
835 }
836
837 if state.current_assignments.contains_key(¶_id) {
838 gum::debug!(
839 target: LOG_TARGET,
840 peer_id = ?origin,
841 ?collator_id,
842 ?para_id,
843 "Declared as collator for current para",
844 );
845
846 peer_data.set_collating(collator_id, para_id);
847 } else {
848 gum::debug!(
849 target: LOG_TARGET,
850 peer_id = ?origin,
851 ?collator_id,
852 ?para_id,
853 "Declared as collator for unneeded para. Current assignments: {:?}",
854 &state.current_assignments
855 );
856
857 modify_reputation(
858 &mut state.reputation,
859 ctx.sender(),
860 origin,
861 COST_UNNEEDED_COLLATOR,
862 )
863 .await;
864 gum::trace!(target: LOG_TARGET, "Disconnecting unneeded collator");
865 disconnect_peer(ctx.sender(), origin).await;
866 }
867 },
868 CollationProtocols::V1(V1::AdvertiseCollation(relay_parent)) =>
869 if let Err(err) =
870 handle_advertisement(ctx.sender(), state, relay_parent, origin, None).await
871 {
872 gum::debug!(
873 target: LOG_TARGET,
874 peer_id = ?origin,
875 ?relay_parent,
876 error = ?err,
877 "Rejected v1 advertisement",
878 );
879
880 if let Some(rep) = err.reputation_changes() {
881 modify_reputation(&mut state.reputation, ctx.sender(), origin, rep).await;
882 }
883 },
884 CollationProtocols::V2(V2::AdvertiseCollation {
885 relay_parent,
886 candidate_hash,
887 parent_head_data_hash,
888 }) => {
889 if let Err(err) = handle_advertisement(
890 ctx.sender(),
891 state,
892 relay_parent,
893 origin,
894 Some((candidate_hash, parent_head_data_hash)),
895 )
896 .await
897 {
898 gum::debug!(
899 target: LOG_TARGET,
900 peer_id = ?origin,
901 ?relay_parent,
902 ?candidate_hash,
903 error = ?err,
904 "Rejected v2 advertisement",
905 );
906
907 if let Some(rep) = err.reputation_changes() {
908 modify_reputation(&mut state.reputation, ctx.sender(), origin, rep).await;
909 }
910 }
911 },
912 CollationProtocols::V1(V1::CollationSeconded(..)) |
913 CollationProtocols::V2(V2::CollationSeconded(..)) => {
914 gum::warn!(
915 target: LOG_TARGET,
916 peer_id = ?origin,
917 "Unexpected `CollationSeconded` message, decreasing reputation",
918 );
919
920 modify_reputation(&mut state.reputation, ctx.sender(), origin, COST_UNEXPECTED_MESSAGE)
921 .await;
922 },
923 }
924}
925
926#[derive(Debug)]
927enum AdvertisementError {
928 RelayParentUnknown,
930 UnknownPeer,
932 UndeclaredCollator,
934 InvalidAssignment,
936 SecondedLimitReached,
938 ProtocolMisuse,
941 #[allow(dead_code)]
943 Invalid(InsertAdvertisementError),
944 BlockedByBacking,
946}
947
948impl AdvertisementError {
949 fn reputation_changes(&self) -> Option<Rep> {
950 use AdvertisementError::*;
951 match self {
952 InvalidAssignment => Some(COST_WRONG_PARA),
953 ProtocolMisuse => Some(COST_PROTOCOL_MISUSE),
954 RelayParentUnknown | UndeclaredCollator | Invalid(_) => Some(COST_UNEXPECTED_MESSAGE),
955 UnknownPeer | SecondedLimitReached | BlockedByBacking => None,
956 }
957 }
958}
959
960async fn can_second<Sender>(
962 sender: &mut Sender,
963 candidate_para_id: ParaId,
964 candidate_relay_parent: Hash,
965 candidate_hash: CandidateHash,
966 parent_head_data_hash: Hash,
967) -> bool
968where
969 Sender: CollatorProtocolSenderTrait,
970{
971 let request = CanSecondRequest {
972 candidate_para_id,
973 candidate_relay_parent,
974 candidate_hash,
975 parent_head_data_hash,
976 };
977 let (tx, rx) = oneshot::channel();
978 sender.send_message(CandidateBackingMessage::CanSecond(request, tx)).await;
979
980 rx.await.unwrap_or_else(|err| {
981 gum::warn!(
982 target: LOG_TARGET,
983 ?err,
984 ?candidate_relay_parent,
985 ?candidate_para_id,
986 ?candidate_hash,
987 "CanSecond-request responder was dropped",
988 );
989
990 false
991 })
992}
993
994#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
996async fn second_unblocked_collations<Context>(
997 ctx: &mut Context,
998 state: &mut State,
999 para_id: ParaId,
1000 head_data: HeadData,
1001 head_data_hash: Hash,
1002) {
1003 if let Some(unblocked_collations) = state
1004 .blocked_from_seconding
1005 .remove(&BlockedCollationId { para_id, parent_head_data_hash: head_data_hash })
1006 {
1007 if !unblocked_collations.is_empty() {
1008 gum::debug!(
1009 target: LOG_TARGET,
1010 "Candidate outputting head data with hash {} unblocked {} collations for seconding.",
1011 head_data_hash,
1012 unblocked_collations.len()
1013 );
1014 }
1015
1016 for mut unblocked_collation in unblocked_collations {
1017 unblocked_collation.maybe_parent_head_data = Some(head_data.clone());
1018 let peer_id = unblocked_collation.collation_event.pending_collation.peer_id;
1019 let relay_parent = unblocked_collation.candidate_receipt.descriptor.relay_parent();
1020
1021 if let Err(err) = kick_off_seconding(ctx, state, unblocked_collation).await {
1022 gum::warn!(
1023 target: LOG_TARGET,
1024 ?relay_parent,
1025 ?para_id,
1026 ?peer_id,
1027 error = %err,
1028 "Seconding aborted due to an error",
1029 );
1030
1031 if err.is_malicious() {
1032 modify_reputation(
1034 &mut state.reputation,
1035 ctx.sender(),
1036 peer_id,
1037 COST_REPORT_BAD,
1038 )
1039 .await;
1040 }
1041 }
1042 }
1043 }
1044}
1045
1046fn ensure_seconding_limit_is_respected(
1047 relay_parent: &Hash,
1048 para_id: ParaId,
1049 state: &State,
1050) -> std::result::Result<(), AdvertisementError> {
1051 let paths = state.implicit_view.paths_via_relay_parent(relay_parent);
1052
1053 gum::trace!(
1054 target: LOG_TARGET,
1055 ?relay_parent,
1056 ?para_id,
1057 ?paths,
1058 "Checking seconding limit",
1059 );
1060
1061 let mut has_claim_at_some_path = false;
1062 for path in paths {
1063 let mut cq_state = ClaimQueueState::new();
1064 for ancestor in &path {
1065 let seconded_and_pending = state.seconded_and_pending_for_para(&ancestor, ¶_id);
1066 cq_state.add_leaf(
1067 &ancestor,
1068 &state
1069 .per_relay_parent
1070 .get(ancestor)
1071 .ok_or(AdvertisementError::RelayParentUnknown)?
1072 .assignment
1073 .current,
1074 );
1075 for _ in 0..seconded_and_pending {
1076 cq_state.claim_at(ancestor, ¶_id);
1077 }
1078 }
1079
1080 if cq_state.can_claim_at(relay_parent, ¶_id) {
1081 gum::trace!(
1082 target: LOG_TARGET,
1083 ?relay_parent,
1084 ?para_id,
1085 ?path,
1086 "Seconding limit respected at path",
1087 );
1088 has_claim_at_some_path = true;
1089 break
1090 }
1091 }
1092
1093 if has_claim_at_some_path {
1096 Ok(())
1097 } else {
1098 Err(AdvertisementError::SecondedLimitReached)
1099 }
1100}
1101
1102async fn handle_advertisement<Sender>(
1103 sender: &mut Sender,
1104 state: &mut State,
1105 relay_parent: Hash,
1106 peer_id: PeerId,
1107 prospective_candidate: Option<(CandidateHash, Hash)>,
1108) -> std::result::Result<(), AdvertisementError>
1109where
1110 Sender: CollatorProtocolSenderTrait,
1111{
1112 let peer_data = state.peer_data.get_mut(&peer_id).ok_or(AdvertisementError::UnknownPeer)?;
1113
1114 if peer_data.version == CollationVersion::V1 && !state.active_leaves.contains(&relay_parent) {
1115 return Err(AdvertisementError::ProtocolMisuse)
1116 }
1117
1118 let per_relay_parent = state
1119 .per_relay_parent
1120 .get(&relay_parent)
1121 .ok_or(AdvertisementError::RelayParentUnknown)?;
1122
1123 let assignment = &per_relay_parent.assignment;
1124
1125 let collator_para_id =
1126 peer_data.collating_para().ok_or(AdvertisementError::UndeclaredCollator)?;
1127
1128 if !assignment.current.contains(&collator_para_id) {
1130 return Err(AdvertisementError::InvalidAssignment)
1131 }
1132
1133 let candidate_hash = prospective_candidate.map(|(hash, ..)| hash);
1135 let (collator_id, para_id) = peer_data
1136 .insert_advertisement(
1137 relay_parent,
1138 candidate_hash,
1139 &state.implicit_view,
1140 &state.active_leaves,
1141 &per_relay_parent,
1142 )
1143 .map_err(AdvertisementError::Invalid)?;
1144
1145 ensure_seconding_limit_is_respected(&relay_parent, para_id, state)?;
1146
1147 if let Some((candidate_hash, parent_head_data_hash)) = prospective_candidate {
1148 let can_second = can_second(
1152 sender,
1153 collator_para_id,
1154 relay_parent,
1155 candidate_hash,
1156 parent_head_data_hash,
1157 )
1158 .await;
1159
1160 if !can_second {
1161 return Err(AdvertisementError::BlockedByBacking)
1162 }
1163 }
1164
1165 let result = enqueue_collation(
1166 sender,
1167 state,
1168 relay_parent,
1169 para_id,
1170 peer_id,
1171 collator_id,
1172 prospective_candidate,
1173 )
1174 .await;
1175
1176 if let Err(fetch_error) = result {
1177 gum::debug!(
1178 target: LOG_TARGET,
1179 relay_parent = ?relay_parent,
1180 para_id = ?para_id,
1181 peer_id = ?peer_id,
1182 error = %fetch_error,
1183 "Failed to request advertised collation",
1184 );
1185 }
1186
1187 Ok(())
1188}
1189
1190async fn enqueue_collation<Sender>(
1193 sender: &mut Sender,
1194 state: &mut State,
1195 relay_parent: Hash,
1196 para_id: ParaId,
1197 peer_id: PeerId,
1198 collator_id: CollatorId,
1199 prospective_candidate: Option<(CandidateHash, Hash)>,
1200) -> std::result::Result<(), FetchError>
1201where
1202 Sender: CollatorProtocolSenderTrait,
1203{
1204 gum::debug!(
1205 target: LOG_TARGET,
1206 peer_id = ?peer_id,
1207 %para_id,
1208 ?relay_parent,
1209 "Received advertise collation",
1210 );
1211 let per_relay_parent = match state.per_relay_parent.get_mut(&relay_parent) {
1212 Some(rp_state) => rp_state,
1213 None => {
1214 gum::trace!(
1216 target: LOG_TARGET,
1217 peer_id = ?peer_id,
1218 %para_id,
1219 ?relay_parent,
1220 ?prospective_candidate,
1221 "Candidate relay parent went out of view for valid advertisement",
1222 );
1223 return Ok(())
1224 },
1225 };
1226 let prospective_candidate =
1227 prospective_candidate.map(|(candidate_hash, parent_head_data_hash)| ProspectiveCandidate {
1228 candidate_hash,
1229 parent_head_data_hash,
1230 });
1231
1232 let collations = &mut per_relay_parent.collations;
1233 let pending_collation =
1234 PendingCollation::new(relay_parent, para_id, &peer_id, prospective_candidate);
1235
1236 match collations.status {
1237 CollationStatus::Fetching(_) | CollationStatus::WaitingOnValidation => {
1238 gum::trace!(
1239 target: LOG_TARGET,
1240 peer_id = ?peer_id,
1241 %para_id,
1242 ?relay_parent,
1243 "Added collation to the pending list"
1244 );
1245 collations.add_to_waiting_queue((pending_collation, collator_id));
1246 },
1247 CollationStatus::Waiting => {
1248 fetch_collation(sender, state, pending_collation, collator_id).await?;
1251 },
1252 }
1253
1254 Ok(())
1255}
1256
1257async fn handle_our_view_change<Sender>(
1259 sender: &mut Sender,
1260 state: &mut State,
1261 keystore: &KeystorePtr,
1262 view: OurView,
1263) -> Result<()>
1264where
1265 Sender: CollatorProtocolSenderTrait,
1266{
1267 let current_leaves = state.active_leaves.clone();
1268
1269 let removed = current_leaves.iter().filter(|h| !view.contains(h));
1270 let added = view.iter().filter(|h| !current_leaves.contains(h));
1271
1272 for leaf in added {
1273 let session_index = request_session_index_for_child(*leaf, sender)
1274 .await
1275 .await
1276 .map_err(Error::CancelledSessionIndex)??;
1277
1278 let v2_receipts = request_node_features(*leaf, session_index, sender)
1279 .await
1280 .await
1281 .map_err(Error::CancelledNodeFeatures)??
1282 .get(node_features::FeatureIndex::CandidateReceiptV2 as usize)
1283 .map(|b| *b)
1284 .unwrap_or(false);
1285
1286 let Some(per_relay_parent) = construct_per_relay_parent(
1287 sender,
1288 &mut state.current_assignments,
1289 keystore,
1290 *leaf,
1291 v2_receipts,
1292 session_index,
1293 )
1294 .await?
1295 else {
1296 continue
1297 };
1298
1299 state.active_leaves.insert(*leaf);
1300 state.per_relay_parent.insert(*leaf, per_relay_parent);
1301
1302 state
1303 .implicit_view
1304 .activate_leaf(sender, *leaf)
1305 .await
1306 .map_err(Error::ImplicitViewFetchError)?;
1307
1308 let allowed_ancestry = state
1310 .implicit_view
1311 .known_allowed_relay_parents_under(leaf, None)
1312 .unwrap_or_default();
1313 for block_hash in allowed_ancestry {
1314 if let Entry::Vacant(entry) = state.per_relay_parent.entry(*block_hash) {
1315 if let Some(per_relay_parent) = construct_per_relay_parent(
1318 sender,
1319 &mut state.current_assignments,
1320 keystore,
1321 *block_hash,
1322 v2_receipts,
1323 session_index,
1324 )
1325 .await?
1326 {
1327 entry.insert(per_relay_parent);
1328 }
1329 }
1330 }
1331 }
1332
1333 for removed in removed {
1334 gum::trace!(
1335 target: LOG_TARGET,
1336 ?view,
1337 ?removed,
1338 "handle_our_view_change - removed",
1339 );
1340
1341 state.active_leaves.remove(removed);
1342 let pruned = state.implicit_view.deactivate_leaf(*removed);
1346
1347 for removed in pruned {
1348 if let Some(per_relay_parent) = state.per_relay_parent.remove(&removed) {
1349 remove_outgoing(&mut state.current_assignments, per_relay_parent);
1350 }
1351
1352 state.collation_requests_cancel_handles.retain(|pc, handle| {
1353 let keep = pc.relay_parent != removed;
1354 if !keep {
1355 handle.cancel();
1356 }
1357 keep
1358 });
1359 state.fetched_candidates.retain(|k, _| k.relay_parent != removed);
1360 }
1361 }
1362
1363 state.blocked_from_seconding.retain(|_, collations| {
1365 collations.retain(|collation| {
1366 state
1367 .per_relay_parent
1368 .contains_key(&collation.candidate_receipt.descriptor.relay_parent())
1369 });
1370
1371 !collations.is_empty()
1372 });
1373
1374 for (peer_id, peer_data) in state.peer_data.iter_mut() {
1375 peer_data.prune_old_advertisements(&state.implicit_view, &state.active_leaves);
1376
1377 if let Some(para_id) = peer_data.collating_para() {
1382 if !state.current_assignments.contains_key(¶_id) {
1383 gum::trace!(
1384 target: LOG_TARGET,
1385 ?peer_id,
1386 ?para_id,
1387 "Disconnecting peer on view change (not current parachain id)"
1388 );
1389 disconnect_peer(sender, *peer_id).await;
1390 }
1391 }
1392 }
1393
1394 Ok(())
1395}
1396
1397#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
1399async fn handle_network_msg<Context>(
1400 ctx: &mut Context,
1401 state: &mut State,
1402 keystore: &KeystorePtr,
1403 bridge_message: NetworkBridgeEvent<net_protocol::CollatorProtocolMessage>,
1404) -> Result<()> {
1405 use NetworkBridgeEvent::*;
1406
1407 match bridge_message {
1408 PeerConnected(peer_id, observed_role, protocol_version, _) => {
1409 let version = match protocol_version.try_into() {
1410 Ok(version) => version,
1411 Err(err) => {
1412 gum::error!(
1414 target: LOG_TARGET,
1415 ?peer_id,
1416 ?observed_role,
1417 ?err,
1418 "Unsupported protocol version"
1419 );
1420 return Ok(())
1421 },
1422 };
1423 state.peer_data.entry(peer_id).or_insert_with(|| PeerData {
1424 view: View::default(),
1425 state: PeerState::Connected(Instant::now()),
1426 version,
1427 });
1428 state.metrics.note_collator_peer_count(state.peer_data.len());
1429 },
1430 PeerDisconnected(peer_id) => {
1431 state.peer_data.remove(&peer_id);
1432 state.metrics.note_collator_peer_count(state.peer_data.len());
1433 },
1434 NewGossipTopology { .. } => {
1435 },
1437 PeerViewChange(peer_id, view) => {
1438 handle_peer_view_change(state, peer_id, view);
1439 },
1440 OurViewChange(view) => {
1441 handle_our_view_change(ctx.sender(), state, keystore, view).await?;
1442 },
1443 PeerMessage(remote, msg) => {
1444 process_incoming_peer_message(ctx, state, remote, msg).await;
1445 },
1446 UpdatedAuthorityIds { .. } => {
1447 },
1449 }
1450
1451 Ok(())
1452}
1453
1454#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
1456async fn process_msg<Context>(
1457 ctx: &mut Context,
1458 keystore: &KeystorePtr,
1459 msg: CollatorProtocolMessage,
1460 state: &mut State,
1461) {
1462 use CollatorProtocolMessage::*;
1463
1464 let _timer = state.metrics.time_process_msg();
1465
1466 match msg {
1467 CollateOn(id) => {
1468 gum::warn!(
1469 target: LOG_TARGET,
1470 para_id = %id,
1471 "CollateOn message is not expected on the validator side of the protocol",
1472 );
1473 },
1474 DistributeCollation { .. } => {
1475 gum::warn!(
1476 target: LOG_TARGET,
1477 "DistributeCollation message is not expected on the validator side of the protocol",
1478 );
1479 },
1480 NetworkBridgeUpdate(event) => {
1481 if let Err(e) = handle_network_msg(ctx, state, keystore, event).await {
1482 gum::warn!(
1483 target: LOG_TARGET,
1484 err = ?e,
1485 "Failed to handle incoming network message",
1486 );
1487 }
1488 },
1489 Seconded(parent, stmt) => {
1490 let receipt = match stmt.payload() {
1491 Statement::Seconded(receipt) => receipt,
1492 Statement::Valid(_) => {
1493 gum::warn!(
1494 target: LOG_TARGET,
1495 ?stmt,
1496 relay_parent = %parent,
1497 "Seconded message received with a `Valid` statement",
1498 );
1499 return
1500 },
1501 };
1502 let output_head_data = receipt.commitments.head_data.clone();
1503 let output_head_data_hash = receipt.descriptor.para_head();
1504 let fetched_collation = FetchedCollation::from(&receipt.to_plain());
1505 if let Some(CollationEvent { collator_id, pending_collation, .. }) =
1506 state.fetched_candidates.remove(&fetched_collation)
1507 {
1508 let PendingCollation {
1509 relay_parent, peer_id, prospective_candidate, para_id, ..
1510 } = pending_collation;
1511 note_good_collation(
1512 &mut state.reputation,
1513 ctx.sender(),
1514 &state.peer_data,
1515 collator_id.clone(),
1516 )
1517 .await;
1518 if let Some(peer_data) = state.peer_data.get(&peer_id) {
1519 notify_collation_seconded(
1520 ctx.sender(),
1521 peer_id,
1522 peer_data.version,
1523 relay_parent,
1524 stmt,
1525 )
1526 .await;
1527 }
1528
1529 if let Some(rp_state) = state.per_relay_parent.get_mut(&parent) {
1530 rp_state.collations.note_seconded(para_id);
1531 }
1532
1533 second_unblocked_collations(
1535 ctx,
1536 state,
1537 fetched_collation.para_id,
1538 output_head_data,
1539 output_head_data_hash,
1540 )
1541 .await;
1542
1543 let maybe_candidate_hash =
1545 prospective_candidate.as_ref().map(ProspectiveCandidate::candidate_hash);
1546 dequeue_next_collation_and_fetch(
1547 ctx,
1548 state,
1549 parent,
1550 (collator_id, maybe_candidate_hash),
1551 )
1552 .await;
1553 } else {
1554 gum::debug!(
1555 target: LOG_TARGET,
1556 relay_parent = ?parent,
1557 "Collation has been seconded, but the relay parent is deactivated",
1558 );
1559 }
1560 },
1561 Invalid(parent, candidate_receipt) => {
1562 state.blocked_from_seconding.remove(&BlockedCollationId {
1564 para_id: candidate_receipt.descriptor.para_id(),
1565 parent_head_data_hash: candidate_receipt.descriptor.para_head(),
1566 });
1567
1568 let fetched_collation = FetchedCollation::from(&candidate_receipt);
1569 let candidate_hash = fetched_collation.candidate_hash;
1570 let id = match state.fetched_candidates.entry(fetched_collation) {
1571 Entry::Occupied(entry)
1572 if entry.get().pending_collation.commitments_hash ==
1573 Some(candidate_receipt.commitments_hash) =>
1574 entry.remove().collator_id,
1575 Entry::Occupied(_) => {
1576 gum::error!(
1577 target: LOG_TARGET,
1578 relay_parent = ?parent,
1579 candidate = ?candidate_receipt.hash(),
1580 "Reported invalid candidate for unknown `pending_candidate`!",
1581 );
1582 return
1583 },
1584 Entry::Vacant(_) => return,
1585 };
1586
1587 report_collator(&mut state.reputation, ctx.sender(), &state.peer_data, id.clone())
1588 .await;
1589
1590 dequeue_next_collation_and_fetch(ctx, state, parent, (id, Some(candidate_hash))).await;
1591 },
1592 }
1593}
1594
1595#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
1597pub(crate) async fn run<Context>(
1598 ctx: Context,
1599 keystore: KeystorePtr,
1600 eviction_policy: crate::CollatorEvictionPolicy,
1601 metrics: Metrics,
1602) -> std::result::Result<(), std::convert::Infallible> {
1603 run_inner(
1604 ctx,
1605 keystore,
1606 eviction_policy,
1607 metrics,
1608 ReputationAggregator::default(),
1609 REPUTATION_CHANGE_INTERVAL,
1610 )
1611 .await
1612}
1613
1614#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
1615async fn run_inner<Context>(
1616 mut ctx: Context,
1617 keystore: KeystorePtr,
1618 eviction_policy: crate::CollatorEvictionPolicy,
1619 metrics: Metrics,
1620 reputation: ReputationAggregator,
1621 reputation_interval: Duration,
1622) -> std::result::Result<(), std::convert::Infallible> {
1623 let new_reputation_delay = || futures_timer::Delay::new(reputation_interval).fuse();
1624 let mut reputation_delay = new_reputation_delay();
1625
1626 let mut state = State { metrics, reputation, ..Default::default() };
1627
1628 let next_inactivity_stream = tick_stream(ACTIVITY_POLL);
1629 futures::pin_mut!(next_inactivity_stream);
1630
1631 let mut network_error_freq = gum::Freq::new();
1632 let mut canceled_freq = gum::Freq::new();
1633
1634 loop {
1635 select! {
1636 _ = reputation_delay => {
1637 state.reputation.send(ctx.sender()).await;
1638 reputation_delay = new_reputation_delay();
1639 },
1640 res = ctx.recv().fuse() => {
1641 match res {
1642 Ok(FromOrchestra::Communication { msg }) => {
1643 gum::trace!(target: LOG_TARGET, msg = ?msg, "received a message");
1644 process_msg(
1645 &mut ctx,
1646 &keystore,
1647 msg,
1648 &mut state,
1649 ).await;
1650 }
1651 Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) | Err(_) => break,
1652 Ok(FromOrchestra::Signal(_)) => continue,
1653 }
1654 },
1655 _ = next_inactivity_stream.next() => {
1656 disconnect_inactive_peers(ctx.sender(), &eviction_policy, &state.peer_data).await;
1657 },
1658 resp = state.collation_requests.select_next_some() => {
1659 let relay_parent = resp.0.pending_collation.relay_parent;
1660 let res = match handle_collation_fetch_response(
1661 &mut state,
1662 resp,
1663 &mut network_error_freq,
1664 &mut canceled_freq,
1665 ).await {
1666 Err(Some((peer_id, rep))) => {
1667 modify_reputation(&mut state.reputation, ctx.sender(), peer_id, rep).await;
1668 state.per_relay_parent.get_mut(&relay_parent).map(|rp| {
1670 rp.collations.status.back_to_waiting();
1671 });
1672 continue
1673 },
1674 Err(None) => {
1675 state.per_relay_parent.get_mut(&relay_parent).map(|rp| {
1677 rp.collations.status.back_to_waiting();
1678 });
1679 continue
1680 },
1681 Ok(res) => res
1682 };
1683
1684 let CollationEvent {collator_id, pending_collation, .. } = res.collation_event.clone();
1685
1686 match kick_off_seconding(&mut ctx, &mut state, res).await {
1687 Err(err) => {
1688 gum::warn!(
1689 target: LOG_TARGET,
1690 relay_parent = ?pending_collation.relay_parent,
1691 para_id = ?pending_collation.para_id,
1692 peer_id = ?pending_collation.peer_id,
1693 error = %err,
1694 "Seconding aborted due to an error",
1695 );
1696
1697 if err.is_malicious() {
1698 modify_reputation(&mut state.reputation, ctx.sender(), pending_collation.peer_id, COST_REPORT_BAD).await;
1700 }
1701 let maybe_candidate_hash =
1702 pending_collation.prospective_candidate.as_ref().map(ProspectiveCandidate::candidate_hash);
1703 dequeue_next_collation_and_fetch(
1704 &mut ctx,
1705 &mut state,
1706 pending_collation.relay_parent,
1707 (collator_id, maybe_candidate_hash),
1708 )
1709 .await;
1710 },
1711 Ok(false) => {
1712 let maybe_candidate_hash =
1714 pending_collation.prospective_candidate.as_ref().map(ProspectiveCandidate::candidate_hash);
1715 dequeue_next_collation_and_fetch(
1716 &mut ctx,
1717 &mut state,
1718 pending_collation.relay_parent,
1719 (collator_id, maybe_candidate_hash),
1720 )
1721 .await;
1722 }
1723 Ok(true) => {}
1724 }
1725 },
1726 res = state.collation_fetch_timeouts.select_next_some() => {
1727 let (collator_id, maybe_candidate_hash, relay_parent) = res;
1728 gum::debug!(
1729 target: LOG_TARGET,
1730 ?relay_parent,
1731 ?collator_id,
1732 "Timeout hit - already seconded?"
1733 );
1734 dequeue_next_collation_and_fetch(
1735 &mut ctx,
1736 &mut state,
1737 relay_parent,
1738 (collator_id, maybe_candidate_hash),
1739 )
1740 .await;
1741 }
1742 }
1743 }
1744
1745 Ok(())
1746}
1747
1748#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
1750async fn dequeue_next_collation_and_fetch<Context>(
1751 ctx: &mut Context,
1752 state: &mut State,
1753 relay_parent: Hash,
1754 previous_fetch: (CollatorId, Option<CandidateHash>),
1756) {
1757 while let Some((next, id)) = get_next_collation_to_fetch(&previous_fetch, relay_parent, state) {
1758 gum::debug!(
1759 target: LOG_TARGET,
1760 ?relay_parent,
1761 ?id,
1762 "Successfully dequeued next advertisement - fetching ..."
1763 );
1764 if let Err(err) = fetch_collation(ctx.sender(), state, next, id).await {
1765 gum::debug!(
1766 target: LOG_TARGET,
1767 relay_parent = ?next.relay_parent,
1768 para_id = ?next.para_id,
1769 peer_id = ?next.peer_id,
1770 error = %err,
1771 "Failed to request a collation, dequeueing next one",
1772 );
1773 } else {
1774 break
1775 }
1776 }
1777}
1778
1779async fn request_persisted_validation_data<Sender>(
1780 sender: &mut Sender,
1781 relay_parent: Hash,
1782 para_id: ParaId,
1783) -> std::result::Result<Option<PersistedValidationData>, SecondingError>
1784where
1785 Sender: CollatorProtocolSenderTrait,
1786{
1787 polkadot_node_subsystem_util::request_persisted_validation_data(
1789 relay_parent,
1790 para_id,
1791 OccupiedCoreAssumption::Free,
1792 sender,
1793 )
1794 .await
1795 .await
1796 .map_err(SecondingError::CancelledRuntimePersistedValidationData)?
1797 .map_err(SecondingError::RuntimeApi)
1798}
1799
1800async fn request_prospective_validation_data<Sender>(
1801 sender: &mut Sender,
1802 candidate_relay_parent: Hash,
1803 parent_head_data_hash: Hash,
1804 para_id: ParaId,
1805 maybe_parent_head_data: Option<HeadData>,
1806) -> std::result::Result<Option<PersistedValidationData>, SecondingError>
1807where
1808 Sender: CollatorProtocolSenderTrait,
1809{
1810 let (tx, rx) = oneshot::channel();
1811
1812 let parent_head_data = if let Some(head_data) = maybe_parent_head_data {
1813 ParentHeadData::WithData { head_data, hash: parent_head_data_hash }
1814 } else {
1815 ParentHeadData::OnlyHash(parent_head_data_hash)
1816 };
1817
1818 let request =
1819 ProspectiveValidationDataRequest { para_id, candidate_relay_parent, parent_head_data };
1820
1821 sender
1822 .send_message(ProspectiveParachainsMessage::GetProspectiveValidationData(request, tx))
1823 .await;
1824
1825 rx.await.map_err(SecondingError::CancelledProspectiveValidationData)
1826}
1827
1828#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
1831async fn kick_off_seconding<Context>(
1832 ctx: &mut Context,
1833 state: &mut State,
1834 PendingCollationFetch { mut collation_event, candidate_receipt, pov, maybe_parent_head_data }: PendingCollationFetch,
1835) -> std::result::Result<bool, SecondingError> {
1836 let pending_collation = collation_event.pending_collation;
1837 let relay_parent = pending_collation.relay_parent;
1838
1839 let per_relay_parent = match state.per_relay_parent.get_mut(&relay_parent) {
1840 Some(state) => state,
1841 None => {
1842 gum::trace!(
1844 target: LOG_TARGET,
1845 relay_parent = ?relay_parent,
1846 "Fetched collation for a parent out of view",
1847 );
1848 return Ok(false)
1849 },
1850 };
1851
1852 descriptor_version_sanity_check(candidate_receipt.descriptor(), per_relay_parent)?;
1854
1855 let collations = &mut per_relay_parent.collations;
1856
1857 let fetched_collation = FetchedCollation::from(&candidate_receipt);
1858 if let Entry::Vacant(entry) = state.fetched_candidates.entry(fetched_collation) {
1859 collation_event.pending_collation.commitments_hash =
1860 Some(candidate_receipt.commitments_hash);
1861
1862 let (maybe_pvd, maybe_parent_head, maybe_parent_head_hash) = match (
1863 collation_event.collator_protocol_version,
1864 collation_event.pending_collation.prospective_candidate,
1865 ) {
1866 (CollationVersion::V2, Some(ProspectiveCandidate { parent_head_data_hash, .. })) => {
1867 let pvd = request_prospective_validation_data(
1868 ctx.sender(),
1869 relay_parent,
1870 parent_head_data_hash,
1871 pending_collation.para_id,
1872 maybe_parent_head_data.clone(),
1873 )
1874 .await?;
1875
1876 (pvd, maybe_parent_head_data, Some(parent_head_data_hash))
1877 },
1878 (CollationVersion::V1, _) => {
1879 let pvd = request_persisted_validation_data(
1880 ctx.sender(),
1881 candidate_receipt.descriptor().relay_parent(),
1882 candidate_receipt.descriptor().para_id(),
1883 )
1884 .await?;
1885 (
1886 Some(pvd.ok_or(SecondingError::PersistedValidationDataNotFound)?),
1887 maybe_parent_head_data,
1888 None,
1889 )
1890 },
1891 _ => {
1892 return Ok(false)
1894 },
1895 };
1896
1897 let pvd = match (maybe_pvd, maybe_parent_head.clone(), maybe_parent_head_hash) {
1898 (Some(pvd), _, _) => pvd,
1899 (None, None, Some(parent_head_data_hash)) => {
1900 let blocked_collation = PendingCollationFetch {
1904 collation_event,
1905 candidate_receipt,
1906 pov,
1907 maybe_parent_head_data: None,
1908 };
1909 gum::debug!(
1910 target: LOG_TARGET,
1911 candidate_hash = ?blocked_collation.candidate_receipt.hash(),
1912 relay_parent = ?blocked_collation.candidate_receipt.descriptor.relay_parent(),
1913 "Collation having parent head data hash {} is blocked from seconding. Waiting on its parent to be validated.",
1914 parent_head_data_hash
1915 );
1916 state
1917 .blocked_from_seconding
1918 .entry(BlockedCollationId {
1919 para_id: blocked_collation.candidate_receipt.descriptor.para_id(),
1920 parent_head_data_hash,
1921 })
1922 .or_insert_with(Vec::new)
1923 .push(blocked_collation);
1924
1925 return Ok(false)
1926 },
1927 (None, _, _) => {
1928 return Err(SecondingError::PersistedValidationDataNotFound)
1931 },
1932 };
1933
1934 fetched_collation_sanity_check(
1935 &collation_event.pending_collation,
1936 &candidate_receipt,
1937 &pvd,
1938 maybe_parent_head.and_then(|head| maybe_parent_head_hash.map(|hash| (head, hash))),
1939 )?;
1940
1941 ctx.send_message(CandidateBackingMessage::Second(
1942 relay_parent,
1943 candidate_receipt,
1944 pvd,
1945 pov,
1946 ))
1947 .await;
1948 collations.status = CollationStatus::WaitingOnValidation;
1951
1952 entry.insert(collation_event);
1953 Ok(true)
1954 } else {
1955 Err(SecondingError::Duplicate)
1956 }
1957}
1958
1959async fn disconnect_inactive_peers(
1963 sender: &mut impl overseer::CollatorProtocolSenderTrait,
1964 eviction_policy: &crate::CollatorEvictionPolicy,
1965 peers: &HashMap<PeerId, PeerData>,
1966) {
1967 for (peer, peer_data) in peers {
1968 if peer_data.is_inactive(&eviction_policy) {
1969 gum::trace!(target: LOG_TARGET, ?peer, "Disconnecting inactive peer");
1970 disconnect_peer(sender, *peer).await;
1971 }
1972 }
1973}
1974
1975async fn handle_collation_fetch_response(
1977 state: &mut State,
1978 response: <CollationFetchRequest as Future>::Output,
1979 network_error_freq: &mut gum::Freq,
1980 canceled_freq: &mut gum::Freq,
1981) -> std::result::Result<PendingCollationFetch, Option<(PeerId, Rep)>> {
1982 let (CollationEvent { collator_id, collator_protocol_version, pending_collation }, response) =
1983 response;
1984 state.collation_requests_cancel_handles.remove(&pending_collation);
1986
1987 let response = match response {
1988 Err(CollationFetchError::Cancelled) => {
1989 gum::debug!(
1990 target: LOG_TARGET,
1991 hash = ?pending_collation.relay_parent,
1992 para_id = ?pending_collation.para_id,
1993 peer_id = ?pending_collation.peer_id,
1994 "Request was cancelled from the validator side"
1995 );
1996 return Err(None)
1997 },
1998 Err(CollationFetchError::Request(req_error)) => Err(req_error),
1999 Ok(resp) => Ok(resp),
2000 };
2001
2002 let _timer = state.metrics.time_handle_collation_request_result();
2003
2004 let mut metrics_result = Err(());
2005
2006 let result = match response {
2007 Err(RequestError::InvalidResponse(err)) => {
2008 gum::warn!(
2009 target: LOG_TARGET,
2010 hash = ?pending_collation.relay_parent,
2011 para_id = ?pending_collation.para_id,
2012 peer_id = ?pending_collation.peer_id,
2013 err = ?err,
2014 "Collator provided response that could not be decoded"
2015 );
2016 Err(Some((pending_collation.peer_id, COST_CORRUPTED_MESSAGE)))
2017 },
2018 Err(err) if err.is_timed_out() => {
2019 gum::debug!(
2020 target: LOG_TARGET,
2021 hash = ?pending_collation.relay_parent,
2022 para_id = ?pending_collation.para_id,
2023 peer_id = ?pending_collation.peer_id,
2024 "Request timed out"
2025 );
2026 Err(None)
2029 },
2030 Err(RequestError::NetworkError(err)) => {
2031 gum::warn_if_frequent!(
2032 freq: network_error_freq,
2033 max_rate: gum::Times::PerHour(100),
2034 target: LOG_TARGET,
2035 hash = ?pending_collation.relay_parent,
2036 para_id = ?pending_collation.para_id,
2037 peer_id = ?pending_collation.peer_id,
2038 err = ?err,
2039 "Fetching collation failed due to network error"
2040 );
2041 Err(Some((pending_collation.peer_id, COST_NETWORK_ERROR)))
2046 },
2047 Err(RequestError::Canceled(err)) => {
2048 gum::warn_if_frequent!(
2049 freq: canceled_freq,
2050 max_rate: gum::Times::PerHour(100),
2051 target: LOG_TARGET,
2052 hash = ?pending_collation.relay_parent,
2053 para_id = ?pending_collation.para_id,
2054 peer_id = ?pending_collation.peer_id,
2055 err = ?err,
2056 "Canceled should be handled by `is_timed_out` above - this is a bug!"
2057 );
2058 Err(None)
2059 },
2060 Ok(
2061 request_v1::CollationFetchingResponse::Collation(receipt, _) |
2062 request_v2::CollationFetchingResponse::Collation(receipt, _) |
2063 request_v1::CollationFetchingResponse::CollationWithParentHeadData { receipt, .. } |
2064 request_v2::CollationFetchingResponse::CollationWithParentHeadData { receipt, .. },
2065 ) if receipt.descriptor().para_id() != pending_collation.para_id => {
2066 gum::debug!(
2067 target: LOG_TARGET,
2068 expected_para_id = ?pending_collation.para_id,
2069 got_para_id = ?receipt.descriptor().para_id(),
2070 peer_id = ?pending_collation.peer_id,
2071 "Got wrong para ID for requested collation."
2072 );
2073
2074 Err(Some((pending_collation.peer_id, COST_WRONG_PARA)))
2075 },
2076 Ok(request_v1::CollationFetchingResponse::Collation(candidate_receipt, pov)) => {
2077 gum::debug!(
2078 target: LOG_TARGET,
2079 para_id = %pending_collation.para_id,
2080 hash = ?pending_collation.relay_parent,
2081 candidate_hash = ?candidate_receipt.hash(),
2082 "Received collation",
2083 );
2084
2085 metrics_result = Ok(());
2086 Ok(PendingCollationFetch {
2087 collation_event: CollationEvent {
2088 collator_id,
2089 pending_collation,
2090 collator_protocol_version,
2091 },
2092 candidate_receipt,
2093 pov,
2094 maybe_parent_head_data: None,
2095 })
2096 },
2097 Ok(request_v2::CollationFetchingResponse::CollationWithParentHeadData {
2098 receipt,
2099 pov,
2100 parent_head_data,
2101 }) => {
2102 gum::debug!(
2103 target: LOG_TARGET,
2104 para_id = %pending_collation.para_id,
2105 hash = ?pending_collation.relay_parent,
2106 candidate_hash = ?receipt.hash(),
2107 "Received collation (v3)",
2108 );
2109
2110 metrics_result = Ok(());
2111 Ok(PendingCollationFetch {
2112 collation_event: CollationEvent {
2113 collator_id,
2114 pending_collation,
2115 collator_protocol_version,
2116 },
2117 candidate_receipt: receipt,
2118 pov,
2119 maybe_parent_head_data: Some(parent_head_data),
2120 })
2121 },
2122 };
2123 state.metrics.on_request(metrics_result);
2124 result
2125}
2126
2127fn unfulfilled_claim_queue_entries(relay_parent: &Hash, state: &State) -> Result<Vec<ParaId>> {
2131 let relay_parent_state = state
2132 .per_relay_parent
2133 .get(relay_parent)
2134 .ok_or(Error::RelayParentStateNotFound)?;
2135 let scheduled_paras = relay_parent_state.assignment.current.iter().collect::<HashSet<_>>();
2136 let paths = state.implicit_view.paths_via_relay_parent(relay_parent);
2137
2138 let mut claim_queue_states = Vec::new();
2139 for path in paths {
2140 let mut cq_state = ClaimQueueState::new();
2141 for ancestor in &path {
2142 cq_state.add_leaf(
2143 &ancestor,
2144 &state
2145 .per_relay_parent
2146 .get(&ancestor)
2147 .ok_or(Error::RelayParentStateNotFound)?
2148 .assignment
2149 .current,
2150 );
2151
2152 for para_id in &scheduled_paras {
2153 let seconded_and_pending = state.seconded_and_pending_for_para(&ancestor, ¶_id);
2154 for _ in 0..seconded_and_pending {
2155 cq_state.claim_at(&ancestor, ¶_id);
2156 }
2157 }
2158 }
2159 claim_queue_states.push(cq_state);
2160 }
2161
2162 let unfulfilled_entries = claim_queue_states
2168 .iter_mut()
2169 .map(|cq| cq.unclaimed_at(relay_parent))
2170 .max_by(|a, b| a.len().cmp(&b.len()))
2171 .unwrap_or_default();
2172
2173 Ok(unfulfilled_entries)
2174}
2175
2176fn get_next_collation_to_fetch(
2179 finished_one: &(CollatorId, Option<CandidateHash>),
2180 relay_parent: Hash,
2181 state: &mut State,
2182) -> Option<(PendingCollation, CollatorId)> {
2183 let unfulfilled_entries = match unfulfilled_claim_queue_entries(&relay_parent, &state) {
2184 Ok(entries) => entries,
2185 Err(err) => {
2186 gum::error!(
2187 target: LOG_TARGET,
2188 ?relay_parent,
2189 ?err,
2190 "Failed to get unfulfilled claim queue entries"
2191 );
2192 return None
2193 },
2194 };
2195 let rp_state = match state.per_relay_parent.get_mut(&relay_parent) {
2196 Some(rp_state) => rp_state,
2197 None => {
2198 gum::error!(
2199 target: LOG_TARGET,
2200 ?relay_parent,
2201 "Failed to get relay parent state"
2202 );
2203 return None
2204 },
2205 };
2206
2207 if let Some((collator_id, maybe_candidate_hash)) = rp_state.collations.fetching_from.as_ref() {
2210 if collator_id != &finished_one.0 &&
2212 maybe_candidate_hash.map_or(true, |hash| Some(&hash) != finished_one.1.as_ref())
2213 {
2214 gum::trace!(
2215 target: LOG_TARGET,
2216 waiting_collation = ?rp_state.collations.fetching_from,
2217 ?finished_one,
2218 "Not proceeding to the next collation - has already been done."
2219 );
2220 return None
2221 }
2222 }
2223 rp_state.collations.status.back_to_waiting();
2224 rp_state.collations.pick_a_collation_to_fetch(unfulfilled_entries)
2225}
2226
2227fn descriptor_version_sanity_check(
2229 descriptor: &CandidateDescriptorV2,
2230 per_relay_parent: &PerRelayParent,
2231) -> std::result::Result<(), SecondingError> {
2232 match descriptor.version() {
2233 CandidateDescriptorVersion::V1 => Ok(()),
2234 CandidateDescriptorVersion::V2 if per_relay_parent.v2_receipts => {
2235 if let Some(core_index) = descriptor.core_index() {
2236 if core_index != per_relay_parent.current_core {
2237 return Err(SecondingError::InvalidCoreIndex(
2238 core_index.0,
2239 per_relay_parent.current_core.0,
2240 ))
2241 }
2242 }
2243
2244 if let Some(session_index) = descriptor.session_index() {
2245 if session_index != per_relay_parent.session_index {
2246 return Err(SecondingError::InvalidSessionIndex(
2247 session_index,
2248 per_relay_parent.session_index,
2249 ))
2250 }
2251 }
2252
2253 Ok(())
2254 },
2255 descriptor_version => Err(SecondingError::InvalidReceiptVersion(descriptor_version)),
2256 }
2257}