referrerpolicy=no-referrer-when-downgrade

polkadot_network_bridge/
network.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use std::{
18	collections::{HashMap, HashSet},
19	sync::Arc,
20};
21
22use async_trait::async_trait;
23use parking_lot::Mutex;
24
25use codec::Encode;
26
27use sc_network::{
28	config::parse_addr, multiaddr::Multiaddr, service::traits::NetworkService, types::ProtocolName,
29	IfDisconnected, MessageSink, OutboundFailure, ReputationChange, RequestFailure,
30};
31
32use polkadot_node_network_protocol::{
33	peer_set::{CollationVersion, PeerSet, ProtocolVersion, ValidationVersion},
34	request_response::{OutgoingRequest, Recipient, ReqProtocolNames, Requests},
35	v1 as protocol_v1, v2 as protocol_v2, v3 as protocol_v3, PeerId,
36};
37use polkadot_primitives::AuthorityDiscoveryId;
38
39use crate::{metrics::Metrics, validator_discovery::AuthorityDiscovery, WireMessage};
40
41// network bridge network abstraction log target
42const LOG_TARGET: &'static str = "parachain::network-bridge-net";
43
44// Helper function to send a validation v3 message to a list of peers.
45// Messages are always sent via the main protocol, even legacy protocol messages.
46pub(crate) fn send_validation_message_v3(
47	peers: Vec<PeerId>,
48	message: WireMessage<protocol_v3::ValidationProtocol>,
49	metrics: &Metrics,
50	notification_sinks: &Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
51) {
52	gum::trace!(target: LOG_TARGET, ?peers, ?message, "Sending validation v3 message to peers",);
53
54	send_message(
55		peers,
56		PeerSet::Validation,
57		ValidationVersion::V3.into(),
58		message,
59		metrics,
60		notification_sinks,
61	);
62}
63
64// Helper function to send a collation v1 message to a list of peers.
65// Messages are always sent via the main protocol, even legacy protocol messages.
66pub(crate) fn send_collation_message_v1(
67	peers: Vec<PeerId>,
68	message: WireMessage<protocol_v1::CollationProtocol>,
69	metrics: &Metrics,
70	notification_sinks: &Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
71) {
72	send_message(
73		peers,
74		PeerSet::Collation,
75		CollationVersion::V1.into(),
76		message,
77		metrics,
78		notification_sinks,
79	);
80}
81
82// Helper function to send a collation v2 message to a list of peers.
83// Messages are always sent via the main protocol, even legacy protocol messages.
84pub(crate) fn send_collation_message_v2(
85	peers: Vec<PeerId>,
86	message: WireMessage<protocol_v2::CollationProtocol>,
87	metrics: &Metrics,
88	notification_sinks: &Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
89) {
90	send_message(
91		peers,
92		PeerSet::Collation,
93		CollationVersion::V2.into(),
94		message,
95		metrics,
96		notification_sinks,
97	);
98}
99
100/// Lower level function that sends a message to the network using the main protocol version.
101///
102/// This function is only used internally by the network-bridge, which is responsible to only send
103/// messages that are compatible with the passed peer set, as that is currently not enforced by
104/// this function. These are messages of type `WireMessage` parameterized on the matching type.
105fn send_message<M>(
106	mut peers: Vec<PeerId>,
107	peer_set: PeerSet,
108	version: ProtocolVersion,
109	message: M,
110	metrics: &super::Metrics,
111	network_notification_sinks: &Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
112) where
113	M: Encode + Clone,
114{
115	if peers.is_empty() {
116		return
117	}
118
119	let message = {
120		let encoded = message.encode();
121		metrics.on_notification_sent(peer_set, version, encoded.len(), peers.len());
122		metrics.on_message(std::any::type_name::<M>());
123		encoded
124	};
125
126	let notification_sinks = network_notification_sinks.lock();
127
128	gum::trace!(
129		target: LOG_TARGET,
130		?peers,
131		?peer_set,
132		?version,
133		?message,
134		"Sending message to peers",
135	);
136
137	// optimization: avoid cloning the message for the last peer in the
138	// list. The message payload can be quite large. If the underlying
139	// network used `Bytes` this would not be necessary.
140	//
141	// peer may have gotten disconnect by the time `send_message()` is called
142	// at which point the sink is not available.
143	let last_peer = peers.pop();
144	peers.into_iter().for_each(|peer| {
145		if let Some(sink) = notification_sinks.get(&(peer_set, peer)) {
146			sink.send_sync_notification(message.clone());
147		}
148	});
149
150	if let Some(peer) = last_peer {
151		if let Some(sink) = notification_sinks.get(&(peer_set, peer)) {
152			sink.send_sync_notification(message.clone());
153		}
154	}
155}
156
157/// An abstraction over networking for the purposes of this subsystem.
158#[async_trait]
159pub trait Network: Clone + Send + 'static {
160	/// Ask the network to keep a substream open with these nodes and not disconnect from them
161	/// until removed from the protocol's peer set.
162	/// Note that `out_peers` setting has no effect on this.
163	async fn set_reserved_peers(
164		&mut self,
165		protocol: ProtocolName,
166		multiaddresses: HashSet<Multiaddr>,
167	) -> Result<(), String>;
168
169	/// Ask the network to extend the reserved set with these nodes.
170	async fn add_peers_to_reserved_set(
171		&mut self,
172		protocol: ProtocolName,
173		multiaddresses: HashSet<Multiaddr>,
174	) -> Result<(), String>;
175
176	/// Removes the peers for the protocol's peer set (both reserved and non-reserved).
177	async fn remove_from_peers_set(
178		&mut self,
179		protocol: ProtocolName,
180		peers: Vec<PeerId>,
181	) -> Result<(), String>;
182
183	/// Send a request to a remote peer.
184	async fn start_request<AD: AuthorityDiscovery>(
185		&self,
186		authority_discovery: &mut AD,
187		req: Requests,
188		req_protocol_names: &ReqProtocolNames,
189		if_disconnected: IfDisconnected,
190	);
191
192	/// Report a given peer as either beneficial (+) or costly (-) according to the given scalar.
193	fn report_peer(&self, who: PeerId, rep: ReputationChange);
194
195	/// Disconnect a given peer from the protocol specified without harming reputation.
196	fn disconnect_peer(&self, who: PeerId, protocol: ProtocolName);
197
198	/// Get peer role.
199	fn peer_role(&self, who: PeerId, handshake: Vec<u8>) -> Option<sc_network::ObservedRole>;
200}
201
202#[async_trait]
203impl Network for Arc<dyn NetworkService> {
204	async fn set_reserved_peers(
205		&mut self,
206		protocol: ProtocolName,
207		multiaddresses: HashSet<Multiaddr>,
208	) -> Result<(), String> {
209		<dyn NetworkService>::set_reserved_peers(&**self, protocol, multiaddresses)
210	}
211
212	async fn add_peers_to_reserved_set(
213		&mut self,
214		protocol: ProtocolName,
215		multiaddresses: HashSet<Multiaddr>,
216	) -> Result<(), String> {
217		<dyn NetworkService>::add_peers_to_reserved_set(&**self, protocol, multiaddresses)
218	}
219
220	async fn remove_from_peers_set(
221		&mut self,
222		protocol: ProtocolName,
223		peers: Vec<PeerId>,
224	) -> Result<(), String> {
225		<dyn NetworkService>::remove_peers_from_reserved_set(&**self, protocol, peers)
226	}
227
228	fn report_peer(&self, who: PeerId, rep: ReputationChange) {
229		<dyn NetworkService>::report_peer(&**self, who, rep);
230	}
231
232	fn disconnect_peer(&self, who: PeerId, protocol: ProtocolName) {
233		<dyn NetworkService>::disconnect_peer(&**self, who, protocol);
234	}
235
236	async fn start_request<AD: AuthorityDiscovery>(
237		&self,
238		authority_discovery: &mut AD,
239		req: Requests,
240		req_protocol_names: &ReqProtocolNames,
241		if_disconnected: IfDisconnected,
242	) {
243		let (protocol, OutgoingRequest { peer, payload, pending_response, fallback_request }) =
244			req.encode_request();
245
246		let peer_id = match peer {
247			Recipient::Peer(peer_id) => Some(peer_id),
248			Recipient::Authority(authority) => {
249				gum::trace!(
250					target: LOG_TARGET,
251					?authority,
252					"Searching for peer id to connect to authority",
253				);
254
255				let mut found_peer_id = None;
256				// Note: `get_addresses_by_authority_id` searched in a cache, and it thus expected
257				// to be very quick.
258				for addr in authority_discovery
259					.get_addresses_by_authority_id(authority)
260					.await
261					.into_iter()
262					.flat_map(|list| list.into_iter())
263				{
264					let (peer_id, addr) = match parse_addr(addr) {
265						Ok(v) => v,
266						Err(_) => continue,
267					};
268					<dyn NetworkService>::add_known_address(&**self, peer_id, addr);
269					found_peer_id = Some(peer_id);
270				}
271				found_peer_id
272			},
273		};
274
275		let peer_id = match peer_id {
276			None => {
277				gum::debug!(target: LOG_TARGET, "Discovering authority failed");
278				match pending_response
279					.send(Err(RequestFailure::Network(OutboundFailure::DialFailure)))
280				{
281					Err(_) => {
282						gum::debug!(target: LOG_TARGET, "Sending failed request response failed.")
283					},
284					Ok(_) => {},
285				}
286				return
287			},
288			Some(peer_id) => peer_id,
289		};
290
291		gum::trace!(
292			target: LOG_TARGET,
293			%peer_id,
294			protocol = %req_protocol_names.get_name(protocol),
295			fallback_protocol = ?fallback_request.as_ref().map(|(_, p)| req_protocol_names.get_name(*p)),
296			?if_disconnected,
297			"Starting request",
298		);
299
300		<dyn NetworkService>::start_request(
301			&**self,
302			peer_id,
303			req_protocol_names.get_name(protocol),
304			payload,
305			fallback_request.map(|(r, p)| (r, req_protocol_names.get_name(p))),
306			pending_response,
307			if_disconnected,
308		);
309	}
310
311	fn peer_role(&self, who: PeerId, handshake: Vec<u8>) -> Option<sc_network::ObservedRole> {
312		<dyn NetworkService>::peer_role(&**self, who, handshake)
313	}
314}
315
316/// We assume one `peer_id` per `authority_id`.
317pub async fn get_peer_id_by_authority_id<AD: AuthorityDiscovery>(
318	authority_discovery: &mut AD,
319	authority: AuthorityDiscoveryId,
320) -> Option<PeerId> {
321	// Note: `get_addresses_by_authority_id` searched in a cache, and it thus expected
322	// to be very quick.
323	authority_discovery
324		.get_addresses_by_authority_id(authority)
325		.await
326		.into_iter()
327		.flat_map(|list| list.into_iter())
328		.find_map(|addr| parse_addr(addr).ok().map(|(p, _)| p))
329}