polkadot_subsystem_bench/mock/
av_store.rs1use crate::network::{HandleNetworkMessage, NetworkMessage};
20use codec::Encode;
21use futures::{channel::oneshot, FutureExt};
22use polkadot_node_network_protocol::request_response::{
23 v1::AvailableDataFetchingResponse, v2::ChunkFetchingResponse, Protocol, ReqProtocolNames,
24 Requests,
25};
26use polkadot_node_primitives::{AvailableData, ErasureChunk};
27use polkadot_node_subsystem::{
28 messages::AvailabilityStoreMessage, overseer, SpawnedSubsystem, SubsystemError,
29};
30use polkadot_node_subsystem_types::OverseerSignal;
31use polkadot_primitives::{CandidateHash, ChunkIndex, CoreIndex, ValidatorIndex};
32use std::collections::HashMap;
33
34pub struct AvailabilityStoreState {
35 candidate_hashes: HashMap<CandidateHash, usize>,
36 chunks: Vec<Vec<ErasureChunk>>,
37 chunk_indices: Vec<Vec<ChunkIndex>>,
38 candidate_hash_to_core_index: HashMap<CandidateHash, CoreIndex>,
39}
40
41const LOG_TARGET: &str = "subsystem-bench::av-store-mock";
42
43#[derive(Clone)]
46pub struct NetworkAvailabilityState {
47 pub req_protocol_names: ReqProtocolNames,
48 pub candidate_hashes: HashMap<CandidateHash, usize>,
49 pub available_data: Vec<AvailableData>,
50 pub chunks: Vec<Vec<ErasureChunk>>,
51 pub chunk_indices: Vec<Vec<ChunkIndex>>,
52 pub candidate_hash_to_core_index: HashMap<CandidateHash, CoreIndex>,
53}
54
55#[async_trait::async_trait]
57impl HandleNetworkMessage for NetworkAvailabilityState {
58 async fn handle(
59 &self,
60 message: NetworkMessage,
61 _node_sender: &mut futures::channel::mpsc::UnboundedSender<NetworkMessage>,
62 ) -> Option<NetworkMessage> {
63 match message {
64 NetworkMessage::RequestFromNode(peer, request) => match *request {
65 Requests::ChunkFetching(outgoing_request) => {
66 gum::debug!(target: LOG_TARGET, request = ?outgoing_request, "Received `RequestFromNode`");
67 let validator_index: usize = outgoing_request.payload.index.0 as usize;
68 let candidate_hash = outgoing_request.payload.candidate_hash;
69
70 let candidate_index = self
71 .candidate_hashes
72 .get(&candidate_hash)
73 .expect("candidate was generated previously; qed");
74 gum::warn!(target: LOG_TARGET, ?candidate_hash, candidate_index, "Candidate mapped to index");
75
76 let candidate_chunks = self.chunks.get(*candidate_index).unwrap();
77 let chunk_indices = self
78 .chunk_indices
79 .get(
80 self.candidate_hash_to_core_index.get(&candidate_hash).unwrap().0
81 as usize,
82 )
83 .unwrap();
84
85 let chunk = candidate_chunks
86 .get(chunk_indices.get(validator_index).unwrap().0 as usize)
87 .unwrap();
88
89 let response = Ok((
90 ChunkFetchingResponse::from(Some(chunk.clone())).encode(),
91 self.req_protocol_names.get_name(Protocol::ChunkFetchingV2),
92 ));
93
94 if let Err(err) = outgoing_request.pending_response.send(response) {
95 gum::error!(target: LOG_TARGET, ?err, "Failed to send `ChunkFetchingResponse`");
96 }
97
98 None
99 },
100 Requests::AvailableDataFetchingV1(outgoing_request) => {
101 let candidate_hash = outgoing_request.payload.candidate_hash;
102 let candidate_index = self
103 .candidate_hashes
104 .get(&candidate_hash)
105 .expect("candidate was generated previously; qed");
106 gum::debug!(target: LOG_TARGET, ?candidate_hash, candidate_index, "Candidate mapped to index");
107
108 let available_data = self.available_data.get(*candidate_index).unwrap().clone();
109
110 let response = Ok((
111 AvailableDataFetchingResponse::from(Some(available_data)).encode(),
112 self.req_protocol_names.get_name(Protocol::AvailableDataFetchingV1),
113 ));
114 outgoing_request
115 .pending_response
116 .send(response)
117 .expect("Response is always sent successfully");
118 None
119 },
120 _ => Some(NetworkMessage::RequestFromNode(peer, request)),
121 },
122
123 message => Some(message),
124 }
125 }
126}
127
128pub struct MockAvailabilityStore {
131 state: AvailabilityStoreState,
132}
133
134impl MockAvailabilityStore {
135 pub fn new(
136 chunks: Vec<Vec<ErasureChunk>>,
137 chunk_indices: Vec<Vec<ChunkIndex>>,
138 candidate_hashes: HashMap<CandidateHash, usize>,
139 candidate_hash_to_core_index: HashMap<CandidateHash, CoreIndex>,
140 ) -> MockAvailabilityStore {
141 Self {
142 state: AvailabilityStoreState {
143 chunks,
144 candidate_hashes,
145 chunk_indices,
146 candidate_hash_to_core_index,
147 },
148 }
149 }
150
151 async fn respond_to_query_all_request(
152 &self,
153 candidate_hash: CandidateHash,
154 send_chunk: impl Fn(ValidatorIndex) -> bool,
155 tx: oneshot::Sender<Vec<(ValidatorIndex, ErasureChunk)>>,
156 ) {
157 let candidate_index = self
158 .state
159 .candidate_hashes
160 .get(&candidate_hash)
161 .expect("candidate was generated previously; qed");
162 gum::debug!(target: LOG_TARGET, ?candidate_hash, candidate_index, "Candidate mapped to index");
163
164 let n_validators = self.state.chunks[0].len();
165 let candidate_chunks = self.state.chunks.get(*candidate_index).unwrap();
166 let core_index = self.state.candidate_hash_to_core_index.get(&candidate_hash).unwrap();
167 let mut v = Vec::with_capacity(1);
169
170 for validator_index in 0..n_validators {
171 if !send_chunk(ValidatorIndex(validator_index as u32)) {
172 continue;
173 }
174 let chunk_index = self
175 .state
176 .chunk_indices
177 .get(core_index.0 as usize)
178 .unwrap()
179 .get(validator_index)
180 .unwrap();
181
182 let chunk = candidate_chunks.get(chunk_index.0 as usize).unwrap().clone();
183 v.push((ValidatorIndex(validator_index as u32), chunk.clone()));
184 }
185
186 let _ = tx.send(v);
187 }
188}
189
190#[overseer::subsystem(AvailabilityStore, error=SubsystemError, prefix=self::overseer)]
191impl<Context> MockAvailabilityStore {
192 fn start(self, ctx: Context) -> SpawnedSubsystem {
193 let future = self.run(ctx).map(|_| Ok(())).boxed();
194
195 SpawnedSubsystem { name: "test-environment", future }
196 }
197}
198
199#[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)]
200impl MockAvailabilityStore {
201 async fn run<Context>(self, mut ctx: Context) {
202 gum::debug!(target: LOG_TARGET, "Subsystem running");
203 loop {
204 let msg = ctx.recv().await.expect("Overseer never fails us");
205
206 match msg {
207 orchestra::FromOrchestra::Signal(signal) =>
208 if signal == OverseerSignal::Conclude {
209 return
210 },
211 orchestra::FromOrchestra::Communication { msg } => match msg {
212 AvailabilityStoreMessage::QueryAvailableData(candidate_hash, tx) => {
213 gum::debug!(target: LOG_TARGET, candidate_hash = ?candidate_hash, "Responding to QueryAvailableData");
214
215 let _ = tx.send(None);
217 },
218 AvailabilityStoreMessage::QueryAllChunks(candidate_hash, tx) => {
219 gum::debug!(target: LOG_TARGET, candidate_hash = ?candidate_hash, "Responding to QueryAllChunks");
221 self.respond_to_query_all_request(
222 candidate_hash,
223 |index| index == 0.into(),
224 tx,
225 )
226 .await;
227 },
228 AvailabilityStoreMessage::QueryChunkSize(candidate_hash, tx) => {
229 gum::debug!(target: LOG_TARGET, candidate_hash = ?candidate_hash, "Responding to QueryChunkSize");
230
231 let candidate_index = self
232 .state
233 .candidate_hashes
234 .get(&candidate_hash)
235 .expect("candidate was generated previously; qed");
236 gum::debug!(target: LOG_TARGET, ?candidate_hash, candidate_index, "Candidate mapped to index");
237
238 let chunk_size = self
239 .state
240 .chunks
241 .get(*candidate_index)
242 .unwrap()
243 .first()
244 .unwrap()
245 .encoded_size();
246 let _ = tx.send(Some(chunk_size));
247 },
248 AvailabilityStoreMessage::StoreChunk {
249 candidate_hash,
250 chunk,
251 tx,
252 validator_index,
253 } => {
254 gum::debug!(
255 target: LOG_TARGET,
256 chunk_index = ?chunk.index,
257 validator_index = ?validator_index,
258 candidate_hash = ?candidate_hash,
259 "Responding to StoreChunk"
260 );
261 let _ = tx.send(Ok(()));
262 },
263 _ => {
264 unimplemented!("Unexpected av-store message")
265 },
266 },
267 }
268 }
269 }
270}