1use codec::Encode;
19use frame_storage_access_test_runtime::StorageAccessParams;
20use log::{debug, info, trace, warn};
21use rand::prelude::*;
22use sc_cli::Result;
23use sc_client_api::{Backend as ClientBackend, StorageProvider, UsageProvider};
24use sc_client_db::{DbHash, DbState, DbStateBuilder};
25use sp_blockchain::HeaderBackend;
26use sp_database::{ColumnId, Transaction};
27use sp_runtime::traits::{Block as BlockT, HashingFor, Header as HeaderT};
28use sp_state_machine::Backend as StateBackend;
29use sp_storage::{ChildInfo, StateVersion};
30use sp_trie::{recorder::Recorder, PrefixedMemoryDB};
31use std::{
32 fmt::Debug,
33 sync::Arc,
34 time::{Duration, Instant},
35};
36
37use super::{cmd::StorageCmd, get_wasm_module, MAX_BATCH_SIZE_FOR_BLOCK_VALIDATION};
38use crate::shared::{new_rng, BenchRecord};
39
40impl StorageCmd {
41 pub(crate) fn bench_write<Block, BA, H, C>(
49 &self,
50 client: Arc<C>,
51 (db, state_col): (Arc<dyn sp_database::Database<DbHash>>, ColumnId),
52 storage: Arc<dyn sp_state_machine::Storage<HashingFor<Block>>>,
53 shared_trie_cache: Option<sp_trie::cache::SharedTrieCache<HashingFor<Block>>>,
54 ) -> Result<BenchRecord>
55 where
56 Block: BlockT<Header = H, Hash = DbHash> + Debug,
57 H: HeaderT<Hash = DbHash>,
58 BA: ClientBackend<Block>,
59 C: UsageProvider<Block> + HeaderBackend<Block> + StorageProvider<Block, BA>,
60 {
61 if self.params.is_validate_block_mode() && self.params.disable_pov_recorder {
62 return Err("PoV recorder must be activated to provide a storage proof for block validation at runtime. Remove `--disable-pov-recorder`.".into())
63 }
64 if self.params.is_validate_block_mode() &&
65 self.params.batch_size > MAX_BATCH_SIZE_FOR_BLOCK_VALIDATION
66 {
67 return Err(format!("Batch size is too large. This may cause problems with runtime memory allocation. Better set `--batch-size {}` or less.", MAX_BATCH_SIZE_FOR_BLOCK_VALIDATION).into())
68 }
69
70 let mut record = BenchRecord::default();
72
73 let best_hash = client.usage_info().chain.best_hash;
74 let header = client.header(best_hash)?.ok_or("Header not found")?;
75 let original_root = *header.state_root();
76
77 let (trie, _) = self.create_trie_backend::<Block, H>(
78 original_root,
79 &storage,
80 shared_trie_cache.as_ref(),
81 );
82
83 info!("Preparing keys from block {}", best_hash);
84 let mut kvs: Vec<_> = trie.pairs(Default::default())?.collect();
86 let (mut rng, _) = new_rng(None);
87 kvs.shuffle(&mut rng);
88 if kvs.is_empty() {
89 return Err("Can't process benchmarking with empty storage".into())
90 }
91
92 info!("Writing {} keys in batches of {}", kvs.len(), self.params.batch_size);
93 let remainder = kvs.len() % self.params.batch_size;
94 if self.params.is_validate_block_mode() && remainder != 0 {
95 info!("Remaining `{remainder}` keys will be skipped");
96 }
97
98 let mut child_nodes = Vec::new();
99 let mut batched_keys = Vec::new();
100 for key_value in kvs {
104 let (k, original_v) = key_value?;
105 match (self.params.include_child_trees, self.is_child_key(k.to_vec())) {
106 (true, Some(info)) => {
107 let child_keys = client
108 .child_storage_keys(best_hash, info.clone(), None, None)?
109 .collect::<Vec<_>>();
110 child_nodes.push((child_keys, info.clone()));
111 },
112 _ => {
113 let mut new_v = vec![0; original_v.len()];
115
116 loop {
117 rng.fill_bytes(&mut new_v[..]);
121 if check_new_value::<Block>(
122 db.clone(),
123 &trie,
124 &k.to_vec(),
125 &new_v,
126 self.state_version(),
127 state_col,
128 None,
129 ) {
130 break
131 }
132 }
133
134 batched_keys.push((k.to_vec(), new_v.to_vec()));
135 if batched_keys.len() < self.params.batch_size {
136 continue
137 }
138
139 let (size, duration) = if self.params.is_validate_block_mode() {
141 self.measure_per_key_amortised_validate_block_write_cost::<Block, H>(
142 original_root,
143 &storage,
144 shared_trie_cache.as_ref(),
145 batched_keys.clone(),
146 None,
147 )?
148 } else {
149 self.measure_per_key_amortised_import_block_write_cost::<Block, H>(
150 original_root,
151 &storage,
152 shared_trie_cache.as_ref(),
153 db.clone(),
154 batched_keys.clone(),
155 self.state_version(),
156 state_col,
157 None,
158 )?
159 };
160 record.append(size, duration)?;
161 batched_keys.clear();
162 },
163 }
164 }
165
166 if self.params.include_child_trees && !child_nodes.is_empty() {
167 info!("Writing {} child keys", child_nodes.iter().map(|(c, _)| c.len()).sum::<usize>());
168 for (mut child_keys, info) in child_nodes {
169 if child_keys.len() < self.params.batch_size {
170 warn!(
171 "{} child keys will be skipped because it's less than batch size",
172 child_keys.len()
173 );
174 continue;
175 }
176
177 child_keys.shuffle(&mut rng);
178
179 for key in child_keys {
180 if let Some(original_v) = client
181 .child_storage(best_hash, &info, &key)
182 .expect("Checked above to exist")
183 {
184 let mut new_v = vec![0; original_v.0.len()];
185
186 loop {
187 rng.fill_bytes(&mut new_v[..]);
188 if check_new_value::<Block>(
189 db.clone(),
190 &trie,
191 &key.0,
192 &new_v,
193 self.state_version(),
194 state_col,
195 Some(&info),
196 ) {
197 break
198 }
199 }
200 batched_keys.push((key.0, new_v.to_vec()));
201 if batched_keys.len() < self.params.batch_size {
202 continue
203 }
204
205 let (size, duration) = if self.params.is_validate_block_mode() {
206 self.measure_per_key_amortised_validate_block_write_cost::<Block, H>(
207 original_root,
208 &storage,
209 shared_trie_cache.as_ref(),
210 batched_keys.clone(),
211 None,
212 )?
213 } else {
214 self.measure_per_key_amortised_import_block_write_cost::<Block, H>(
215 original_root,
216 &storage,
217 shared_trie_cache.as_ref(),
218 db.clone(),
219 batched_keys.clone(),
220 self.state_version(),
221 state_col,
222 Some(&info),
223 )?
224 };
225 record.append(size, duration)?;
226 batched_keys.clear();
227 }
228 }
229 }
230 }
231
232 Ok(record)
233 }
234
235 fn create_trie_backend<Block, H>(
236 &self,
237 original_root: Block::Hash,
238 storage: &Arc<dyn sp_state_machine::Storage<HashingFor<Block>>>,
239 shared_trie_cache: Option<&sp_trie::cache::SharedTrieCache<HashingFor<Block>>>,
240 ) -> (DbState<HashingFor<Block>>, Option<Recorder<HashingFor<Block>>>)
241 where
242 Block: BlockT<Header = H, Hash = DbHash> + Debug,
243 H: HeaderT<Hash = DbHash>,
244 {
245 let recorder = (!self.params.disable_pov_recorder).then(|| Default::default());
246 let trie = DbStateBuilder::<HashingFor<Block>>::new(storage.clone(), original_root)
247 .with_optional_cache(shared_trie_cache.map(|c| c.local_cache_trusted()))
248 .with_optional_recorder(recorder.clone())
249 .build();
250
251 (trie, recorder)
252 }
253
254 fn measure_per_key_amortised_import_block_write_cost<Block, H>(
257 &self,
258 original_root: Block::Hash,
259 storage: &Arc<dyn sp_state_machine::Storage<HashingFor<Block>>>,
260 shared_trie_cache: Option<&sp_trie::cache::SharedTrieCache<HashingFor<Block>>>,
261 db: Arc<dyn sp_database::Database<DbHash>>,
262 changes: Vec<(Vec<u8>, Vec<u8>)>,
263 version: StateVersion,
264 col: ColumnId,
265 child_info: Option<&ChildInfo>,
266 ) -> Result<(usize, Duration)>
267 where
268 Block: BlockT<Header = H, Hash = DbHash> + Debug,
269 H: HeaderT<Hash = DbHash>,
270 {
271 let batch_size = changes.len();
272 let average_len = changes.iter().map(|(_, v)| v.len()).sum::<usize>() / batch_size;
273 let (trie, _recorder) =
276 self.create_trie_backend::<Block, H>(original_root, storage, shared_trie_cache);
277
278 let start = Instant::now();
279 let replace = changes
282 .iter()
283 .map(|(key, new_v)| (key.as_ref(), Some(new_v.as_ref())))
284 .collect::<Vec<_>>();
285 let stx = match child_info {
286 Some(info) => trie.child_storage_root(info, replace.iter().cloned(), version).2,
287 None => trie.storage_root(replace.iter().cloned(), version).1,
288 };
289 let tx = convert_tx::<Block>(db.clone(), stx.clone(), false, col);
291 db.commit(tx).map_err(|e| format!("Writing to the Database: {}", e))?;
292 let result = (average_len, start.elapsed() / batch_size as u32);
293
294 let tx = convert_tx::<Block>(db.clone(), stx.clone(), true, col);
296 db.commit(tx).map_err(|e| format!("Writing to the Database: {}", e))?;
297
298 Ok(result)
299 }
300
301 fn measure_per_key_amortised_validate_block_write_cost<Block, H>(
304 &self,
305 original_root: Block::Hash,
306 storage: &Arc<dyn sp_state_machine::Storage<HashingFor<Block>>>,
307 shared_trie_cache: Option<&sp_trie::cache::SharedTrieCache<HashingFor<Block>>>,
308 changes: Vec<(Vec<u8>, Vec<u8>)>,
309 maybe_child_info: Option<&ChildInfo>,
310 ) -> Result<(usize, Duration)>
311 where
312 Block: BlockT<Header = H, Hash = DbHash> + Debug,
313 H: HeaderT<Hash = DbHash>,
314 {
315 let batch_size = changes.len();
316 let average_len = changes.iter().map(|(_, v)| v.len()).sum::<usize>() / batch_size;
317 let (trie, recorder) =
318 self.create_trie_backend::<Block, H>(original_root, storage, shared_trie_cache);
319 for (key, _) in changes.iter() {
320 let _v = trie
321 .storage(key)
322 .expect("Checked above to exist")
323 .ok_or("Value unexpectedly empty")?;
324 }
325 let storage_proof = recorder
326 .map(|r| r.drain_storage_proof())
327 .expect("Storage proof must exist for block validation");
328 let root = trie.root();
329 debug!(
330 "POV: len {:?} {:?}",
331 storage_proof.len(),
332 storage_proof.clone().encoded_compact_size::<HashingFor<Block>>(*root)
333 );
334 let params = StorageAccessParams::<Block>::new_write(
335 *root,
336 storage_proof,
337 (changes, maybe_child_info.cloned()),
338 );
339
340 let mut durations_in_nanos = Vec::new();
341 let wasm_module = get_wasm_module();
342 let mut instance = wasm_module.new_instance().expect("Failed to create wasm instance");
343 let dry_run_encoded = params.as_dry_run().encode();
344 let encoded = params.encode();
345
346 for i in 1..=self.params.validate_block_rounds {
347 info!(
348 "validate_block with {} keys, round {}/{}",
349 batch_size, i, self.params.validate_block_rounds
350 );
351
352 let dry_run_start = Instant::now();
354 instance
355 .call_export("validate_block", &dry_run_encoded)
356 .expect("Failed to call validate_block");
357 let dry_run_elapsed = dry_run_start.elapsed();
358 debug!("validate_block dry-run time {:?}", dry_run_elapsed);
359
360 let start = Instant::now();
361 instance
362 .call_export("validate_block", &encoded)
363 .expect("Failed to call validate_block");
364 let elapsed = start.elapsed();
365 debug!("validate_block time {:?}", elapsed);
366
367 durations_in_nanos.push(
368 elapsed.saturating_sub(dry_run_elapsed).as_nanos() as u64 / batch_size as u64,
369 );
370 }
371
372 let result = (
373 average_len,
374 std::time::Duration::from_nanos(
375 durations_in_nanos.iter().sum::<u64>() / durations_in_nanos.len() as u64,
376 ),
377 );
378
379 Ok(result)
380 }
381}
382
383fn convert_tx<B: BlockT>(
387 db: Arc<dyn sp_database::Database<DbHash>>,
388 mut tx: PrefixedMemoryDB<HashingFor<B>>,
389 invert_inserts: bool,
390 col: ColumnId,
391) -> Transaction<DbHash> {
392 let mut ret = Transaction::<DbHash>::default();
393
394 for (mut k, (v, rc)) in tx.drain().into_iter() {
395 if rc > 0 {
396 db.sanitize_key(&mut k);
397 if invert_inserts {
398 ret.remove(col, &k);
399 } else {
400 ret.set(col, &k, &v);
401 }
402 }
403 }
406 ret
407}
408
409fn check_new_value<Block: BlockT>(
413 db: Arc<dyn sp_database::Database<DbHash>>,
414 trie: &DbState<HashingFor<Block>>,
415 key: &Vec<u8>,
416 new_v: &Vec<u8>,
417 version: StateVersion,
418 col: ColumnId,
419 child_info: Option<&ChildInfo>,
420) -> bool {
421 let new_kv = vec![(key.as_ref(), Some(new_v.as_ref()))];
422 let mut stx = match child_info {
423 Some(info) => trie.child_storage_root(info, new_kv.iter().cloned(), version).2,
424 None => trie.storage_root(new_kv.iter().cloned(), version).1,
425 };
426 for (mut k, (_, rc)) in stx.drain().into_iter() {
427 if rc > 0 {
428 db.sanitize_key(&mut k);
429 if db.get(col, &k).is_some() {
430 trace!("Benchmark-store key creation: Key collision detected, retry");
431 return false
432 }
433 }
434 }
435 true
436}