polkadot_subsystem_bench/mock/
network_bridge.rs1use crate::{
21 configuration::TestAuthorities,
22 network::{NetworkEmulatorHandle, NetworkInterfaceReceiver, NetworkMessage, RequestExt},
23};
24use futures::{channel::mpsc::UnboundedSender, FutureExt, StreamExt};
25use polkadot_node_network_protocol::ValidationProtocols;
26use polkadot_node_subsystem::{
27 messages::{ApprovalVotingParallelMessage, NetworkBridgeTxMessage},
28 overseer, SpawnedSubsystem, SubsystemError,
29};
30use polkadot_node_subsystem_types::{
31 messages::{BitfieldDistributionMessage, NetworkBridgeEvent, StatementDistributionMessage},
32 OverseerSignal,
33};
34use sc_network::{request_responses::ProtocolConfig, RequestFailure};
35
36const LOG_TARGET: &str = "subsystem-bench::network-bridge";
37const ALLOWED_PROTOCOLS: &[&str] = &[
38 "/ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff/req_chunk/2",
39 "/ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff/req_attested_candidate/2",
40 "/ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff/send_dispute/1",
41];
42
43pub struct MockNetworkBridgeTx {
45 network: NetworkEmulatorHandle,
47 to_network_interface: UnboundedSender<NetworkMessage>,
49 test_authorities: TestAuthorities,
51}
52
53pub struct MockNetworkBridgeRx {
55 network_receiver: NetworkInterfaceReceiver,
57 chunk_request_sender: Option<ProtocolConfig>,
59}
60
61impl MockNetworkBridgeTx {
62 pub fn new(
63 network: NetworkEmulatorHandle,
64 to_network_interface: UnboundedSender<NetworkMessage>,
65 test_authorities: TestAuthorities,
66 ) -> MockNetworkBridgeTx {
67 Self { network, to_network_interface, test_authorities }
68 }
69}
70
71impl MockNetworkBridgeRx {
72 pub fn new(
73 network_receiver: NetworkInterfaceReceiver,
74 chunk_request_sender: Option<ProtocolConfig>,
75 ) -> MockNetworkBridgeRx {
76 Self { network_receiver, chunk_request_sender }
77 }
78}
79
80#[overseer::subsystem(NetworkBridgeTx, error=SubsystemError, prefix=self::overseer)]
81impl<Context> MockNetworkBridgeTx {
82 fn start(self, ctx: Context) -> SpawnedSubsystem {
83 let future = self.run(ctx).map(|_| Ok(())).boxed();
84
85 SpawnedSubsystem { name: "network-bridge-tx", future }
86 }
87}
88
89#[overseer::subsystem(NetworkBridgeRx, error=SubsystemError, prefix=self::overseer)]
90impl<Context> MockNetworkBridgeRx {
91 fn start(self, ctx: Context) -> SpawnedSubsystem {
92 let future = self.run(ctx).map(|_| Ok(())).boxed();
93
94 SpawnedSubsystem { name: "network-bridge-rx", future }
95 }
96}
97
98#[overseer::contextbounds(NetworkBridgeTx, prefix = self::overseer)]
99impl MockNetworkBridgeTx {
100 async fn run<Context>(self, mut ctx: Context) {
101 loop {
103 let subsystem_message = ctx.recv().await.expect("Overseer never fails us");
104 match subsystem_message {
105 orchestra::FromOrchestra::Signal(signal) =>
106 if signal == OverseerSignal::Conclude {
107 return
108 },
109 orchestra::FromOrchestra::Communication { msg } => match msg {
110 NetworkBridgeTxMessage::SendRequests(requests, _if_disconnected) => {
111 for request in requests {
112 gum::debug!(target: LOG_TARGET, request = ?request, "Processing request");
113 let peer_id = match request.authority_id() {
114 Some(v) => v.clone(),
115 None => self
116 .test_authorities
117 .peer_id_to_authority
118 .get(request.peer_id().expect("Should exist"))
119 .expect("Should exist")
120 .clone(),
121 };
122
123 if !self.network.is_peer_connected(&peer_id) {
124 request
126 .into_response_sender()
127 .send(Err(RequestFailure::NotConnected))
128 .expect("send never fails");
129 continue
130 }
131
132 let peer_message =
133 NetworkMessage::RequestFromNode(peer_id.clone(), Box::new(request));
134
135 let _ = self.to_network_interface.unbounded_send(peer_message);
136 }
137 },
138 NetworkBridgeTxMessage::ReportPeer(_) => {
139 },
141 NetworkBridgeTxMessage::SendValidationMessage(peers, message) => {
142 for peer in peers {
143 self.to_network_interface
144 .unbounded_send(NetworkMessage::MessageFromNode(
145 self.test_authorities
146 .peer_id_to_authority
147 .get(&peer)
148 .unwrap()
149 .clone(),
150 message.clone(),
151 ))
152 .expect("Should not fail");
153 }
154 },
155 NetworkBridgeTxMessage::SendValidationMessages(messages) => {
156 for (peers, message) in messages {
157 for peer in peers {
158 self.to_network_interface
159 .unbounded_send(NetworkMessage::MessageFromNode(
160 self.test_authorities
161 .peer_id_to_authority
162 .get(&peer)
163 .unwrap()
164 .clone(),
165 message.clone(),
166 ))
167 .expect("Should not fail");
168 }
169 }
170 },
171 message => unimplemented!("Unexpected network bridge message {:?}", message),
172 },
173 }
174 }
175 }
176}
177
178#[overseer::contextbounds(NetworkBridgeRx, prefix = self::overseer)]
179impl MockNetworkBridgeRx {
180 async fn run<Context>(mut self, mut ctx: Context) {
181 let mut from_network_interface = self.network_receiver.0;
183 loop {
184 futures::select! {
185 maybe_peer_message = from_network_interface.next() => {
186 if let Some(message) = maybe_peer_message {
187 match message {
188 NetworkMessage::MessageFromPeer(peer_id, message) => match message {
189 ValidationProtocols::V3(
190 polkadot_node_network_protocol::v3::ValidationProtocol::BitfieldDistribution(
191 bitfield,
192 ),
193 ) => {
194 ctx.send_message(
195 BitfieldDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(peer_id, polkadot_node_network_protocol::ValidationProtocols::V3(bitfield)))
196 ).await;
197 },
198 ValidationProtocols::V3(
199 polkadot_node_network_protocol::v3::ValidationProtocol::ApprovalDistribution(msg)
200 ) => {
201 ctx.send_message(
202 ApprovalVotingParallelMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(peer_id, polkadot_node_network_protocol::ValidationProtocols::V3(msg)))
203 ).await;
204 }
205 ValidationProtocols::V3(
206 polkadot_node_network_protocol::v3::ValidationProtocol::StatementDistribution(msg)
207 ) => {
208 ctx.send_message(
209 StatementDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(peer_id, polkadot_node_network_protocol::ValidationProtocols::V3(msg)))
210 ).await;
211 },
212 },
213 NetworkMessage::RequestFromPeer(request) => {
214 if let Some(protocol) = self.chunk_request_sender.as_mut() {
215 assert!(ALLOWED_PROTOCOLS.contains(&&*protocol.name), "Unexpected protocol {:?}", protocol.name);
216 if let Some(inbound_queue) = protocol.inbound_queue.as_ref() {
217 inbound_queue
218 .send(request)
219 .await
220 .expect("Forwarding requests to subsystem never fails");
221 }
222 }
223 },
224 _ => {
225 panic!("NetworkMessage::RequestFromNode is not expected to be received from a peer")
226 }
227 }
228 }
229 },
230 subsystem_message = ctx.recv().fuse() => {
231 match subsystem_message.expect("Overseer never fails us") {
232 orchestra::FromOrchestra::Signal(signal) => if signal == OverseerSignal::Conclude { return },
233 _ => {
234 unimplemented!("Unexpected network bridge rx message")
235 },
236 }
237 }
238 }
239 }
240 }
241}