1use super::*;
19
20use polkadot_node_network_protocol::{
21 peer_set::PeerSetProtocolNames, request_response::ReqProtocolNames, CollationProtocols,
22 ValidationProtocols,
23};
24
25use polkadot_node_subsystem::{
26 errors::SubsystemError,
27 messages::{NetworkBridgeTxMessage, ReportPeerMessage},
28 overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem,
29};
30
31use polkadot_node_network_protocol::request_response::Requests;
32use sc_network::{MessageSink, ReputationChange};
33
34use crate::validator_discovery;
35
36use crate::network::{
40 send_collation_message_v1, send_collation_message_v2, send_validation_message_v3, Network,
41};
42
43use crate::metrics::Metrics;
44
45#[cfg(test)]
46mod tests;
47
48const LOG_TARGET: &'static str = "parachain::network-bridge-tx";
50
51pub struct NetworkBridgeTx<N, AD> {
53 network_service: N,
55 authority_discovery_service: AD,
56 metrics: Metrics,
57 req_protocol_names: ReqProtocolNames,
58 peerset_protocol_names: PeerSetProtocolNames,
59 notification_sinks: Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
60}
61
62impl<N, AD> NetworkBridgeTx<N, AD> {
63 pub fn new(
69 network_service: N,
70 authority_discovery_service: AD,
71 metrics: Metrics,
72 req_protocol_names: ReqProtocolNames,
73 peerset_protocol_names: PeerSetProtocolNames,
74 notification_sinks: Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
75 ) -> Self {
76 Self {
77 network_service,
78 authority_discovery_service,
79 metrics,
80 req_protocol_names,
81 peerset_protocol_names,
82 notification_sinks,
83 }
84 }
85}
86
87#[overseer::subsystem(NetworkBridgeTx, error = SubsystemError, prefix = self::overseer)]
88impl<Net, AD, Context> NetworkBridgeTx<Net, AD>
89where
90 Net: Network + Sync,
91 AD: validator_discovery::AuthorityDiscovery + Clone + Sync,
92{
93 fn start(self, ctx: Context) -> SpawnedSubsystem {
94 let future = run_network_out(self, ctx)
95 .map_err(|e| SubsystemError::with_origin("network-bridge", e))
96 .boxed();
97 SpawnedSubsystem { name: "network-bridge-tx-subsystem", future }
98 }
99}
100
101#[overseer::contextbounds(NetworkBridgeTx, prefix = self::overseer)]
102async fn handle_subsystem_messages<Context, N, AD>(
103 mut ctx: Context,
104 mut network_service: N,
105 mut authority_discovery_service: AD,
106 metrics: Metrics,
107 req_protocol_names: ReqProtocolNames,
108 peerset_protocol_names: PeerSetProtocolNames,
109 notification_sinks: Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
110) -> Result<(), Error>
111where
112 N: Network,
113 AD: validator_discovery::AuthorityDiscovery + Clone,
114{
115 let mut validator_discovery =
116 validator_discovery::Service::<N, AD>::new(peerset_protocol_names.clone());
117
118 loop {
119 match ctx.recv().fuse().await? {
120 FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
121 FromOrchestra::Signal(_) => { },
122 FromOrchestra::Communication { msg } => {
123 (network_service, authority_discovery_service) =
124 handle_incoming_subsystem_communication(
125 &mut ctx,
126 network_service,
127 &mut validator_discovery,
128 authority_discovery_service.clone(),
129 msg,
130 &metrics,
131 &req_protocol_names,
132 &peerset_protocol_names,
133 ¬ification_sinks,
134 )
135 .await;
136 },
137 }
138 }
139}
140
141#[overseer::contextbounds(NetworkBridgeTx, prefix = self::overseer)]
142async fn handle_incoming_subsystem_communication<Context, N, AD>(
143 _ctx: &mut Context,
144 network_service: N,
145 validator_discovery: &mut validator_discovery::Service<N, AD>,
146 mut authority_discovery_service: AD,
147 msg: NetworkBridgeTxMessage,
148 metrics: &Metrics,
149 req_protocol_names: &ReqProtocolNames,
150 peerset_protocol_names: &PeerSetProtocolNames,
151 notification_sinks: &Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
152) -> (N, AD)
153where
154 N: Network,
155 AD: validator_discovery::AuthorityDiscovery + Clone,
156{
157 match msg {
158 NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(peer, rep)) => {
159 if !rep.value.is_positive() {
160 gum::debug!(target: LOG_TARGET, ?peer, ?rep, action = "ReportPeer");
161 }
162
163 metrics.on_report_event();
164 network_service.report_peer(peer, rep);
165 },
166 NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Batch(batch)) => {
167 for (peer, score) in batch {
168 let rep = ReputationChange::new(score, "Aggregated reputation change");
169 if !rep.value.is_positive() {
170 gum::debug!(target: LOG_TARGET, ?peer, ?rep, action = "ReportPeer");
171 }
172
173 metrics.on_report_event();
174 network_service.report_peer(peer, rep);
175 }
176 },
177 NetworkBridgeTxMessage::DisconnectPeers(peers, peer_set) => {
178 gum::trace!(
179 target: LOG_TARGET,
180 action = "DisconnectPeers",
181 ?peers,
182 peer_set = ?peer_set,
183 );
184
185 let protocol = peerset_protocol_names.get_main_name(peer_set);
187 for peer in peers {
188 network_service.disconnect_peer(peer, protocol.clone());
189 }
190 },
191 NetworkBridgeTxMessage::SendValidationMessage(peers, msg) => {
192 gum::trace!(
193 target: LOG_TARGET,
194 action = "SendValidationMessages",
195 ?msg,
196 num_messages = 1usize,
197 );
198
199 match msg {
200 ValidationProtocols::V3(msg) => send_validation_message_v3(
201 peers,
202 WireMessage::ProtocolMessage(msg),
203 &metrics,
204 notification_sinks,
205 ),
206 }
207 },
208 NetworkBridgeTxMessage::SendValidationMessages(msgs) => {
209 gum::trace!(
210 target: LOG_TARGET,
211 action = "SendValidationMessages",
212 num_messages = %msgs.len(),
213 ?msgs,
214 );
215
216 for (peers, msg) in msgs {
217 match msg {
218 ValidationProtocols::V3(msg) => send_validation_message_v3(
219 peers,
220 WireMessage::ProtocolMessage(msg),
221 &metrics,
222 notification_sinks,
223 ),
224 }
225 }
226 },
227 NetworkBridgeTxMessage::SendCollationMessage(peers, msg) => {
228 gum::trace!(
229 target: LOG_TARGET,
230 action = "SendCollationMessages",
231 num_messages = 1usize,
232 );
233
234 match msg {
235 CollationProtocols::V1(msg) => send_collation_message_v1(
236 peers,
237 WireMessage::ProtocolMessage(msg),
238 &metrics,
239 notification_sinks,
240 ),
241 CollationProtocols::V2(msg) => send_collation_message_v2(
242 peers,
243 WireMessage::ProtocolMessage(msg),
244 &metrics,
245 notification_sinks,
246 ),
247 }
248 },
249 NetworkBridgeTxMessage::SendCollationMessages(msgs) => {
250 gum::trace!(
251 target: LOG_TARGET,
252 action = "SendCollationMessages",
253 num_messages = %msgs.len(),
254 );
255
256 for (peers, msg) in msgs {
257 match msg {
258 CollationProtocols::V1(msg) => send_collation_message_v1(
259 peers,
260 WireMessage::ProtocolMessage(msg),
261 &metrics,
262 notification_sinks,
263 ),
264 CollationProtocols::V2(msg) => send_collation_message_v2(
265 peers,
266 WireMessage::ProtocolMessage(msg),
267 &metrics,
268 notification_sinks,
269 ),
270 }
271 }
272 },
273 NetworkBridgeTxMessage::SendRequests(reqs, if_disconnected) => {
274 gum::trace!(
275 target: LOG_TARGET,
276 action = "SendRequests",
277 num_requests = %reqs.len(),
278 );
279
280 for req in reqs {
281 match req {
282 Requests::ChunkFetching(ref req) => {
283 if req.fallback_request.is_some() {
286 metrics.on_message("chunk_fetching_v2")
287 } else {
288 metrics.on_message("chunk_fetching_v1")
289 }
290 },
291 Requests::AvailableDataFetchingV1(_) =>
292 metrics.on_message("available_data_fetching_v1"),
293 Requests::CollationFetchingV1(_) => metrics.on_message("collation_fetching_v1"),
294 Requests::CollationFetchingV2(_) => metrics.on_message("collation_fetching_v2"),
295 Requests::PoVFetchingV1(_) => metrics.on_message("pov_fetching_v1"),
296 Requests::DisputeSendingV1(_) => metrics.on_message("dispute_sending_v1"),
297 Requests::AttestedCandidateV2(_) => metrics.on_message("attested_candidate_v2"),
298 }
299
300 network_service
301 .start_request(
302 &mut authority_discovery_service,
303 req,
304 req_protocol_names,
305 if_disconnected,
306 )
307 .await;
308 }
309 },
310 NetworkBridgeTxMessage::ConnectToValidators { validator_ids, peer_set, failed } => {
311 gum::trace!(
312 target: LOG_TARGET,
313 action = "ConnectToValidators",
314 peer_set = ?peer_set,
315 ids = ?validator_ids,
316 "Received a validator connection request",
317 );
318
319 metrics.note_desired_peer_count(peer_set, validator_ids.len());
320
321 let (network_service, ads) = validator_discovery
322 .on_request(
323 validator_ids,
324 peer_set,
325 failed,
326 network_service,
327 authority_discovery_service,
328 )
329 .await;
330
331 return (network_service, ads)
332 },
333 NetworkBridgeTxMessage::ConnectToResolvedValidators { validator_addrs, peer_set } => {
334 gum::trace!(
335 target: LOG_TARGET,
336 action = "ConnectToPeers",
337 peer_set = ?peer_set,
338 ?validator_addrs,
339 "Received a resolved validator connection request",
340 );
341
342 metrics.note_desired_peer_count(peer_set, validator_addrs.len());
343
344 let all_addrs = validator_addrs.into_iter().flatten().collect();
345 let network_service = validator_discovery
346 .on_resolved_request(all_addrs, peer_set, network_service)
347 .await;
348 return (network_service, authority_discovery_service)
349 },
350
351 NetworkBridgeTxMessage::AddToResolvedValidators { validator_addrs, peer_set } => {
352 gum::trace!(
353 target: LOG_TARGET,
354 action = "AddToResolvedValidators",
355 peer_set = ?peer_set,
356 ?validator_addrs,
357 "Received a resolved validator connection request",
358 );
359
360 let all_addrs = validator_addrs.into_iter().flatten().collect();
361 let network_service = validator_discovery
362 .on_add_to_resolved_request(all_addrs, peer_set, network_service)
363 .await;
364 return (network_service, authority_discovery_service)
365 },
366 }
367 (network_service, authority_discovery_service)
368}
369
370#[overseer::contextbounds(NetworkBridgeTx, prefix = self::overseer)]
371async fn run_network_out<N, AD, Context>(
372 bridge: NetworkBridgeTx<N, AD>,
373 ctx: Context,
374) -> Result<(), Error>
375where
376 N: Network,
377 AD: validator_discovery::AuthorityDiscovery + Clone + Sync,
378{
379 let NetworkBridgeTx {
380 network_service,
381 authority_discovery_service,
382 metrics,
383 req_protocol_names,
384 peerset_protocol_names,
385 notification_sinks,
386 } = bridge;
387
388 handle_subsystem_messages(
389 ctx,
390 network_service,
391 authority_discovery_service,
392 metrics,
393 req_protocol_names,
394 peerset_protocol_names,
395 notification_sinks,
396 )
397 .await?;
398
399 Ok(())
400}