referrerpolicy=no-referrer-when-downgrade

frame_remote_externalities/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// 	http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! # Remote Externalities
19//!
20//! An equivalent of `sp_io::TestExternalities` that can load its state from a remote substrate
21//! based chain, or a local state snapshot file.
22
23mod logging;
24
25use codec::{Compact, Decode, Encode};
26use indicatif::{ProgressBar, ProgressStyle};
27use jsonrpsee::{core::params::ArrayParams, http_client::HttpClient};
28use log::*;
29use serde::de::DeserializeOwned;
30use sp_core::{
31	hexdisplay::HexDisplay,
32	storage::{
33		well_known_keys::{is_default_child_storage_key, DEFAULT_CHILD_STORAGE_KEY_PREFIX},
34		ChildInfo, ChildType, PrefixedStorageKey, StorageData, StorageKey,
35	},
36};
37use sp_runtime::{
38	traits::{Block as BlockT, HashingFor},
39	StateVersion,
40};
41use sp_state_machine::TestExternalities;
42use std::{
43	cmp::{max, min},
44	fs,
45	ops::{Deref, DerefMut},
46	path::{Path, PathBuf},
47	sync::Arc,
48	time::{Duration, Instant},
49};
50use substrate_rpc_client::{rpc_params, BatchRequestBuilder, ChainApi, ClientT, StateApi};
51use tokio_retry::{strategy::FixedInterval, Retry};
52
53type Result<T, E = &'static str> = std::result::Result<T, E>;
54
55type KeyValue = (StorageKey, StorageData);
56type TopKeyValues = Vec<KeyValue>;
57type ChildKeyValues = Vec<(ChildInfo, Vec<KeyValue>)>;
58type SnapshotVersion = Compact<u16>;
59
60const LOG_TARGET: &str = "remote-ext";
61const DEFAULT_HTTP_ENDPOINT: &str = "https://try-runtime.polkadot.io:443";
62const SNAPSHOT_VERSION: SnapshotVersion = Compact(4);
63
64/// The snapshot that we store on disk.
65#[derive(Decode, Encode)]
66struct Snapshot<B: BlockT> {
67	snapshot_version: SnapshotVersion,
68	state_version: StateVersion,
69	// <Vec<Key, (Value, MemoryDbRefCount)>>
70	raw_storage: Vec<(Vec<u8>, (Vec<u8>, i32))>,
71	// The storage root of the state. This may vary from the storage root in the header, if not the
72	// entire state was fetched.
73	storage_root: B::Hash,
74	header: B::Header,
75}
76
77impl<B: BlockT> Snapshot<B> {
78	pub fn new(
79		state_version: StateVersion,
80		raw_storage: Vec<(Vec<u8>, (Vec<u8>, i32))>,
81		storage_root: B::Hash,
82		header: B::Header,
83	) -> Self {
84		Self {
85			snapshot_version: SNAPSHOT_VERSION,
86			state_version,
87			raw_storage,
88			storage_root,
89			header,
90		}
91	}
92
93	fn load(path: &PathBuf) -> Result<Snapshot<B>> {
94		let bytes = fs::read(path).map_err(|_| "fs::read failed.")?;
95		// The first item in the SCALE encoded struct bytes is the snapshot version. We decode and
96		// check that first, before proceeding to decode the rest of the snapshot.
97		let snapshot_version = SnapshotVersion::decode(&mut &*bytes)
98			.map_err(|_| "Failed to decode snapshot version")?;
99
100		if snapshot_version != SNAPSHOT_VERSION {
101			return Err("Unsupported snapshot version detected. Please create a new snapshot.")
102		}
103
104		Decode::decode(&mut &*bytes).map_err(|_| "Decode failed")
105	}
106}
107
108/// An externalities that acts exactly the same as [`sp_io::TestExternalities`] but has a few extra
109/// bits and pieces to it, and can be loaded remotely.
110pub struct RemoteExternalities<B: BlockT> {
111	/// The inner externalities.
112	pub inner_ext: TestExternalities<HashingFor<B>>,
113	/// The block header which we created this externality env.
114	pub header: B::Header,
115}
116
117impl<B: BlockT> Deref for RemoteExternalities<B> {
118	type Target = TestExternalities<HashingFor<B>>;
119	fn deref(&self) -> &Self::Target {
120		&self.inner_ext
121	}
122}
123
124impl<B: BlockT> DerefMut for RemoteExternalities<B> {
125	fn deref_mut(&mut self) -> &mut Self::Target {
126		&mut self.inner_ext
127	}
128}
129
130/// The execution mode.
131#[derive(Clone)]
132pub enum Mode<H> {
133	/// Online. Potentially writes to a snapshot file.
134	Online(OnlineConfig<H>),
135	/// Offline. Uses a state snapshot file and needs not any client config.
136	Offline(OfflineConfig),
137	/// Prefer using a snapshot file if it exists, else use a remote server.
138	OfflineOrElseOnline(OfflineConfig, OnlineConfig<H>),
139}
140
141impl<H> Default for Mode<H> {
142	fn default() -> Self {
143		Mode::Online(OnlineConfig::default())
144	}
145}
146
147/// Configuration of the offline execution.
148///
149/// A state snapshot config must be present.
150#[derive(Clone)]
151pub struct OfflineConfig {
152	/// The configuration of the state snapshot file to use. It must be present.
153	pub state_snapshot: SnapshotConfig,
154}
155
156/// Description of the transport protocol (for online execution).
157#[derive(Debug, Clone)]
158pub enum Transport {
159	/// Use the `URI` to open a new WebSocket connection.
160	Uri(String),
161	/// Use HTTP connection.
162	RemoteClient(HttpClient),
163}
164
165impl Transport {
166	fn as_client(&self) -> Option<&HttpClient> {
167		match self {
168			Self::RemoteClient(client) => Some(client),
169			_ => None,
170		}
171	}
172
173	// Build an HttpClient from a URI.
174	async fn init(&mut self) -> Result<()> {
175		if let Self::Uri(uri) = self {
176			debug!(target: LOG_TARGET, "initializing remote client to {uri:?}");
177
178			let http_client = HttpClient::builder()
179				.max_request_size(u32::MAX)
180				.max_response_size(u32::MAX)
181				.request_timeout(std::time::Duration::from_secs(60 * 5))
182				.build(uri)
183				.map_err(|e| {
184					error!(target: LOG_TARGET, "error: {e:?}");
185					"failed to build http client"
186				})?;
187
188			*self = Self::RemoteClient(http_client)
189		}
190
191		Ok(())
192	}
193}
194
195impl From<String> for Transport {
196	fn from(uri: String) -> Self {
197		Transport::Uri(uri)
198	}
199}
200
201impl From<HttpClient> for Transport {
202	fn from(client: HttpClient) -> Self {
203		Transport::RemoteClient(client)
204	}
205}
206
207/// Configuration of the online execution.
208///
209/// A state snapshot config may be present and will be written to in that case.
210#[derive(Clone)]
211pub struct OnlineConfig<H> {
212	/// The block hash at which to get the runtime state. Will be latest finalized head if not
213	/// provided.
214	pub at: Option<H>,
215	/// An optional state snapshot file to WRITE to, not for reading. Not written if set to `None`.
216	pub state_snapshot: Option<SnapshotConfig>,
217	/// The pallets to scrape. These values are hashed and added to `hashed_prefix`.
218	pub pallets: Vec<String>,
219	/// Transport config.
220	pub transport: Transport,
221	/// Lookout for child-keys, and scrape them as well if set to true.
222	pub child_trie: bool,
223	/// Storage entry key prefixes to be injected into the externalities. The *hashed* prefix must
224	/// be given.
225	pub hashed_prefixes: Vec<Vec<u8>>,
226	/// Storage entry keys to be injected into the externalities. The *hashed* key must be given.
227	pub hashed_keys: Vec<Vec<u8>>,
228}
229
230impl<H: Clone> OnlineConfig<H> {
231	/// Return rpc (http) client reference.
232	fn rpc_client(&self) -> &HttpClient {
233		self.transport
234			.as_client()
235			.expect("http client must have been initialized by now; qed.")
236	}
237
238	fn at_expected(&self) -> H {
239		self.at.clone().expect("block at must be initialized; qed")
240	}
241}
242
243impl<H> Default for OnlineConfig<H> {
244	fn default() -> Self {
245		Self {
246			transport: Transport::from(DEFAULT_HTTP_ENDPOINT.to_owned()),
247			child_trie: true,
248			at: None,
249			state_snapshot: None,
250			pallets: Default::default(),
251			hashed_keys: Default::default(),
252			hashed_prefixes: Default::default(),
253		}
254	}
255}
256
257impl<H> From<String> for OnlineConfig<H> {
258	fn from(t: String) -> Self {
259		Self { transport: t.into(), ..Default::default() }
260	}
261}
262
263/// Configuration of the state snapshot.
264#[derive(Clone)]
265pub struct SnapshotConfig {
266	/// The path to the snapshot file.
267	pub path: PathBuf,
268}
269
270impl SnapshotConfig {
271	pub fn new<P: Into<PathBuf>>(path: P) -> Self {
272		Self { path: path.into() }
273	}
274}
275
276impl From<String> for SnapshotConfig {
277	fn from(s: String) -> Self {
278		Self::new(s)
279	}
280}
281
282impl Default for SnapshotConfig {
283	fn default() -> Self {
284		Self { path: Path::new("SNAPSHOT").into() }
285	}
286}
287
288/// Builder for remote-externalities.
289#[derive(Clone)]
290pub struct Builder<B: BlockT> {
291	/// Custom key-pairs to be injected into the final externalities. The *hashed* keys and values
292	/// must be given.
293	hashed_key_values: Vec<KeyValue>,
294	/// The keys that will be excluded from the final externality. The *hashed* key must be given.
295	hashed_blacklist: Vec<Vec<u8>>,
296	/// Connectivity mode, online or offline.
297	mode: Mode<B::Hash>,
298	/// If provided, overwrite the state version with this. Otherwise, the state_version of the
299	/// remote node is used. All cache files also store their state version.
300	///
301	/// Overwrite only with care.
302	overwrite_state_version: Option<StateVersion>,
303}
304
305impl<B: BlockT> Default for Builder<B> {
306	fn default() -> Self {
307		Self {
308			mode: Default::default(),
309			hashed_key_values: Default::default(),
310			hashed_blacklist: Default::default(),
311			overwrite_state_version: None,
312		}
313	}
314}
315
316// Mode methods
317impl<B: BlockT> Builder<B> {
318	fn as_online(&self) -> &OnlineConfig<B::Hash> {
319		match &self.mode {
320			Mode::Online(config) => config,
321			Mode::OfflineOrElseOnline(_, config) => config,
322			_ => panic!("Unexpected mode: Online"),
323		}
324	}
325
326	fn as_online_mut(&mut self) -> &mut OnlineConfig<B::Hash> {
327		match &mut self.mode {
328			Mode::Online(config) => config,
329			Mode::OfflineOrElseOnline(_, config) => config,
330			_ => panic!("Unexpected mode: Online"),
331		}
332	}
333}
334
335// RPC methods
336impl<B: BlockT> Builder<B>
337where
338	B::Hash: DeserializeOwned,
339	B::Header: DeserializeOwned,
340{
341	const PARALLEL_REQUESTS: usize = 4;
342	const BATCH_SIZE_INCREASE_FACTOR: f32 = 1.10;
343	const BATCH_SIZE_DECREASE_FACTOR: f32 = 0.50;
344	const REQUEST_DURATION_TARGET: Duration = Duration::from_secs(15);
345	const INITIAL_BATCH_SIZE: usize = 10;
346	// nodes by default will not return more than 1000 keys per request
347	const DEFAULT_KEY_DOWNLOAD_PAGE: u32 = 1000;
348	const MAX_RETRIES: usize = 12;
349	const KEYS_PAGE_RETRY_INTERVAL: Duration = Duration::from_secs(5);
350
351	async fn rpc_get_storage(
352		&self,
353		key: StorageKey,
354		maybe_at: Option<B::Hash>,
355	) -> Result<Option<StorageData>> {
356		trace!(target: LOG_TARGET, "rpc: get_storage");
357		self.as_online().rpc_client().storage(key, maybe_at).await.map_err(|e| {
358			error!(target: LOG_TARGET, "Error = {e:?}");
359			"rpc get_storage failed."
360		})
361	}
362
363	/// Get the latest finalized head.
364	async fn rpc_get_head(&self) -> Result<B::Hash> {
365		trace!(target: LOG_TARGET, "rpc: finalized_head");
366
367		// sadly this pretty much unreadable...
368		ChainApi::<(), _, B::Header, ()>::finalized_head(self.as_online().rpc_client())
369			.await
370			.map_err(|e| {
371				error!(target: LOG_TARGET, "Error = {e:?}");
372				"rpc finalized_head failed."
373			})
374	}
375
376	async fn get_keys_single_page(
377		&self,
378		prefix: Option<StorageKey>,
379		start_key: Option<StorageKey>,
380		at: B::Hash,
381	) -> Result<Vec<StorageKey>> {
382		self.as_online()
383			.rpc_client()
384			.storage_keys_paged(prefix, Self::DEFAULT_KEY_DOWNLOAD_PAGE, start_key, Some(at))
385			.await
386			.map_err(|e| {
387				error!(target: LOG_TARGET, "Error = {e:?}");
388				"rpc get_keys failed"
389			})
390	}
391
392	/// Get keys with `prefix` at `block` in a parallel manner.
393	async fn rpc_get_keys_parallel(
394		&self,
395		prefix: &StorageKey,
396		block: B::Hash,
397		parallel: usize,
398	) -> Result<Vec<StorageKey>> {
399		/// Divide the workload and return the start key of each chunks. Guaranteed to return a
400		/// non-empty list.
401		fn gen_start_keys(prefix: &StorageKey) -> Vec<StorageKey> {
402			let mut prefix = prefix.as_ref().to_vec();
403			let scale = 32usize.saturating_sub(prefix.len());
404
405			// no need to divide workload
406			if scale < 9 {
407				prefix.extend(vec![0; scale]);
408				return vec![StorageKey(prefix)]
409			}
410
411			let chunks = 16;
412			let step = 0x10000 / chunks;
413			let ext = scale - 2;
414
415			(0..chunks)
416				.map(|i| {
417					let mut key = prefix.clone();
418					let start = i * step;
419					key.extend(vec![(start >> 8) as u8, (start & 0xff) as u8]);
420					key.extend(vec![0; ext]);
421					StorageKey(key)
422				})
423				.collect()
424		}
425
426		let start_keys = gen_start_keys(&prefix);
427		let start_keys: Vec<Option<&StorageKey>> = start_keys.iter().map(Some).collect();
428		let mut end_keys: Vec<Option<&StorageKey>> = start_keys[1..].to_vec();
429		end_keys.push(None);
430
431		// use a semaphore to limit max scraping tasks
432		let parallel = Arc::new(tokio::sync::Semaphore::new(parallel));
433		let builder = Arc::new(self.clone());
434		let mut handles = vec![];
435
436		for (start_key, end_key) in start_keys.into_iter().zip(end_keys) {
437			let permit = parallel
438				.clone()
439				.acquire_owned()
440				.await
441				.expect("semaphore is not closed until the end of loop");
442
443			let builder = builder.clone();
444			let prefix = prefix.clone();
445			let start_key = start_key.cloned();
446			let end_key = end_key.cloned();
447
448			let handle = tokio::spawn(async move {
449				let res = builder
450					.rpc_get_keys_in_range(&prefix, block, start_key.as_ref(), end_key.as_ref())
451					.await;
452				drop(permit);
453				res
454			});
455
456			handles.push(handle);
457		}
458
459		parallel.close();
460
461		let keys = futures::future::join_all(handles)
462			.await
463			.into_iter()
464			.filter_map(|res| match res {
465				Ok(Ok(keys)) => Some(keys),
466				_ => None,
467			})
468			.flatten()
469			.collect::<Vec<StorageKey>>();
470
471		Ok(keys)
472	}
473
474	/// Get all keys with `prefix` within the given range at `block`.
475	/// Both `start_key` and `end_key` are optional if you want an open-ended range.
476	async fn rpc_get_keys_in_range(
477		&self,
478		prefix: &StorageKey,
479		block: B::Hash,
480		start_key: Option<&StorageKey>,
481		end_key: Option<&StorageKey>,
482	) -> Result<Vec<StorageKey>> {
483		let mut last_key: Option<&StorageKey> = start_key;
484		let mut keys: Vec<StorageKey> = vec![];
485
486		loop {
487			// This loop can hit the node with very rapid requests, occasionally causing it to
488			// error out in CI (https://github.com/paritytech/substrate/issues/14129), so we retry.
489			let retry_strategy =
490				FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES);
491			let get_page_closure =
492				|| self.get_keys_single_page(Some(prefix.clone()), last_key.cloned(), block);
493			let mut page = Retry::spawn(retry_strategy, get_page_closure).await?;
494
495			// avoid duplicated keys across workloads
496			if let (Some(last), Some(end)) = (page.last(), end_key) {
497				if last >= end {
498					page.retain(|key| key < end);
499				}
500			}
501
502			let page_len = page.len();
503			keys.extend(page);
504			last_key = keys.last();
505
506			// scraping out of range or no more matches,
507			// we are done either way
508			if page_len < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize {
509				debug!(target: LOG_TARGET, "last page received: {page_len}");
510				break
511			}
512
513			debug!(
514				target: LOG_TARGET,
515				"new total = {}, full page received: {}",
516				keys.len(),
517				HexDisplay::from(last_key.expect("full page received, cannot be None"))
518			);
519		}
520
521		Ok(keys)
522	}
523
524	/// Fetches storage data from a node using a dynamic batch size.
525	///
526	/// This function adjusts the batch size on the fly to help prevent overwhelming the node with
527	/// large batch requests, and stay within request size limits enforced by the node.
528	///
529	/// # Arguments
530	///
531	/// * `client` - An `Arc` wrapped `HttpClient` used for making the requests.
532	/// * `payloads` - A vector of tuples containing a JSONRPC method name and `ArrayParams`
533	///
534	/// # Returns
535	///
536	/// Returns a `Result` with a vector of `Option<StorageData>`, where each element corresponds to
537	/// the storage data for the given method and parameters. The result will be an `Err` with a
538	/// `String` error message if the request fails.
539	///
540	/// # Errors
541	///
542	/// This function will return an error if:
543	/// * The batch request fails and the batch size is less than 2.
544	/// * There are invalid batch params.
545	/// * There is an error in the batch response.
546	///
547	/// # Example
548	///
549	/// ```ignore
550	/// use your_crate::{get_storage_data_dynamic_batch_size, HttpClient, ArrayParams};
551	/// use std::sync::Arc;
552	///
553	/// async fn example() {
554	///     let client = HttpClient::new();
555	///     let payloads = vec![
556	///         ("some_method".to_string(), ArrayParams::new(vec![])),
557	///         ("another_method".to_string(), ArrayParams::new(vec![])),
558	///     ];
559	///     let initial_batch_size = 10;
560	///
561	///     let storage_data = get_storage_data_dynamic_batch_size(client, payloads, batch_size).await;
562	///     match storage_data {
563	///         Ok(data) => println!("Storage data: {:?}", data),
564	///         Err(e) => eprintln!("Error fetching storage data: {}", e),
565	///     }
566	/// }
567	/// ```
568	async fn get_storage_data_dynamic_batch_size(
569		client: &HttpClient,
570		payloads: Vec<(String, ArrayParams)>,
571		bar: &ProgressBar,
572	) -> Result<Vec<Option<StorageData>>, String> {
573		let mut all_data: Vec<Option<StorageData>> = vec![];
574		let mut start_index = 0;
575		let mut retries = 0usize;
576		let mut batch_size = Self::INITIAL_BATCH_SIZE;
577		let total_payloads = payloads.len();
578
579		while start_index < total_payloads {
580			debug!(
581				target: LOG_TARGET,
582				"Remaining payloads: {} Batch request size: {batch_size}",
583				total_payloads - start_index,
584			);
585
586			let end_index = usize::min(start_index + batch_size, total_payloads);
587			let page = &payloads[start_index..end_index];
588
589			// Build the batch request
590			let mut batch = BatchRequestBuilder::new();
591			for (method, params) in page.iter() {
592				batch
593					.insert(method, params.clone())
594					.map_err(|_| "Invalid batch method and/or params")?;
595			}
596
597			let request_started = Instant::now();
598			let batch_response = match client.batch_request::<Option<StorageData>>(batch).await {
599				Ok(batch_response) => {
600					retries = 0;
601					batch_response
602				},
603				Err(e) => {
604					if retries > Self::MAX_RETRIES {
605						return Err(e.to_string())
606					}
607
608					retries += 1;
609					let failure_log = format!(
610						"Batch request failed ({retries}/{} retries). Error: {e}",
611						Self::MAX_RETRIES
612					);
613					// after 2 subsequent failures something very wrong is happening. log a warning
614					// and reset the batch size down to 1.
615					if retries >= 2 {
616						warn!("{failure_log}");
617						batch_size = 1;
618					} else {
619						debug!("{failure_log}");
620						// Decrease batch size by DECREASE_FACTOR
621						batch_size =
622							(batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize;
623					}
624					continue
625				},
626			};
627
628			let request_duration = request_started.elapsed();
629			batch_size = if request_duration > Self::REQUEST_DURATION_TARGET {
630				// Decrease batch size
631				max(1, (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize)
632			} else {
633				// Increase batch size, but not more than the remaining total payloads to process
634				min(
635					total_payloads - start_index,
636					max(
637						batch_size + 1,
638						(batch_size as f32 * Self::BATCH_SIZE_INCREASE_FACTOR) as usize,
639					),
640				)
641			};
642
643			debug!(
644				target: LOG_TARGET,
645				"Request duration: {request_duration:?} Target duration: {:?} Last batch size: {} Next batch size: {batch_size}",
646				Self::REQUEST_DURATION_TARGET,
647				end_index - start_index,
648			);
649
650			let batch_response_len = batch_response.len();
651			for item in batch_response.into_iter() {
652				match item {
653					Ok(x) => all_data.push(x),
654					Err(e) => return Err(e.message().to_string()),
655				}
656			}
657			bar.inc(batch_response_len as u64);
658
659			// Update the start index for the next iteration
660			start_index = end_index;
661		}
662
663		Ok(all_data)
664	}
665
666	/// Synonym of `getPairs` that uses paged queries to first get the keys, and then
667	/// map them to values one by one.
668	///
669	/// This can work with public nodes. But, expect it to be darn slow.
670	pub(crate) async fn rpc_get_pairs(
671		&self,
672		prefix: StorageKey,
673		at: B::Hash,
674		pending_ext: &mut TestExternalities<HashingFor<B>>,
675	) -> Result<Vec<KeyValue>> {
676		let keys = logging::with_elapsed_async(
677			|| async {
678				// TODO: We could start downloading when having collected the first batch of keys.
679				// https://github.com/paritytech/polkadot-sdk/issues/2494
680				let keys = self
681					.rpc_get_keys_parallel(&prefix, at, Self::PARALLEL_REQUESTS)
682					.await?
683					.into_iter()
684					.collect::<Vec<_>>();
685
686				Ok(keys)
687			},
688			"Scraping keys...",
689			|keys| format!("Found {} keys", keys.len()),
690		)
691		.await?;
692
693		if keys.is_empty() {
694			return Ok(Default::default())
695		}
696
697		let client = self.as_online().rpc_client();
698		let payloads = keys
699			.iter()
700			.map(|key| ("state_getStorage".to_string(), rpc_params!(key, at)))
701			.collect::<Vec<_>>();
702
703		let bar = ProgressBar::new(payloads.len() as u64);
704		bar.enable_steady_tick(Duration::from_secs(1));
705		bar.set_message("Downloading key values".to_string());
706		bar.set_style(
707			ProgressStyle::with_template(
708				"[{elapsed_precise}] {msg} {per_sec} [{wide_bar}] {pos}/{len} ({eta})",
709			)
710			.unwrap()
711			.progress_chars("=>-"),
712		);
713		let payloads_chunked = payloads.chunks((payloads.len() / Self::PARALLEL_REQUESTS).max(1));
714		let requests = payloads_chunked.map(|payload_chunk| {
715			Self::get_storage_data_dynamic_batch_size(client, payload_chunk.to_vec(), &bar)
716		});
717		// Execute the requests and move the Result outside.
718		let storage_data_result: Result<Vec<_>, _> =
719			futures::future::join_all(requests).await.into_iter().collect();
720		// Handle the Result.
721		let storage_data = match storage_data_result {
722			Ok(storage_data) => storage_data.into_iter().flatten().collect::<Vec<_>>(),
723			Err(e) => {
724				error!(target: LOG_TARGET, "Error while getting storage data: {e}");
725				return Err("Error while getting storage data")
726			},
727		};
728		bar.finish_with_message("โœ… Downloaded key values");
729		println!();
730
731		// Check if we got responses for all submitted requests.
732		assert_eq!(keys.len(), storage_data.len());
733
734		let key_values = keys
735			.iter()
736			.zip(storage_data)
737			.map(|(key, maybe_value)| match maybe_value {
738				Some(data) => (key.clone(), data),
739				None => {
740					warn!(target: LOG_TARGET, "key {key:?} had none corresponding value.");
741					let data = StorageData(vec![]);
742					(key.clone(), data)
743				},
744			})
745			.collect::<Vec<_>>();
746
747		logging::with_elapsed(
748			|| {
749				pending_ext.batch_insert(key_values.clone().into_iter().filter_map(|(k, v)| {
750					// Don't insert the child keys here, they need to be inserted separately with
751					// all their data in the load_child_remote function.
752					match is_default_child_storage_key(&k.0) {
753						true => None,
754						false => Some((k.0, v.0)),
755					}
756				}));
757
758				Ok(())
759			},
760			"Inserting keys into DB...",
761			|_| "Inserted keys into DB".into(),
762		)
763		.expect("must succeed; qed");
764
765		Ok(key_values)
766	}
767
768	/// Get the values corresponding to `child_keys` at the given `prefixed_top_key`.
769	pub(crate) async fn rpc_child_get_storage_paged(
770		client: &HttpClient,
771		prefixed_top_key: &StorageKey,
772		child_keys: Vec<StorageKey>,
773		at: B::Hash,
774	) -> Result<Vec<KeyValue>> {
775		let child_keys_len = child_keys.len();
776
777		let payloads = child_keys
778			.iter()
779			.map(|key| {
780				(
781					"childstate_getStorage".to_string(),
782					rpc_params![
783						PrefixedStorageKey::new(prefixed_top_key.as_ref().to_vec()),
784						key,
785						at
786					],
787				)
788			})
789			.collect::<Vec<_>>();
790
791		let bar = ProgressBar::new(payloads.len() as u64);
792		let storage_data =
793			match Self::get_storage_data_dynamic_batch_size(client, payloads, &bar).await {
794				Ok(storage_data) => storage_data,
795				Err(e) => {
796					error!(target: LOG_TARGET, "batch processing failed: {e:?}");
797					return Err("batch processing failed")
798				},
799			};
800
801		assert_eq!(child_keys_len, storage_data.len());
802
803		Ok(child_keys
804			.iter()
805			.zip(storage_data)
806			.map(|(key, maybe_value)| match maybe_value {
807				Some(v) => (key.clone(), v),
808				None => {
809					warn!(target: LOG_TARGET, "key {key:?} had no corresponding value.");
810					(key.clone(), StorageData(vec![]))
811				},
812			})
813			.collect::<Vec<_>>())
814	}
815
816	pub(crate) async fn rpc_child_get_keys(
817		client: &HttpClient,
818		prefixed_top_key: &StorageKey,
819		child_prefix: StorageKey,
820		at: B::Hash,
821	) -> Result<Vec<StorageKey>> {
822		let retry_strategy =
823			FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES);
824		let mut all_child_keys = Vec::new();
825		let mut start_key = None;
826
827		loop {
828			let get_child_keys_closure = || {
829				let top_key = PrefixedStorageKey::new(prefixed_top_key.0.clone());
830				substrate_rpc_client::ChildStateApi::storage_keys_paged(
831					client,
832					top_key,
833					Some(child_prefix.clone()),
834					Self::DEFAULT_KEY_DOWNLOAD_PAGE,
835					start_key.clone(),
836					Some(at),
837				)
838			};
839
840			let child_keys = Retry::spawn(retry_strategy.clone(), get_child_keys_closure)
841				.await
842				.map_err(|e| {
843					error!(target: LOG_TARGET, "Error = {e:?}");
844					"rpc child_get_keys failed."
845				})?;
846
847			let keys_count = child_keys.len();
848			if keys_count == 0 {
849				break;
850			}
851
852			start_key = child_keys.last().cloned();
853			all_child_keys.extend(child_keys);
854
855			if keys_count < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize {
856				break;
857			}
858		}
859
860		debug!(
861			target: LOG_TARGET,
862			"[thread = {:?}] scraped {} child-keys of the child-bearing top key: {}",
863			std::thread::current().id(),
864			all_child_keys.len(),
865			HexDisplay::from(prefixed_top_key)
866		);
867
868		Ok(all_child_keys)
869	}
870}
871
872impl<B: BlockT> Builder<B>
873where
874	B::Hash: DeserializeOwned,
875	B::Header: DeserializeOwned,
876{
877	/// Load all of the child keys from the remote config, given the already scraped list of top key
878	/// pairs.
879	///
880	/// `top_kv` need not be only child-bearing top keys. It should be all of the top keys that are
881	/// included thus far.
882	///
883	/// This function concurrently populates `pending_ext`. the return value is only for writing to
884	/// cache, we can also optimize further.
885	async fn load_child_remote(
886		&self,
887		top_kv: &[KeyValue],
888		pending_ext: &mut TestExternalities<HashingFor<B>>,
889	) -> Result<ChildKeyValues> {
890		let child_roots = top_kv
891			.iter()
892			.filter(|(k, _)| is_default_child_storage_key(k.as_ref()))
893			.map(|(k, _)| k.clone())
894			.collect::<Vec<_>>();
895
896		if child_roots.is_empty() {
897			info!(target: LOG_TARGET, "๐Ÿ‘ฉโ€๐Ÿ‘ฆ no child roots found to scrape");
898			return Ok(Default::default())
899		}
900
901		info!(
902			target: LOG_TARGET,
903			"๐Ÿ‘ฉโ€๐Ÿ‘ฆ scraping child-tree data from {} top keys",
904			child_roots.len(),
905		);
906
907		let at = self.as_online().at_expected();
908
909		let client = self.as_online().rpc_client();
910		let mut child_kv = vec![];
911		for prefixed_top_key in child_roots {
912			let child_keys =
913				Self::rpc_child_get_keys(client, &prefixed_top_key, StorageKey(vec![]), at).await?;
914
915			let child_kv_inner =
916				Self::rpc_child_get_storage_paged(client, &prefixed_top_key, child_keys, at)
917					.await?;
918
919			let prefixed_top_key = PrefixedStorageKey::new(prefixed_top_key.clone().0);
920			let un_prefixed = match ChildType::from_prefixed_key(&prefixed_top_key) {
921				Some((ChildType::ParentKeyId, storage_key)) => storage_key,
922				None => {
923					error!(target: LOG_TARGET, "invalid key: {prefixed_top_key:?}");
924					return Err("Invalid child key")
925				},
926			};
927
928			let info = ChildInfo::new_default(un_prefixed);
929			let key_values =
930				child_kv_inner.iter().cloned().map(|(k, v)| (k.0, v.0)).collect::<Vec<_>>();
931			child_kv.push((info.clone(), child_kv_inner));
932			for (k, v) in key_values {
933				pending_ext.insert_child(info.clone(), k, v);
934			}
935		}
936
937		Ok(child_kv)
938	}
939
940	/// Build `Self` from a network node denoted by `uri`.
941	///
942	/// This function concurrently populates `pending_ext`. the return value is only for writing to
943	/// cache, we can also optimize further.
944	async fn load_top_remote(
945		&self,
946		pending_ext: &mut TestExternalities<HashingFor<B>>,
947	) -> Result<TopKeyValues> {
948		let config = self.as_online();
949		let at = self
950			.as_online()
951			.at
952			.expect("online config must be initialized by this point; qed.");
953		info!(target: LOG_TARGET, "scraping key-pairs from remote at block height {at:?}");
954
955		let mut keys_and_values = Vec::new();
956		for prefix in &config.hashed_prefixes {
957			let now = std::time::Instant::now();
958			let additional_key_values =
959				self.rpc_get_pairs(StorageKey(prefix.to_vec()), at, pending_ext).await?;
960			let elapsed = now.elapsed();
961			info!(
962				target: LOG_TARGET,
963				"adding data for hashed prefix: {:?}, took {:.2}s",
964				HexDisplay::from(prefix),
965				elapsed.as_secs_f32()
966			);
967			keys_and_values.extend(additional_key_values);
968		}
969
970		for key in &config.hashed_keys {
971			let key = StorageKey(key.to_vec());
972			info!(
973				target: LOG_TARGET,
974				"adding data for hashed key: {:?}",
975				HexDisplay::from(&key)
976			);
977			match self.rpc_get_storage(key.clone(), Some(at)).await? {
978				Some(value) => {
979					pending_ext.insert(key.clone().0, value.clone().0);
980					keys_and_values.push((key, value));
981				},
982				None => {
983					warn!(
984						target: LOG_TARGET,
985						"no data found for hashed key: {:?}",
986						HexDisplay::from(&key)
987					);
988				},
989			}
990		}
991
992		Ok(keys_and_values)
993	}
994
995	/// The entry point of execution, if `mode` is online.
996	///
997	/// initializes the remote client in `transport`, and sets the `at` field, if not specified.
998	async fn init_remote_client(&mut self) -> Result<()> {
999		// First, initialize the http client.
1000		self.as_online_mut().transport.init().await?;
1001
1002		// Then, if `at` is not set, set it.
1003		if self.as_online().at.is_none() {
1004			let at = self.rpc_get_head().await?;
1005			info!(
1006				target: LOG_TARGET,
1007				"since no at is provided, setting it to latest finalized head, {at:?}",
1008			);
1009			self.as_online_mut().at = Some(at);
1010		}
1011
1012		// Then, a few transformation that we want to perform in the online config:
1013		let online_config = self.as_online_mut();
1014		online_config.pallets.iter().for_each(|p| {
1015			online_config
1016				.hashed_prefixes
1017				.push(sp_crypto_hashing::twox_128(p.as_bytes()).to_vec())
1018		});
1019
1020		if online_config.child_trie {
1021			online_config.hashed_prefixes.push(DEFAULT_CHILD_STORAGE_KEY_PREFIX.to_vec());
1022		}
1023
1024		// Finally, if by now, we have put any limitations on prefixes that we are interested in, we
1025		// download everything.
1026		if online_config
1027			.hashed_prefixes
1028			.iter()
1029			.filter(|p| *p != DEFAULT_CHILD_STORAGE_KEY_PREFIX)
1030			.count() == 0
1031		{
1032			info!(
1033				target: LOG_TARGET,
1034				"since no prefix is filtered, the data for all pallets will be downloaded"
1035			);
1036			online_config.hashed_prefixes.push(vec![]);
1037		}
1038
1039		Ok(())
1040	}
1041
1042	async fn load_header(&self) -> Result<B::Header> {
1043		let retry_strategy =
1044			FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES);
1045		let get_header_closure = || {
1046			ChainApi::<(), _, B::Header, ()>::header(
1047				self.as_online().rpc_client(),
1048				Some(self.as_online().at_expected()),
1049			)
1050		};
1051		Retry::spawn(retry_strategy, get_header_closure)
1052			.await
1053			.map_err(|_| "Failed to fetch header for block from network")?
1054			.ok_or("Network returned None block header")
1055	}
1056
1057	/// Load the data from a remote server. The main code path is calling into `load_top_remote` and
1058	/// `load_child_remote`.
1059	///
1060	/// Must be called after `init_remote_client`.
1061	async fn load_remote_and_maybe_save(&mut self) -> Result<TestExternalities<HashingFor<B>>> {
1062		let state_version =
1063			StateApi::<B::Hash>::runtime_version(self.as_online().rpc_client(), None)
1064				.await
1065				.map_err(|e| {
1066					error!(target: LOG_TARGET, "Error = {e:?}");
1067					"rpc runtime_version failed."
1068				})
1069				.map(|v| v.state_version())?;
1070		let mut pending_ext = TestExternalities::new_with_code_and_state(
1071			Default::default(),
1072			Default::default(),
1073			self.overwrite_state_version.unwrap_or(state_version),
1074		);
1075
1076		// Load data from the remote into `pending_ext`.
1077		let top_kv = self.load_top_remote(&mut pending_ext).await?;
1078		self.load_child_remote(&top_kv, &mut pending_ext).await?;
1079
1080		// If we need to save a snapshot, save the raw storage and root hash to the snapshot.
1081		if let Some(path) = self.as_online().state_snapshot.clone().map(|c| c.path) {
1082			let (raw_storage, storage_root) = pending_ext.into_raw_snapshot();
1083			let snapshot = Snapshot::<B>::new(
1084				state_version,
1085				raw_storage.clone(),
1086				storage_root,
1087				self.load_header().await?,
1088			);
1089			let encoded = snapshot.encode();
1090			info!(
1091				target: LOG_TARGET,
1092				"writing snapshot of {} bytes to {path:?}",
1093				encoded.len(),
1094			);
1095			std::fs::write(path, encoded).map_err(|_| "fs::write failed")?;
1096
1097			// pending_ext was consumed when creating the snapshot, need to reinitailize it
1098			return Ok(TestExternalities::from_raw_snapshot(
1099				raw_storage,
1100				storage_root,
1101				self.overwrite_state_version.unwrap_or(state_version),
1102			))
1103		}
1104
1105		Ok(pending_ext)
1106	}
1107
1108	async fn do_load_remote(&mut self) -> Result<RemoteExternalities<B>> {
1109		self.init_remote_client().await?;
1110		let inner_ext = self.load_remote_and_maybe_save().await?;
1111		Ok(RemoteExternalities { header: self.load_header().await?, inner_ext })
1112	}
1113
1114	fn do_load_offline(&mut self, config: OfflineConfig) -> Result<RemoteExternalities<B>> {
1115		let (header, inner_ext) = logging::with_elapsed(
1116			|| {
1117				info!(target: LOG_TARGET, "Loading snapshot from {:?}", &config.state_snapshot.path);
1118
1119				let Snapshot { header, state_version, raw_storage, storage_root, .. } =
1120					Snapshot::<B>::load(&config.state_snapshot.path)?;
1121				let inner_ext = TestExternalities::from_raw_snapshot(
1122					raw_storage,
1123					storage_root,
1124					self.overwrite_state_version.unwrap_or(state_version),
1125				);
1126
1127				Ok((header, inner_ext))
1128			},
1129			"Loading snapshot...",
1130			|_| "Loaded snapshot".into(),
1131		)?;
1132
1133		Ok(RemoteExternalities { inner_ext, header })
1134	}
1135
1136	pub(crate) async fn pre_build(mut self) -> Result<RemoteExternalities<B>> {
1137		let mut ext = match self.mode.clone() {
1138			Mode::Offline(config) => self.do_load_offline(config)?,
1139			Mode::Online(_) => self.do_load_remote().await?,
1140			Mode::OfflineOrElseOnline(offline_config, _) => {
1141				match self.do_load_offline(offline_config) {
1142					Ok(x) => x,
1143					Err(_) => self.do_load_remote().await?,
1144				}
1145			},
1146		};
1147
1148		// inject manual key values.
1149		if !self.hashed_key_values.is_empty() {
1150			info!(
1151				target: LOG_TARGET,
1152				"extending externalities with {} manually injected key-values",
1153				self.hashed_key_values.len()
1154			);
1155			ext.batch_insert(self.hashed_key_values.into_iter().map(|(k, v)| (k.0, v.0)));
1156		}
1157
1158		// exclude manual key values.
1159		if !self.hashed_blacklist.is_empty() {
1160			info!(
1161				target: LOG_TARGET,
1162				"excluding externalities from {} keys",
1163				self.hashed_blacklist.len()
1164			);
1165			for k in self.hashed_blacklist {
1166				ext.execute_with(|| sp_io::storage::clear(&k));
1167			}
1168		}
1169
1170		Ok(ext)
1171	}
1172}
1173
1174// Public methods
1175impl<B: BlockT> Builder<B>
1176where
1177	B::Hash: DeserializeOwned,
1178	B::Header: DeserializeOwned,
1179{
1180	/// Create a new builder.
1181	pub fn new() -> Self {
1182		Default::default()
1183	}
1184
1185	/// Inject a manual list of key and values to the storage.
1186	pub fn inject_hashed_key_value(mut self, injections: Vec<KeyValue>) -> Self {
1187		for i in injections {
1188			self.hashed_key_values.push(i.clone());
1189		}
1190		self
1191	}
1192
1193	/// Blacklist this hashed key from the final externalities. This is treated as-is, and should be
1194	/// pre-hashed.
1195	pub fn blacklist_hashed_key(mut self, hashed: &[u8]) -> Self {
1196		self.hashed_blacklist.push(hashed.to_vec());
1197		self
1198	}
1199
1200	/// Configure a state snapshot to be used.
1201	pub fn mode(mut self, mode: Mode<B::Hash>) -> Self {
1202		self.mode = mode;
1203		self
1204	}
1205
1206	/// The state version to use.
1207	pub fn overwrite_state_version(mut self, version: StateVersion) -> Self {
1208		self.overwrite_state_version = Some(version);
1209		self
1210	}
1211
1212	pub async fn build(self) -> Result<RemoteExternalities<B>> {
1213		let mut ext = self.pre_build().await?;
1214		ext.commit_all().unwrap();
1215
1216		info!(
1217			target: LOG_TARGET,
1218			"initialized state externalities with storage root {:?} and state_version {:?}",
1219			ext.as_backend().root(),
1220			ext.state_version
1221		);
1222
1223		Ok(ext)
1224	}
1225}
1226
1227#[cfg(test)]
1228mod test_prelude {
1229	pub(crate) use super::*;
1230	pub(crate) use sp_runtime::testing::{Block as RawBlock, MockCallU64};
1231	pub(crate) type UncheckedXt = sp_runtime::testing::TestXt<MockCallU64, ()>;
1232	pub(crate) type Block = RawBlock<UncheckedXt>;
1233
1234	pub(crate) fn init_logger() {
1235		sp_tracing::try_init_simple();
1236	}
1237}
1238
1239#[cfg(test)]
1240mod tests {
1241	use super::test_prelude::*;
1242
1243	#[tokio::test]
1244	async fn can_load_state_snapshot() {
1245		init_logger();
1246		Builder::<Block>::new()
1247			.mode(Mode::Offline(OfflineConfig {
1248				state_snapshot: SnapshotConfig::new("test_data/test.snap"),
1249			}))
1250			.build()
1251			.await
1252			.unwrap()
1253			.execute_with(|| {});
1254	}
1255
1256	#[tokio::test]
1257	async fn can_exclude_from_snapshot() {
1258		init_logger();
1259
1260		// get the first key from the snapshot file.
1261		let some_key = Builder::<Block>::new()
1262			.mode(Mode::Offline(OfflineConfig {
1263				state_snapshot: SnapshotConfig::new("test_data/test.snap"),
1264			}))
1265			.build()
1266			.await
1267			.expect("Can't read state snapshot file")
1268			.execute_with(|| {
1269				let key =
1270					sp_io::storage::next_key(&[]).expect("some key must exist in the snapshot");
1271				assert!(sp_io::storage::get(&key).is_some());
1272				key
1273			});
1274
1275		Builder::<Block>::new()
1276			.mode(Mode::Offline(OfflineConfig {
1277				state_snapshot: SnapshotConfig::new("test_data/test.snap"),
1278			}))
1279			.blacklist_hashed_key(&some_key)
1280			.build()
1281			.await
1282			.expect("Can't read state snapshot file")
1283			.execute_with(|| assert!(sp_io::storage::get(&some_key).is_none()));
1284	}
1285}
1286
1287#[cfg(all(test, feature = "remote-test"))]
1288mod remote_tests {
1289	use super::test_prelude::*;
1290	use std::{env, os::unix::fs::MetadataExt};
1291
1292	fn endpoint() -> String {
1293		env::var("TEST_WS").unwrap_or_else(|_| DEFAULT_HTTP_ENDPOINT.to_string())
1294	}
1295
1296	#[tokio::test]
1297	async fn state_version_is_kept_and_can_be_altered() {
1298		const CACHE: &'static str = "state_version_is_kept_and_can_be_altered";
1299		init_logger();
1300
1301		// first, build a snapshot.
1302		let ext = Builder::<Block>::new()
1303			.mode(Mode::Online(OnlineConfig {
1304				transport: endpoint().clone().into(),
1305				pallets: vec!["Proxy".to_owned()],
1306				child_trie: false,
1307				state_snapshot: Some(SnapshotConfig::new(CACHE)),
1308				..Default::default()
1309			}))
1310			.build()
1311			.await
1312			.unwrap();
1313
1314		// now re-create the same snapshot.
1315		let cached_ext = Builder::<Block>::new()
1316			.mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
1317			.build()
1318			.await
1319			.unwrap();
1320
1321		assert_eq!(ext.state_version, cached_ext.state_version);
1322
1323		// now overwrite it
1324		let other = match ext.state_version {
1325			StateVersion::V0 => StateVersion::V1,
1326			StateVersion::V1 => StateVersion::V0,
1327		};
1328		let cached_ext = Builder::<Block>::new()
1329			.mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
1330			.overwrite_state_version(other)
1331			.build()
1332			.await
1333			.unwrap();
1334
1335		assert_eq!(cached_ext.state_version, other);
1336	}
1337
1338	#[tokio::test]
1339	async fn snapshot_block_hash_works() {
1340		const CACHE: &'static str = "snapshot_block_hash_works";
1341		init_logger();
1342
1343		// first, build a snapshot.
1344		let ext = Builder::<Block>::new()
1345			.mode(Mode::Online(OnlineConfig {
1346				transport: endpoint().clone().into(),
1347				pallets: vec!["Proxy".to_owned()],
1348				child_trie: false,
1349				state_snapshot: Some(SnapshotConfig::new(CACHE)),
1350				..Default::default()
1351			}))
1352			.build()
1353			.await
1354			.unwrap();
1355
1356		// now re-create the same snapshot.
1357		let cached_ext = Builder::<Block>::new()
1358			.mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
1359			.build()
1360			.await
1361			.unwrap();
1362
1363		assert_eq!(ext.header.hash(), cached_ext.header.hash());
1364	}
1365
1366	#[tokio::test]
1367	async fn child_keys_are_loaded() {
1368		const CACHE: &'static str = "snapshot_retains_storage";
1369		init_logger();
1370
1371		// create an ext with children keys
1372		let mut child_ext = Builder::<Block>::new()
1373			.mode(Mode::Online(OnlineConfig {
1374				transport: endpoint().clone().into(),
1375				pallets: vec!["Proxy".to_owned()],
1376				child_trie: true,
1377				state_snapshot: Some(SnapshotConfig::new(CACHE)),
1378				..Default::default()
1379			}))
1380			.build()
1381			.await
1382			.unwrap();
1383
1384		// create an ext without children keys
1385		let mut ext = Builder::<Block>::new()
1386			.mode(Mode::Online(OnlineConfig {
1387				transport: endpoint().clone().into(),
1388				pallets: vec!["Proxy".to_owned()],
1389				child_trie: false,
1390				state_snapshot: Some(SnapshotConfig::new(CACHE)),
1391				..Default::default()
1392			}))
1393			.build()
1394			.await
1395			.unwrap();
1396
1397		// there should be more keys in the child ext.
1398		assert!(
1399			child_ext.as_backend().backend_storage().keys().len() >
1400				ext.as_backend().backend_storage().keys().len()
1401		);
1402	}
1403
1404	#[tokio::test]
1405	async fn offline_else_online_works() {
1406		const CACHE: &'static str = "offline_else_online_works_data";
1407		init_logger();
1408		// this shows that in the second run, we use the remote and create a snapshot.
1409		Builder::<Block>::new()
1410			.mode(Mode::OfflineOrElseOnline(
1411				OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) },
1412				OnlineConfig {
1413					transport: endpoint().clone().into(),
1414					pallets: vec!["Proxy".to_owned()],
1415					child_trie: false,
1416					state_snapshot: Some(SnapshotConfig::new(CACHE)),
1417					..Default::default()
1418				},
1419			))
1420			.build()
1421			.await
1422			.unwrap()
1423			.execute_with(|| {});
1424
1425		// this shows that in the second run, we are not using the remote
1426		Builder::<Block>::new()
1427			.mode(Mode::OfflineOrElseOnline(
1428				OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) },
1429				OnlineConfig {
1430					transport: "ws://non-existent:666".to_owned().into(),
1431					..Default::default()
1432				},
1433			))
1434			.build()
1435			.await
1436			.unwrap()
1437			.execute_with(|| {});
1438
1439		let to_delete = std::fs::read_dir(Path::new("."))
1440			.unwrap()
1441			.into_iter()
1442			.map(|d| d.unwrap())
1443			.filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
1444			.collect::<Vec<_>>();
1445
1446		assert!(to_delete.len() == 1);
1447		std::fs::remove_file(to_delete[0].path()).unwrap();
1448	}
1449
1450	#[tokio::test]
1451	async fn can_build_one_small_pallet() {
1452		init_logger();
1453		Builder::<Block>::new()
1454			.mode(Mode::Online(OnlineConfig {
1455				transport: endpoint().clone().into(),
1456				pallets: vec!["Proxy".to_owned()],
1457				child_trie: false,
1458				..Default::default()
1459			}))
1460			.build()
1461			.await
1462			.unwrap()
1463			.execute_with(|| {});
1464	}
1465
1466	#[tokio::test]
1467	async fn can_build_few_pallet() {
1468		init_logger();
1469		Builder::<Block>::new()
1470			.mode(Mode::Online(OnlineConfig {
1471				transport: endpoint().clone().into(),
1472				pallets: vec!["Proxy".to_owned(), "Multisig".to_owned()],
1473				child_trie: false,
1474				..Default::default()
1475			}))
1476			.build()
1477			.await
1478			.unwrap()
1479			.execute_with(|| {});
1480	}
1481
1482	#[tokio::test(flavor = "multi_thread")]
1483	async fn can_create_snapshot() {
1484		const CACHE: &'static str = "can_create_snapshot";
1485		init_logger();
1486
1487		Builder::<Block>::new()
1488			.mode(Mode::Online(OnlineConfig {
1489				transport: endpoint().clone().into(),
1490				state_snapshot: Some(SnapshotConfig::new(CACHE)),
1491				pallets: vec!["Proxy".to_owned()],
1492				child_trie: false,
1493				..Default::default()
1494			}))
1495			.build()
1496			.await
1497			.unwrap()
1498			.execute_with(|| {});
1499
1500		let to_delete = std::fs::read_dir(Path::new("."))
1501			.unwrap()
1502			.into_iter()
1503			.map(|d| d.unwrap())
1504			.filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
1505			.collect::<Vec<_>>();
1506
1507		assert!(to_delete.len() == 1);
1508		let to_delete = to_delete.first().unwrap();
1509		assert!(std::fs::metadata(to_delete.path()).unwrap().size() > 1);
1510		std::fs::remove_file(to_delete.path()).unwrap();
1511	}
1512
1513	#[tokio::test]
1514	async fn can_create_child_snapshot() {
1515		const CACHE: &'static str = "can_create_child_snapshot";
1516		init_logger();
1517		Builder::<Block>::new()
1518			.mode(Mode::Online(OnlineConfig {
1519				transport: endpoint().clone().into(),
1520				state_snapshot: Some(SnapshotConfig::new(CACHE)),
1521				pallets: vec!["Crowdloan".to_owned()],
1522				child_trie: true,
1523				..Default::default()
1524			}))
1525			.build()
1526			.await
1527			.unwrap()
1528			.execute_with(|| {});
1529
1530		let to_delete = std::fs::read_dir(Path::new("."))
1531			.unwrap()
1532			.into_iter()
1533			.map(|d| d.unwrap())
1534			.filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
1535			.collect::<Vec<_>>();
1536
1537		assert!(to_delete.len() == 1);
1538		let to_delete = to_delete.first().unwrap();
1539		assert!(std::fs::metadata(to_delete.path()).unwrap().size() > 1);
1540		std::fs::remove_file(to_delete.path()).unwrap();
1541	}
1542
1543	#[tokio::test]
1544	async fn can_build_big_pallet() {
1545		if std::option_env!("TEST_WS").is_none() {
1546			return
1547		}
1548		init_logger();
1549		Builder::<Block>::new()
1550			.mode(Mode::Online(OnlineConfig {
1551				transport: endpoint().clone().into(),
1552				pallets: vec!["Staking".to_owned()],
1553				child_trie: false,
1554				..Default::default()
1555			}))
1556			.build()
1557			.await
1558			.unwrap()
1559			.execute_with(|| {});
1560	}
1561
1562	#[tokio::test]
1563	async fn can_fetch_all() {
1564		if std::option_env!("TEST_WS").is_none() {
1565			return
1566		}
1567		init_logger();
1568		Builder::<Block>::new()
1569			.mode(Mode::Online(OnlineConfig {
1570				transport: endpoint().clone().into(),
1571				..Default::default()
1572			}))
1573			.build()
1574			.await
1575			.unwrap()
1576			.execute_with(|| {});
1577	}
1578
1579	#[tokio::test]
1580	async fn can_fetch_in_parallel() {
1581		init_logger();
1582
1583		let mut builder = Builder::<Block>::new().mode(Mode::Online(OnlineConfig {
1584			transport: endpoint().clone().into(),
1585			..Default::default()
1586		}));
1587		builder.init_remote_client().await.unwrap();
1588
1589		let at = builder.as_online().at.unwrap();
1590
1591		let prefix = StorageKey(vec![13]);
1592		let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap();
1593		let para = builder.rpc_get_keys_parallel(&prefix, at, 4).await.unwrap();
1594		assert_eq!(paged, para);
1595
1596		let prefix = StorageKey(vec![]);
1597		let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap();
1598		let para = builder.rpc_get_keys_parallel(&prefix, at, 8).await.unwrap();
1599		assert_eq!(paged, para);
1600	}
1601}