referrerpolicy=no-referrer-when-downgrade

polkadot_subsystem_bench/mock/
av_store.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! A generic av store subsystem mockup suitable to be used in benchmarks.
18
19use crate::network::{HandleNetworkMessage, NetworkMessage};
20use codec::Encode;
21use futures::{channel::oneshot, FutureExt};
22use polkadot_node_network_protocol::request_response::{
23	v1::AvailableDataFetchingResponse, v2::ChunkFetchingResponse, Protocol, ReqProtocolNames,
24	Requests,
25};
26use polkadot_node_primitives::{AvailableData, ErasureChunk};
27use polkadot_node_subsystem::{
28	messages::AvailabilityStoreMessage, overseer, SpawnedSubsystem, SubsystemError,
29};
30use polkadot_node_subsystem_types::OverseerSignal;
31use polkadot_primitives::{CandidateHash, ChunkIndex, CoreIndex, ValidatorIndex};
32use std::collections::HashMap;
33
34pub struct AvailabilityStoreState {
35	candidate_hashes: HashMap<CandidateHash, usize>,
36	chunks: Vec<Vec<ErasureChunk>>,
37	chunk_indices: Vec<Vec<ChunkIndex>>,
38	candidate_hash_to_core_index: HashMap<CandidateHash, CoreIndex>,
39}
40
41const LOG_TARGET: &str = "subsystem-bench::av-store-mock";
42
43/// Mockup helper. Contains Chunks and full availability data of all parachain blocks
44/// used in a test.
45#[derive(Clone)]
46pub struct NetworkAvailabilityState {
47	pub req_protocol_names: ReqProtocolNames,
48	pub candidate_hashes: HashMap<CandidateHash, usize>,
49	pub available_data: Vec<AvailableData>,
50	pub chunks: Vec<Vec<ErasureChunk>>,
51	pub chunk_indices: Vec<Vec<ChunkIndex>>,
52	pub candidate_hash_to_core_index: HashMap<CandidateHash, CoreIndex>,
53}
54
55// Implement access to the state.
56#[async_trait::async_trait]
57impl HandleNetworkMessage for NetworkAvailabilityState {
58	async fn handle(
59		&self,
60		message: NetworkMessage,
61		_node_sender: &mut futures::channel::mpsc::UnboundedSender<NetworkMessage>,
62	) -> Option<NetworkMessage> {
63		match message {
64			NetworkMessage::RequestFromNode(peer, request) => match *request {
65				Requests::ChunkFetching(outgoing_request) => {
66					gum::debug!(target: LOG_TARGET, request = ?outgoing_request, "Received `RequestFromNode`");
67					let validator_index: usize = outgoing_request.payload.index.0 as usize;
68					let candidate_hash = outgoing_request.payload.candidate_hash;
69
70					let candidate_index = self
71						.candidate_hashes
72						.get(&candidate_hash)
73						.expect("candidate was generated previously; qed");
74					gum::warn!(target: LOG_TARGET, ?candidate_hash, candidate_index, "Candidate mapped to index");
75
76					let candidate_chunks = self.chunks.get(*candidate_index).unwrap();
77					let chunk_indices = self
78						.chunk_indices
79						.get(
80							self.candidate_hash_to_core_index.get(&candidate_hash).unwrap().0
81								as usize,
82						)
83						.unwrap();
84
85					let chunk = candidate_chunks
86						.get(chunk_indices.get(validator_index).unwrap().0 as usize)
87						.unwrap();
88
89					let response = Ok((
90						ChunkFetchingResponse::from(Some(chunk.clone())).encode(),
91						self.req_protocol_names.get_name(Protocol::ChunkFetchingV2),
92					));
93
94					if let Err(err) = outgoing_request.pending_response.send(response) {
95						gum::error!(target: LOG_TARGET, ?err, "Failed to send `ChunkFetchingResponse`");
96					}
97
98					None
99				},
100				Requests::AvailableDataFetchingV1(outgoing_request) => {
101					let candidate_hash = outgoing_request.payload.candidate_hash;
102					let candidate_index = self
103						.candidate_hashes
104						.get(&candidate_hash)
105						.expect("candidate was generated previously; qed");
106					gum::debug!(target: LOG_TARGET, ?candidate_hash, candidate_index, "Candidate mapped to index");
107
108					let available_data = self.available_data.get(*candidate_index).unwrap().clone();
109
110					let response = Ok((
111						AvailableDataFetchingResponse::from(Some(available_data)).encode(),
112						self.req_protocol_names.get_name(Protocol::AvailableDataFetchingV1),
113					));
114					outgoing_request
115						.pending_response
116						.send(response)
117						.expect("Response is always sent successfully");
118					None
119				},
120				_ => Some(NetworkMessage::RequestFromNode(peer, request)),
121			},
122
123			message => Some(message),
124		}
125	}
126}
127
128/// A mock of the availability store subsystem. This one also generates all the
129/// candidates that a
130pub struct MockAvailabilityStore {
131	state: AvailabilityStoreState,
132}
133
134impl MockAvailabilityStore {
135	pub fn new(
136		chunks: Vec<Vec<ErasureChunk>>,
137		chunk_indices: Vec<Vec<ChunkIndex>>,
138		candidate_hashes: HashMap<CandidateHash, usize>,
139		candidate_hash_to_core_index: HashMap<CandidateHash, CoreIndex>,
140	) -> MockAvailabilityStore {
141		Self {
142			state: AvailabilityStoreState {
143				chunks,
144				candidate_hashes,
145				chunk_indices,
146				candidate_hash_to_core_index,
147			},
148		}
149	}
150
151	async fn respond_to_query_all_request(
152		&self,
153		candidate_hash: CandidateHash,
154		send_chunk: impl Fn(ValidatorIndex) -> bool,
155		tx: oneshot::Sender<Vec<(ValidatorIndex, ErasureChunk)>>,
156	) {
157		let candidate_index = self
158			.state
159			.candidate_hashes
160			.get(&candidate_hash)
161			.expect("candidate was generated previously; qed");
162		gum::debug!(target: LOG_TARGET, ?candidate_hash, candidate_index, "Candidate mapped to index");
163
164		let n_validators = self.state.chunks[0].len();
165		let candidate_chunks = self.state.chunks.get(*candidate_index).unwrap();
166		let core_index = self.state.candidate_hash_to_core_index.get(&candidate_hash).unwrap();
167		// We'll likely only send our chunk, so use capacity 1.
168		let mut v = Vec::with_capacity(1);
169
170		for validator_index in 0..n_validators {
171			if !send_chunk(ValidatorIndex(validator_index as u32)) {
172				continue;
173			}
174			let chunk_index = self
175				.state
176				.chunk_indices
177				.get(core_index.0 as usize)
178				.unwrap()
179				.get(validator_index)
180				.unwrap();
181
182			let chunk = candidate_chunks.get(chunk_index.0 as usize).unwrap().clone();
183			v.push((ValidatorIndex(validator_index as u32), chunk.clone()));
184		}
185
186		let _ = tx.send(v);
187	}
188}
189
190#[overseer::subsystem(AvailabilityStore, error=SubsystemError, prefix=self::overseer)]
191impl<Context> MockAvailabilityStore {
192	fn start(self, ctx: Context) -> SpawnedSubsystem {
193		let future = self.run(ctx).map(|_| Ok(())).boxed();
194
195		SpawnedSubsystem { name: "test-environment", future }
196	}
197}
198
199#[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)]
200impl MockAvailabilityStore {
201	async fn run<Context>(self, mut ctx: Context) {
202		gum::debug!(target: LOG_TARGET, "Subsystem running");
203		loop {
204			let msg = ctx.recv().await.expect("Overseer never fails us");
205
206			match msg {
207				orchestra::FromOrchestra::Signal(signal) =>
208					if signal == OverseerSignal::Conclude {
209						return
210					},
211				orchestra::FromOrchestra::Communication { msg } => match msg {
212					AvailabilityStoreMessage::QueryAvailableData(candidate_hash, tx) => {
213						gum::debug!(target: LOG_TARGET, candidate_hash = ?candidate_hash, "Responding to QueryAvailableData");
214
215						// We never have the full available data.
216						let _ = tx.send(None);
217					},
218					AvailabilityStoreMessage::QueryAllChunks(candidate_hash, tx) => {
219						// We always have our own chunk.
220						gum::debug!(target: LOG_TARGET, candidate_hash = ?candidate_hash, "Responding to QueryAllChunks");
221						self.respond_to_query_all_request(
222							candidate_hash,
223							|index| index == 0.into(),
224							tx,
225						)
226						.await;
227					},
228					AvailabilityStoreMessage::QueryChunkSize(candidate_hash, tx) => {
229						gum::debug!(target: LOG_TARGET, candidate_hash = ?candidate_hash, "Responding to QueryChunkSize");
230
231						let candidate_index = self
232							.state
233							.candidate_hashes
234							.get(&candidate_hash)
235							.expect("candidate was generated previously; qed");
236						gum::debug!(target: LOG_TARGET, ?candidate_hash, candidate_index, "Candidate mapped to index");
237
238						let chunk_size = self
239							.state
240							.chunks
241							.get(*candidate_index)
242							.unwrap()
243							.first()
244							.unwrap()
245							.encoded_size();
246						let _ = tx.send(Some(chunk_size));
247					},
248					AvailabilityStoreMessage::StoreChunk {
249						candidate_hash,
250						chunk,
251						tx,
252						validator_index,
253					} => {
254						gum::debug!(
255							target: LOG_TARGET,
256							chunk_index = ?chunk.index,
257							validator_index = ?validator_index,
258							candidate_hash = ?candidate_hash,
259							"Responding to StoreChunk"
260						);
261						let _ = tx.send(Ok(()));
262					},
263					_ => {
264						unimplemented!("Unexpected av-store message")
265					},
266				},
267			}
268		}
269	}
270}