referrerpolicy=no-referrer-when-downgrade

polkadot_subsystem_bench/mock/
network_bridge.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Mocked `network-bridge` subsystems that uses a `NetworkInterface` to access
18//! the emulated network.
19
20use crate::{
21	configuration::TestAuthorities,
22	network::{NetworkEmulatorHandle, NetworkInterfaceReceiver, NetworkMessage, RequestExt},
23};
24use futures::{channel::mpsc::UnboundedSender, FutureExt, StreamExt};
25use polkadot_node_network_protocol::ValidationProtocols;
26use polkadot_node_subsystem::{
27	messages::{ApprovalVotingParallelMessage, NetworkBridgeTxMessage},
28	overseer, SpawnedSubsystem, SubsystemError,
29};
30use polkadot_node_subsystem_types::{
31	messages::{BitfieldDistributionMessage, NetworkBridgeEvent, StatementDistributionMessage},
32	OverseerSignal,
33};
34use sc_network::{request_responses::ProtocolConfig, RequestFailure};
35
36const LOG_TARGET: &str = "subsystem-bench::network-bridge";
37const ALLOWED_PROTOCOLS: &[&str] = &[
38	"/ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff/req_chunk/2",
39	"/ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff/req_attested_candidate/2",
40	"/ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff/send_dispute/1",
41];
42
43/// A mock of the network bridge tx subsystem.
44pub struct MockNetworkBridgeTx {
45	/// A network emulator handle
46	network: NetworkEmulatorHandle,
47	/// A channel to the network interface,
48	to_network_interface: UnboundedSender<NetworkMessage>,
49	/// Test authorities
50	test_authorities: TestAuthorities,
51}
52
53/// A mock of the network bridge tx subsystem.
54pub struct MockNetworkBridgeRx {
55	/// A network interface receiver
56	network_receiver: NetworkInterfaceReceiver,
57	/// Chunk request sender
58	chunk_request_sender: Option<ProtocolConfig>,
59}
60
61impl MockNetworkBridgeTx {
62	pub fn new(
63		network: NetworkEmulatorHandle,
64		to_network_interface: UnboundedSender<NetworkMessage>,
65		test_authorities: TestAuthorities,
66	) -> MockNetworkBridgeTx {
67		Self { network, to_network_interface, test_authorities }
68	}
69}
70
71impl MockNetworkBridgeRx {
72	pub fn new(
73		network_receiver: NetworkInterfaceReceiver,
74		chunk_request_sender: Option<ProtocolConfig>,
75	) -> MockNetworkBridgeRx {
76		Self { network_receiver, chunk_request_sender }
77	}
78}
79
80#[overseer::subsystem(NetworkBridgeTx, error=SubsystemError, prefix=self::overseer)]
81impl<Context> MockNetworkBridgeTx {
82	fn start(self, ctx: Context) -> SpawnedSubsystem {
83		let future = self.run(ctx).map(|_| Ok(())).boxed();
84
85		SpawnedSubsystem { name: "network-bridge-tx", future }
86	}
87}
88
89#[overseer::subsystem(NetworkBridgeRx, error=SubsystemError, prefix=self::overseer)]
90impl<Context> MockNetworkBridgeRx {
91	fn start(self, ctx: Context) -> SpawnedSubsystem {
92		let future = self.run(ctx).map(|_| Ok(())).boxed();
93
94		SpawnedSubsystem { name: "network-bridge-rx", future }
95	}
96}
97
98#[overseer::contextbounds(NetworkBridgeTx, prefix = self::overseer)]
99impl MockNetworkBridgeTx {
100	async fn run<Context>(self, mut ctx: Context) {
101		// Main subsystem loop.
102		loop {
103			let subsystem_message = ctx.recv().await.expect("Overseer never fails us");
104			match subsystem_message {
105				orchestra::FromOrchestra::Signal(signal) =>
106					if signal == OverseerSignal::Conclude {
107						return
108					},
109				orchestra::FromOrchestra::Communication { msg } => match msg {
110					NetworkBridgeTxMessage::SendRequests(requests, _if_disconnected) => {
111						for request in requests {
112							gum::debug!(target: LOG_TARGET, request = ?request, "Processing request");
113							let peer_id = match request.authority_id() {
114								Some(v) => v.clone(),
115								None => self
116									.test_authorities
117									.peer_id_to_authority
118									.get(request.peer_id().expect("Should exist"))
119									.expect("Should exist")
120									.clone(),
121							};
122
123							if !self.network.is_peer_connected(&peer_id) {
124								// Attempting to send a request to a disconnected peer.
125								request
126									.into_response_sender()
127									.send(Err(RequestFailure::NotConnected))
128									.expect("send never fails");
129								continue
130							}
131
132							let peer_message =
133								NetworkMessage::RequestFromNode(peer_id.clone(), Box::new(request));
134
135							let _ = self.to_network_interface.unbounded_send(peer_message);
136						}
137					},
138					NetworkBridgeTxMessage::ReportPeer(_) => {
139						// ignore rep changes
140					},
141					NetworkBridgeTxMessage::SendValidationMessage(peers, message) => {
142						for peer in peers {
143							self.to_network_interface
144								.unbounded_send(NetworkMessage::MessageFromNode(
145									self.test_authorities
146										.peer_id_to_authority
147										.get(&peer)
148										.unwrap()
149										.clone(),
150									message.clone(),
151								))
152								.expect("Should not fail");
153						}
154					},
155					NetworkBridgeTxMessage::SendValidationMessages(messages) => {
156						for (peers, message) in messages {
157							for peer in peers {
158								self.to_network_interface
159									.unbounded_send(NetworkMessage::MessageFromNode(
160										self.test_authorities
161											.peer_id_to_authority
162											.get(&peer)
163											.unwrap()
164											.clone(),
165										message.clone(),
166									))
167									.expect("Should not fail");
168							}
169						}
170					},
171					message => unimplemented!("Unexpected network bridge message {:?}", message),
172				},
173			}
174		}
175	}
176}
177
178#[overseer::contextbounds(NetworkBridgeRx, prefix = self::overseer)]
179impl MockNetworkBridgeRx {
180	async fn run<Context>(mut self, mut ctx: Context) {
181		// Main subsystem loop.
182		let mut from_network_interface = self.network_receiver.0;
183		loop {
184			futures::select! {
185				maybe_peer_message = from_network_interface.next() => {
186					if let Some(message) = maybe_peer_message {
187						match message {
188							NetworkMessage::MessageFromPeer(peer_id, message) => match message {
189								ValidationProtocols::V3(
190									polkadot_node_network_protocol::v3::ValidationProtocol::BitfieldDistribution(
191										bitfield,
192									),
193								) => {
194									ctx.send_message(
195										BitfieldDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(peer_id, polkadot_node_network_protocol::ValidationProtocols::V3(bitfield)))
196									).await;
197								},
198								ValidationProtocols::V3(
199									polkadot_node_network_protocol::v3::ValidationProtocol::ApprovalDistribution(msg)
200								) => {
201										ctx.send_message(
202											ApprovalVotingParallelMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(peer_id, polkadot_node_network_protocol::ValidationProtocols::V3(msg)))
203										).await;
204								}
205								ValidationProtocols::V3(
206									polkadot_node_network_protocol::v3::ValidationProtocol::StatementDistribution(msg)
207								) => {
208									ctx.send_message(
209										StatementDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(peer_id, polkadot_node_network_protocol::ValidationProtocols::V3(msg)))
210									).await;
211								},
212							},
213							NetworkMessage::RequestFromPeer(request) => {
214								if let Some(protocol) = self.chunk_request_sender.as_mut() {
215									assert!(ALLOWED_PROTOCOLS.contains(&&*protocol.name), "Unexpected protocol {:?}", protocol.name);
216									if let Some(inbound_queue) = protocol.inbound_queue.as_ref() {
217										inbound_queue
218											.send(request)
219											.await
220											.expect("Forwarding requests to subsystem never fails");
221									}
222								}
223							},
224							_ => {
225								panic!("NetworkMessage::RequestFromNode is not expected to be received from a peer")
226							}
227						}
228					}
229				},
230				subsystem_message = ctx.recv().fuse() => {
231					match subsystem_message.expect("Overseer never fails us") {
232						orchestra::FromOrchestra::Signal(signal) => if signal == OverseerSignal::Conclude { return },
233						_ => {
234							unimplemented!("Unexpected network bridge rx message")
235						},
236					}
237				}
238			}
239		}
240	}
241}