1use std::{
18 collections::{HashMap, HashSet},
19 sync::Arc,
20};
21
22use async_trait::async_trait;
23use parking_lot::Mutex;
24
25use codec::Encode;
26
27use sc_network::{
28 config::parse_addr, multiaddr::Multiaddr, service::traits::NetworkService, types::ProtocolName,
29 IfDisconnected, MessageSink, OutboundFailure, ReputationChange, RequestFailure,
30};
31
32use polkadot_node_network_protocol::{
33 peer_set::{CollationVersion, PeerSet, ProtocolVersion, ValidationVersion},
34 request_response::{OutgoingRequest, Recipient, ReqProtocolNames, Requests},
35 v1 as protocol_v1, v2 as protocol_v2, v3 as protocol_v3, PeerId,
36};
37use polkadot_primitives::AuthorityDiscoveryId;
38
39use crate::{metrics::Metrics, validator_discovery::AuthorityDiscovery, WireMessage};
40
41const LOG_TARGET: &'static str = "parachain::network-bridge-net";
43
44pub(crate) fn send_validation_message_v3(
47 peers: Vec<PeerId>,
48 message: WireMessage<protocol_v3::ValidationProtocol>,
49 metrics: &Metrics,
50 notification_sinks: &Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
51) {
52 gum::trace!(target: LOG_TARGET, ?peers, ?message, "Sending validation v3 message to peers",);
53
54 send_message(
55 peers,
56 PeerSet::Validation,
57 ValidationVersion::V3.into(),
58 message,
59 metrics,
60 notification_sinks,
61 );
62}
63
64pub(crate) fn send_collation_message_v1(
67 peers: Vec<PeerId>,
68 message: WireMessage<protocol_v1::CollationProtocol>,
69 metrics: &Metrics,
70 notification_sinks: &Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
71) {
72 send_message(
73 peers,
74 PeerSet::Collation,
75 CollationVersion::V1.into(),
76 message,
77 metrics,
78 notification_sinks,
79 );
80}
81
82pub(crate) fn send_collation_message_v2(
85 peers: Vec<PeerId>,
86 message: WireMessage<protocol_v2::CollationProtocol>,
87 metrics: &Metrics,
88 notification_sinks: &Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
89) {
90 send_message(
91 peers,
92 PeerSet::Collation,
93 CollationVersion::V2.into(),
94 message,
95 metrics,
96 notification_sinks,
97 );
98}
99
100fn send_message<M>(
106 mut peers: Vec<PeerId>,
107 peer_set: PeerSet,
108 version: ProtocolVersion,
109 message: M,
110 metrics: &super::Metrics,
111 network_notification_sinks: &Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
112) where
113 M: Encode + Clone,
114{
115 if peers.is_empty() {
116 return
117 }
118
119 let message = {
120 let encoded = message.encode();
121 metrics.on_notification_sent(peer_set, version, encoded.len(), peers.len());
122 metrics.on_message(std::any::type_name::<M>());
123 encoded
124 };
125
126 let notification_sinks = network_notification_sinks.lock();
127
128 gum::trace!(
129 target: LOG_TARGET,
130 ?peers,
131 ?peer_set,
132 ?version,
133 ?message,
134 "Sending message to peers",
135 );
136
137 let last_peer = peers.pop();
144 peers.into_iter().for_each(|peer| {
145 if let Some(sink) = notification_sinks.get(&(peer_set, peer)) {
146 sink.send_sync_notification(message.clone());
147 }
148 });
149
150 if let Some(peer) = last_peer {
151 if let Some(sink) = notification_sinks.get(&(peer_set, peer)) {
152 sink.send_sync_notification(message.clone());
153 }
154 }
155}
156
157#[async_trait]
159pub trait Network: Clone + Send + 'static {
160 async fn set_reserved_peers(
164 &mut self,
165 protocol: ProtocolName,
166 multiaddresses: HashSet<Multiaddr>,
167 ) -> Result<(), String>;
168
169 async fn add_peers_to_reserved_set(
171 &mut self,
172 protocol: ProtocolName,
173 multiaddresses: HashSet<Multiaddr>,
174 ) -> Result<(), String>;
175
176 async fn remove_from_peers_set(
178 &mut self,
179 protocol: ProtocolName,
180 peers: Vec<PeerId>,
181 ) -> Result<(), String>;
182
183 async fn start_request<AD: AuthorityDiscovery>(
185 &self,
186 authority_discovery: &mut AD,
187 req: Requests,
188 req_protocol_names: &ReqProtocolNames,
189 if_disconnected: IfDisconnected,
190 );
191
192 fn report_peer(&self, who: PeerId, rep: ReputationChange);
194
195 fn disconnect_peer(&self, who: PeerId, protocol: ProtocolName);
197
198 fn peer_role(&self, who: PeerId, handshake: Vec<u8>) -> Option<sc_network::ObservedRole>;
200}
201
202#[async_trait]
203impl Network for Arc<dyn NetworkService> {
204 async fn set_reserved_peers(
205 &mut self,
206 protocol: ProtocolName,
207 multiaddresses: HashSet<Multiaddr>,
208 ) -> Result<(), String> {
209 <dyn NetworkService>::set_reserved_peers(&**self, protocol, multiaddresses)
210 }
211
212 async fn add_peers_to_reserved_set(
213 &mut self,
214 protocol: ProtocolName,
215 multiaddresses: HashSet<Multiaddr>,
216 ) -> Result<(), String> {
217 <dyn NetworkService>::add_peers_to_reserved_set(&**self, protocol, multiaddresses)
218 }
219
220 async fn remove_from_peers_set(
221 &mut self,
222 protocol: ProtocolName,
223 peers: Vec<PeerId>,
224 ) -> Result<(), String> {
225 <dyn NetworkService>::remove_peers_from_reserved_set(&**self, protocol, peers)
226 }
227
228 fn report_peer(&self, who: PeerId, rep: ReputationChange) {
229 <dyn NetworkService>::report_peer(&**self, who, rep);
230 }
231
232 fn disconnect_peer(&self, who: PeerId, protocol: ProtocolName) {
233 <dyn NetworkService>::disconnect_peer(&**self, who, protocol);
234 }
235
236 async fn start_request<AD: AuthorityDiscovery>(
237 &self,
238 authority_discovery: &mut AD,
239 req: Requests,
240 req_protocol_names: &ReqProtocolNames,
241 if_disconnected: IfDisconnected,
242 ) {
243 let (protocol, OutgoingRequest { peer, payload, pending_response, fallback_request }) =
244 req.encode_request();
245
246 let peer_id = match peer {
247 Recipient::Peer(peer_id) => Some(peer_id),
248 Recipient::Authority(authority) => {
249 gum::trace!(
250 target: LOG_TARGET,
251 ?authority,
252 "Searching for peer id to connect to authority",
253 );
254
255 let mut found_peer_id = None;
256 for addr in authority_discovery
259 .get_addresses_by_authority_id(authority)
260 .await
261 .into_iter()
262 .flat_map(|list| list.into_iter())
263 {
264 let (peer_id, addr) = match parse_addr(addr) {
265 Ok(v) => v,
266 Err(_) => continue,
267 };
268 <dyn NetworkService>::add_known_address(&**self, peer_id, addr);
269 found_peer_id = Some(peer_id);
270 }
271 found_peer_id
272 },
273 };
274
275 let peer_id = match peer_id {
276 None => {
277 gum::debug!(target: LOG_TARGET, "Discovering authority failed");
278 match pending_response
279 .send(Err(RequestFailure::Network(OutboundFailure::DialFailure)))
280 {
281 Err(_) => {
282 gum::debug!(target: LOG_TARGET, "Sending failed request response failed.")
283 },
284 Ok(_) => {},
285 }
286 return
287 },
288 Some(peer_id) => peer_id,
289 };
290
291 gum::trace!(
292 target: LOG_TARGET,
293 %peer_id,
294 protocol = %req_protocol_names.get_name(protocol),
295 fallback_protocol = ?fallback_request.as_ref().map(|(_, p)| req_protocol_names.get_name(*p)),
296 ?if_disconnected,
297 "Starting request",
298 );
299
300 <dyn NetworkService>::start_request(
301 &**self,
302 peer_id,
303 req_protocol_names.get_name(protocol),
304 payload,
305 fallback_request.map(|(r, p)| (r, req_protocol_names.get_name(p))),
306 pending_response,
307 if_disconnected,
308 );
309 }
310
311 fn peer_role(&self, who: PeerId, handshake: Vec<u8>) -> Option<sc_network::ObservedRole> {
312 <dyn NetworkService>::peer_role(&**self, who, handshake)
313 }
314}
315
316pub async fn get_peer_id_by_authority_id<AD: AuthorityDiscovery>(
318 authority_discovery: &mut AD,
319 authority: AuthorityDiscoveryId,
320) -> Option<PeerId> {
321 authority_discovery
324 .get_addresses_by_authority_id(authority)
325 .await
326 .into_iter()
327 .flat_map(|list| list.into_iter())
328 .find_map(|addr| parse_addr(addr).ok().map(|(p, _)| p))
329}