frame_remote_externalities/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// 	http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! # Remote Externalities
19//!
20//! An equivalent of `sp_io::TestExternalities` that can load its state from a remote substrate
21//! based chain, or a local state snapshot file.
22
23use codec::{Compact, Decode, Encode};
24use indicatif::{ProgressBar, ProgressStyle};
25use jsonrpsee::{core::params::ArrayParams, http_client::HttpClient};
26use log::*;
27use serde::de::DeserializeOwned;
28use sp_core::{
29	hexdisplay::HexDisplay,
30	storage::{
31		well_known_keys::{is_default_child_storage_key, DEFAULT_CHILD_STORAGE_KEY_PREFIX},
32		ChildInfo, ChildType, PrefixedStorageKey, StorageData, StorageKey,
33	},
34};
35use sp_runtime::{
36	traits::{Block as BlockT, HashingFor},
37	StateVersion,
38};
39use sp_state_machine::TestExternalities;
40use spinners::{Spinner, Spinners};
41use std::{
42	cmp::{max, min},
43	fs,
44	ops::{Deref, DerefMut},
45	path::{Path, PathBuf},
46	sync::Arc,
47	time::{Duration, Instant},
48};
49use substrate_rpc_client::{rpc_params, BatchRequestBuilder, ChainApi, ClientT, StateApi};
50use tokio_retry::{strategy::FixedInterval, Retry};
51
52type KeyValue = (StorageKey, StorageData);
53type TopKeyValues = Vec<KeyValue>;
54type ChildKeyValues = Vec<(ChildInfo, Vec<KeyValue>)>;
55type SnapshotVersion = Compact<u16>;
56
57const LOG_TARGET: &str = "remote-ext";
58const DEFAULT_HTTP_ENDPOINT: &str = "https://try-runtime.polkadot.io:443";
59const SNAPSHOT_VERSION: SnapshotVersion = Compact(4);
60
61/// The snapshot that we store on disk.
62#[derive(Decode, Encode)]
63struct Snapshot<B: BlockT> {
64	snapshot_version: SnapshotVersion,
65	state_version: StateVersion,
66	// <Vec<Key, (Value, MemoryDbRefCount)>>
67	raw_storage: Vec<(Vec<u8>, (Vec<u8>, i32))>,
68	// The storage root of the state. This may vary from the storage root in the header, if not the
69	// entire state was fetched.
70	storage_root: B::Hash,
71	header: B::Header,
72}
73
74impl<B: BlockT> Snapshot<B> {
75	pub fn new(
76		state_version: StateVersion,
77		raw_storage: Vec<(Vec<u8>, (Vec<u8>, i32))>,
78		storage_root: B::Hash,
79		header: B::Header,
80	) -> Self {
81		Self {
82			snapshot_version: SNAPSHOT_VERSION,
83			state_version,
84			raw_storage,
85			storage_root,
86			header,
87		}
88	}
89
90	fn load(path: &PathBuf) -> Result<Snapshot<B>, &'static str> {
91		let bytes = fs::read(path).map_err(|_| "fs::read failed.")?;
92		// The first item in the SCALE encoded struct bytes is the snapshot version. We decode and
93		// check that first, before proceeding to decode the rest of the snapshot.
94		let snapshot_version = SnapshotVersion::decode(&mut &*bytes)
95			.map_err(|_| "Failed to decode snapshot version")?;
96
97		if snapshot_version != SNAPSHOT_VERSION {
98			return Err("Unsupported snapshot version detected. Please create a new snapshot.")
99		}
100
101		Decode::decode(&mut &*bytes).map_err(|_| "Decode failed")
102	}
103}
104
105/// An externalities that acts exactly the same as [`sp_io::TestExternalities`] but has a few extra
106/// bits and pieces to it, and can be loaded remotely.
107pub struct RemoteExternalities<B: BlockT> {
108	/// The inner externalities.
109	pub inner_ext: TestExternalities<HashingFor<B>>,
110	/// The block header which we created this externality env.
111	pub header: B::Header,
112}
113
114impl<B: BlockT> Deref for RemoteExternalities<B> {
115	type Target = TestExternalities<HashingFor<B>>;
116	fn deref(&self) -> &Self::Target {
117		&self.inner_ext
118	}
119}
120
121impl<B: BlockT> DerefMut for RemoteExternalities<B> {
122	fn deref_mut(&mut self) -> &mut Self::Target {
123		&mut self.inner_ext
124	}
125}
126
127/// The execution mode.
128#[derive(Clone)]
129pub enum Mode<H> {
130	/// Online. Potentially writes to a snapshot file.
131	Online(OnlineConfig<H>),
132	/// Offline. Uses a state snapshot file and needs not any client config.
133	Offline(OfflineConfig),
134	/// Prefer using a snapshot file if it exists, else use a remote server.
135	OfflineOrElseOnline(OfflineConfig, OnlineConfig<H>),
136}
137
138impl<H> Default for Mode<H> {
139	fn default() -> Self {
140		Mode::Online(OnlineConfig::default())
141	}
142}
143
144/// Configuration of the offline execution.
145///
146/// A state snapshot config must be present.
147#[derive(Clone)]
148pub struct OfflineConfig {
149	/// The configuration of the state snapshot file to use. It must be present.
150	pub state_snapshot: SnapshotConfig,
151}
152
153/// Description of the transport protocol (for online execution).
154#[derive(Debug, Clone)]
155pub enum Transport {
156	/// Use the `URI` to open a new WebSocket connection.
157	Uri(String),
158	/// Use HTTP connection.
159	RemoteClient(HttpClient),
160}
161
162impl Transport {
163	fn as_client(&self) -> Option<&HttpClient> {
164		match self {
165			Self::RemoteClient(client) => Some(client),
166			_ => None,
167		}
168	}
169
170	// Build an HttpClient from a URI.
171	async fn init(&mut self) -> Result<(), &'static str> {
172		if let Self::Uri(uri) = self {
173			log::debug!(target: LOG_TARGET, "initializing remote client to {:?}", uri);
174
175			// If we have a ws uri, try to convert it to an http uri.
176			// We use an HTTP client rather than WS because WS starts to choke with "accumulated
177			// message length exceeds maximum" errors after processing ~10k keys when fetching
178			// from a node running a default configuration.
179			let uri = if uri.starts_with("ws://") {
180				let uri = uri.replace("ws://", "http://");
181				log::info!(target: LOG_TARGET, "replacing ws:// in uri with http://: {:?} (ws is currently unstable for fetching remote storage, for more see https://github.com/paritytech/jsonrpsee/issues/1086)", uri);
182				uri
183			} else if uri.starts_with("wss://") {
184				let uri = uri.replace("wss://", "https://");
185				log::info!(target: LOG_TARGET, "replacing wss:// in uri with https://: {:?} (ws is currently unstable for fetching remote storage, for more see https://github.com/paritytech/jsonrpsee/issues/1086)", uri);
186				uri
187			} else {
188				uri.clone()
189			};
190			let http_client = HttpClient::builder()
191				.max_request_size(u32::MAX)
192				.max_response_size(u32::MAX)
193				.request_timeout(std::time::Duration::from_secs(60 * 5))
194				.build(uri)
195				.map_err(|e| {
196					log::error!(target: LOG_TARGET, "error: {:?}", e);
197					"failed to build http client"
198				})?;
199
200			*self = Self::RemoteClient(http_client)
201		}
202
203		Ok(())
204	}
205}
206
207impl From<String> for Transport {
208	fn from(uri: String) -> Self {
209		Transport::Uri(uri)
210	}
211}
212
213impl From<HttpClient> for Transport {
214	fn from(client: HttpClient) -> Self {
215		Transport::RemoteClient(client)
216	}
217}
218
219/// Configuration of the online execution.
220///
221/// A state snapshot config may be present and will be written to in that case.
222#[derive(Clone)]
223pub struct OnlineConfig<H> {
224	/// The block hash at which to get the runtime state. Will be latest finalized head if not
225	/// provided.
226	pub at: Option<H>,
227	/// An optional state snapshot file to WRITE to, not for reading. Not written if set to `None`.
228	pub state_snapshot: Option<SnapshotConfig>,
229	/// The pallets to scrape. These values are hashed and added to `hashed_prefix`.
230	pub pallets: Vec<String>,
231	/// Transport config.
232	pub transport: Transport,
233	/// Lookout for child-keys, and scrape them as well if set to true.
234	pub child_trie: bool,
235	/// Storage entry key prefixes to be injected into the externalities. The *hashed* prefix must
236	/// be given.
237	pub hashed_prefixes: Vec<Vec<u8>>,
238	/// Storage entry keys to be injected into the externalities. The *hashed* key must be given.
239	pub hashed_keys: Vec<Vec<u8>>,
240}
241
242impl<H: Clone> OnlineConfig<H> {
243	/// Return rpc (http) client reference.
244	fn rpc_client(&self) -> &HttpClient {
245		self.transport
246			.as_client()
247			.expect("http client must have been initialized by now; qed.")
248	}
249
250	fn at_expected(&self) -> H {
251		self.at.clone().expect("block at must be initialized; qed")
252	}
253}
254
255impl<H> Default for OnlineConfig<H> {
256	fn default() -> Self {
257		Self {
258			transport: Transport::from(DEFAULT_HTTP_ENDPOINT.to_owned()),
259			child_trie: true,
260			at: None,
261			state_snapshot: None,
262			pallets: Default::default(),
263			hashed_keys: Default::default(),
264			hashed_prefixes: Default::default(),
265		}
266	}
267}
268
269impl<H> From<String> for OnlineConfig<H> {
270	fn from(t: String) -> Self {
271		Self { transport: t.into(), ..Default::default() }
272	}
273}
274
275/// Configuration of the state snapshot.
276#[derive(Clone)]
277pub struct SnapshotConfig {
278	/// The path to the snapshot file.
279	pub path: PathBuf,
280}
281
282impl SnapshotConfig {
283	pub fn new<P: Into<PathBuf>>(path: P) -> Self {
284		Self { path: path.into() }
285	}
286}
287
288impl From<String> for SnapshotConfig {
289	fn from(s: String) -> Self {
290		Self::new(s)
291	}
292}
293
294impl Default for SnapshotConfig {
295	fn default() -> Self {
296		Self { path: Path::new("SNAPSHOT").into() }
297	}
298}
299
300/// Builder for remote-externalities.
301#[derive(Clone)]
302pub struct Builder<B: BlockT> {
303	/// Custom key-pairs to be injected into the final externalities. The *hashed* keys and values
304	/// must be given.
305	hashed_key_values: Vec<KeyValue>,
306	/// The keys that will be excluded from the final externality. The *hashed* key must be given.
307	hashed_blacklist: Vec<Vec<u8>>,
308	/// Connectivity mode, online or offline.
309	mode: Mode<B::Hash>,
310	/// If provided, overwrite the state version with this. Otherwise, the state_version of the
311	/// remote node is used. All cache files also store their state version.
312	///
313	/// Overwrite only with care.
314	overwrite_state_version: Option<StateVersion>,
315}
316
317impl<B: BlockT> Default for Builder<B> {
318	fn default() -> Self {
319		Self {
320			mode: Default::default(),
321			hashed_key_values: Default::default(),
322			hashed_blacklist: Default::default(),
323			overwrite_state_version: None,
324		}
325	}
326}
327
328// Mode methods
329impl<B: BlockT> Builder<B> {
330	fn as_online(&self) -> &OnlineConfig<B::Hash> {
331		match &self.mode {
332			Mode::Online(config) => config,
333			Mode::OfflineOrElseOnline(_, config) => config,
334			_ => panic!("Unexpected mode: Online"),
335		}
336	}
337
338	fn as_online_mut(&mut self) -> &mut OnlineConfig<B::Hash> {
339		match &mut self.mode {
340			Mode::Online(config) => config,
341			Mode::OfflineOrElseOnline(_, config) => config,
342			_ => panic!("Unexpected mode: Online"),
343		}
344	}
345}
346
347// RPC methods
348impl<B: BlockT> Builder<B>
349where
350	B::Hash: DeserializeOwned,
351	B::Header: DeserializeOwned,
352{
353	const PARALLEL_REQUESTS: usize = 4;
354	const BATCH_SIZE_INCREASE_FACTOR: f32 = 1.10;
355	const BATCH_SIZE_DECREASE_FACTOR: f32 = 0.50;
356	const REQUEST_DURATION_TARGET: Duration = Duration::from_secs(15);
357	const INITIAL_BATCH_SIZE: usize = 10;
358	// nodes by default will not return more than 1000 keys per request
359	const DEFAULT_KEY_DOWNLOAD_PAGE: u32 = 1000;
360	const MAX_RETRIES: usize = 12;
361	const KEYS_PAGE_RETRY_INTERVAL: Duration = Duration::from_secs(5);
362
363	async fn rpc_get_storage(
364		&self,
365		key: StorageKey,
366		maybe_at: Option<B::Hash>,
367	) -> Result<Option<StorageData>, &'static str> {
368		trace!(target: LOG_TARGET, "rpc: get_storage");
369		self.as_online().rpc_client().storage(key, maybe_at).await.map_err(|e| {
370			error!(target: LOG_TARGET, "Error = {:?}", e);
371			"rpc get_storage failed."
372		})
373	}
374
375	/// Get the latest finalized head.
376	async fn rpc_get_head(&self) -> Result<B::Hash, &'static str> {
377		trace!(target: LOG_TARGET, "rpc: finalized_head");
378
379		// sadly this pretty much unreadable...
380		ChainApi::<(), _, B::Header, ()>::finalized_head(self.as_online().rpc_client())
381			.await
382			.map_err(|e| {
383				error!(target: LOG_TARGET, "Error = {:?}", e);
384				"rpc finalized_head failed."
385			})
386	}
387
388	async fn get_keys_single_page(
389		&self,
390		prefix: Option<StorageKey>,
391		start_key: Option<StorageKey>,
392		at: B::Hash,
393	) -> Result<Vec<StorageKey>, &'static str> {
394		self.as_online()
395			.rpc_client()
396			.storage_keys_paged(prefix, Self::DEFAULT_KEY_DOWNLOAD_PAGE, start_key, Some(at))
397			.await
398			.map_err(|e| {
399				error!(target: LOG_TARGET, "Error = {:?}", e);
400				"rpc get_keys failed"
401			})
402	}
403
404	/// Get keys with `prefix` at `block` in a parallel manner.
405	async fn rpc_get_keys_parallel(
406		&self,
407		prefix: &StorageKey,
408		block: B::Hash,
409		parallel: usize,
410	) -> Result<Vec<StorageKey>, &'static str> {
411		/// Divide the workload and return the start key of each chunks. Guaranteed to return a
412		/// non-empty list.
413		fn gen_start_keys(prefix: &StorageKey) -> Vec<StorageKey> {
414			let mut prefix = prefix.as_ref().to_vec();
415			let scale = 32usize.saturating_sub(prefix.len());
416
417			// no need to divide workload
418			if scale < 9 {
419				prefix.extend(vec![0; scale]);
420				return vec![StorageKey(prefix)]
421			}
422
423			let chunks = 16;
424			let step = 0x10000 / chunks;
425			let ext = scale - 2;
426
427			(0..chunks)
428				.map(|i| {
429					let mut key = prefix.clone();
430					let start = i * step;
431					key.extend(vec![(start >> 8) as u8, (start & 0xff) as u8]);
432					key.extend(vec![0; ext]);
433					StorageKey(key)
434				})
435				.collect()
436		}
437
438		let start_keys = gen_start_keys(&prefix);
439		let start_keys: Vec<Option<&StorageKey>> = start_keys.iter().map(Some).collect();
440		let mut end_keys: Vec<Option<&StorageKey>> = start_keys[1..].to_vec();
441		end_keys.push(None);
442
443		// use a semaphore to limit max scraping tasks
444		let parallel = Arc::new(tokio::sync::Semaphore::new(parallel));
445		let builder = Arc::new(self.clone());
446		let mut handles = vec![];
447
448		for (start_key, end_key) in start_keys.into_iter().zip(end_keys) {
449			let permit = parallel
450				.clone()
451				.acquire_owned()
452				.await
453				.expect("semaphore is not closed until the end of loop");
454
455			let builder = builder.clone();
456			let prefix = prefix.clone();
457			let start_key = start_key.cloned();
458			let end_key = end_key.cloned();
459
460			let handle = tokio::spawn(async move {
461				let res = builder
462					.rpc_get_keys_in_range(&prefix, block, start_key.as_ref(), end_key.as_ref())
463					.await;
464				drop(permit);
465				res
466			});
467
468			handles.push(handle);
469		}
470
471		parallel.close();
472
473		let keys = futures::future::join_all(handles)
474			.await
475			.into_iter()
476			.filter_map(|res| match res {
477				Ok(Ok(keys)) => Some(keys),
478				_ => None,
479			})
480			.flatten()
481			.collect::<Vec<StorageKey>>();
482
483		Ok(keys)
484	}
485
486	/// Get all keys with `prefix` within the given range at `block`.
487	/// Both `start_key` and `end_key` are optional if you want an open-ended range.
488	async fn rpc_get_keys_in_range(
489		&self,
490		prefix: &StorageKey,
491		block: B::Hash,
492		start_key: Option<&StorageKey>,
493		end_key: Option<&StorageKey>,
494	) -> Result<Vec<StorageKey>, &'static str> {
495		let mut last_key: Option<&StorageKey> = start_key;
496		let mut keys: Vec<StorageKey> = vec![];
497
498		loop {
499			// This loop can hit the node with very rapid requests, occasionally causing it to
500			// error out in CI (https://github.com/paritytech/substrate/issues/14129), so we retry.
501			let retry_strategy =
502				FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES);
503			let get_page_closure =
504				|| self.get_keys_single_page(Some(prefix.clone()), last_key.cloned(), block);
505			let mut page = Retry::spawn(retry_strategy, get_page_closure).await?;
506
507			// avoid duplicated keys across workloads
508			if let (Some(last), Some(end)) = (page.last(), end_key) {
509				if last >= end {
510					page.retain(|key| key < end);
511				}
512			}
513
514			let page_len = page.len();
515			keys.extend(page);
516			last_key = keys.last();
517
518			// scraping out of range or no more matches,
519			// we are done either way
520			if page_len < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize {
521				log::debug!(target: LOG_TARGET, "last page received: {}", page_len);
522				break
523			}
524
525			log::debug!(
526				target: LOG_TARGET,
527				"new total = {}, full page received: {}",
528				keys.len(),
529				HexDisplay::from(last_key.expect("full page received, cannot be None"))
530			);
531		}
532
533		Ok(keys)
534	}
535
536	/// Fetches storage data from a node using a dynamic batch size.
537	///
538	/// This function adjusts the batch size on the fly to help prevent overwhelming the node with
539	/// large batch requests, and stay within request size limits enforced by the node.
540	///
541	/// # Arguments
542	///
543	/// * `client` - An `Arc` wrapped `HttpClient` used for making the requests.
544	/// * `payloads` - A vector of tuples containing a JSONRPC method name and `ArrayParams`
545	///
546	/// # Returns
547	///
548	/// Returns a `Result` with a vector of `Option<StorageData>`, where each element corresponds to
549	/// the storage data for the given method and parameters. The result will be an `Err` with a
550	/// `String` error message if the request fails.
551	///
552	/// # Errors
553	///
554	/// This function will return an error if:
555	/// * The batch request fails and the batch size is less than 2.
556	/// * There are invalid batch params.
557	/// * There is an error in the batch response.
558	///
559	/// # Example
560	///
561	/// ```ignore
562	/// use your_crate::{get_storage_data_dynamic_batch_size, HttpClient, ArrayParams};
563	/// use std::sync::Arc;
564	///
565	/// async fn example() {
566	///     let client = HttpClient::new();
567	///     let payloads = vec![
568	///         ("some_method".to_string(), ArrayParams::new(vec![])),
569	///         ("another_method".to_string(), ArrayParams::new(vec![])),
570	///     ];
571	///     let initial_batch_size = 10;
572	///
573	///     let storage_data = get_storage_data_dynamic_batch_size(client, payloads, batch_size).await;
574	///     match storage_data {
575	///         Ok(data) => println!("Storage data: {:?}", data),
576	///         Err(e) => eprintln!("Error fetching storage data: {}", e),
577	///     }
578	/// }
579	/// ```
580	async fn get_storage_data_dynamic_batch_size(
581		client: &HttpClient,
582		payloads: Vec<(String, ArrayParams)>,
583		bar: &ProgressBar,
584	) -> Result<Vec<Option<StorageData>>, String> {
585		let mut all_data: Vec<Option<StorageData>> = vec![];
586		let mut start_index = 0;
587		let mut retries = 0usize;
588		let mut batch_size = Self::INITIAL_BATCH_SIZE;
589		let total_payloads = payloads.len();
590
591		while start_index < total_payloads {
592			log::debug!(
593				target: LOG_TARGET,
594				"Remaining payloads: {} Batch request size: {}",
595				total_payloads - start_index,
596				batch_size,
597			);
598
599			let end_index = usize::min(start_index + batch_size, total_payloads);
600			let page = &payloads[start_index..end_index];
601
602			// Build the batch request
603			let mut batch = BatchRequestBuilder::new();
604			for (method, params) in page.iter() {
605				batch
606					.insert(method, params.clone())
607					.map_err(|_| "Invalid batch method and/or params")?;
608			}
609
610			let request_started = Instant::now();
611			let batch_response = match client.batch_request::<Option<StorageData>>(batch).await {
612				Ok(batch_response) => {
613					retries = 0;
614					batch_response
615				},
616				Err(e) => {
617					if retries > Self::MAX_RETRIES {
618						return Err(e.to_string())
619					}
620
621					retries += 1;
622					let failure_log = format!(
623						"Batch request failed ({}/{} retries). Error: {}",
624						retries,
625						Self::MAX_RETRIES,
626						e
627					);
628					// after 2 subsequent failures something very wrong is happening. log a warning
629					// and reset the batch size down to 1.
630					if retries >= 2 {
631						log::warn!("{}", failure_log);
632						batch_size = 1;
633					} else {
634						log::debug!("{}", failure_log);
635						// Decrease batch size by DECREASE_FACTOR
636						batch_size =
637							(batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize;
638					}
639					continue
640				},
641			};
642
643			let request_duration = request_started.elapsed();
644			batch_size = if request_duration > Self::REQUEST_DURATION_TARGET {
645				// Decrease batch size
646				max(1, (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize)
647			} else {
648				// Increase batch size, but not more than the remaining total payloads to process
649				min(
650					total_payloads - start_index,
651					max(
652						batch_size + 1,
653						(batch_size as f32 * Self::BATCH_SIZE_INCREASE_FACTOR) as usize,
654					),
655				)
656			};
657
658			log::debug!(
659				target: LOG_TARGET,
660				"Request duration: {:?} Target duration: {:?} Last batch size: {} Next batch size: {}",
661				request_duration,
662				Self::REQUEST_DURATION_TARGET,
663				end_index - start_index,
664				batch_size
665			);
666
667			let batch_response_len = batch_response.len();
668			for item in batch_response.into_iter() {
669				match item {
670					Ok(x) => all_data.push(x),
671					Err(e) => return Err(e.message().to_string()),
672				}
673			}
674			bar.inc(batch_response_len as u64);
675
676			// Update the start index for the next iteration
677			start_index = end_index;
678		}
679
680		Ok(all_data)
681	}
682
683	/// Synonym of `getPairs` that uses paged queries to first get the keys, and then
684	/// map them to values one by one.
685	///
686	/// This can work with public nodes. But, expect it to be darn slow.
687	pub(crate) async fn rpc_get_pairs(
688		&self,
689		prefix: StorageKey,
690		at: B::Hash,
691		pending_ext: &mut TestExternalities<HashingFor<B>>,
692	) -> Result<Vec<KeyValue>, &'static str> {
693		let start = Instant::now();
694		let mut sp = Spinner::with_timer(Spinners::Dots, "Scraping keys...".into());
695		// TODO We could start downloading when having collected the first batch of keys
696		// https://github.com/paritytech/polkadot-sdk/issues/2494
697		let keys = self
698			.rpc_get_keys_parallel(&prefix, at, Self::PARALLEL_REQUESTS)
699			.await?
700			.into_iter()
701			.collect::<Vec<_>>();
702		sp.stop_with_message(format!(
703			"✅ Found {} keys ({:.2}s)",
704			keys.len(),
705			start.elapsed().as_secs_f32()
706		));
707		if keys.is_empty() {
708			return Ok(Default::default())
709		}
710
711		let client = self.as_online().rpc_client();
712		let payloads = keys
713			.iter()
714			.map(|key| ("state_getStorage".to_string(), rpc_params!(key, at)))
715			.collect::<Vec<_>>();
716
717		let bar = ProgressBar::new(payloads.len() as u64);
718		bar.enable_steady_tick(Duration::from_secs(1));
719		bar.set_message("Downloading key values".to_string());
720		bar.set_style(
721			ProgressStyle::with_template(
722				"[{elapsed_precise}] {msg} {per_sec} [{wide_bar}] {pos}/{len} ({eta})",
723			)
724			.unwrap()
725			.progress_chars("=>-"),
726		);
727		let payloads_chunked = payloads.chunks((payloads.len() / Self::PARALLEL_REQUESTS).max(1));
728		let requests = payloads_chunked.map(|payload_chunk| {
729			Self::get_storage_data_dynamic_batch_size(client, payload_chunk.to_vec(), &bar)
730		});
731		// Execute the requests and move the Result outside.
732		let storage_data_result: Result<Vec<_>, _> =
733			futures::future::join_all(requests).await.into_iter().collect();
734		// Handle the Result.
735		let storage_data = match storage_data_result {
736			Ok(storage_data) => storage_data.into_iter().flatten().collect::<Vec<_>>(),
737			Err(e) => {
738				log::error!(target: LOG_TARGET, "Error while getting storage data: {}", e);
739				return Err("Error while getting storage data")
740			},
741		};
742		bar.finish_with_message("✅ Downloaded key values");
743		println!();
744
745		// Check if we got responses for all submitted requests.
746		assert_eq!(keys.len(), storage_data.len());
747
748		let key_values = keys
749			.iter()
750			.zip(storage_data)
751			.map(|(key, maybe_value)| match maybe_value {
752				Some(data) => (key.clone(), data),
753				None => {
754					log::warn!(target: LOG_TARGET, "key {:?} had none corresponding value.", &key);
755					let data = StorageData(vec![]);
756					(key.clone(), data)
757				},
758			})
759			.collect::<Vec<_>>();
760
761		let mut sp = Spinner::with_timer(Spinners::Dots, "Inserting keys into DB...".into());
762		let start = Instant::now();
763		pending_ext.batch_insert(key_values.clone().into_iter().filter_map(|(k, v)| {
764			// Don't insert the child keys here, they need to be inserted separately with all their
765			// data in the load_child_remote function.
766			match is_default_child_storage_key(&k.0) {
767				true => None,
768				false => Some((k.0, v.0)),
769			}
770		}));
771		sp.stop_with_message(format!(
772			"✅ Inserted keys into DB ({:.2}s)",
773			start.elapsed().as_secs_f32()
774		));
775		Ok(key_values)
776	}
777
778	/// Get the values corresponding to `child_keys` at the given `prefixed_top_key`.
779	pub(crate) async fn rpc_child_get_storage_paged(
780		client: &HttpClient,
781		prefixed_top_key: &StorageKey,
782		child_keys: Vec<StorageKey>,
783		at: B::Hash,
784	) -> Result<Vec<KeyValue>, &'static str> {
785		let child_keys_len = child_keys.len();
786
787		let payloads = child_keys
788			.iter()
789			.map(|key| {
790				(
791					"childstate_getStorage".to_string(),
792					rpc_params![
793						PrefixedStorageKey::new(prefixed_top_key.as_ref().to_vec()),
794						key,
795						at
796					],
797				)
798			})
799			.collect::<Vec<_>>();
800
801		let bar = ProgressBar::new(payloads.len() as u64);
802		let storage_data =
803			match Self::get_storage_data_dynamic_batch_size(client, payloads, &bar).await {
804				Ok(storage_data) => storage_data,
805				Err(e) => {
806					log::error!(target: LOG_TARGET, "batch processing failed: {:?}", e);
807					return Err("batch processing failed")
808				},
809			};
810
811		assert_eq!(child_keys_len, storage_data.len());
812
813		Ok(child_keys
814			.iter()
815			.zip(storage_data)
816			.map(|(key, maybe_value)| match maybe_value {
817				Some(v) => (key.clone(), v),
818				None => {
819					log::warn!(target: LOG_TARGET, "key {:?} had no corresponding value.", &key);
820					(key.clone(), StorageData(vec![]))
821				},
822			})
823			.collect::<Vec<_>>())
824	}
825
826	pub(crate) async fn rpc_child_get_keys(
827		client: &HttpClient,
828		prefixed_top_key: &StorageKey,
829		child_prefix: StorageKey,
830		at: B::Hash,
831	) -> Result<Vec<StorageKey>, &'static str> {
832		let retry_strategy =
833			FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES);
834		let mut all_child_keys = Vec::new();
835		let mut start_key = None;
836
837		loop {
838			let get_child_keys_closure = || {
839				let top_key = PrefixedStorageKey::new(prefixed_top_key.0.clone());
840				substrate_rpc_client::ChildStateApi::storage_keys_paged(
841					client,
842					top_key,
843					Some(child_prefix.clone()),
844					Self::DEFAULT_KEY_DOWNLOAD_PAGE,
845					start_key.clone(),
846					Some(at),
847				)
848			};
849
850			let child_keys = Retry::spawn(retry_strategy.clone(), get_child_keys_closure)
851				.await
852				.map_err(|e| {
853					error!(target: LOG_TARGET, "Error = {:?}", e);
854					"rpc child_get_keys failed."
855				})?;
856
857			let keys_count = child_keys.len();
858			if keys_count == 0 {
859				break;
860			}
861
862			start_key = child_keys.last().cloned();
863			all_child_keys.extend(child_keys);
864
865			if keys_count < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize {
866				break;
867			}
868		}
869
870		debug!(
871			target: LOG_TARGET,
872			"[thread = {:?}] scraped {} child-keys of the child-bearing top key: {}",
873			std::thread::current().id(),
874			all_child_keys.len(),
875			HexDisplay::from(prefixed_top_key)
876		);
877
878		Ok(all_child_keys)
879	}
880}
881
882impl<B: BlockT> Builder<B>
883where
884	B::Hash: DeserializeOwned,
885	B::Header: DeserializeOwned,
886{
887	/// Load all of the child keys from the remote config, given the already scraped list of top key
888	/// pairs.
889	///
890	/// `top_kv` need not be only child-bearing top keys. It should be all of the top keys that are
891	/// included thus far.
892	///
893	/// This function concurrently populates `pending_ext`. the return value is only for writing to
894	/// cache, we can also optimize further.
895	async fn load_child_remote(
896		&self,
897		top_kv: &[KeyValue],
898		pending_ext: &mut TestExternalities<HashingFor<B>>,
899	) -> Result<ChildKeyValues, &'static str> {
900		let child_roots = top_kv
901			.iter()
902			.filter(|(k, _)| is_default_child_storage_key(k.as_ref()))
903			.map(|(k, _)| k.clone())
904			.collect::<Vec<_>>();
905
906		if child_roots.is_empty() {
907			info!(target: LOG_TARGET, "👩‍👦 no child roots found to scrape",);
908			return Ok(Default::default())
909		}
910
911		info!(
912			target: LOG_TARGET,
913			"👩‍👦 scraping child-tree data from {} top keys",
914			child_roots.len(),
915		);
916
917		let at = self.as_online().at_expected();
918
919		let client = self.as_online().rpc_client();
920		let mut child_kv = vec![];
921		for prefixed_top_key in child_roots {
922			let child_keys =
923				Self::rpc_child_get_keys(client, &prefixed_top_key, StorageKey(vec![]), at).await?;
924
925			let child_kv_inner =
926				Self::rpc_child_get_storage_paged(client, &prefixed_top_key, child_keys, at)
927					.await?;
928
929			let prefixed_top_key = PrefixedStorageKey::new(prefixed_top_key.clone().0);
930			let un_prefixed = match ChildType::from_prefixed_key(&prefixed_top_key) {
931				Some((ChildType::ParentKeyId, storage_key)) => storage_key,
932				None => {
933					log::error!(target: LOG_TARGET, "invalid key: {:?}", prefixed_top_key);
934					return Err("Invalid child key")
935				},
936			};
937
938			let info = ChildInfo::new_default(un_prefixed);
939			let key_values =
940				child_kv_inner.iter().cloned().map(|(k, v)| (k.0, v.0)).collect::<Vec<_>>();
941			child_kv.push((info.clone(), child_kv_inner));
942			for (k, v) in key_values {
943				pending_ext.insert_child(info.clone(), k, v);
944			}
945		}
946
947		Ok(child_kv)
948	}
949
950	/// Build `Self` from a network node denoted by `uri`.
951	///
952	/// This function concurrently populates `pending_ext`. the return value is only for writing to
953	/// cache, we can also optimize further.
954	async fn load_top_remote(
955		&self,
956		pending_ext: &mut TestExternalities<HashingFor<B>>,
957	) -> Result<TopKeyValues, &'static str> {
958		let config = self.as_online();
959		let at = self
960			.as_online()
961			.at
962			.expect("online config must be initialized by this point; qed.");
963		log::info!(target: LOG_TARGET, "scraping key-pairs from remote at block height {:?}", at);
964
965		let mut keys_and_values = Vec::new();
966		for prefix in &config.hashed_prefixes {
967			let now = std::time::Instant::now();
968			let additional_key_values =
969				self.rpc_get_pairs(StorageKey(prefix.to_vec()), at, pending_ext).await?;
970			let elapsed = now.elapsed();
971			log::info!(
972				target: LOG_TARGET,
973				"adding data for hashed prefix: {:?}, took {:.2}s",
974				HexDisplay::from(prefix),
975				elapsed.as_secs_f32()
976			);
977			keys_and_values.extend(additional_key_values);
978		}
979
980		for key in &config.hashed_keys {
981			let key = StorageKey(key.to_vec());
982			log::info!(
983				target: LOG_TARGET,
984				"adding data for hashed key: {:?}",
985				HexDisplay::from(&key)
986			);
987			match self.rpc_get_storage(key.clone(), Some(at)).await? {
988				Some(value) => {
989					pending_ext.insert(key.clone().0, value.clone().0);
990					keys_and_values.push((key, value));
991				},
992				None => {
993					log::warn!(
994						target: LOG_TARGET,
995						"no data found for hashed key: {:?}",
996						HexDisplay::from(&key)
997					);
998				},
999			}
1000		}
1001
1002		Ok(keys_and_values)
1003	}
1004
1005	/// The entry point of execution, if `mode` is online.
1006	///
1007	/// initializes the remote client in `transport`, and sets the `at` field, if not specified.
1008	async fn init_remote_client(&mut self) -> Result<(), &'static str> {
1009		// First, initialize the http client.
1010		self.as_online_mut().transport.init().await?;
1011
1012		// Then, if `at` is not set, set it.
1013		if self.as_online().at.is_none() {
1014			let at = self.rpc_get_head().await?;
1015			log::info!(
1016				target: LOG_TARGET,
1017				"since no at is provided, setting it to latest finalized head, {:?}",
1018				at
1019			);
1020			self.as_online_mut().at = Some(at);
1021		}
1022
1023		// Then, a few transformation that we want to perform in the online config:
1024		let online_config = self.as_online_mut();
1025		online_config.pallets.iter().for_each(|p| {
1026			online_config
1027				.hashed_prefixes
1028				.push(sp_crypto_hashing::twox_128(p.as_bytes()).to_vec())
1029		});
1030
1031		if online_config.child_trie {
1032			online_config.hashed_prefixes.push(DEFAULT_CHILD_STORAGE_KEY_PREFIX.to_vec());
1033		}
1034
1035		// Finally, if by now, we have put any limitations on prefixes that we are interested in, we
1036		// download everything.
1037		if online_config
1038			.hashed_prefixes
1039			.iter()
1040			.filter(|p| *p != DEFAULT_CHILD_STORAGE_KEY_PREFIX)
1041			.count() == 0
1042		{
1043			log::info!(
1044				target: LOG_TARGET,
1045				"since no prefix is filtered, the data for all pallets will be downloaded"
1046			);
1047			online_config.hashed_prefixes.push(vec![]);
1048		}
1049
1050		Ok(())
1051	}
1052
1053	async fn load_header(&self) -> Result<B::Header, &'static str> {
1054		let retry_strategy =
1055			FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES);
1056		let get_header_closure = || {
1057			ChainApi::<(), _, B::Header, ()>::header(
1058				self.as_online().rpc_client(),
1059				Some(self.as_online().at_expected()),
1060			)
1061		};
1062		Retry::spawn(retry_strategy, get_header_closure)
1063			.await
1064			.map_err(|_| "Failed to fetch header for block from network")?
1065			.ok_or("Network returned None block header")
1066	}
1067
1068	/// Load the data from a remote server. The main code path is calling into `load_top_remote` and
1069	/// `load_child_remote`.
1070	///
1071	/// Must be called after `init_remote_client`.
1072	async fn load_remote_and_maybe_save(
1073		&mut self,
1074	) -> Result<TestExternalities<HashingFor<B>>, &'static str> {
1075		let state_version =
1076			StateApi::<B::Hash>::runtime_version(self.as_online().rpc_client(), None)
1077				.await
1078				.map_err(|e| {
1079					error!(target: LOG_TARGET, "Error = {:?}", e);
1080					"rpc runtime_version failed."
1081				})
1082				.map(|v| v.state_version())?;
1083		let mut pending_ext = TestExternalities::new_with_code_and_state(
1084			Default::default(),
1085			Default::default(),
1086			self.overwrite_state_version.unwrap_or(state_version),
1087		);
1088
1089		// Load data from the remote into `pending_ext`.
1090		let top_kv = self.load_top_remote(&mut pending_ext).await?;
1091		self.load_child_remote(&top_kv, &mut pending_ext).await?;
1092
1093		// If we need to save a snapshot, save the raw storage and root hash to the snapshot.
1094		if let Some(path) = self.as_online().state_snapshot.clone().map(|c| c.path) {
1095			let (raw_storage, storage_root) = pending_ext.into_raw_snapshot();
1096			let snapshot = Snapshot::<B>::new(
1097				state_version,
1098				raw_storage.clone(),
1099				storage_root,
1100				self.load_header().await?,
1101			);
1102			let encoded = snapshot.encode();
1103			log::info!(
1104				target: LOG_TARGET,
1105				"writing snapshot of {} bytes to {:?}",
1106				encoded.len(),
1107				path
1108			);
1109			std::fs::write(path, encoded).map_err(|_| "fs::write failed")?;
1110
1111			// pending_ext was consumed when creating the snapshot, need to reinitailize it
1112			return Ok(TestExternalities::from_raw_snapshot(
1113				raw_storage,
1114				storage_root,
1115				self.overwrite_state_version.unwrap_or(state_version),
1116			))
1117		}
1118
1119		Ok(pending_ext)
1120	}
1121
1122	async fn do_load_remote(&mut self) -> Result<RemoteExternalities<B>, &'static str> {
1123		self.init_remote_client().await?;
1124		let inner_ext = self.load_remote_and_maybe_save().await?;
1125		Ok(RemoteExternalities { header: self.load_header().await?, inner_ext })
1126	}
1127
1128	fn do_load_offline(
1129		&mut self,
1130		config: OfflineConfig,
1131	) -> Result<RemoteExternalities<B>, &'static str> {
1132		let mut sp = Spinner::with_timer(Spinners::Dots, "Loading snapshot...".into());
1133		let start = Instant::now();
1134		info!(target: LOG_TARGET, "Loading snapshot from {:?}", &config.state_snapshot.path);
1135		let Snapshot { snapshot_version: _, header, state_version, raw_storage, storage_root } =
1136			Snapshot::<B>::load(&config.state_snapshot.path)?;
1137
1138		let inner_ext = TestExternalities::from_raw_snapshot(
1139			raw_storage,
1140			storage_root,
1141			self.overwrite_state_version.unwrap_or(state_version),
1142		);
1143		sp.stop_with_message(format!("✅ Loaded snapshot ({:.2}s)", start.elapsed().as_secs_f32()));
1144
1145		Ok(RemoteExternalities { inner_ext, header })
1146	}
1147
1148	pub(crate) async fn pre_build(mut self) -> Result<RemoteExternalities<B>, &'static str> {
1149		let mut ext = match self.mode.clone() {
1150			Mode::Offline(config) => self.do_load_offline(config)?,
1151			Mode::Online(_) => self.do_load_remote().await?,
1152			Mode::OfflineOrElseOnline(offline_config, _) => {
1153				match self.do_load_offline(offline_config) {
1154					Ok(x) => x,
1155					Err(_) => self.do_load_remote().await?,
1156				}
1157			},
1158		};
1159
1160		// inject manual key values.
1161		if !self.hashed_key_values.is_empty() {
1162			log::info!(
1163				target: LOG_TARGET,
1164				"extending externalities with {} manually injected key-values",
1165				self.hashed_key_values.len()
1166			);
1167			ext.batch_insert(self.hashed_key_values.into_iter().map(|(k, v)| (k.0, v.0)));
1168		}
1169
1170		// exclude manual key values.
1171		if !self.hashed_blacklist.is_empty() {
1172			log::info!(
1173				target: LOG_TARGET,
1174				"excluding externalities from {} keys",
1175				self.hashed_blacklist.len()
1176			);
1177			for k in self.hashed_blacklist {
1178				ext.execute_with(|| sp_io::storage::clear(&k));
1179			}
1180		}
1181
1182		Ok(ext)
1183	}
1184}
1185
1186// Public methods
1187impl<B: BlockT> Builder<B>
1188where
1189	B::Hash: DeserializeOwned,
1190	B::Header: DeserializeOwned,
1191{
1192	/// Create a new builder.
1193	pub fn new() -> Self {
1194		Default::default()
1195	}
1196
1197	/// Inject a manual list of key and values to the storage.
1198	pub fn inject_hashed_key_value(mut self, injections: Vec<KeyValue>) -> Self {
1199		for i in injections {
1200			self.hashed_key_values.push(i.clone());
1201		}
1202		self
1203	}
1204
1205	/// Blacklist this hashed key from the final externalities. This is treated as-is, and should be
1206	/// pre-hashed.
1207	pub fn blacklist_hashed_key(mut self, hashed: &[u8]) -> Self {
1208		self.hashed_blacklist.push(hashed.to_vec());
1209		self
1210	}
1211
1212	/// Configure a state snapshot to be used.
1213	pub fn mode(mut self, mode: Mode<B::Hash>) -> Self {
1214		self.mode = mode;
1215		self
1216	}
1217
1218	/// The state version to use.
1219	pub fn overwrite_state_version(mut self, version: StateVersion) -> Self {
1220		self.overwrite_state_version = Some(version);
1221		self
1222	}
1223
1224	pub async fn build(self) -> Result<RemoteExternalities<B>, &'static str> {
1225		let mut ext = self.pre_build().await?;
1226		ext.commit_all().unwrap();
1227
1228		info!(
1229			target: LOG_TARGET,
1230			"initialized state externalities with storage root {:?} and state_version {:?}",
1231			ext.as_backend().root(),
1232			ext.state_version
1233		);
1234
1235		Ok(ext)
1236	}
1237}
1238
1239#[cfg(test)]
1240mod test_prelude {
1241	pub(crate) use super::*;
1242	pub(crate) use sp_runtime::testing::{Block as RawBlock, ExtrinsicWrapper, H256 as Hash};
1243	pub(crate) type Block = RawBlock<ExtrinsicWrapper<Hash>>;
1244
1245	pub(crate) fn init_logger() {
1246		sp_tracing::try_init_simple();
1247	}
1248}
1249
1250#[cfg(test)]
1251mod tests {
1252	use super::test_prelude::*;
1253
1254	#[tokio::test]
1255	async fn can_load_state_snapshot() {
1256		init_logger();
1257		Builder::<Block>::new()
1258			.mode(Mode::Offline(OfflineConfig {
1259				state_snapshot: SnapshotConfig::new("test_data/test.snap"),
1260			}))
1261			.build()
1262			.await
1263			.unwrap()
1264			.execute_with(|| {});
1265	}
1266
1267	#[tokio::test]
1268	async fn can_exclude_from_snapshot() {
1269		init_logger();
1270
1271		// get the first key from the snapshot file.
1272		let some_key = Builder::<Block>::new()
1273			.mode(Mode::Offline(OfflineConfig {
1274				state_snapshot: SnapshotConfig::new("test_data/test.snap"),
1275			}))
1276			.build()
1277			.await
1278			.expect("Can't read state snapshot file")
1279			.execute_with(|| {
1280				let key =
1281					sp_io::storage::next_key(&[]).expect("some key must exist in the snapshot");
1282				assert!(sp_io::storage::get(&key).is_some());
1283				key
1284			});
1285
1286		Builder::<Block>::new()
1287			.mode(Mode::Offline(OfflineConfig {
1288				state_snapshot: SnapshotConfig::new("test_data/test.snap"),
1289			}))
1290			.blacklist_hashed_key(&some_key)
1291			.build()
1292			.await
1293			.expect("Can't read state snapshot file")
1294			.execute_with(|| assert!(sp_io::storage::get(&some_key).is_none()));
1295	}
1296}
1297
1298#[cfg(all(test, feature = "remote-test"))]
1299mod remote_tests {
1300	use super::test_prelude::*;
1301	use std::{env, os::unix::fs::MetadataExt};
1302
1303	fn endpoint() -> String {
1304		env::var("TEST_WS").unwrap_or_else(|_| DEFAULT_HTTP_ENDPOINT.to_string())
1305	}
1306
1307	#[tokio::test]
1308	async fn state_version_is_kept_and_can_be_altered() {
1309		const CACHE: &'static str = "state_version_is_kept_and_can_be_altered";
1310		init_logger();
1311
1312		// first, build a snapshot.
1313		let ext = Builder::<Block>::new()
1314			.mode(Mode::Online(OnlineConfig {
1315				transport: endpoint().clone().into(),
1316				pallets: vec!["Proxy".to_owned()],
1317				child_trie: false,
1318				state_snapshot: Some(SnapshotConfig::new(CACHE)),
1319				..Default::default()
1320			}))
1321			.build()
1322			.await
1323			.unwrap();
1324
1325		// now re-create the same snapshot.
1326		let cached_ext = Builder::<Block>::new()
1327			.mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
1328			.build()
1329			.await
1330			.unwrap();
1331
1332		assert_eq!(ext.state_version, cached_ext.state_version);
1333
1334		// now overwrite it
1335		let other = match ext.state_version {
1336			StateVersion::V0 => StateVersion::V1,
1337			StateVersion::V1 => StateVersion::V0,
1338		};
1339		let cached_ext = Builder::<Block>::new()
1340			.mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
1341			.overwrite_state_version(other)
1342			.build()
1343			.await
1344			.unwrap();
1345
1346		assert_eq!(cached_ext.state_version, other);
1347	}
1348
1349	#[tokio::test]
1350	async fn snapshot_block_hash_works() {
1351		const CACHE: &'static str = "snapshot_block_hash_works";
1352		init_logger();
1353
1354		// first, build a snapshot.
1355		let ext = Builder::<Block>::new()
1356			.mode(Mode::Online(OnlineConfig {
1357				transport: endpoint().clone().into(),
1358				pallets: vec!["Proxy".to_owned()],
1359				child_trie: false,
1360				state_snapshot: Some(SnapshotConfig::new(CACHE)),
1361				..Default::default()
1362			}))
1363			.build()
1364			.await
1365			.unwrap();
1366
1367		// now re-create the same snapshot.
1368		let cached_ext = Builder::<Block>::new()
1369			.mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
1370			.build()
1371			.await
1372			.unwrap();
1373
1374		assert_eq!(ext.header.hash(), cached_ext.header.hash());
1375	}
1376
1377	#[tokio::test]
1378	async fn child_keys_are_loaded() {
1379		const CACHE: &'static str = "snapshot_retains_storage";
1380		init_logger();
1381
1382		// create an ext with children keys
1383		let mut child_ext = Builder::<Block>::new()
1384			.mode(Mode::Online(OnlineConfig {
1385				transport: endpoint().clone().into(),
1386				pallets: vec!["Proxy".to_owned()],
1387				child_trie: true,
1388				state_snapshot: Some(SnapshotConfig::new(CACHE)),
1389				..Default::default()
1390			}))
1391			.build()
1392			.await
1393			.unwrap();
1394
1395		// create an ext without children keys
1396		let mut ext = Builder::<Block>::new()
1397			.mode(Mode::Online(OnlineConfig {
1398				transport: endpoint().clone().into(),
1399				pallets: vec!["Proxy".to_owned()],
1400				child_trie: false,
1401				state_snapshot: Some(SnapshotConfig::new(CACHE)),
1402				..Default::default()
1403			}))
1404			.build()
1405			.await
1406			.unwrap();
1407
1408		// there should be more keys in the child ext.
1409		assert!(
1410			child_ext.as_backend().backend_storage().keys().len() >
1411				ext.as_backend().backend_storage().keys().len()
1412		);
1413	}
1414
1415	#[tokio::test]
1416	async fn offline_else_online_works() {
1417		const CACHE: &'static str = "offline_else_online_works_data";
1418		init_logger();
1419		// this shows that in the second run, we use the remote and create a snapshot.
1420		Builder::<Block>::new()
1421			.mode(Mode::OfflineOrElseOnline(
1422				OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) },
1423				OnlineConfig {
1424					transport: endpoint().clone().into(),
1425					pallets: vec!["Proxy".to_owned()],
1426					child_trie: false,
1427					state_snapshot: Some(SnapshotConfig::new(CACHE)),
1428					..Default::default()
1429				},
1430			))
1431			.build()
1432			.await
1433			.unwrap()
1434			.execute_with(|| {});
1435
1436		// this shows that in the second run, we are not using the remote
1437		Builder::<Block>::new()
1438			.mode(Mode::OfflineOrElseOnline(
1439				OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) },
1440				OnlineConfig {
1441					transport: "ws://non-existent:666".to_owned().into(),
1442					..Default::default()
1443				},
1444			))
1445			.build()
1446			.await
1447			.unwrap()
1448			.execute_with(|| {});
1449
1450		let to_delete = std::fs::read_dir(Path::new("."))
1451			.unwrap()
1452			.into_iter()
1453			.map(|d| d.unwrap())
1454			.filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
1455			.collect::<Vec<_>>();
1456
1457		assert!(to_delete.len() == 1);
1458		std::fs::remove_file(to_delete[0].path()).unwrap();
1459	}
1460
1461	#[tokio::test]
1462	async fn can_build_one_small_pallet() {
1463		init_logger();
1464		Builder::<Block>::new()
1465			.mode(Mode::Online(OnlineConfig {
1466				transport: endpoint().clone().into(),
1467				pallets: vec!["Proxy".to_owned()],
1468				child_trie: false,
1469				..Default::default()
1470			}))
1471			.build()
1472			.await
1473			.unwrap()
1474			.execute_with(|| {});
1475	}
1476
1477	#[tokio::test]
1478	async fn can_build_few_pallet() {
1479		init_logger();
1480		Builder::<Block>::new()
1481			.mode(Mode::Online(OnlineConfig {
1482				transport: endpoint().clone().into(),
1483				pallets: vec!["Proxy".to_owned(), "Multisig".to_owned()],
1484				child_trie: false,
1485				..Default::default()
1486			}))
1487			.build()
1488			.await
1489			.unwrap()
1490			.execute_with(|| {});
1491	}
1492
1493	#[tokio::test(flavor = "multi_thread")]
1494	async fn can_create_snapshot() {
1495		const CACHE: &'static str = "can_create_snapshot";
1496		init_logger();
1497
1498		Builder::<Block>::new()
1499			.mode(Mode::Online(OnlineConfig {
1500				transport: endpoint().clone().into(),
1501				state_snapshot: Some(SnapshotConfig::new(CACHE)),
1502				pallets: vec!["Proxy".to_owned()],
1503				child_trie: false,
1504				..Default::default()
1505			}))
1506			.build()
1507			.await
1508			.unwrap()
1509			.execute_with(|| {});
1510
1511		let to_delete = std::fs::read_dir(Path::new("."))
1512			.unwrap()
1513			.into_iter()
1514			.map(|d| d.unwrap())
1515			.filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
1516			.collect::<Vec<_>>();
1517
1518		assert!(to_delete.len() == 1);
1519		let to_delete = to_delete.first().unwrap();
1520		assert!(std::fs::metadata(to_delete.path()).unwrap().size() > 1);
1521		std::fs::remove_file(to_delete.path()).unwrap();
1522	}
1523
1524	#[tokio::test]
1525	async fn can_create_child_snapshot() {
1526		const CACHE: &'static str = "can_create_child_snapshot";
1527		init_logger();
1528		Builder::<Block>::new()
1529			.mode(Mode::Online(OnlineConfig {
1530				transport: endpoint().clone().into(),
1531				state_snapshot: Some(SnapshotConfig::new(CACHE)),
1532				pallets: vec!["Crowdloan".to_owned()],
1533				child_trie: true,
1534				..Default::default()
1535			}))
1536			.build()
1537			.await
1538			.unwrap()
1539			.execute_with(|| {});
1540
1541		let to_delete = std::fs::read_dir(Path::new("."))
1542			.unwrap()
1543			.into_iter()
1544			.map(|d| d.unwrap())
1545			.filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
1546			.collect::<Vec<_>>();
1547
1548		assert!(to_delete.len() == 1);
1549		let to_delete = to_delete.first().unwrap();
1550		assert!(std::fs::metadata(to_delete.path()).unwrap().size() > 1);
1551		std::fs::remove_file(to_delete.path()).unwrap();
1552	}
1553
1554	#[tokio::test]
1555	async fn can_build_big_pallet() {
1556		if std::option_env!("TEST_WS").is_none() {
1557			return
1558		}
1559		init_logger();
1560		Builder::<Block>::new()
1561			.mode(Mode::Online(OnlineConfig {
1562				transport: endpoint().clone().into(),
1563				pallets: vec!["Staking".to_owned()],
1564				child_trie: false,
1565				..Default::default()
1566			}))
1567			.build()
1568			.await
1569			.unwrap()
1570			.execute_with(|| {});
1571	}
1572
1573	#[tokio::test]
1574	async fn can_fetch_all() {
1575		if std::option_env!("TEST_WS").is_none() {
1576			return
1577		}
1578		init_logger();
1579		Builder::<Block>::new()
1580			.mode(Mode::Online(OnlineConfig {
1581				transport: endpoint().clone().into(),
1582				..Default::default()
1583			}))
1584			.build()
1585			.await
1586			.unwrap()
1587			.execute_with(|| {});
1588	}
1589
1590	#[tokio::test]
1591	async fn can_fetch_in_parallel() {
1592		init_logger();
1593
1594		let mut builder = Builder::<Block>::new().mode(Mode::Online(OnlineConfig {
1595			transport: endpoint().clone().into(),
1596			..Default::default()
1597		}));
1598		builder.init_remote_client().await.unwrap();
1599
1600		let at = builder.as_online().at.unwrap();
1601
1602		let prefix = StorageKey(vec![13]);
1603		let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap();
1604		let para = builder.rpc_get_keys_parallel(&prefix, at, 4).await.unwrap();
1605		assert_eq!(paged, para);
1606
1607		let prefix = StorageKey(vec![]);
1608		let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap();
1609		let para = builder.rpc_get_keys_parallel(&prefix, at, 8).await.unwrap();
1610		assert_eq!(paged, para);
1611	}
1612}