1use super::*;
20
21use always_assert::never;
22use bytes::Bytes;
23use codec::{Decode, DecodeAll};
24use net_protocol::filter_by_peer_version;
25use parking_lot::Mutex;
26
27use sc_network::{
28 service::traits::{NotificationEvent, ValidationResult},
29 MessageSink, NotificationService,
30};
31use sp_consensus::SyncOracle;
32
33use polkadot_node_network_protocol::{
34 self as net_protocol,
35 grid_topology::{SessionGridTopology, TopologyPeerInfo},
36 peer_set::{
37 CollationVersion, PeerSet, PeerSetProtocolNames, PerPeerSet, ProtocolVersion,
38 ValidationVersion,
39 },
40 v1 as protocol_v1, v2 as protocol_v2, v3 as protocol_v3, ObservedRole, OurView, PeerId,
41 UnifiedReputationChange as Rep, View,
42};
43
44use polkadot_node_subsystem::{
45 errors::SubsystemError,
46 messages::{
47 network_bridge_event::NewGossipTopology, ApprovalVotingParallelMessage,
48 BitfieldDistributionMessage, CollatorProtocolMessage, GossipSupportMessage,
49 NetworkBridgeEvent, NetworkBridgeRxMessage, StatementDistributionMessage,
50 },
51 overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem,
52};
53
54use polkadot_primitives::{AuthorityDiscoveryId, BlockNumber, Hash, ValidatorIndex};
55
56use std::{
57 collections::{hash_map, HashMap},
58 iter::ExactSizeIterator,
59 u32,
60};
61
62use super::validator_discovery;
63
64use crate::network::{
68 send_collation_message_v1, send_collation_message_v2, send_validation_message_v3, Network,
69};
70use crate::{network::get_peer_id_by_authority_id, WireMessage};
71
72use super::metrics::Metrics;
73
74#[cfg(test)]
75mod tests;
76
77const LOG_TARGET: &'static str = "parachain::network-bridge-rx";
79
80pub struct NetworkBridgeRx<N, AD> {
82 network_service: N,
84 authority_discovery_service: AD,
85 sync_oracle: Box<dyn SyncOracle + Send>,
86 shared: Shared,
87 metrics: Metrics,
88 peerset_protocol_names: PeerSetProtocolNames,
89 validation_service: Box<dyn NotificationService>,
90 collation_service: Box<dyn NotificationService>,
91 notification_sinks: Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
92}
93
94impl<N, AD> NetworkBridgeRx<N, AD> {
95 pub fn new(
101 network_service: N,
102 authority_discovery_service: AD,
103 sync_oracle: Box<dyn SyncOracle + Send>,
104 metrics: Metrics,
105 peerset_protocol_names: PeerSetProtocolNames,
106 mut notification_services: HashMap<PeerSet, Box<dyn NotificationService>>,
107 notification_sinks: Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
108 ) -> Self {
109 let shared = Shared::default();
110
111 let validation_service = notification_services
112 .remove(&PeerSet::Validation)
113 .expect("validation protocol was enabled so `NotificationService` must exist; qed");
114 let collation_service = notification_services
115 .remove(&PeerSet::Collation)
116 .expect("collation protocol was enabled so `NotificationService` must exist; qed");
117
118 Self {
119 network_service,
120 authority_discovery_service,
121 sync_oracle,
122 shared,
123 metrics,
124 peerset_protocol_names,
125 validation_service,
126 collation_service,
127 notification_sinks,
128 }
129 }
130}
131
132#[overseer::subsystem(NetworkBridgeRx, error = SubsystemError, prefix = self::overseer)]
133impl<Net, AD, Context> NetworkBridgeRx<Net, AD>
134where
135 Net: Network + Sync,
136 AD: validator_discovery::AuthorityDiscovery + Clone + Sync,
137{
138 fn start(self, ctx: Context) -> SpawnedSubsystem {
139 let future = run_network_in(self, ctx)
142 .map_err(|e| SubsystemError::with_origin("network-bridge", e))
143 .boxed();
144 SpawnedSubsystem { name: "network-bridge-rx-subsystem", future }
145 }
146}
147
148async fn handle_validation_message<AD>(
150 event: NotificationEvent,
151 network_service: &mut impl Network,
152 sender: &mut impl overseer::NetworkBridgeRxSenderTrait,
153 authority_discovery_service: &mut AD,
154 metrics: &Metrics,
155 shared: &Shared,
156 peerset_protocol_names: &PeerSetProtocolNames,
157 notification_service: &mut Box<dyn NotificationService>,
158 notification_sinks: &mut Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
159) where
160 AD: validator_discovery::AuthorityDiscovery + Send,
161{
162 match event {
163 NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx, .. } => {
164 let result = network_service
166 .peer_role(peer, handshake)
167 .map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
168 let _ = result_tx.send(result);
169 },
170 NotificationEvent::NotificationStreamOpened {
171 peer,
172 handshake,
173 negotiated_fallback,
174 ..
175 } => {
176 let role = match network_service.peer_role(peer, handshake) {
177 Some(role) => ObservedRole::from(role),
178 None => {
179 gum::debug!(
180 target: LOG_TARGET,
181 ?peer,
182 "Failed to determine peer role for validation protocol",
183 );
184 return
185 },
186 };
187
188 let (peer_set, version) = {
189 let (peer_set, version) =
190 (PeerSet::Validation, PeerSet::Validation.get_main_version());
191
192 if let Some(fallback) = negotiated_fallback {
193 match peerset_protocol_names.try_get_protocol(&fallback) {
194 None => {
195 gum::debug!(
196 target: LOG_TARGET,
197 fallback = &*fallback,
198 ?peer,
199 peerset = ?peer_set,
200 "Unknown fallback",
201 );
202
203 return
204 },
205 Some((p2, v2)) => {
206 if p2 != peer_set {
207 gum::debug!(
208 target: LOG_TARGET,
209 fallback = &*fallback,
210 fallback_peerset = ?p2,
211 peerset = ?peer_set,
212 "Fallback mismatched peer-set",
213 );
214
215 return
216 }
217
218 (p2, v2)
219 },
220 }
221 } else {
222 (peer_set, version)
223 }
224 };
225 match notification_service.message_sink(&peer) {
228 Some(sink) => {
229 notification_sinks.lock().insert((peer_set, peer), sink);
230 },
231 None => {
232 gum::warn!(
233 target: LOG_TARGET,
234 peerset = ?peer_set,
235 version = %version,
236 ?peer,
237 ?role,
238 "Message sink not available for peer",
239 );
240 return
241 },
242 }
243
244 gum::debug!(
245 target: LOG_TARGET,
246 action = "PeerConnected",
247 peer_set = ?peer_set,
248 version = %version,
249 peer = ?peer,
250 role = ?role
251 );
252
253 let local_view = {
254 let mut shared = shared.0.lock();
255 let peer_map = &mut shared.validation_peers;
256
257 match peer_map.entry(peer) {
258 hash_map::Entry::Occupied(_) => return,
259 hash_map::Entry::Vacant(vacant) => {
260 vacant.insert(PeerData { view: View::default(), version });
261 },
262 }
263
264 metrics.on_peer_connected(peer_set, version);
265
266 shared.local_view.clone().unwrap_or(View::default())
267 };
268
269 let maybe_authority =
270 authority_discovery_service.get_authority_ids_by_peer_id(peer).await;
271
272 dispatch_validation_events_to_all(
273 vec![
274 NetworkBridgeEvent::PeerConnected(peer, role, version, maybe_authority),
275 NetworkBridgeEvent::PeerViewChange(peer, View::default()),
276 ],
277 sender,
278 &metrics,
279 )
280 .await;
281
282 match ValidationVersion::try_from(version)
283 .expect("try_get_protocol has already checked version is known; qed")
284 {
285 ValidationVersion::V3 => send_validation_message_v3(
286 vec![peer],
287 WireMessage::<protocol_v3::ValidationProtocol>::ViewUpdate(local_view),
288 metrics,
289 notification_sinks,
290 ),
291 }
292 },
293 NotificationEvent::NotificationStreamClosed { peer } => {
294 let (peer_set, version) = (PeerSet::Validation, PeerSet::Validation.get_main_version());
295
296 gum::debug!(
297 target: LOG_TARGET,
298 action = "PeerDisconnected",
299 ?peer_set,
300 ?peer
301 );
302
303 let was_connected = {
304 let mut shared = shared.0.lock();
305 let peer_map = &mut shared.validation_peers;
306
307 let w = peer_map.remove(&peer).is_some();
308
309 metrics.on_peer_disconnected(peer_set, version);
310 w
311 };
312
313 notification_sinks.lock().remove(&(peer_set, peer));
314
315 if was_connected && version == peer_set.get_main_version() {
316 dispatch_validation_event_to_all(
317 NetworkBridgeEvent::PeerDisconnected(peer),
318 sender,
319 &metrics,
320 )
321 .await;
322 }
323 },
324 NotificationEvent::NotificationReceived { peer, notification } => {
325 let expected_versions = {
326 let mut versions = PerPeerSet::<Option<ProtocolVersion>>::default();
327 let shared = shared.0.lock();
328
329 if let Some(peer_data) = shared.validation_peers.get(&peer) {
330 versions[PeerSet::Validation] = Some(peer_data.version);
331 }
332
333 versions
334 };
335
336 gum::trace!(
337 target: LOG_TARGET,
338 action = "PeerMessage",
339 peerset = ?PeerSet::Validation,
340 ?peer,
341 );
342
343 let (events, reports) =
344 if expected_versions[PeerSet::Validation] == Some(ValidationVersion::V3.into()) {
345 handle_peer_messages::<protocol_v3::ValidationProtocol, _>(
346 peer,
347 PeerSet::Validation,
348 &mut shared.0.lock().validation_peers,
349 vec![notification.into()],
350 metrics,
351 )
352 } else {
353 gum::warn!(
354 target: LOG_TARGET,
355 version = ?expected_versions[PeerSet::Validation],
356 "Major logic bug. Peer somehow has unsupported validation protocol version."
357 );
358
359 never!("Only version 3 is supported; peer set connection checked above; qed");
360
361 (Vec::new(), vec![UNCONNECTED_PEERSET_COST])
364 };
365
366 for report in reports {
367 network_service.report_peer(peer, report.into());
368 }
369
370 dispatch_validation_events_to_all(events, sender, &metrics).await;
371 },
372 }
373}
374
375async fn handle_collation_message<AD>(
377 event: NotificationEvent,
378 network_service: &mut impl Network,
379 sender: &mut impl overseer::NetworkBridgeRxSenderTrait,
380 authority_discovery_service: &mut AD,
381 metrics: &Metrics,
382 shared: &Shared,
383 peerset_protocol_names: &PeerSetProtocolNames,
384 notification_service: &mut Box<dyn NotificationService>,
385 notification_sinks: &mut Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
386) where
387 AD: validator_discovery::AuthorityDiscovery + Send,
388{
389 match event {
390 NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx, .. } => {
391 let result = network_service
393 .peer_role(peer, handshake)
394 .map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
395 let _ = result_tx.send(result);
396 },
397 NotificationEvent::NotificationStreamOpened {
398 peer,
399 handshake,
400 negotiated_fallback,
401 ..
402 } => {
403 let role = match network_service.peer_role(peer, handshake) {
404 Some(role) => ObservedRole::from(role),
405 None => {
406 gum::debug!(
407 target: LOG_TARGET,
408 ?peer,
409 "Failed to determine peer role for validation protocol",
410 );
411 return
412 },
413 };
414
415 let (peer_set, version) = {
416 let (peer_set, version) =
417 (PeerSet::Collation, PeerSet::Collation.get_main_version());
418
419 if let Some(fallback) = negotiated_fallback {
420 match peerset_protocol_names.try_get_protocol(&fallback) {
421 None => {
422 gum::debug!(
423 target: LOG_TARGET,
424 fallback = &*fallback,
425 ?peer,
426 ?peer_set,
427 "Unknown fallback",
428 );
429
430 return
431 },
432 Some((p2, v2)) => {
433 if p2 != peer_set {
434 gum::debug!(
435 target: LOG_TARGET,
436 fallback = &*fallback,
437 fallback_peerset = ?p2,
438 peerset = ?peer_set,
439 "Fallback mismatched peer-set",
440 );
441
442 return
443 }
444
445 (p2, v2)
446 },
447 }
448 } else {
449 (peer_set, version)
450 }
451 };
452
453 match notification_service.message_sink(&peer) {
456 Some(sink) => {
457 notification_sinks.lock().insert((peer_set, peer), sink);
458 },
459 None => {
460 gum::warn!(
461 target: LOG_TARGET,
462 peer_set = ?peer_set,
463 version = %version,
464 peer = ?peer,
465 role = ?role,
466 "Message sink not available for peer",
467 );
468 return
469 },
470 }
471
472 gum::debug!(
473 target: LOG_TARGET,
474 action = "PeerConnected",
475 peer_set = ?peer_set,
476 version = %version,
477 peer = ?peer,
478 role = ?role
479 );
480
481 let local_view = {
482 let mut shared = shared.0.lock();
483 let peer_map = &mut shared.collation_peers;
484
485 match peer_map.entry(peer) {
486 hash_map::Entry::Occupied(_) => return,
487 hash_map::Entry::Vacant(vacant) => {
488 vacant.insert(PeerData { view: View::default(), version });
489 },
490 }
491
492 metrics.on_peer_connected(peer_set, version);
493
494 shared.local_view.clone().unwrap_or(View::default())
495 };
496
497 let maybe_authority =
498 authority_discovery_service.get_authority_ids_by_peer_id(peer).await;
499
500 dispatch_collation_events_to_all(
501 vec![
502 NetworkBridgeEvent::PeerConnected(peer, role, version, maybe_authority),
503 NetworkBridgeEvent::PeerViewChange(peer, View::default()),
504 ],
505 sender,
506 )
507 .await;
508
509 match CollationVersion::try_from(version)
510 .expect("try_get_protocol has already checked version is known; qed")
511 {
512 CollationVersion::V1 => send_collation_message_v1(
513 vec![peer],
514 WireMessage::<protocol_v1::CollationProtocol>::ViewUpdate(local_view),
515 metrics,
516 notification_sinks,
517 ),
518 CollationVersion::V2 => send_collation_message_v2(
519 vec![peer],
520 WireMessage::<protocol_v2::CollationProtocol>::ViewUpdate(local_view),
521 metrics,
522 notification_sinks,
523 ),
524 }
525 },
526 NotificationEvent::NotificationStreamClosed { peer } => {
527 let (peer_set, version) = (PeerSet::Collation, PeerSet::Collation.get_main_version());
528
529 gum::debug!(
530 target: LOG_TARGET,
531 action = "PeerDisconnected",
532 ?peer_set,
533 ?peer
534 );
535
536 let was_connected = {
537 let mut shared = shared.0.lock();
538 let peer_map = &mut shared.collation_peers;
539
540 let w = peer_map.remove(&peer).is_some();
541
542 metrics.on_peer_disconnected(peer_set, version);
543
544 w
545 };
546
547 notification_sinks.lock().remove(&(peer_set, peer));
548
549 if was_connected && version == peer_set.get_main_version() {
550 dispatch_collation_event_to_all(NetworkBridgeEvent::PeerDisconnected(peer), sender)
551 .await;
552 }
553 },
554 NotificationEvent::NotificationReceived { peer, notification } => {
555 let expected_versions = {
556 let mut versions = PerPeerSet::<Option<ProtocolVersion>>::default();
557 let shared = shared.0.lock();
558
559 if let Some(peer_data) = shared.collation_peers.get(&peer) {
560 versions[PeerSet::Collation] = Some(peer_data.version);
561 }
562
563 versions
564 };
565
566 gum::trace!(
567 target: LOG_TARGET,
568 action = "PeerMessage",
569 peerset = ?PeerSet::Collation,
570 ?peer,
571 );
572
573 let (events, reports) =
574 if expected_versions[PeerSet::Collation] == Some(CollationVersion::V1.into()) {
575 handle_peer_messages::<protocol_v1::CollationProtocol, _>(
576 peer,
577 PeerSet::Collation,
578 &mut shared.0.lock().collation_peers,
579 vec![notification.into()],
580 metrics,
581 )
582 } else if expected_versions[PeerSet::Collation] == Some(CollationVersion::V2.into())
583 {
584 handle_peer_messages::<protocol_v2::CollationProtocol, _>(
585 peer,
586 PeerSet::Collation,
587 &mut shared.0.lock().collation_peers,
588 vec![notification.into()],
589 metrics,
590 )
591 } else {
592 gum::warn!(
593 target: LOG_TARGET,
594 version = ?expected_versions[PeerSet::Collation],
595 "Major logic bug. Peer somehow has unsupported collation protocol version."
596 );
597
598 never!("Only versions 1 and 2 are supported; peer set connection checked above; qed");
599
600 (Vec::new(), vec![UNCONNECTED_PEERSET_COST])
603 };
604
605 for report in reports {
606 network_service.report_peer(peer, report.into());
607 }
608
609 dispatch_collation_events_to_all(events, sender).await;
610 },
611 }
612}
613
614async fn handle_network_messages<AD>(
615 mut sender: impl overseer::NetworkBridgeRxSenderTrait,
616 mut network_service: impl Network,
617 mut authority_discovery_service: AD,
618 metrics: Metrics,
619 shared: Shared,
620 peerset_protocol_names: PeerSetProtocolNames,
621 mut validation_service: Box<dyn NotificationService>,
622 mut collation_service: Box<dyn NotificationService>,
623 mut notification_sinks: Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
624) -> Result<(), Error>
625where
626 AD: validator_discovery::AuthorityDiscovery + Send,
627{
628 loop {
629 futures::select! {
630 event = validation_service.next_event().fuse() => match event {
631 Some(event) => handle_validation_message(
632 event,
633 &mut network_service,
634 &mut sender,
635 &mut authority_discovery_service,
636 &metrics,
637 &shared,
638 &peerset_protocol_names,
639 &mut validation_service,
640 &mut notification_sinks,
641 ).await,
642 None => return Err(Error::EventStreamConcluded),
643 },
644 event = collation_service.next_event().fuse() => match event {
645 Some(event) => handle_collation_message(
646 event,
647 &mut network_service,
648 &mut sender,
649 &mut authority_discovery_service,
650 &metrics,
651 &shared,
652 &peerset_protocol_names,
653 &mut collation_service,
654 &mut notification_sinks,
655 ).await,
656 None => return Err(Error::EventStreamConcluded),
657 }
658 }
659 }
660}
661
662async fn flesh_out_topology_peers<AD, N>(ads: &mut AD, neighbors: N) -> Vec<TopologyPeerInfo>
663where
664 AD: validator_discovery::AuthorityDiscovery,
665 N: IntoIterator<Item = (AuthorityDiscoveryId, ValidatorIndex)>,
666 N::IntoIter: std::iter::ExactSizeIterator,
667{
668 let neighbors = neighbors.into_iter();
669 let mut peers = Vec::with_capacity(neighbors.len());
670 for (discovery_id, validator_index) in neighbors {
671 let addr = get_peer_id_by_authority_id(ads, discovery_id.clone()).await;
672 if addr.is_none() {
673 gum::debug!(
675 target: LOG_TARGET,
676 ?validator_index,
677 "Could not determine peer_id for validator, let the team know in \n
678 https://github.com/paritytech/polkadot-sdk/issues/2138"
679 )
680 }
681 peers.push(TopologyPeerInfo {
682 peer_ids: addr.into_iter().collect(),
683 validator_index,
684 discovery_id,
685 });
686 }
687
688 peers
689}
690
691#[overseer::contextbounds(NetworkBridgeRx, prefix = self::overseer)]
692async fn run_incoming_orchestra_signals<Context, AD>(
693 mut ctx: Context,
694 mut authority_discovery_service: AD,
695 shared: Shared,
696 sync_oracle: Box<dyn SyncOracle + Send>,
697 metrics: Metrics,
698 notification_sinks: Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
699) -> Result<(), Error>
700where
701 AD: validator_discovery::AuthorityDiscovery + Clone,
702{
703 let mut live_heads: Vec<ActivatedLeaf> = Vec::with_capacity(MAX_VIEW_HEADS);
705 let mut finalized_number = 0;
706 let mut newest_session = u32::MIN;
707 let mut mode = Mode::Syncing(sync_oracle);
708 loop {
709 match ctx.recv().fuse().await? {
710 FromOrchestra::Communication {
711 msg:
712 NetworkBridgeRxMessage::NewGossipTopology {
713 session,
714 local_index,
715 canonical_shuffling,
716 shuffled_indices,
717 },
718 } => {
719 gum::debug!(
720 target: LOG_TARGET,
721 action = "NewGossipTopology",
722 ?session,
723 ?local_index,
724 "Gossip topology has changed",
725 );
726
727 let topology_peers =
728 flesh_out_topology_peers(&mut authority_discovery_service, canonical_shuffling)
729 .await;
730
731 if session >= newest_session {
732 dispatch_validation_event_to_all_unbounded(
733 NetworkBridgeEvent::NewGossipTopology(NewGossipTopology {
734 session,
735 topology: SessionGridTopology::new(shuffled_indices, topology_peers),
736 local_index,
737 }),
738 ctx.sender(),
739 );
740 } else {
741 dispatch_validation_event_to_approval_unbounded(
742 &NetworkBridgeEvent::NewGossipTopology(NewGossipTopology {
743 session,
744 topology: SessionGridTopology::new(shuffled_indices, topology_peers),
745 local_index,
746 }),
747 ctx.sender(),
748 );
749 }
750
751 newest_session = newest_session.max(session);
752 },
753 FromOrchestra::Communication {
754 msg: NetworkBridgeRxMessage::UpdatedAuthorityIds { peer_id, authority_ids },
755 } => {
756 gum::debug!(
757 target: LOG_TARGET,
758 action = "UpdatedAuthorityIds",
759 ?peer_id,
760 ?authority_ids,
761 "`AuthorityDiscoveryId`s have changed",
762 );
763 dispatch_collation_event_to_all_unbounded(
766 NetworkBridgeEvent::UpdatedAuthorityIds(peer_id, authority_ids.clone()),
767 ctx.sender(),
768 );
769 dispatch_validation_event_to_all_unbounded(
770 NetworkBridgeEvent::UpdatedAuthorityIds(peer_id, authority_ids),
771 ctx.sender(),
772 );
773 },
774 FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
775 FromOrchestra::Signal(OverseerSignal::ActiveLeaves(active_leaves)) => {
776 let ActiveLeavesUpdate { activated, deactivated } = active_leaves;
777 gum::trace!(
778 target: LOG_TARGET,
779 action = "ActiveLeaves",
780 has_activated = activated.is_some(),
781 num_deactivated = %deactivated.len(),
782 );
783
784 if let Some(activated) = activated {
785 let pos = live_heads
786 .binary_search_by(|probe| probe.number.cmp(&activated.number).reverse())
787 .unwrap_or_else(|i| i);
788
789 live_heads.insert(pos, activated);
790 }
791 live_heads.retain(|h| !deactivated.contains(&h.hash));
792
793 {
796 let is_done_syncing = match mode {
797 Mode::Active => true,
798 Mode::Syncing(ref mut sync_oracle) => !sync_oracle.is_major_syncing(),
799 };
800
801 if is_done_syncing {
802 mode = Mode::Active;
803
804 update_our_view(
805 &mut ctx,
806 &live_heads,
807 &shared,
808 finalized_number,
809 &metrics,
810 ¬ification_sinks,
811 );
812 note_peers_count(&metrics, &shared);
813 }
814 }
815 },
816 FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number)) => {
817 gum::trace!(target: LOG_TARGET, action = "BlockFinalized");
818
819 debug_assert!(finalized_number < number);
820
821 finalized_number = number;
825 },
826 }
827 }
828}
829
830#[overseer::contextbounds(NetworkBridgeRx, prefix = self::overseer)]
842async fn run_network_in<N, AD, Context>(
843 bridge: NetworkBridgeRx<N, AD>,
844 mut ctx: Context,
845) -> Result<(), Error>
846where
847 N: Network,
848 AD: validator_discovery::AuthorityDiscovery + Clone,
849{
850 let NetworkBridgeRx {
851 network_service,
852 authority_discovery_service,
853 metrics,
854 sync_oracle,
855 shared,
856 peerset_protocol_names,
857 validation_service,
858 collation_service,
859 notification_sinks,
860 } = bridge;
861
862 let (task, network_event_handler) = handle_network_messages(
863 ctx.sender().clone(),
864 network_service.clone(),
865 authority_discovery_service.clone(),
866 metrics.clone(),
867 shared.clone(),
868 peerset_protocol_names.clone(),
869 validation_service,
870 collation_service,
871 notification_sinks.clone(),
872 )
873 .remote_handle();
874
875 ctx.spawn_blocking("network-bridge-in-network-worker", Box::pin(task))?;
876 futures::pin_mut!(network_event_handler);
877
878 let orchestra_signal_handler = run_incoming_orchestra_signals(
879 ctx,
880 authority_discovery_service,
881 shared,
882 sync_oracle,
883 metrics,
884 notification_sinks,
885 );
886
887 futures::pin_mut!(orchestra_signal_handler);
888
889 futures::future::select(orchestra_signal_handler, network_event_handler)
890 .await
891 .factor_first()
892 .0?;
893 Ok(())
894}
895
896fn construct_view(
897 live_heads: impl DoubleEndedIterator<Item = Hash>,
898 finalized_number: BlockNumber,
899) -> View {
900 View::new(live_heads.take(MAX_VIEW_HEADS), finalized_number)
901}
902
903#[overseer::contextbounds(NetworkBridgeRx, prefix = self::overseer)]
904fn update_our_view<Context>(
905 ctx: &mut Context,
906 live_heads: &[ActivatedLeaf],
907 shared: &Shared,
908 finalized_number: BlockNumber,
909 metrics: &Metrics,
910 notification_sinks: &Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
911) {
912 let new_view = construct_view(live_heads.iter().map(|v| v.hash), finalized_number);
913
914 let (validation_peers, collation_peers) = {
915 let mut shared = shared.0.lock();
916
917 match shared.local_view {
923 Some(ref v) if v.check_heads_eq(&new_view) => return,
924 None if live_heads.is_empty() => {
925 shared.local_view = Some(new_view);
926 return
927 },
928 _ => {
929 shared.local_view = Some(new_view.clone());
930 },
931 }
932
933 (
934 shared
935 .validation_peers
936 .iter()
937 .map(|(peer_id, data)| (*peer_id, data.version))
938 .collect::<Vec<_>>(),
939 shared
940 .collation_peers
941 .iter()
942 .map(|(peer_id, data)| (*peer_id, data.version))
943 .collect::<Vec<_>>(),
944 )
945 };
946
947 let our_view = OurView::new(
948 live_heads.iter().take(MAX_VIEW_HEADS).cloned().map(|a| a.hash),
949 finalized_number,
950 );
951
952 gum::debug!(
953 target: LOG_TARGET,
954 live_head_count = ?live_heads.len(),
955 "Our view updated, current view: {:?}",
956 our_view,
957 );
958
959 dispatch_validation_event_to_all_unbounded(
960 NetworkBridgeEvent::OurViewChange(our_view.clone()),
961 ctx.sender(),
962 );
963
964 dispatch_collation_event_to_all_unbounded(
965 NetworkBridgeEvent::OurViewChange(our_view),
966 ctx.sender(),
967 );
968
969 let v1_collation_peers = filter_by_peer_version(&collation_peers, CollationVersion::V1.into());
970
971 let v2_collation_peers = filter_by_peer_version(&collation_peers, CollationVersion::V2.into());
972
973 let v3_validation_peers =
974 filter_by_peer_version(&validation_peers, ValidationVersion::V3.into());
975
976 send_collation_message_v1(
977 v1_collation_peers,
978 WireMessage::ViewUpdate(new_view.clone()),
979 metrics,
980 notification_sinks,
981 );
982
983 send_collation_message_v2(
984 v2_collation_peers,
985 WireMessage::ViewUpdate(new_view.clone()),
986 metrics,
987 notification_sinks,
988 );
989
990 send_validation_message_v3(
991 v3_validation_peers,
992 WireMessage::ViewUpdate(new_view.clone()),
993 metrics,
994 notification_sinks,
995 );
996}
997
998fn handle_peer_messages<RawMessage: Decode, OutMessage: From<RawMessage>>(
1001 peer: PeerId,
1002 peer_set: PeerSet,
1003 peers: &mut HashMap<PeerId, PeerData>,
1004 messages: Vec<Bytes>,
1005 metrics: &Metrics,
1006) -> (Vec<NetworkBridgeEvent<OutMessage>>, Vec<Rep>) {
1007 let peer_data = match peers.get_mut(&peer) {
1008 None => return (Vec::new(), vec![UNCONNECTED_PEERSET_COST]),
1009 Some(d) => d,
1010 };
1011
1012 let mut outgoing_events = Vec::with_capacity(messages.len());
1013 let mut reports = Vec::new();
1014
1015 for message in messages {
1016 metrics.on_notification_received(peer_set, peer_data.version, message.len());
1017 let message = match WireMessage::<RawMessage>::decode_all(&mut message.as_ref()) {
1018 Err(_) => {
1019 reports.push(MALFORMED_MESSAGE_COST);
1020 continue
1021 },
1022 Ok(m) => m,
1023 };
1024
1025 outgoing_events.push(match message {
1026 WireMessage::ViewUpdate(new_view) => {
1027 if new_view.len() > MAX_VIEW_HEADS ||
1028 new_view.finalized_number < peer_data.view.finalized_number
1029 {
1030 reports.push(MALFORMED_VIEW_COST);
1031 continue
1032 } else if new_view.is_empty() {
1033 reports.push(EMPTY_VIEW_COST);
1034 continue
1035 } else if new_view == peer_data.view {
1036 continue
1037 } else {
1038 peer_data.view = new_view;
1039
1040 NetworkBridgeEvent::PeerViewChange(peer, peer_data.view.clone())
1041 }
1042 },
1043 WireMessage::ProtocolMessage(message) =>
1044 NetworkBridgeEvent::PeerMessage(peer, message.into()),
1045 })
1046 }
1047
1048 (outgoing_events, reports)
1049}
1050
1051async fn dispatch_validation_event_to_all(
1052 event: NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>,
1053 ctx: &mut impl overseer::NetworkBridgeRxSenderTrait,
1054 metrics: &Metrics,
1055) {
1056 dispatch_validation_events_to_all(std::iter::once(event), ctx, metrics).await
1057}
1058
1059async fn dispatch_collation_event_to_all(
1060 event: NetworkBridgeEvent<net_protocol::VersionedCollationProtocol>,
1061 ctx: &mut impl overseer::NetworkBridgeRxSenderTrait,
1062) {
1063 dispatch_collation_events_to_all(std::iter::once(event), ctx).await
1064}
1065
1066fn dispatch_validation_event_to_approval_unbounded(
1067 event: &NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>,
1068 sender: &mut impl overseer::NetworkBridgeRxSenderTrait,
1069) {
1070 event
1071 .focus()
1072 .ok()
1073 .map(ApprovalVotingParallelMessage::from)
1074 .and_then(|msg| Some(sender.send_unbounded_message(msg)));
1075}
1076
1077fn dispatch_validation_event_to_all_unbounded(
1078 event: NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>,
1079 sender: &mut impl overseer::NetworkBridgeRxSenderTrait,
1080) {
1081 event
1082 .focus()
1083 .ok()
1084 .map(StatementDistributionMessage::from)
1085 .and_then(|msg| Some(sender.send_unbounded_message(msg)));
1086 event
1087 .focus()
1088 .ok()
1089 .map(BitfieldDistributionMessage::from)
1090 .and_then(|msg| Some(sender.send_unbounded_message(msg)));
1091
1092 dispatch_validation_event_to_approval_unbounded(&event, sender);
1093
1094 event
1095 .focus()
1096 .ok()
1097 .map(GossipSupportMessage::from)
1098 .and_then(|msg| Some(sender.send_unbounded_message(msg)));
1099}
1100
1101fn dispatch_collation_event_to_all_unbounded(
1102 event: NetworkBridgeEvent<net_protocol::VersionedCollationProtocol>,
1103 sender: &mut impl overseer::NetworkBridgeRxSenderTrait,
1104) {
1105 if let Ok(msg) = event.focus() {
1106 sender.send_unbounded_message(CollatorProtocolMessage::NetworkBridgeUpdate(msg))
1107 }
1108}
1109
1110async fn dispatch_validation_events_to_all<I>(
1111 events: I,
1112 sender: &mut impl overseer::NetworkBridgeRxSenderTrait,
1113 _metrics: &Metrics,
1114) where
1115 I: IntoIterator<Item = NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>>,
1116 I::IntoIter: Send,
1117{
1118 macro_rules! send_message {
1119 ($event:expr, $message:ident) => {
1120 if let Ok(event) = $event.focus() {
1121 let has_high_priority = matches!(
1122 event,
1123 NetworkBridgeEvent::PeerConnected(..) |
1127 NetworkBridgeEvent::PeerDisconnected(..) |
1128 NetworkBridgeEvent::PeerViewChange(..)
1129 );
1130 let message = $message::from(event);
1131 if has_high_priority {
1132 sender.send_message_with_priority::<overseer::HighPriority>(message).await;
1133 } else {
1134 sender.send_message(message).await;
1135 }
1136 }
1137 };
1138 }
1139
1140 for event in events {
1141 send_message!(event, StatementDistributionMessage);
1142 send_message!(event, BitfieldDistributionMessage);
1143 send_message!(event, ApprovalVotingParallelMessage);
1144 send_message!(event, GossipSupportMessage);
1145 }
1146}
1147
1148async fn dispatch_collation_events_to_all<I>(
1149 events: I,
1150 ctx: &mut impl overseer::NetworkBridgeRxSenderTrait,
1151) where
1152 I: IntoIterator<Item = NetworkBridgeEvent<net_protocol::VersionedCollationProtocol>>,
1153 I::IntoIter: Send,
1154{
1155 let messages_for = |event: NetworkBridgeEvent<net_protocol::VersionedCollationProtocol>| {
1156 event.focus().ok().map(|m| CollatorProtocolMessage::NetworkBridgeUpdate(m))
1157 };
1158
1159 ctx.send_messages(events.into_iter().flat_map(messages_for)).await
1160}