referrerpolicy=no-referrer-when-downgrade

frame_benchmarking_cli/storage/
write.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18use codec::Encode;
19use frame_storage_access_test_runtime::StorageAccessParams;
20use log::{debug, info, trace, warn};
21use rand::prelude::*;
22use sc_cli::Result;
23use sc_client_api::{Backend as ClientBackend, StorageProvider, UsageProvider};
24use sc_client_db::{DbHash, DbState, DbStateBuilder};
25use sp_blockchain::HeaderBackend;
26use sp_database::{ColumnId, Transaction};
27use sp_runtime::traits::{Block as BlockT, HashingFor, Header as HeaderT};
28use sp_state_machine::Backend as StateBackend;
29use sp_storage::{ChildInfo, StateVersion};
30use sp_trie::{recorder::Recorder, PrefixedMemoryDB};
31use std::{
32	fmt::Debug,
33	sync::Arc,
34	time::{Duration, Instant},
35};
36
37use super::{cmd::StorageCmd, get_wasm_module, MAX_BATCH_SIZE_FOR_BLOCK_VALIDATION};
38use crate::shared::{new_rng, BenchRecord};
39
40impl StorageCmd {
41	/// Benchmarks the time it takes to write a single Storage item.
42	///
43	/// Uses the latest state that is available for the given client.
44	///
45	/// Unlike reading benchmark, where we read every single key, here we write a batch of keys in
46	/// one time. So writing a remaining keys with the size much smaller than batch size can
47	/// dramatically distort the results. To avoid this, we skip the remaining keys.
48	pub(crate) fn bench_write<Block, BA, H, C>(
49		&self,
50		client: Arc<C>,
51		(db, state_col): (Arc<dyn sp_database::Database<DbHash>>, ColumnId),
52		storage: Arc<dyn sp_state_machine::Storage<HashingFor<Block>>>,
53		shared_trie_cache: Option<sp_trie::cache::SharedTrieCache<HashingFor<Block>>>,
54	) -> Result<BenchRecord>
55	where
56		Block: BlockT<Header = H, Hash = DbHash> + Debug,
57		H: HeaderT<Hash = DbHash>,
58		BA: ClientBackend<Block>,
59		C: UsageProvider<Block> + HeaderBackend<Block> + StorageProvider<Block, BA>,
60	{
61		if self.params.is_validate_block_mode() && self.params.disable_pov_recorder {
62			return Err("PoV recorder must be activated to provide a storage proof for block validation at runtime. Remove `--disable-pov-recorder`.".into())
63		}
64		if self.params.is_validate_block_mode() &&
65			self.params.batch_size > MAX_BATCH_SIZE_FOR_BLOCK_VALIDATION
66		{
67			return Err(format!("Batch size is too large. This may cause problems with runtime memory allocation. Better set `--batch-size {}` or less.", MAX_BATCH_SIZE_FOR_BLOCK_VALIDATION).into())
68		}
69
70		// Store the time that it took to write each value.
71		let mut record = BenchRecord::default();
72
73		let best_hash = client.usage_info().chain.best_hash;
74		let header = client.header(best_hash)?.ok_or("Header not found")?;
75		let original_root = *header.state_root();
76
77		let (trie, _) = self.create_trie_backend::<Block, H>(
78			original_root,
79			&storage,
80			shared_trie_cache.as_ref(),
81		);
82
83		info!("Preparing keys from block {}", best_hash);
84		// Load all KV pairs and randomly shuffle them.
85		let mut kvs: Vec<_> = trie.pairs(Default::default())?.collect();
86		let (mut rng, _) = new_rng(None);
87		kvs.shuffle(&mut rng);
88		if kvs.is_empty() {
89			return Err("Can't process benchmarking with empty storage".into())
90		}
91
92		info!("Writing {} keys in batches of {}", kvs.len(), self.params.batch_size);
93		let remainder = kvs.len() % self.params.batch_size;
94		if self.params.is_validate_block_mode() && remainder != 0 {
95			info!("Remaining `{remainder}` keys will be skipped");
96		}
97
98		let mut child_nodes = Vec::new();
99		let mut batched_keys = Vec::new();
100		// Generate all random values first; Make sure there are no collisions with existing
101		// db entries, so we can rollback all additions without corrupting existing entries.
102
103		for key_value in kvs {
104			let (k, original_v) = key_value?;
105			match (self.params.include_child_trees, self.is_child_key(k.to_vec())) {
106				(true, Some(info)) => {
107					let child_keys = client
108						.child_storage_keys(best_hash, info.clone(), None, None)?
109						.collect::<Vec<_>>();
110					child_nodes.push((child_keys, info.clone()));
111				},
112				_ => {
113					// regular key
114					let mut new_v = vec![0; original_v.len()];
115
116					loop {
117						// Create a random value to overwrite with.
118						// NOTE: We use a possibly higher entropy than the original value,
119						// could be improved but acts as an over-estimation which is fine for now.
120						rng.fill_bytes(&mut new_v[..]);
121						if check_new_value::<Block>(
122							db.clone(),
123							&trie,
124							&k.to_vec(),
125							&new_v,
126							self.state_version(),
127							state_col,
128							None,
129						) {
130							break
131						}
132					}
133
134					batched_keys.push((k.to_vec(), new_v.to_vec()));
135					if batched_keys.len() < self.params.batch_size {
136						continue
137					}
138
139					// Write each value in one commit.
140					let (size, duration) = if self.params.is_validate_block_mode() {
141						self.measure_per_key_amortised_validate_block_write_cost::<Block, H>(
142							original_root,
143							&storage,
144							shared_trie_cache.as_ref(),
145							batched_keys.clone(),
146							None,
147						)?
148					} else {
149						self.measure_per_key_amortised_import_block_write_cost::<Block, H>(
150							original_root,
151							&storage,
152							shared_trie_cache.as_ref(),
153							db.clone(),
154							batched_keys.clone(),
155							self.state_version(),
156							state_col,
157							None,
158						)?
159					};
160					record.append(size, duration)?;
161					batched_keys.clear();
162				},
163			}
164		}
165
166		if self.params.include_child_trees && !child_nodes.is_empty() {
167			info!("Writing {} child keys", child_nodes.iter().map(|(c, _)| c.len()).sum::<usize>());
168			for (mut child_keys, info) in child_nodes {
169				if child_keys.len() < self.params.batch_size {
170					warn!(
171						"{} child keys will be skipped because it's less than batch size",
172						child_keys.len()
173					);
174					continue;
175				}
176
177				child_keys.shuffle(&mut rng);
178
179				for key in child_keys {
180					if let Some(original_v) = client
181						.child_storage(best_hash, &info, &key)
182						.expect("Checked above to exist")
183					{
184						let mut new_v = vec![0; original_v.0.len()];
185
186						loop {
187							rng.fill_bytes(&mut new_v[..]);
188							if check_new_value::<Block>(
189								db.clone(),
190								&trie,
191								&key.0,
192								&new_v,
193								self.state_version(),
194								state_col,
195								Some(&info),
196							) {
197								break
198							}
199						}
200						batched_keys.push((key.0, new_v.to_vec()));
201						if batched_keys.len() < self.params.batch_size {
202							continue
203						}
204
205						let (size, duration) = if self.params.is_validate_block_mode() {
206							self.measure_per_key_amortised_validate_block_write_cost::<Block, H>(
207								original_root,
208								&storage,
209								shared_trie_cache.as_ref(),
210								batched_keys.clone(),
211								None,
212							)?
213						} else {
214							self.measure_per_key_amortised_import_block_write_cost::<Block, H>(
215								original_root,
216								&storage,
217								shared_trie_cache.as_ref(),
218								db.clone(),
219								batched_keys.clone(),
220								self.state_version(),
221								state_col,
222								Some(&info),
223							)?
224						};
225						record.append(size, duration)?;
226						batched_keys.clear();
227					}
228				}
229			}
230		}
231
232		Ok(record)
233	}
234
235	fn create_trie_backend<Block, H>(
236		&self,
237		original_root: Block::Hash,
238		storage: &Arc<dyn sp_state_machine::Storage<HashingFor<Block>>>,
239		shared_trie_cache: Option<&sp_trie::cache::SharedTrieCache<HashingFor<Block>>>,
240	) -> (DbState<HashingFor<Block>>, Option<Recorder<HashingFor<Block>>>)
241	where
242		Block: BlockT<Header = H, Hash = DbHash> + Debug,
243		H: HeaderT<Hash = DbHash>,
244	{
245		let recorder = (!self.params.disable_pov_recorder).then(|| Default::default());
246		let trie = DbStateBuilder::<HashingFor<Block>>::new(storage.clone(), original_root)
247			.with_optional_cache(shared_trie_cache.map(|c| c.local_cache_trusted()))
248			.with_optional_recorder(recorder.clone())
249			.build();
250
251		(trie, recorder)
252	}
253
254	/// Measures write benchmark
255	/// if `child_info` exist then it means this is a child tree key
256	fn measure_per_key_amortised_import_block_write_cost<Block, H>(
257		&self,
258		original_root: Block::Hash,
259		storage: &Arc<dyn sp_state_machine::Storage<HashingFor<Block>>>,
260		shared_trie_cache: Option<&sp_trie::cache::SharedTrieCache<HashingFor<Block>>>,
261		db: Arc<dyn sp_database::Database<DbHash>>,
262		changes: Vec<(Vec<u8>, Vec<u8>)>,
263		version: StateVersion,
264		col: ColumnId,
265		child_info: Option<&ChildInfo>,
266	) -> Result<(usize, Duration)>
267	where
268		Block: BlockT<Header = H, Hash = DbHash> + Debug,
269		H: HeaderT<Hash = DbHash>,
270	{
271		let batch_size = changes.len();
272		let average_len = changes.iter().map(|(_, v)| v.len()).sum::<usize>() / batch_size;
273		// For every batched write use a different trie instance and recorder, so we
274		// don't benefit from past runs.
275		let (trie, _recorder) =
276			self.create_trie_backend::<Block, H>(original_root, storage, shared_trie_cache);
277
278		let start = Instant::now();
279		// Create a TX that will modify the Trie in the DB and
280		// calculate the root hash of the Trie after the modification.
281		let replace = changes
282			.iter()
283			.map(|(key, new_v)| (key.as_ref(), Some(new_v.as_ref())))
284			.collect::<Vec<_>>();
285		let stx = match child_info {
286			Some(info) => trie.child_storage_root(info, replace.iter().cloned(), version).2,
287			None => trie.storage_root(replace.iter().cloned(), version).1,
288		};
289		// Only the keep the insertions, since we do not want to benchmark pruning.
290		let tx = convert_tx::<Block>(db.clone(), stx.clone(), false, col);
291		db.commit(tx).map_err(|e| format!("Writing to the Database: {}", e))?;
292		let result = (average_len, start.elapsed() / batch_size as u32);
293
294		// Now undo the changes by removing what was added.
295		let tx = convert_tx::<Block>(db.clone(), stx.clone(), true, col);
296		db.commit(tx).map_err(|e| format!("Writing to the Database: {}", e))?;
297
298		Ok(result)
299	}
300
301	/// Measures write benchmark on block validation
302	/// if `child_info` exist then it means this is a child tree key
303	fn measure_per_key_amortised_validate_block_write_cost<Block, H>(
304		&self,
305		original_root: Block::Hash,
306		storage: &Arc<dyn sp_state_machine::Storage<HashingFor<Block>>>,
307		shared_trie_cache: Option<&sp_trie::cache::SharedTrieCache<HashingFor<Block>>>,
308		changes: Vec<(Vec<u8>, Vec<u8>)>,
309		maybe_child_info: Option<&ChildInfo>,
310	) -> Result<(usize, Duration)>
311	where
312		Block: BlockT<Header = H, Hash = DbHash> + Debug,
313		H: HeaderT<Hash = DbHash>,
314	{
315		let batch_size = changes.len();
316		let average_len = changes.iter().map(|(_, v)| v.len()).sum::<usize>() / batch_size;
317		let (trie, recorder) =
318			self.create_trie_backend::<Block, H>(original_root, storage, shared_trie_cache);
319		for (key, _) in changes.iter() {
320			let _v = trie
321				.storage(key)
322				.expect("Checked above to exist")
323				.ok_or("Value unexpectedly empty")?;
324		}
325		let storage_proof = recorder
326			.map(|r| r.drain_storage_proof())
327			.expect("Storage proof must exist for block validation");
328		let root = trie.root();
329		debug!(
330			"POV: len {:?} {:?}",
331			storage_proof.len(),
332			storage_proof.clone().encoded_compact_size::<HashingFor<Block>>(*root)
333		);
334		let params = StorageAccessParams::<Block>::new_write(
335			*root,
336			storage_proof,
337			(changes, maybe_child_info.cloned()),
338		);
339
340		let mut durations_in_nanos = Vec::new();
341		let wasm_module = get_wasm_module();
342		let mut instance = wasm_module.new_instance().expect("Failed to create wasm instance");
343		let dry_run_encoded = params.as_dry_run().encode();
344		let encoded = params.encode();
345
346		for i in 1..=self.params.validate_block_rounds {
347			info!(
348				"validate_block with {} keys, round {}/{}",
349				batch_size, i, self.params.validate_block_rounds
350			);
351
352			// Dry run to get the time it takes without storage access
353			let dry_run_start = Instant::now();
354			instance
355				.call_export("validate_block", &dry_run_encoded)
356				.expect("Failed to call validate_block");
357			let dry_run_elapsed = dry_run_start.elapsed();
358			debug!("validate_block dry-run time {:?}", dry_run_elapsed);
359
360			let start = Instant::now();
361			instance
362				.call_export("validate_block", &encoded)
363				.expect("Failed to call validate_block");
364			let elapsed = start.elapsed();
365			debug!("validate_block time {:?}", elapsed);
366
367			durations_in_nanos.push(
368				elapsed.saturating_sub(dry_run_elapsed).as_nanos() as u64 / batch_size as u64,
369			);
370		}
371
372		let result = (
373			average_len,
374			std::time::Duration::from_nanos(
375				durations_in_nanos.iter().sum::<u64>() / durations_in_nanos.len() as u64,
376			),
377		);
378
379		Ok(result)
380	}
381}
382
383/// Converts a Trie transaction into a DB transaction.
384/// Removals are ignored and will not be included in the final tx.
385/// `invert_inserts` replaces all inserts with removals.
386fn convert_tx<B: BlockT>(
387	db: Arc<dyn sp_database::Database<DbHash>>,
388	mut tx: PrefixedMemoryDB<HashingFor<B>>,
389	invert_inserts: bool,
390	col: ColumnId,
391) -> Transaction<DbHash> {
392	let mut ret = Transaction::<DbHash>::default();
393
394	for (mut k, (v, rc)) in tx.drain().into_iter() {
395		if rc > 0 {
396			db.sanitize_key(&mut k);
397			if invert_inserts {
398				ret.remove(col, &k);
399			} else {
400				ret.set(col, &k, &v);
401			}
402		}
403		// < 0 means removal - ignored.
404		// 0 means no modification.
405	}
406	ret
407}
408
409/// Checks if a new value causes any collision in tree updates
410/// returns true if there is no collision
411/// if `child_info` exist then it means this is a child tree key
412fn check_new_value<Block: BlockT>(
413	db: Arc<dyn sp_database::Database<DbHash>>,
414	trie: &DbState<HashingFor<Block>>,
415	key: &Vec<u8>,
416	new_v: &Vec<u8>,
417	version: StateVersion,
418	col: ColumnId,
419	child_info: Option<&ChildInfo>,
420) -> bool {
421	let new_kv = vec![(key.as_ref(), Some(new_v.as_ref()))];
422	let mut stx = match child_info {
423		Some(info) => trie.child_storage_root(info, new_kv.iter().cloned(), version).2,
424		None => trie.storage_root(new_kv.iter().cloned(), version).1,
425	};
426	for (mut k, (_, rc)) in stx.drain().into_iter() {
427		if rc > 0 {
428			db.sanitize_key(&mut k);
429			if db.get(col, &k).is_some() {
430				trace!("Benchmark-store key creation: Key collision detected, retry");
431				return false
432			}
433		}
434	}
435	true
436}