referrerpolicy=no-referrer-when-downgrade

polkadot_network_bridge/rx/
mod.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! The Network Bridge Subsystem - handles _incoming_ messages from the network, forwarded to the
18//! relevant subsystems.
19use super::*;
20
21use always_assert::never;
22use bytes::Bytes;
23use codec::{Decode, DecodeAll};
24use net_protocol::filter_by_peer_version;
25use parking_lot::Mutex;
26
27use sc_network::{
28	service::traits::{NotificationEvent, ValidationResult},
29	MessageSink, NotificationService,
30};
31use sp_consensus::SyncOracle;
32
33use polkadot_node_network_protocol::{
34	self as net_protocol,
35	grid_topology::{SessionGridTopology, TopologyPeerInfo},
36	peer_set::{
37		CollationVersion, PeerSet, PeerSetProtocolNames, PerPeerSet, ProtocolVersion,
38		ValidationVersion,
39	},
40	v1 as protocol_v1, v2 as protocol_v2, v3 as protocol_v3, ObservedRole, OurView, PeerId,
41	UnifiedReputationChange as Rep, View,
42};
43
44use polkadot_node_subsystem::{
45	errors::SubsystemError,
46	messages::{
47		network_bridge_event::NewGossipTopology, ApprovalVotingParallelMessage,
48		BitfieldDistributionMessage, CollatorProtocolMessage, GossipSupportMessage,
49		NetworkBridgeEvent, NetworkBridgeRxMessage, StatementDistributionMessage,
50	},
51	overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem,
52};
53
54use polkadot_primitives::{AuthorityDiscoveryId, BlockNumber, Hash, ValidatorIndex};
55
56use std::{
57	collections::{hash_map, HashMap},
58	iter::ExactSizeIterator,
59	u32,
60};
61
62use super::validator_discovery;
63
64/// Actual interfacing to the network based on the `Network` trait.
65///
66/// Defines the `Network` trait with an implementation for an `Arc<NetworkService>`.
67use crate::network::{
68	send_collation_message_v1, send_collation_message_v2, send_validation_message_v3, Network,
69};
70use crate::{network::get_peer_id_by_authority_id, WireMessage};
71
72use super::metrics::Metrics;
73
74#[cfg(test)]
75mod tests;
76
77// network bridge log target
78const LOG_TARGET: &'static str = "parachain::network-bridge-rx";
79
80/// The network bridge subsystem - network receiving side.
81pub struct NetworkBridgeRx<N, AD> {
82	/// `Network` trait implementing type.
83	network_service: N,
84	authority_discovery_service: AD,
85	sync_oracle: Box<dyn SyncOracle + Send>,
86	shared: Shared,
87	metrics: Metrics,
88	peerset_protocol_names: PeerSetProtocolNames,
89	validation_service: Box<dyn NotificationService>,
90	collation_service: Box<dyn NotificationService>,
91	notification_sinks: Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
92}
93
94impl<N, AD> NetworkBridgeRx<N, AD> {
95	/// Create a new network bridge subsystem with underlying network service and authority
96	/// discovery service.
97	///
98	/// This assumes that the network service has had the notifications protocol for the network
99	/// bridge already registered. See [`peer_sets_info`].
100	pub fn new(
101		network_service: N,
102		authority_discovery_service: AD,
103		sync_oracle: Box<dyn SyncOracle + Send>,
104		metrics: Metrics,
105		peerset_protocol_names: PeerSetProtocolNames,
106		mut notification_services: HashMap<PeerSet, Box<dyn NotificationService>>,
107		notification_sinks: Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
108	) -> Self {
109		let shared = Shared::default();
110
111		let validation_service = notification_services
112			.remove(&PeerSet::Validation)
113			.expect("validation protocol was enabled so `NotificationService` must exist; qed");
114		let collation_service = notification_services
115			.remove(&PeerSet::Collation)
116			.expect("collation protocol was enabled so `NotificationService` must exist; qed");
117
118		Self {
119			network_service,
120			authority_discovery_service,
121			sync_oracle,
122			shared,
123			metrics,
124			peerset_protocol_names,
125			validation_service,
126			collation_service,
127			notification_sinks,
128		}
129	}
130}
131
132#[overseer::subsystem(NetworkBridgeRx, error = SubsystemError, prefix = self::overseer)]
133impl<Net, AD, Context> NetworkBridgeRx<Net, AD>
134where
135	Net: Network + Sync,
136	AD: validator_discovery::AuthorityDiscovery + Clone + Sync,
137{
138	fn start(self, ctx: Context) -> SpawnedSubsystem {
139		// Swallow error because failure is fatal to the node and we log with more precision
140		// within `run_network`.
141		let future = run_network_in(self, ctx)
142			.map_err(|e| SubsystemError::with_origin("network-bridge", e))
143			.boxed();
144		SpawnedSubsystem { name: "network-bridge-rx-subsystem", future }
145	}
146}
147
148/// Handle notification event received over the validation protocol.
149async fn handle_validation_message<AD>(
150	event: NotificationEvent,
151	network_service: &mut impl Network,
152	sender: &mut impl overseer::NetworkBridgeRxSenderTrait,
153	authority_discovery_service: &mut AD,
154	metrics: &Metrics,
155	shared: &Shared,
156	peerset_protocol_names: &PeerSetProtocolNames,
157	notification_service: &mut Box<dyn NotificationService>,
158	notification_sinks: &mut Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
159) where
160	AD: validator_discovery::AuthorityDiscovery + Send,
161{
162	match event {
163		NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx, .. } => {
164			// only accept peers whose role can be determined
165			let result = network_service
166				.peer_role(peer, handshake)
167				.map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
168			let _ = result_tx.send(result);
169		},
170		NotificationEvent::NotificationStreamOpened {
171			peer,
172			handshake,
173			negotiated_fallback,
174			..
175		} => {
176			let role = match network_service.peer_role(peer, handshake) {
177				Some(role) => ObservedRole::from(role),
178				None => {
179					gum::debug!(
180						target: LOG_TARGET,
181						?peer,
182						"Failed to determine peer role for validation protocol",
183					);
184					return
185				},
186			};
187
188			let (peer_set, version) = {
189				let (peer_set, version) =
190					(PeerSet::Validation, PeerSet::Validation.get_main_version());
191
192				if let Some(fallback) = negotiated_fallback {
193					match peerset_protocol_names.try_get_protocol(&fallback) {
194						None => {
195							gum::debug!(
196								target: LOG_TARGET,
197								fallback = &*fallback,
198								?peer,
199								peerset = ?peer_set,
200								"Unknown fallback",
201							);
202
203							return
204						},
205						Some((p2, v2)) => {
206							if p2 != peer_set {
207								gum::debug!(
208									target: LOG_TARGET,
209									fallback = &*fallback,
210									fallback_peerset = ?p2,
211									peerset = ?peer_set,
212									"Fallback mismatched peer-set",
213								);
214
215								return
216							}
217
218							(p2, v2)
219						},
220					}
221				} else {
222					(peer_set, version)
223				}
224			};
225			// store the notification sink to `notification_sinks` so both `NetworkBridgeRx`
226			// and `NetworkBridgeTx` can send messages to the peer.
227			match notification_service.message_sink(&peer) {
228				Some(sink) => {
229					notification_sinks.lock().insert((peer_set, peer), sink);
230				},
231				None => {
232					gum::warn!(
233						target: LOG_TARGET,
234						peerset = ?peer_set,
235						version = %version,
236						?peer,
237						?role,
238						"Message sink not available for peer",
239					);
240					return
241				},
242			}
243
244			gum::debug!(
245				target: LOG_TARGET,
246				action = "PeerConnected",
247				peer_set = ?peer_set,
248				version = %version,
249				peer = ?peer,
250				role = ?role
251			);
252
253			let local_view = {
254				let mut shared = shared.0.lock();
255				let peer_map = &mut shared.validation_peers;
256
257				match peer_map.entry(peer) {
258					hash_map::Entry::Occupied(_) => return,
259					hash_map::Entry::Vacant(vacant) => {
260						vacant.insert(PeerData { view: View::default(), version });
261					},
262				}
263
264				metrics.on_peer_connected(peer_set, version);
265
266				shared.local_view.clone().unwrap_or(View::default())
267			};
268
269			let maybe_authority =
270				authority_discovery_service.get_authority_ids_by_peer_id(peer).await;
271
272			dispatch_validation_events_to_all(
273				vec![
274					NetworkBridgeEvent::PeerConnected(peer, role, version, maybe_authority),
275					NetworkBridgeEvent::PeerViewChange(peer, View::default()),
276				],
277				sender,
278				&metrics,
279			)
280			.await;
281
282			match ValidationVersion::try_from(version)
283				.expect("try_get_protocol has already checked version is known; qed")
284			{
285				ValidationVersion::V3 => send_validation_message_v3(
286					vec![peer],
287					WireMessage::<protocol_v3::ValidationProtocol>::ViewUpdate(local_view),
288					metrics,
289					notification_sinks,
290				),
291			}
292		},
293		NotificationEvent::NotificationStreamClosed { peer } => {
294			let (peer_set, version) = (PeerSet::Validation, PeerSet::Validation.get_main_version());
295
296			gum::debug!(
297				target: LOG_TARGET,
298				action = "PeerDisconnected",
299				?peer_set,
300				?peer
301			);
302
303			let was_connected = {
304				let mut shared = shared.0.lock();
305				let peer_map = &mut shared.validation_peers;
306
307				let w = peer_map.remove(&peer).is_some();
308
309				metrics.on_peer_disconnected(peer_set, version);
310				w
311			};
312
313			notification_sinks.lock().remove(&(peer_set, peer));
314
315			if was_connected && version == peer_set.get_main_version() {
316				dispatch_validation_event_to_all(
317					NetworkBridgeEvent::PeerDisconnected(peer),
318					sender,
319					&metrics,
320				)
321				.await;
322			}
323		},
324		NotificationEvent::NotificationReceived { peer, notification } => {
325			let expected_versions = {
326				let mut versions = PerPeerSet::<Option<ProtocolVersion>>::default();
327				let shared = shared.0.lock();
328
329				if let Some(peer_data) = shared.validation_peers.get(&peer) {
330					versions[PeerSet::Validation] = Some(peer_data.version);
331				}
332
333				versions
334			};
335
336			gum::trace!(
337				target: LOG_TARGET,
338				action = "PeerMessage",
339				peerset = ?PeerSet::Validation,
340				?peer,
341			);
342
343			let (events, reports) =
344				if expected_versions[PeerSet::Validation] == Some(ValidationVersion::V3.into()) {
345					handle_peer_messages::<protocol_v3::ValidationProtocol, _>(
346						peer,
347						PeerSet::Validation,
348						&mut shared.0.lock().validation_peers,
349						vec![notification.into()],
350						metrics,
351					)
352				} else {
353					gum::warn!(
354						target: LOG_TARGET,
355						version = ?expected_versions[PeerSet::Validation],
356						"Major logic bug. Peer somehow has unsupported validation protocol version."
357					);
358
359					never!("Only version 3 is supported; peer set connection checked above; qed");
360
361					// If a peer somehow triggers this, we'll disconnect them
362					// eventually.
363					(Vec::new(), vec![UNCONNECTED_PEERSET_COST])
364				};
365
366			for report in reports {
367				network_service.report_peer(peer, report.into());
368			}
369
370			dispatch_validation_events_to_all(events, sender, &metrics).await;
371		},
372	}
373}
374
375/// Handle notification event received over the collation protocol.
376async fn handle_collation_message<AD>(
377	event: NotificationEvent,
378	network_service: &mut impl Network,
379	sender: &mut impl overseer::NetworkBridgeRxSenderTrait,
380	authority_discovery_service: &mut AD,
381	metrics: &Metrics,
382	shared: &Shared,
383	peerset_protocol_names: &PeerSetProtocolNames,
384	notification_service: &mut Box<dyn NotificationService>,
385	notification_sinks: &mut Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
386) where
387	AD: validator_discovery::AuthorityDiscovery + Send,
388{
389	match event {
390		NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx, .. } => {
391			// only accept peers whose role can be determined
392			let result = network_service
393				.peer_role(peer, handshake)
394				.map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
395			let _ = result_tx.send(result);
396		},
397		NotificationEvent::NotificationStreamOpened {
398			peer,
399			handshake,
400			negotiated_fallback,
401			..
402		} => {
403			let role = match network_service.peer_role(peer, handshake) {
404				Some(role) => ObservedRole::from(role),
405				None => {
406					gum::debug!(
407						target: LOG_TARGET,
408						?peer,
409						"Failed to determine peer role for validation protocol",
410					);
411					return
412				},
413			};
414
415			let (peer_set, version) = {
416				let (peer_set, version) =
417					(PeerSet::Collation, PeerSet::Collation.get_main_version());
418
419				if let Some(fallback) = negotiated_fallback {
420					match peerset_protocol_names.try_get_protocol(&fallback) {
421						None => {
422							gum::debug!(
423								target: LOG_TARGET,
424								fallback = &*fallback,
425								?peer,
426								?peer_set,
427								"Unknown fallback",
428							);
429
430							return
431						},
432						Some((p2, v2)) => {
433							if p2 != peer_set {
434								gum::debug!(
435									target: LOG_TARGET,
436									fallback = &*fallback,
437									fallback_peerset = ?p2,
438									peerset = ?peer_set,
439									"Fallback mismatched peer-set",
440								);
441
442								return
443							}
444
445							(p2, v2)
446						},
447					}
448				} else {
449					(peer_set, version)
450				}
451			};
452
453			// store the notification sink to `notification_sinks` so both `NetworkBridgeRx`
454			// and `NetworkBridgeTx` can send messages to the peer.
455			match notification_service.message_sink(&peer) {
456				Some(sink) => {
457					notification_sinks.lock().insert((peer_set, peer), sink);
458				},
459				None => {
460					gum::warn!(
461						target: LOG_TARGET,
462						peer_set = ?peer_set,
463						version = %version,
464						peer = ?peer,
465						role = ?role,
466						"Message sink not available for peer",
467					);
468					return
469				},
470			}
471
472			gum::debug!(
473				target: LOG_TARGET,
474				action = "PeerConnected",
475				peer_set = ?peer_set,
476				version = %version,
477				peer = ?peer,
478				role = ?role
479			);
480
481			let local_view = {
482				let mut shared = shared.0.lock();
483				let peer_map = &mut shared.collation_peers;
484
485				match peer_map.entry(peer) {
486					hash_map::Entry::Occupied(_) => return,
487					hash_map::Entry::Vacant(vacant) => {
488						vacant.insert(PeerData { view: View::default(), version });
489					},
490				}
491
492				metrics.on_peer_connected(peer_set, version);
493
494				shared.local_view.clone().unwrap_or(View::default())
495			};
496
497			let maybe_authority =
498				authority_discovery_service.get_authority_ids_by_peer_id(peer).await;
499
500			dispatch_collation_events_to_all(
501				vec![
502					NetworkBridgeEvent::PeerConnected(peer, role, version, maybe_authority),
503					NetworkBridgeEvent::PeerViewChange(peer, View::default()),
504				],
505				sender,
506			)
507			.await;
508
509			match CollationVersion::try_from(version)
510				.expect("try_get_protocol has already checked version is known; qed")
511			{
512				CollationVersion::V1 => send_collation_message_v1(
513					vec![peer],
514					WireMessage::<protocol_v1::CollationProtocol>::ViewUpdate(local_view),
515					metrics,
516					notification_sinks,
517				),
518				CollationVersion::V2 => send_collation_message_v2(
519					vec![peer],
520					WireMessage::<protocol_v2::CollationProtocol>::ViewUpdate(local_view),
521					metrics,
522					notification_sinks,
523				),
524			}
525		},
526		NotificationEvent::NotificationStreamClosed { peer } => {
527			let (peer_set, version) = (PeerSet::Collation, PeerSet::Collation.get_main_version());
528
529			gum::debug!(
530				target: LOG_TARGET,
531				action = "PeerDisconnected",
532				?peer_set,
533				?peer
534			);
535
536			let was_connected = {
537				let mut shared = shared.0.lock();
538				let peer_map = &mut shared.collation_peers;
539
540				let w = peer_map.remove(&peer).is_some();
541
542				metrics.on_peer_disconnected(peer_set, version);
543
544				w
545			};
546
547			notification_sinks.lock().remove(&(peer_set, peer));
548
549			if was_connected && version == peer_set.get_main_version() {
550				dispatch_collation_event_to_all(NetworkBridgeEvent::PeerDisconnected(peer), sender)
551					.await;
552			}
553		},
554		NotificationEvent::NotificationReceived { peer, notification } => {
555			let expected_versions = {
556				let mut versions = PerPeerSet::<Option<ProtocolVersion>>::default();
557				let shared = shared.0.lock();
558
559				if let Some(peer_data) = shared.collation_peers.get(&peer) {
560					versions[PeerSet::Collation] = Some(peer_data.version);
561				}
562
563				versions
564			};
565
566			gum::trace!(
567				target: LOG_TARGET,
568				action = "PeerMessage",
569				peerset = ?PeerSet::Collation,
570				?peer,
571			);
572
573			let (events, reports) =
574				if expected_versions[PeerSet::Collation] == Some(CollationVersion::V1.into()) {
575					handle_peer_messages::<protocol_v1::CollationProtocol, _>(
576						peer,
577						PeerSet::Collation,
578						&mut shared.0.lock().collation_peers,
579						vec![notification.into()],
580						metrics,
581					)
582				} else if expected_versions[PeerSet::Collation] == Some(CollationVersion::V2.into())
583				{
584					handle_peer_messages::<protocol_v2::CollationProtocol, _>(
585						peer,
586						PeerSet::Collation,
587						&mut shared.0.lock().collation_peers,
588						vec![notification.into()],
589						metrics,
590					)
591				} else {
592					gum::warn!(
593						target: LOG_TARGET,
594						version = ?expected_versions[PeerSet::Collation],
595						"Major logic bug. Peer somehow has unsupported collation protocol version."
596					);
597
598					never!("Only versions 1 and 2 are supported; peer set connection checked above; qed");
599
600					// If a peer somehow triggers this, we'll disconnect them
601					// eventually.
602					(Vec::new(), vec![UNCONNECTED_PEERSET_COST])
603				};
604
605			for report in reports {
606				network_service.report_peer(peer, report.into());
607			}
608
609			dispatch_collation_events_to_all(events, sender).await;
610		},
611	}
612}
613
614async fn handle_network_messages<AD>(
615	mut sender: impl overseer::NetworkBridgeRxSenderTrait,
616	mut network_service: impl Network,
617	mut authority_discovery_service: AD,
618	metrics: Metrics,
619	shared: Shared,
620	peerset_protocol_names: PeerSetProtocolNames,
621	mut validation_service: Box<dyn NotificationService>,
622	mut collation_service: Box<dyn NotificationService>,
623	mut notification_sinks: Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
624) -> Result<(), Error>
625where
626	AD: validator_discovery::AuthorityDiscovery + Send,
627{
628	loop {
629		futures::select! {
630			event = validation_service.next_event().fuse() => match event {
631				Some(event) => handle_validation_message(
632					event,
633					&mut network_service,
634					&mut sender,
635					&mut authority_discovery_service,
636					&metrics,
637					&shared,
638					&peerset_protocol_names,
639					&mut validation_service,
640					&mut notification_sinks,
641				).await,
642				None => return Err(Error::EventStreamConcluded),
643			},
644			event = collation_service.next_event().fuse() => match event {
645				Some(event) => handle_collation_message(
646					event,
647					&mut network_service,
648					&mut sender,
649					&mut authority_discovery_service,
650					&metrics,
651					&shared,
652					&peerset_protocol_names,
653					&mut collation_service,
654					&mut notification_sinks,
655				).await,
656				None => return Err(Error::EventStreamConcluded),
657			}
658		}
659	}
660}
661
662async fn flesh_out_topology_peers<AD, N>(ads: &mut AD, neighbors: N) -> Vec<TopologyPeerInfo>
663where
664	AD: validator_discovery::AuthorityDiscovery,
665	N: IntoIterator<Item = (AuthorityDiscoveryId, ValidatorIndex)>,
666	N::IntoIter: std::iter::ExactSizeIterator,
667{
668	let neighbors = neighbors.into_iter();
669	let mut peers = Vec::with_capacity(neighbors.len());
670	for (discovery_id, validator_index) in neighbors {
671		let addr = get_peer_id_by_authority_id(ads, discovery_id.clone()).await;
672		if addr.is_none() {
673			// See on why is not good in https://github.com/paritytech/polkadot-sdk/issues/2138
674			gum::debug!(
675				target: LOG_TARGET,
676				?validator_index,
677				"Could not determine peer_id for validator, let the team know in \n
678				https://github.com/paritytech/polkadot-sdk/issues/2138"
679			)
680		}
681		peers.push(TopologyPeerInfo {
682			peer_ids: addr.into_iter().collect(),
683			validator_index,
684			discovery_id,
685		});
686	}
687
688	peers
689}
690
691#[overseer::contextbounds(NetworkBridgeRx, prefix = self::overseer)]
692async fn run_incoming_orchestra_signals<Context, AD>(
693	mut ctx: Context,
694	mut authority_discovery_service: AD,
695	shared: Shared,
696	sync_oracle: Box<dyn SyncOracle + Send>,
697	metrics: Metrics,
698	notification_sinks: Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
699) -> Result<(), Error>
700where
701	AD: validator_discovery::AuthorityDiscovery + Clone,
702{
703	// This is kept sorted, descending, by block number.
704	let mut live_heads: Vec<ActivatedLeaf> = Vec::with_capacity(MAX_VIEW_HEADS);
705	let mut finalized_number = 0;
706	let mut newest_session = u32::MIN;
707	let mut mode = Mode::Syncing(sync_oracle);
708	loop {
709		match ctx.recv().fuse().await? {
710			FromOrchestra::Communication {
711				msg:
712					NetworkBridgeRxMessage::NewGossipTopology {
713						session,
714						local_index,
715						canonical_shuffling,
716						shuffled_indices,
717					},
718			} => {
719				gum::debug!(
720					target: LOG_TARGET,
721					action = "NewGossipTopology",
722					?session,
723					?local_index,
724					"Gossip topology has changed",
725				);
726
727				let topology_peers =
728					flesh_out_topology_peers(&mut authority_discovery_service, canonical_shuffling)
729						.await;
730
731				if session >= newest_session {
732					dispatch_validation_event_to_all_unbounded(
733						NetworkBridgeEvent::NewGossipTopology(NewGossipTopology {
734							session,
735							topology: SessionGridTopology::new(shuffled_indices, topology_peers),
736							local_index,
737						}),
738						ctx.sender(),
739					);
740				} else {
741					dispatch_validation_event_to_approval_unbounded(
742						&NetworkBridgeEvent::NewGossipTopology(NewGossipTopology {
743							session,
744							topology: SessionGridTopology::new(shuffled_indices, topology_peers),
745							local_index,
746						}),
747						ctx.sender(),
748					);
749				}
750
751				newest_session = newest_session.max(session);
752			},
753			FromOrchestra::Communication {
754				msg: NetworkBridgeRxMessage::UpdatedAuthorityIds { peer_id, authority_ids },
755			} => {
756				gum::debug!(
757					target: LOG_TARGET,
758					action = "UpdatedAuthorityIds",
759					?peer_id,
760					?authority_ids,
761					"`AuthorityDiscoveryId`s have changed",
762				);
763				// using unbounded send to avoid cycles
764				// the messages are sent only once per session up to one per peer
765				dispatch_collation_event_to_all_unbounded(
766					NetworkBridgeEvent::UpdatedAuthorityIds(peer_id, authority_ids.clone()),
767					ctx.sender(),
768				);
769				dispatch_validation_event_to_all_unbounded(
770					NetworkBridgeEvent::UpdatedAuthorityIds(peer_id, authority_ids),
771					ctx.sender(),
772				);
773			},
774			FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
775			FromOrchestra::Signal(OverseerSignal::ActiveLeaves(active_leaves)) => {
776				let ActiveLeavesUpdate { activated, deactivated } = active_leaves;
777				gum::trace!(
778					target: LOG_TARGET,
779					action = "ActiveLeaves",
780					has_activated = activated.is_some(),
781					num_deactivated = %deactivated.len(),
782				);
783
784				if let Some(activated) = activated {
785					let pos = live_heads
786						.binary_search_by(|probe| probe.number.cmp(&activated.number).reverse())
787						.unwrap_or_else(|i| i);
788
789					live_heads.insert(pos, activated);
790				}
791				live_heads.retain(|h| !deactivated.contains(&h.hash));
792
793				// if we're done syncing, set the mode to `Mode::Active`.
794				// Otherwise, we don't need to send view updates.
795				{
796					let is_done_syncing = match mode {
797						Mode::Active => true,
798						Mode::Syncing(ref mut sync_oracle) => !sync_oracle.is_major_syncing(),
799					};
800
801					if is_done_syncing {
802						mode = Mode::Active;
803
804						update_our_view(
805							&mut ctx,
806							&live_heads,
807							&shared,
808							finalized_number,
809							&metrics,
810							&notification_sinks,
811						);
812						note_peers_count(&metrics, &shared);
813					}
814				}
815			},
816			FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number)) => {
817				gum::trace!(target: LOG_TARGET, action = "BlockFinalized");
818
819				debug_assert!(finalized_number < number);
820
821				// we don't send the view updates here, but delay them until the next `ActiveLeaves`
822				// otherwise it might break assumptions of some of the subsystems
823				// that we never send the same `ActiveLeavesUpdate`
824				finalized_number = number;
825			},
826		}
827	}
828}
829
830/// Main driver, processing network events and overseer signals.
831///
832/// THIS IS A HACK. We need to ensure we never hold the mutex across an `.await` boundary
833/// and `parking_lot` currently does not provide `Send`, which helps us enforce that.
834/// If this breaks, we need to find another way to protect ourselves.
835///
836/// ```compile_fail
837/// #use parking_lot::MutexGuard;
838/// #fn is_send<T: Send>();
839/// #is_send::<parking_lot::MutexGuard<'static, ()>();
840/// ```
841#[overseer::contextbounds(NetworkBridgeRx, prefix = self::overseer)]
842async fn run_network_in<N, AD, Context>(
843	bridge: NetworkBridgeRx<N, AD>,
844	mut ctx: Context,
845) -> Result<(), Error>
846where
847	N: Network,
848	AD: validator_discovery::AuthorityDiscovery + Clone,
849{
850	let NetworkBridgeRx {
851		network_service,
852		authority_discovery_service,
853		metrics,
854		sync_oracle,
855		shared,
856		peerset_protocol_names,
857		validation_service,
858		collation_service,
859		notification_sinks,
860	} = bridge;
861
862	let (task, network_event_handler) = handle_network_messages(
863		ctx.sender().clone(),
864		network_service.clone(),
865		authority_discovery_service.clone(),
866		metrics.clone(),
867		shared.clone(),
868		peerset_protocol_names.clone(),
869		validation_service,
870		collation_service,
871		notification_sinks.clone(),
872	)
873	.remote_handle();
874
875	ctx.spawn_blocking("network-bridge-in-network-worker", Box::pin(task))?;
876	futures::pin_mut!(network_event_handler);
877
878	let orchestra_signal_handler = run_incoming_orchestra_signals(
879		ctx,
880		authority_discovery_service,
881		shared,
882		sync_oracle,
883		metrics,
884		notification_sinks,
885	);
886
887	futures::pin_mut!(orchestra_signal_handler);
888
889	futures::future::select(orchestra_signal_handler, network_event_handler)
890		.await
891		.factor_first()
892		.0?;
893	Ok(())
894}
895
896fn construct_view(
897	live_heads: impl DoubleEndedIterator<Item = Hash>,
898	finalized_number: BlockNumber,
899) -> View {
900	View::new(live_heads.take(MAX_VIEW_HEADS), finalized_number)
901}
902
903#[overseer::contextbounds(NetworkBridgeRx, prefix = self::overseer)]
904fn update_our_view<Context>(
905	ctx: &mut Context,
906	live_heads: &[ActivatedLeaf],
907	shared: &Shared,
908	finalized_number: BlockNumber,
909	metrics: &Metrics,
910	notification_sinks: &Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
911) {
912	let new_view = construct_view(live_heads.iter().map(|v| v.hash), finalized_number);
913
914	let (validation_peers, collation_peers) = {
915		let mut shared = shared.0.lock();
916
917		// We only want to send a view update when the heads changed.
918		// A change in finalized block number only is _not_ sufficient.
919		//
920		// If this is the first view update since becoming active, but our view is empty,
921		// there is no need to send anything.
922		match shared.local_view {
923			Some(ref v) if v.check_heads_eq(&new_view) => return,
924			None if live_heads.is_empty() => {
925				shared.local_view = Some(new_view);
926				return
927			},
928			_ => {
929				shared.local_view = Some(new_view.clone());
930			},
931		}
932
933		(
934			shared
935				.validation_peers
936				.iter()
937				.map(|(peer_id, data)| (*peer_id, data.version))
938				.collect::<Vec<_>>(),
939			shared
940				.collation_peers
941				.iter()
942				.map(|(peer_id, data)| (*peer_id, data.version))
943				.collect::<Vec<_>>(),
944		)
945	};
946
947	let our_view = OurView::new(
948		live_heads.iter().take(MAX_VIEW_HEADS).cloned().map(|a| a.hash),
949		finalized_number,
950	);
951
952	gum::debug!(
953		target: LOG_TARGET,
954		live_head_count = ?live_heads.len(),
955		"Our view updated, current view: {:?}",
956		our_view,
957	);
958
959	dispatch_validation_event_to_all_unbounded(
960		NetworkBridgeEvent::OurViewChange(our_view.clone()),
961		ctx.sender(),
962	);
963
964	dispatch_collation_event_to_all_unbounded(
965		NetworkBridgeEvent::OurViewChange(our_view),
966		ctx.sender(),
967	);
968
969	let v1_collation_peers = filter_by_peer_version(&collation_peers, CollationVersion::V1.into());
970
971	let v2_collation_peers = filter_by_peer_version(&collation_peers, CollationVersion::V2.into());
972
973	let v3_validation_peers =
974		filter_by_peer_version(&validation_peers, ValidationVersion::V3.into());
975
976	send_collation_message_v1(
977		v1_collation_peers,
978		WireMessage::ViewUpdate(new_view.clone()),
979		metrics,
980		notification_sinks,
981	);
982
983	send_collation_message_v2(
984		v2_collation_peers,
985		WireMessage::ViewUpdate(new_view.clone()),
986		metrics,
987		notification_sinks,
988	);
989
990	send_validation_message_v3(
991		v3_validation_peers,
992		WireMessage::ViewUpdate(new_view.clone()),
993		metrics,
994		notification_sinks,
995	);
996}
997
998// Handle messages on a specific v1 peer-set. The peer is expected to be connected on that
999// peer-set.
1000fn handle_peer_messages<RawMessage: Decode, OutMessage: From<RawMessage>>(
1001	peer: PeerId,
1002	peer_set: PeerSet,
1003	peers: &mut HashMap<PeerId, PeerData>,
1004	messages: Vec<Bytes>,
1005	metrics: &Metrics,
1006) -> (Vec<NetworkBridgeEvent<OutMessage>>, Vec<Rep>) {
1007	let peer_data = match peers.get_mut(&peer) {
1008		None => return (Vec::new(), vec![UNCONNECTED_PEERSET_COST]),
1009		Some(d) => d,
1010	};
1011
1012	let mut outgoing_events = Vec::with_capacity(messages.len());
1013	let mut reports = Vec::new();
1014
1015	for message in messages {
1016		metrics.on_notification_received(peer_set, peer_data.version, message.len());
1017		let message = match WireMessage::<RawMessage>::decode_all(&mut message.as_ref()) {
1018			Err(_) => {
1019				reports.push(MALFORMED_MESSAGE_COST);
1020				continue
1021			},
1022			Ok(m) => m,
1023		};
1024
1025		outgoing_events.push(match message {
1026			WireMessage::ViewUpdate(new_view) => {
1027				if new_view.len() > MAX_VIEW_HEADS ||
1028					new_view.finalized_number < peer_data.view.finalized_number
1029				{
1030					reports.push(MALFORMED_VIEW_COST);
1031					continue
1032				} else if new_view.is_empty() {
1033					reports.push(EMPTY_VIEW_COST);
1034					continue
1035				} else if new_view == peer_data.view {
1036					continue
1037				} else {
1038					peer_data.view = new_view;
1039
1040					NetworkBridgeEvent::PeerViewChange(peer, peer_data.view.clone())
1041				}
1042			},
1043			WireMessage::ProtocolMessage(message) =>
1044				NetworkBridgeEvent::PeerMessage(peer, message.into()),
1045		})
1046	}
1047
1048	(outgoing_events, reports)
1049}
1050
1051async fn dispatch_validation_event_to_all(
1052	event: NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>,
1053	ctx: &mut impl overseer::NetworkBridgeRxSenderTrait,
1054	metrics: &Metrics,
1055) {
1056	dispatch_validation_events_to_all(std::iter::once(event), ctx, metrics).await
1057}
1058
1059async fn dispatch_collation_event_to_all(
1060	event: NetworkBridgeEvent<net_protocol::VersionedCollationProtocol>,
1061	ctx: &mut impl overseer::NetworkBridgeRxSenderTrait,
1062) {
1063	dispatch_collation_events_to_all(std::iter::once(event), ctx).await
1064}
1065
1066fn dispatch_validation_event_to_approval_unbounded(
1067	event: &NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>,
1068	sender: &mut impl overseer::NetworkBridgeRxSenderTrait,
1069) {
1070	event
1071		.focus()
1072		.ok()
1073		.map(ApprovalVotingParallelMessage::from)
1074		.and_then(|msg| Some(sender.send_unbounded_message(msg)));
1075}
1076
1077fn dispatch_validation_event_to_all_unbounded(
1078	event: NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>,
1079	sender: &mut impl overseer::NetworkBridgeRxSenderTrait,
1080) {
1081	event
1082		.focus()
1083		.ok()
1084		.map(StatementDistributionMessage::from)
1085		.and_then(|msg| Some(sender.send_unbounded_message(msg)));
1086	event
1087		.focus()
1088		.ok()
1089		.map(BitfieldDistributionMessage::from)
1090		.and_then(|msg| Some(sender.send_unbounded_message(msg)));
1091
1092	dispatch_validation_event_to_approval_unbounded(&event, sender);
1093
1094	event
1095		.focus()
1096		.ok()
1097		.map(GossipSupportMessage::from)
1098		.and_then(|msg| Some(sender.send_unbounded_message(msg)));
1099}
1100
1101fn dispatch_collation_event_to_all_unbounded(
1102	event: NetworkBridgeEvent<net_protocol::VersionedCollationProtocol>,
1103	sender: &mut impl overseer::NetworkBridgeRxSenderTrait,
1104) {
1105	if let Ok(msg) = event.focus() {
1106		sender.send_unbounded_message(CollatorProtocolMessage::NetworkBridgeUpdate(msg))
1107	}
1108}
1109
1110async fn dispatch_validation_events_to_all<I>(
1111	events: I,
1112	sender: &mut impl overseer::NetworkBridgeRxSenderTrait,
1113	_metrics: &Metrics,
1114) where
1115	I: IntoIterator<Item = NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>>,
1116	I::IntoIter: Send,
1117{
1118	macro_rules! send_message {
1119		($event:expr, $message:ident) => {
1120			if let Ok(event) = $event.focus() {
1121				let has_high_priority = matches!(
1122					event,
1123					// NetworkBridgeEvent::OurViewChange(..) must also be here,
1124					// but it is sent via an unbounded channel.
1125					// See https://github.com/paritytech/polkadot-sdk/issues/824
1126					NetworkBridgeEvent::PeerConnected(..) |
1127						NetworkBridgeEvent::PeerDisconnected(..) |
1128						NetworkBridgeEvent::PeerViewChange(..)
1129				);
1130				let message = $message::from(event);
1131				if has_high_priority {
1132					sender.send_message_with_priority::<overseer::HighPriority>(message).await;
1133				} else {
1134					sender.send_message(message).await;
1135				}
1136			}
1137		};
1138	}
1139
1140	for event in events {
1141		send_message!(event, StatementDistributionMessage);
1142		send_message!(event, BitfieldDistributionMessage);
1143		send_message!(event, ApprovalVotingParallelMessage);
1144		send_message!(event, GossipSupportMessage);
1145	}
1146}
1147
1148async fn dispatch_collation_events_to_all<I>(
1149	events: I,
1150	ctx: &mut impl overseer::NetworkBridgeRxSenderTrait,
1151) where
1152	I: IntoIterator<Item = NetworkBridgeEvent<net_protocol::VersionedCollationProtocol>>,
1153	I::IntoIter: Send,
1154{
1155	let messages_for = |event: NetworkBridgeEvent<net_protocol::VersionedCollationProtocol>| {
1156		event.focus().ok().map(|m| CollatorProtocolMessage::NetworkBridgeUpdate(m))
1157	};
1158
1159	ctx.send_messages(events.into_iter().flat_map(messages_for)).await
1160}