referrerpolicy=no-referrer-when-downgrade

polkadot_network_bridge/tx/
mod.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! The Network Bridge Subsystem - handles _outgoing_ messages, from subsystem to the network.
18use super::*;
19
20use polkadot_node_network_protocol::{
21	peer_set::PeerSetProtocolNames, request_response::ReqProtocolNames, CollationProtocols,
22	ValidationProtocols,
23};
24
25use polkadot_node_subsystem::{
26	errors::SubsystemError,
27	messages::{NetworkBridgeTxMessage, ReportPeerMessage},
28	overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem,
29};
30
31use polkadot_node_network_protocol::request_response::Requests;
32use sc_network::{MessageSink, ReputationChange};
33
34use crate::validator_discovery;
35
36/// Actual interfacing to the network based on the `Network` trait.
37///
38/// Defines the `Network` trait with an implementation for an `Arc<NetworkService>`.
39use crate::network::{
40	send_collation_message_v1, send_collation_message_v2, send_validation_message_v3, Network,
41};
42
43use crate::metrics::Metrics;
44
45#[cfg(test)]
46mod tests;
47
48// network bridge log target
49const LOG_TARGET: &'static str = "parachain::network-bridge-tx";
50
51/// The network bridge subsystem.
52pub struct NetworkBridgeTx<N, AD> {
53	/// `Network` trait implementing type.
54	network_service: N,
55	authority_discovery_service: AD,
56	metrics: Metrics,
57	req_protocol_names: ReqProtocolNames,
58	peerset_protocol_names: PeerSetProtocolNames,
59	notification_sinks: Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
60}
61
62impl<N, AD> NetworkBridgeTx<N, AD> {
63	/// Create a new network bridge subsystem with underlying network service and authority
64	/// discovery service.
65	///
66	/// This assumes that the network service has had the notifications protocol for the network
67	/// bridge already registered. See [`peer_sets_info`].
68	pub fn new(
69		network_service: N,
70		authority_discovery_service: AD,
71		metrics: Metrics,
72		req_protocol_names: ReqProtocolNames,
73		peerset_protocol_names: PeerSetProtocolNames,
74		notification_sinks: Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
75	) -> Self {
76		Self {
77			network_service,
78			authority_discovery_service,
79			metrics,
80			req_protocol_names,
81			peerset_protocol_names,
82			notification_sinks,
83		}
84	}
85}
86
87#[overseer::subsystem(NetworkBridgeTx, error = SubsystemError, prefix = self::overseer)]
88impl<Net, AD, Context> NetworkBridgeTx<Net, AD>
89where
90	Net: Network + Sync,
91	AD: validator_discovery::AuthorityDiscovery + Clone + Sync,
92{
93	fn start(self, ctx: Context) -> SpawnedSubsystem {
94		let future = run_network_out(self, ctx)
95			.map_err(|e| SubsystemError::with_origin("network-bridge", e))
96			.boxed();
97		SpawnedSubsystem { name: "network-bridge-tx-subsystem", future }
98	}
99}
100
101#[overseer::contextbounds(NetworkBridgeTx, prefix = self::overseer)]
102async fn handle_subsystem_messages<Context, N, AD>(
103	mut ctx: Context,
104	mut network_service: N,
105	mut authority_discovery_service: AD,
106	metrics: Metrics,
107	req_protocol_names: ReqProtocolNames,
108	peerset_protocol_names: PeerSetProtocolNames,
109	notification_sinks: Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
110) -> Result<(), Error>
111where
112	N: Network,
113	AD: validator_discovery::AuthorityDiscovery + Clone,
114{
115	let mut validator_discovery =
116		validator_discovery::Service::<N, AD>::new(peerset_protocol_names.clone());
117
118	loop {
119		match ctx.recv().fuse().await? {
120			FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
121			FromOrchestra::Signal(_) => { /* handled by incoming */ },
122			FromOrchestra::Communication { msg } => {
123				(network_service, authority_discovery_service) =
124					handle_incoming_subsystem_communication(
125						&mut ctx,
126						network_service,
127						&mut validator_discovery,
128						authority_discovery_service.clone(),
129						msg,
130						&metrics,
131						&req_protocol_names,
132						&peerset_protocol_names,
133						&notification_sinks,
134					)
135					.await;
136			},
137		}
138	}
139}
140
141#[overseer::contextbounds(NetworkBridgeTx, prefix = self::overseer)]
142async fn handle_incoming_subsystem_communication<Context, N, AD>(
143	_ctx: &mut Context,
144	network_service: N,
145	validator_discovery: &mut validator_discovery::Service<N, AD>,
146	mut authority_discovery_service: AD,
147	msg: NetworkBridgeTxMessage,
148	metrics: &Metrics,
149	req_protocol_names: &ReqProtocolNames,
150	peerset_protocol_names: &PeerSetProtocolNames,
151	notification_sinks: &Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
152) -> (N, AD)
153where
154	N: Network,
155	AD: validator_discovery::AuthorityDiscovery + Clone,
156{
157	match msg {
158		NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(peer, rep)) => {
159			if !rep.value.is_positive() {
160				gum::debug!(target: LOG_TARGET, ?peer, ?rep, action = "ReportPeer");
161			}
162
163			metrics.on_report_event();
164			network_service.report_peer(peer, rep);
165		},
166		NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Batch(batch)) => {
167			for (peer, score) in batch {
168				let rep = ReputationChange::new(score, "Aggregated reputation change");
169				if !rep.value.is_positive() {
170					gum::debug!(target: LOG_TARGET, ?peer, ?rep, action = "ReportPeer");
171				}
172
173				metrics.on_report_event();
174				network_service.report_peer(peer, rep);
175			}
176		},
177		NetworkBridgeTxMessage::DisconnectPeers(peers, peer_set) => {
178			gum::trace!(
179				target: LOG_TARGET,
180				action = "DisconnectPeers",
181				?peers,
182				peer_set = ?peer_set,
183			);
184
185			// [`NetworkService`] keeps track of the protocols by their main name.
186			let protocol = peerset_protocol_names.get_main_name(peer_set);
187			for peer in peers {
188				network_service.disconnect_peer(peer, protocol.clone());
189			}
190		},
191		NetworkBridgeTxMessage::SendValidationMessage(peers, msg) => {
192			gum::trace!(
193				target: LOG_TARGET,
194				action = "SendValidationMessages",
195				?msg,
196				num_messages = 1usize,
197			);
198
199			match msg {
200				ValidationProtocols::V3(msg) => send_validation_message_v3(
201					peers,
202					WireMessage::ProtocolMessage(msg),
203					&metrics,
204					notification_sinks,
205				),
206			}
207		},
208		NetworkBridgeTxMessage::SendValidationMessages(msgs) => {
209			gum::trace!(
210				target: LOG_TARGET,
211				action = "SendValidationMessages",
212				num_messages = %msgs.len(),
213				?msgs,
214			);
215
216			for (peers, msg) in msgs {
217				match msg {
218					ValidationProtocols::V3(msg) => send_validation_message_v3(
219						peers,
220						WireMessage::ProtocolMessage(msg),
221						&metrics,
222						notification_sinks,
223					),
224				}
225			}
226		},
227		NetworkBridgeTxMessage::SendCollationMessage(peers, msg) => {
228			gum::trace!(
229				target: LOG_TARGET,
230				action = "SendCollationMessages",
231				num_messages = 1usize,
232			);
233
234			match msg {
235				CollationProtocols::V1(msg) => send_collation_message_v1(
236					peers,
237					WireMessage::ProtocolMessage(msg),
238					&metrics,
239					notification_sinks,
240				),
241				CollationProtocols::V2(msg) => send_collation_message_v2(
242					peers,
243					WireMessage::ProtocolMessage(msg),
244					&metrics,
245					notification_sinks,
246				),
247			}
248		},
249		NetworkBridgeTxMessage::SendCollationMessages(msgs) => {
250			gum::trace!(
251				target: LOG_TARGET,
252				action = "SendCollationMessages",
253				num_messages = %msgs.len(),
254			);
255
256			for (peers, msg) in msgs {
257				match msg {
258					CollationProtocols::V1(msg) => send_collation_message_v1(
259						peers,
260						WireMessage::ProtocolMessage(msg),
261						&metrics,
262						notification_sinks,
263					),
264					CollationProtocols::V2(msg) => send_collation_message_v2(
265						peers,
266						WireMessage::ProtocolMessage(msg),
267						&metrics,
268						notification_sinks,
269					),
270				}
271			}
272		},
273		NetworkBridgeTxMessage::SendRequests(reqs, if_disconnected) => {
274			gum::trace!(
275				target: LOG_TARGET,
276				action = "SendRequests",
277				num_requests = %reqs.len(),
278			);
279
280			for req in reqs {
281				match req {
282					Requests::ChunkFetching(ref req) => {
283						// This is not the actual request that will succeed, as we don't know yet
284						// what that will be. It's only the primary request we tried.
285						if req.fallback_request.is_some() {
286							metrics.on_message("chunk_fetching_v2")
287						} else {
288							metrics.on_message("chunk_fetching_v1")
289						}
290					},
291					Requests::AvailableDataFetchingV1(_) =>
292						metrics.on_message("available_data_fetching_v1"),
293					Requests::CollationFetchingV1(_) => metrics.on_message("collation_fetching_v1"),
294					Requests::CollationFetchingV2(_) => metrics.on_message("collation_fetching_v2"),
295					Requests::PoVFetchingV1(_) => metrics.on_message("pov_fetching_v1"),
296					Requests::DisputeSendingV1(_) => metrics.on_message("dispute_sending_v1"),
297					Requests::AttestedCandidateV2(_) => metrics.on_message("attested_candidate_v2"),
298				}
299
300				network_service
301					.start_request(
302						&mut authority_discovery_service,
303						req,
304						req_protocol_names,
305						if_disconnected,
306					)
307					.await;
308			}
309		},
310		NetworkBridgeTxMessage::ConnectToValidators { validator_ids, peer_set, failed } => {
311			gum::trace!(
312				target: LOG_TARGET,
313				action = "ConnectToValidators",
314				peer_set = ?peer_set,
315				ids = ?validator_ids,
316				"Received a validator connection request",
317			);
318
319			metrics.note_desired_peer_count(peer_set, validator_ids.len());
320
321			let (network_service, ads) = validator_discovery
322				.on_request(
323					validator_ids,
324					peer_set,
325					failed,
326					network_service,
327					authority_discovery_service,
328				)
329				.await;
330
331			return (network_service, ads)
332		},
333		NetworkBridgeTxMessage::ConnectToResolvedValidators { validator_addrs, peer_set } => {
334			gum::trace!(
335				target: LOG_TARGET,
336				action = "ConnectToPeers",
337				peer_set = ?peer_set,
338				?validator_addrs,
339				"Received a resolved validator connection request",
340			);
341
342			metrics.note_desired_peer_count(peer_set, validator_addrs.len());
343
344			let all_addrs = validator_addrs.into_iter().flatten().collect();
345			let network_service = validator_discovery
346				.on_resolved_request(all_addrs, peer_set, network_service)
347				.await;
348			return (network_service, authority_discovery_service)
349		},
350
351		NetworkBridgeTxMessage::AddToResolvedValidators { validator_addrs, peer_set } => {
352			gum::trace!(
353				target: LOG_TARGET,
354				action = "AddToResolvedValidators",
355				peer_set = ?peer_set,
356				?validator_addrs,
357				"Received a resolved validator connection request",
358			);
359
360			let all_addrs = validator_addrs.into_iter().flatten().collect();
361			let network_service = validator_discovery
362				.on_add_to_resolved_request(all_addrs, peer_set, network_service)
363				.await;
364			return (network_service, authority_discovery_service)
365		},
366	}
367	(network_service, authority_discovery_service)
368}
369
370#[overseer::contextbounds(NetworkBridgeTx, prefix = self::overseer)]
371async fn run_network_out<N, AD, Context>(
372	bridge: NetworkBridgeTx<N, AD>,
373	ctx: Context,
374) -> Result<(), Error>
375where
376	N: Network,
377	AD: validator_discovery::AuthorityDiscovery + Clone + Sync,
378{
379	let NetworkBridgeTx {
380		network_service,
381		authority_discovery_service,
382		metrics,
383		req_protocol_names,
384		peerset_protocol_names,
385		notification_sinks,
386	} = bridge;
387
388	handle_subsystem_messages(
389		ctx,
390		network_service,
391		authority_discovery_service,
392		metrics,
393		req_protocol_names,
394		peerset_protocol_names,
395		notification_sinks,
396	)
397	.await?;
398
399	Ok(())
400}