referrerpolicy=no-referrer-when-downgrade

frame_remote_externalities/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// 	http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! # Remote Externalities
19//!
20//! An equivalent of `sp_io::TestExternalities` that can load its state from a remote substrate
21//! based chain, or a local state snapshot file.
22
23mod logging;
24
25use codec::{Compact, Decode, Encode};
26use indicatif::{ProgressBar, ProgressStyle};
27use jsonrpsee::{core::params::ArrayParams, http_client::HttpClient};
28use log::*;
29use serde::de::DeserializeOwned;
30use sp_core::{
31	hexdisplay::HexDisplay,
32	storage::{
33		well_known_keys::{is_default_child_storage_key, DEFAULT_CHILD_STORAGE_KEY_PREFIX},
34		ChildInfo, ChildType, PrefixedStorageKey, StorageData, StorageKey,
35	},
36};
37use sp_runtime::{
38	traits::{Block as BlockT, HashingFor},
39	StateVersion,
40};
41use sp_state_machine::TestExternalities;
42use std::{
43	cmp::{max, min},
44	fs,
45	ops::{Deref, DerefMut},
46	path::{Path, PathBuf},
47	sync::Arc,
48	time::{Duration, Instant},
49};
50use substrate_rpc_client::{rpc_params, BatchRequestBuilder, ChainApi, ClientT, StateApi};
51use tokio_retry::{strategy::FixedInterval, Retry};
52
53type Result<T, E = &'static str> = std::result::Result<T, E>;
54
55type KeyValue = (StorageKey, StorageData);
56type TopKeyValues = Vec<KeyValue>;
57type ChildKeyValues = Vec<(ChildInfo, Vec<KeyValue>)>;
58type SnapshotVersion = Compact<u16>;
59
60const LOG_TARGET: &str = "remote-ext";
61const DEFAULT_HTTP_ENDPOINT: &str = "https://try-runtime.polkadot.io:443";
62const SNAPSHOT_VERSION: SnapshotVersion = Compact(4);
63
64/// The snapshot that we store on disk.
65#[derive(Decode, Encode)]
66struct Snapshot<B: BlockT> {
67	snapshot_version: SnapshotVersion,
68	state_version: StateVersion,
69	// <Vec<Key, (Value, MemoryDbRefCount)>>
70	raw_storage: Vec<(Vec<u8>, (Vec<u8>, i32))>,
71	// The storage root of the state. This may vary from the storage root in the header, if not the
72	// entire state was fetched.
73	storage_root: B::Hash,
74	header: B::Header,
75}
76
77impl<B: BlockT> Snapshot<B> {
78	pub fn new(
79		state_version: StateVersion,
80		raw_storage: Vec<(Vec<u8>, (Vec<u8>, i32))>,
81		storage_root: B::Hash,
82		header: B::Header,
83	) -> Self {
84		Self {
85			snapshot_version: SNAPSHOT_VERSION,
86			state_version,
87			raw_storage,
88			storage_root,
89			header,
90		}
91	}
92
93	fn load(path: &PathBuf) -> Result<Snapshot<B>> {
94		let bytes = fs::read(path).map_err(|_| "fs::read failed.")?;
95		// The first item in the SCALE encoded struct bytes is the snapshot version. We decode and
96		// check that first, before proceeding to decode the rest of the snapshot.
97		let snapshot_version = SnapshotVersion::decode(&mut &*bytes)
98			.map_err(|_| "Failed to decode snapshot version")?;
99
100		if snapshot_version != SNAPSHOT_VERSION {
101			return Err("Unsupported snapshot version detected. Please create a new snapshot.")
102		}
103
104		Decode::decode(&mut &*bytes).map_err(|_| "Decode failed")
105	}
106}
107
108/// An externalities that acts exactly the same as [`sp_io::TestExternalities`] but has a few extra
109/// bits and pieces to it, and can be loaded remotely.
110pub struct RemoteExternalities<B: BlockT> {
111	/// The inner externalities.
112	pub inner_ext: TestExternalities<HashingFor<B>>,
113	/// The block header which we created this externality env.
114	pub header: B::Header,
115}
116
117impl<B: BlockT> Deref for RemoteExternalities<B> {
118	type Target = TestExternalities<HashingFor<B>>;
119	fn deref(&self) -> &Self::Target {
120		&self.inner_ext
121	}
122}
123
124impl<B: BlockT> DerefMut for RemoteExternalities<B> {
125	fn deref_mut(&mut self) -> &mut Self::Target {
126		&mut self.inner_ext
127	}
128}
129
130/// The execution mode.
131#[derive(Clone)]
132pub enum Mode<H> {
133	/// Online. Potentially writes to a snapshot file.
134	Online(OnlineConfig<H>),
135	/// Offline. Uses a state snapshot file and needs not any client config.
136	Offline(OfflineConfig),
137	/// Prefer using a snapshot file if it exists, else use a remote server.
138	OfflineOrElseOnline(OfflineConfig, OnlineConfig<H>),
139}
140
141impl<H> Default for Mode<H> {
142	fn default() -> Self {
143		Mode::Online(OnlineConfig::default())
144	}
145}
146
147/// Configuration of the offline execution.
148///
149/// A state snapshot config must be present.
150#[derive(Clone)]
151pub struct OfflineConfig {
152	/// The configuration of the state snapshot file to use. It must be present.
153	pub state_snapshot: SnapshotConfig,
154}
155
156/// Description of the transport protocol (for online execution).
157#[derive(Debug, Clone)]
158pub enum Transport {
159	/// Use the `URI` to open a new WebSocket connection.
160	Uri(String),
161	/// Use HTTP connection.
162	RemoteClient(HttpClient),
163}
164
165impl Transport {
166	fn as_client(&self) -> Option<&HttpClient> {
167		match self {
168			Self::RemoteClient(client) => Some(client),
169			_ => None,
170		}
171	}
172
173	// Build an HttpClient from a URI.
174	async fn init(&mut self) -> Result<()> {
175		if let Self::Uri(uri) = self {
176			debug!(target: LOG_TARGET, "initializing remote client to {uri:?}");
177
178			// If we have a ws uri, try to convert it to an http uri.
179			// We use an HTTP client rather than WS because WS starts to choke with "accumulated
180			// message length exceeds maximum" errors after processing ~10k keys when fetching
181			// from a node running a default configuration.
182			let uri = if uri.starts_with("ws://") {
183				let uri = uri.replace("ws://", "http://");
184				info!(target: LOG_TARGET, "replacing ws:// in uri with http://: {uri:?} (ws is currently unstable for fetching remote storage, for more see https://github.com/paritytech/jsonrpsee/issues/1086)");
185				uri
186			} else if uri.starts_with("wss://") {
187				let uri = uri.replace("wss://", "https://");
188				info!(target: LOG_TARGET, "replacing wss:// in uri with https://: {uri:?} (ws is currently unstable for fetching remote storage, for more see https://github.com/paritytech/jsonrpsee/issues/1086)");
189				uri
190			} else {
191				uri.clone()
192			};
193			let http_client = HttpClient::builder()
194				.max_request_size(u32::MAX)
195				.max_response_size(u32::MAX)
196				.request_timeout(std::time::Duration::from_secs(60 * 5))
197				.build(uri)
198				.map_err(|e| {
199					error!(target: LOG_TARGET, "error: {e:?}");
200					"failed to build http client"
201				})?;
202
203			*self = Self::RemoteClient(http_client)
204		}
205
206		Ok(())
207	}
208}
209
210impl From<String> for Transport {
211	fn from(uri: String) -> Self {
212		Transport::Uri(uri)
213	}
214}
215
216impl From<HttpClient> for Transport {
217	fn from(client: HttpClient) -> Self {
218		Transport::RemoteClient(client)
219	}
220}
221
222/// Configuration of the online execution.
223///
224/// A state snapshot config may be present and will be written to in that case.
225#[derive(Clone)]
226pub struct OnlineConfig<H> {
227	/// The block hash at which to get the runtime state. Will be latest finalized head if not
228	/// provided.
229	pub at: Option<H>,
230	/// An optional state snapshot file to WRITE to, not for reading. Not written if set to `None`.
231	pub state_snapshot: Option<SnapshotConfig>,
232	/// The pallets to scrape. These values are hashed and added to `hashed_prefix`.
233	pub pallets: Vec<String>,
234	/// Transport config.
235	pub transport: Transport,
236	/// Lookout for child-keys, and scrape them as well if set to true.
237	pub child_trie: bool,
238	/// Storage entry key prefixes to be injected into the externalities. The *hashed* prefix must
239	/// be given.
240	pub hashed_prefixes: Vec<Vec<u8>>,
241	/// Storage entry keys to be injected into the externalities. The *hashed* key must be given.
242	pub hashed_keys: Vec<Vec<u8>>,
243}
244
245impl<H: Clone> OnlineConfig<H> {
246	/// Return rpc (http) client reference.
247	fn rpc_client(&self) -> &HttpClient {
248		self.transport
249			.as_client()
250			.expect("http client must have been initialized by now; qed.")
251	}
252
253	fn at_expected(&self) -> H {
254		self.at.clone().expect("block at must be initialized; qed")
255	}
256}
257
258impl<H> Default for OnlineConfig<H> {
259	fn default() -> Self {
260		Self {
261			transport: Transport::from(DEFAULT_HTTP_ENDPOINT.to_owned()),
262			child_trie: true,
263			at: None,
264			state_snapshot: None,
265			pallets: Default::default(),
266			hashed_keys: Default::default(),
267			hashed_prefixes: Default::default(),
268		}
269	}
270}
271
272impl<H> From<String> for OnlineConfig<H> {
273	fn from(t: String) -> Self {
274		Self { transport: t.into(), ..Default::default() }
275	}
276}
277
278/// Configuration of the state snapshot.
279#[derive(Clone)]
280pub struct SnapshotConfig {
281	/// The path to the snapshot file.
282	pub path: PathBuf,
283}
284
285impl SnapshotConfig {
286	pub fn new<P: Into<PathBuf>>(path: P) -> Self {
287		Self { path: path.into() }
288	}
289}
290
291impl From<String> for SnapshotConfig {
292	fn from(s: String) -> Self {
293		Self::new(s)
294	}
295}
296
297impl Default for SnapshotConfig {
298	fn default() -> Self {
299		Self { path: Path::new("SNAPSHOT").into() }
300	}
301}
302
303/// Builder for remote-externalities.
304#[derive(Clone)]
305pub struct Builder<B: BlockT> {
306	/// Custom key-pairs to be injected into the final externalities. The *hashed* keys and values
307	/// must be given.
308	hashed_key_values: Vec<KeyValue>,
309	/// The keys that will be excluded from the final externality. The *hashed* key must be given.
310	hashed_blacklist: Vec<Vec<u8>>,
311	/// Connectivity mode, online or offline.
312	mode: Mode<B::Hash>,
313	/// If provided, overwrite the state version with this. Otherwise, the state_version of the
314	/// remote node is used. All cache files also store their state version.
315	///
316	/// Overwrite only with care.
317	overwrite_state_version: Option<StateVersion>,
318}
319
320impl<B: BlockT> Default for Builder<B> {
321	fn default() -> Self {
322		Self {
323			mode: Default::default(),
324			hashed_key_values: Default::default(),
325			hashed_blacklist: Default::default(),
326			overwrite_state_version: None,
327		}
328	}
329}
330
331// Mode methods
332impl<B: BlockT> Builder<B> {
333	fn as_online(&self) -> &OnlineConfig<B::Hash> {
334		match &self.mode {
335			Mode::Online(config) => config,
336			Mode::OfflineOrElseOnline(_, config) => config,
337			_ => panic!("Unexpected mode: Online"),
338		}
339	}
340
341	fn as_online_mut(&mut self) -> &mut OnlineConfig<B::Hash> {
342		match &mut self.mode {
343			Mode::Online(config) => config,
344			Mode::OfflineOrElseOnline(_, config) => config,
345			_ => panic!("Unexpected mode: Online"),
346		}
347	}
348}
349
350// RPC methods
351impl<B: BlockT> Builder<B>
352where
353	B::Hash: DeserializeOwned,
354	B::Header: DeserializeOwned,
355{
356	const PARALLEL_REQUESTS: usize = 4;
357	const BATCH_SIZE_INCREASE_FACTOR: f32 = 1.10;
358	const BATCH_SIZE_DECREASE_FACTOR: f32 = 0.50;
359	const REQUEST_DURATION_TARGET: Duration = Duration::from_secs(15);
360	const INITIAL_BATCH_SIZE: usize = 10;
361	// nodes by default will not return more than 1000 keys per request
362	const DEFAULT_KEY_DOWNLOAD_PAGE: u32 = 1000;
363	const MAX_RETRIES: usize = 12;
364	const KEYS_PAGE_RETRY_INTERVAL: Duration = Duration::from_secs(5);
365
366	async fn rpc_get_storage(
367		&self,
368		key: StorageKey,
369		maybe_at: Option<B::Hash>,
370	) -> Result<Option<StorageData>> {
371		trace!(target: LOG_TARGET, "rpc: get_storage");
372		self.as_online().rpc_client().storage(key, maybe_at).await.map_err(|e| {
373			error!(target: LOG_TARGET, "Error = {e:?}");
374			"rpc get_storage failed."
375		})
376	}
377
378	/// Get the latest finalized head.
379	async fn rpc_get_head(&self) -> Result<B::Hash> {
380		trace!(target: LOG_TARGET, "rpc: finalized_head");
381
382		// sadly this pretty much unreadable...
383		ChainApi::<(), _, B::Header, ()>::finalized_head(self.as_online().rpc_client())
384			.await
385			.map_err(|e| {
386				error!(target: LOG_TARGET, "Error = {e:?}");
387				"rpc finalized_head failed."
388			})
389	}
390
391	async fn get_keys_single_page(
392		&self,
393		prefix: Option<StorageKey>,
394		start_key: Option<StorageKey>,
395		at: B::Hash,
396	) -> Result<Vec<StorageKey>> {
397		self.as_online()
398			.rpc_client()
399			.storage_keys_paged(prefix, Self::DEFAULT_KEY_DOWNLOAD_PAGE, start_key, Some(at))
400			.await
401			.map_err(|e| {
402				error!(target: LOG_TARGET, "Error = {e:?}");
403				"rpc get_keys failed"
404			})
405	}
406
407	/// Get keys with `prefix` at `block` in a parallel manner.
408	async fn rpc_get_keys_parallel(
409		&self,
410		prefix: &StorageKey,
411		block: B::Hash,
412		parallel: usize,
413	) -> Result<Vec<StorageKey>> {
414		/// Divide the workload and return the start key of each chunks. Guaranteed to return a
415		/// non-empty list.
416		fn gen_start_keys(prefix: &StorageKey) -> Vec<StorageKey> {
417			let mut prefix = prefix.as_ref().to_vec();
418			let scale = 32usize.saturating_sub(prefix.len());
419
420			// no need to divide workload
421			if scale < 9 {
422				prefix.extend(vec![0; scale]);
423				return vec![StorageKey(prefix)]
424			}
425
426			let chunks = 16;
427			let step = 0x10000 / chunks;
428			let ext = scale - 2;
429
430			(0..chunks)
431				.map(|i| {
432					let mut key = prefix.clone();
433					let start = i * step;
434					key.extend(vec![(start >> 8) as u8, (start & 0xff) as u8]);
435					key.extend(vec![0; ext]);
436					StorageKey(key)
437				})
438				.collect()
439		}
440
441		let start_keys = gen_start_keys(&prefix);
442		let start_keys: Vec<Option<&StorageKey>> = start_keys.iter().map(Some).collect();
443		let mut end_keys: Vec<Option<&StorageKey>> = start_keys[1..].to_vec();
444		end_keys.push(None);
445
446		// use a semaphore to limit max scraping tasks
447		let parallel = Arc::new(tokio::sync::Semaphore::new(parallel));
448		let builder = Arc::new(self.clone());
449		let mut handles = vec![];
450
451		for (start_key, end_key) in start_keys.into_iter().zip(end_keys) {
452			let permit = parallel
453				.clone()
454				.acquire_owned()
455				.await
456				.expect("semaphore is not closed until the end of loop");
457
458			let builder = builder.clone();
459			let prefix = prefix.clone();
460			let start_key = start_key.cloned();
461			let end_key = end_key.cloned();
462
463			let handle = tokio::spawn(async move {
464				let res = builder
465					.rpc_get_keys_in_range(&prefix, block, start_key.as_ref(), end_key.as_ref())
466					.await;
467				drop(permit);
468				res
469			});
470
471			handles.push(handle);
472		}
473
474		parallel.close();
475
476		let keys = futures::future::join_all(handles)
477			.await
478			.into_iter()
479			.filter_map(|res| match res {
480				Ok(Ok(keys)) => Some(keys),
481				_ => None,
482			})
483			.flatten()
484			.collect::<Vec<StorageKey>>();
485
486		Ok(keys)
487	}
488
489	/// Get all keys with `prefix` within the given range at `block`.
490	/// Both `start_key` and `end_key` are optional if you want an open-ended range.
491	async fn rpc_get_keys_in_range(
492		&self,
493		prefix: &StorageKey,
494		block: B::Hash,
495		start_key: Option<&StorageKey>,
496		end_key: Option<&StorageKey>,
497	) -> Result<Vec<StorageKey>> {
498		let mut last_key: Option<&StorageKey> = start_key;
499		let mut keys: Vec<StorageKey> = vec![];
500
501		loop {
502			// This loop can hit the node with very rapid requests, occasionally causing it to
503			// error out in CI (https://github.com/paritytech/substrate/issues/14129), so we retry.
504			let retry_strategy =
505				FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES);
506			let get_page_closure =
507				|| self.get_keys_single_page(Some(prefix.clone()), last_key.cloned(), block);
508			let mut page = Retry::spawn(retry_strategy, get_page_closure).await?;
509
510			// avoid duplicated keys across workloads
511			if let (Some(last), Some(end)) = (page.last(), end_key) {
512				if last >= end {
513					page.retain(|key| key < end);
514				}
515			}
516
517			let page_len = page.len();
518			keys.extend(page);
519			last_key = keys.last();
520
521			// scraping out of range or no more matches,
522			// we are done either way
523			if page_len < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize {
524				debug!(target: LOG_TARGET, "last page received: {page_len}");
525				break
526			}
527
528			debug!(
529				target: LOG_TARGET,
530				"new total = {}, full page received: {}",
531				keys.len(),
532				HexDisplay::from(last_key.expect("full page received, cannot be None"))
533			);
534		}
535
536		Ok(keys)
537	}
538
539	/// Fetches storage data from a node using a dynamic batch size.
540	///
541	/// This function adjusts the batch size on the fly to help prevent overwhelming the node with
542	/// large batch requests, and stay within request size limits enforced by the node.
543	///
544	/// # Arguments
545	///
546	/// * `client` - An `Arc` wrapped `HttpClient` used for making the requests.
547	/// * `payloads` - A vector of tuples containing a JSONRPC method name and `ArrayParams`
548	///
549	/// # Returns
550	///
551	/// Returns a `Result` with a vector of `Option<StorageData>`, where each element corresponds to
552	/// the storage data for the given method and parameters. The result will be an `Err` with a
553	/// `String` error message if the request fails.
554	///
555	/// # Errors
556	///
557	/// This function will return an error if:
558	/// * The batch request fails and the batch size is less than 2.
559	/// * There are invalid batch params.
560	/// * There is an error in the batch response.
561	///
562	/// # Example
563	///
564	/// ```ignore
565	/// use your_crate::{get_storage_data_dynamic_batch_size, HttpClient, ArrayParams};
566	/// use std::sync::Arc;
567	///
568	/// async fn example() {
569	///     let client = HttpClient::new();
570	///     let payloads = vec![
571	///         ("some_method".to_string(), ArrayParams::new(vec![])),
572	///         ("another_method".to_string(), ArrayParams::new(vec![])),
573	///     ];
574	///     let initial_batch_size = 10;
575	///
576	///     let storage_data = get_storage_data_dynamic_batch_size(client, payloads, batch_size).await;
577	///     match storage_data {
578	///         Ok(data) => println!("Storage data: {:?}", data),
579	///         Err(e) => eprintln!("Error fetching storage data: {}", e),
580	///     }
581	/// }
582	/// ```
583	async fn get_storage_data_dynamic_batch_size(
584		client: &HttpClient,
585		payloads: Vec<(String, ArrayParams)>,
586		bar: &ProgressBar,
587	) -> Result<Vec<Option<StorageData>>, String> {
588		let mut all_data: Vec<Option<StorageData>> = vec![];
589		let mut start_index = 0;
590		let mut retries = 0usize;
591		let mut batch_size = Self::INITIAL_BATCH_SIZE;
592		let total_payloads = payloads.len();
593
594		while start_index < total_payloads {
595			debug!(
596				target: LOG_TARGET,
597				"Remaining payloads: {} Batch request size: {batch_size}",
598				total_payloads - start_index,
599			);
600
601			let end_index = usize::min(start_index + batch_size, total_payloads);
602			let page = &payloads[start_index..end_index];
603
604			// Build the batch request
605			let mut batch = BatchRequestBuilder::new();
606			for (method, params) in page.iter() {
607				batch
608					.insert(method, params.clone())
609					.map_err(|_| "Invalid batch method and/or params")?;
610			}
611
612			let request_started = Instant::now();
613			let batch_response = match client.batch_request::<Option<StorageData>>(batch).await {
614				Ok(batch_response) => {
615					retries = 0;
616					batch_response
617				},
618				Err(e) => {
619					if retries > Self::MAX_RETRIES {
620						return Err(e.to_string())
621					}
622
623					retries += 1;
624					let failure_log = format!(
625						"Batch request failed ({retries}/{} retries). Error: {e}",
626						Self::MAX_RETRIES
627					);
628					// after 2 subsequent failures something very wrong is happening. log a warning
629					// and reset the batch size down to 1.
630					if retries >= 2 {
631						warn!("{failure_log}");
632						batch_size = 1;
633					} else {
634						debug!("{failure_log}");
635						// Decrease batch size by DECREASE_FACTOR
636						batch_size =
637							(batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize;
638					}
639					continue
640				},
641			};
642
643			let request_duration = request_started.elapsed();
644			batch_size = if request_duration > Self::REQUEST_DURATION_TARGET {
645				// Decrease batch size
646				max(1, (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize)
647			} else {
648				// Increase batch size, but not more than the remaining total payloads to process
649				min(
650					total_payloads - start_index,
651					max(
652						batch_size + 1,
653						(batch_size as f32 * Self::BATCH_SIZE_INCREASE_FACTOR) as usize,
654					),
655				)
656			};
657
658			debug!(
659				target: LOG_TARGET,
660				"Request duration: {request_duration:?} Target duration: {:?} Last batch size: {} Next batch size: {batch_size}",
661				Self::REQUEST_DURATION_TARGET,
662				end_index - start_index,
663			);
664
665			let batch_response_len = batch_response.len();
666			for item in batch_response.into_iter() {
667				match item {
668					Ok(x) => all_data.push(x),
669					Err(e) => return Err(e.message().to_string()),
670				}
671			}
672			bar.inc(batch_response_len as u64);
673
674			// Update the start index for the next iteration
675			start_index = end_index;
676		}
677
678		Ok(all_data)
679	}
680
681	/// Synonym of `getPairs` that uses paged queries to first get the keys, and then
682	/// map them to values one by one.
683	///
684	/// This can work with public nodes. But, expect it to be darn slow.
685	pub(crate) async fn rpc_get_pairs(
686		&self,
687		prefix: StorageKey,
688		at: B::Hash,
689		pending_ext: &mut TestExternalities<HashingFor<B>>,
690	) -> Result<Vec<KeyValue>> {
691		let keys = logging::with_elapsed_async(
692			|| async {
693				// TODO: We could start downloading when having collected the first batch of keys.
694				// https://github.com/paritytech/polkadot-sdk/issues/2494
695				let keys = self
696					.rpc_get_keys_parallel(&prefix, at, Self::PARALLEL_REQUESTS)
697					.await?
698					.into_iter()
699					.collect::<Vec<_>>();
700
701				Ok(keys)
702			},
703			"Scraping keys...",
704			|keys| format!("Found {} keys", keys.len()),
705		)
706		.await?;
707
708		if keys.is_empty() {
709			return Ok(Default::default())
710		}
711
712		let client = self.as_online().rpc_client();
713		let payloads = keys
714			.iter()
715			.map(|key| ("state_getStorage".to_string(), rpc_params!(key, at)))
716			.collect::<Vec<_>>();
717
718		let bar = ProgressBar::new(payloads.len() as u64);
719		bar.enable_steady_tick(Duration::from_secs(1));
720		bar.set_message("Downloading key values".to_string());
721		bar.set_style(
722			ProgressStyle::with_template(
723				"[{elapsed_precise}] {msg} {per_sec} [{wide_bar}] {pos}/{len} ({eta})",
724			)
725			.unwrap()
726			.progress_chars("=>-"),
727		);
728		let payloads_chunked = payloads.chunks((payloads.len() / Self::PARALLEL_REQUESTS).max(1));
729		let requests = payloads_chunked.map(|payload_chunk| {
730			Self::get_storage_data_dynamic_batch_size(client, payload_chunk.to_vec(), &bar)
731		});
732		// Execute the requests and move the Result outside.
733		let storage_data_result: Result<Vec<_>, _> =
734			futures::future::join_all(requests).await.into_iter().collect();
735		// Handle the Result.
736		let storage_data = match storage_data_result {
737			Ok(storage_data) => storage_data.into_iter().flatten().collect::<Vec<_>>(),
738			Err(e) => {
739				error!(target: LOG_TARGET, "Error while getting storage data: {e}");
740				return Err("Error while getting storage data")
741			},
742		};
743		bar.finish_with_message("โœ… Downloaded key values");
744		println!();
745
746		// Check if we got responses for all submitted requests.
747		assert_eq!(keys.len(), storage_data.len());
748
749		let key_values = keys
750			.iter()
751			.zip(storage_data)
752			.map(|(key, maybe_value)| match maybe_value {
753				Some(data) => (key.clone(), data),
754				None => {
755					warn!(target: LOG_TARGET, "key {key:?} had none corresponding value.");
756					let data = StorageData(vec![]);
757					(key.clone(), data)
758				},
759			})
760			.collect::<Vec<_>>();
761
762		logging::with_elapsed(
763			|| {
764				pending_ext.batch_insert(key_values.clone().into_iter().filter_map(|(k, v)| {
765					// Don't insert the child keys here, they need to be inserted separately with
766					// all their data in the load_child_remote function.
767					match is_default_child_storage_key(&k.0) {
768						true => None,
769						false => Some((k.0, v.0)),
770					}
771				}));
772
773				Ok(())
774			},
775			"Inserting keys into DB...",
776			|_| "Inserted keys into DB".into(),
777		)
778		.expect("must succeed; qed");
779
780		Ok(key_values)
781	}
782
783	/// Get the values corresponding to `child_keys` at the given `prefixed_top_key`.
784	pub(crate) async fn rpc_child_get_storage_paged(
785		client: &HttpClient,
786		prefixed_top_key: &StorageKey,
787		child_keys: Vec<StorageKey>,
788		at: B::Hash,
789	) -> Result<Vec<KeyValue>> {
790		let child_keys_len = child_keys.len();
791
792		let payloads = child_keys
793			.iter()
794			.map(|key| {
795				(
796					"childstate_getStorage".to_string(),
797					rpc_params![
798						PrefixedStorageKey::new(prefixed_top_key.as_ref().to_vec()),
799						key,
800						at
801					],
802				)
803			})
804			.collect::<Vec<_>>();
805
806		let bar = ProgressBar::new(payloads.len() as u64);
807		let storage_data =
808			match Self::get_storage_data_dynamic_batch_size(client, payloads, &bar).await {
809				Ok(storage_data) => storage_data,
810				Err(e) => {
811					error!(target: LOG_TARGET, "batch processing failed: {e:?}");
812					return Err("batch processing failed")
813				},
814			};
815
816		assert_eq!(child_keys_len, storage_data.len());
817
818		Ok(child_keys
819			.iter()
820			.zip(storage_data)
821			.map(|(key, maybe_value)| match maybe_value {
822				Some(v) => (key.clone(), v),
823				None => {
824					warn!(target: LOG_TARGET, "key {key:?} had no corresponding value.");
825					(key.clone(), StorageData(vec![]))
826				},
827			})
828			.collect::<Vec<_>>())
829	}
830
831	pub(crate) async fn rpc_child_get_keys(
832		client: &HttpClient,
833		prefixed_top_key: &StorageKey,
834		child_prefix: StorageKey,
835		at: B::Hash,
836	) -> Result<Vec<StorageKey>> {
837		let retry_strategy =
838			FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES);
839		let mut all_child_keys = Vec::new();
840		let mut start_key = None;
841
842		loop {
843			let get_child_keys_closure = || {
844				let top_key = PrefixedStorageKey::new(prefixed_top_key.0.clone());
845				substrate_rpc_client::ChildStateApi::storage_keys_paged(
846					client,
847					top_key,
848					Some(child_prefix.clone()),
849					Self::DEFAULT_KEY_DOWNLOAD_PAGE,
850					start_key.clone(),
851					Some(at),
852				)
853			};
854
855			let child_keys = Retry::spawn(retry_strategy.clone(), get_child_keys_closure)
856				.await
857				.map_err(|e| {
858					error!(target: LOG_TARGET, "Error = {e:?}");
859					"rpc child_get_keys failed."
860				})?;
861
862			let keys_count = child_keys.len();
863			if keys_count == 0 {
864				break;
865			}
866
867			start_key = child_keys.last().cloned();
868			all_child_keys.extend(child_keys);
869
870			if keys_count < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize {
871				break;
872			}
873		}
874
875		debug!(
876			target: LOG_TARGET,
877			"[thread = {:?}] scraped {} child-keys of the child-bearing top key: {}",
878			std::thread::current().id(),
879			all_child_keys.len(),
880			HexDisplay::from(prefixed_top_key)
881		);
882
883		Ok(all_child_keys)
884	}
885}
886
887impl<B: BlockT> Builder<B>
888where
889	B::Hash: DeserializeOwned,
890	B::Header: DeserializeOwned,
891{
892	/// Load all of the child keys from the remote config, given the already scraped list of top key
893	/// pairs.
894	///
895	/// `top_kv` need not be only child-bearing top keys. It should be all of the top keys that are
896	/// included thus far.
897	///
898	/// This function concurrently populates `pending_ext`. the return value is only for writing to
899	/// cache, we can also optimize further.
900	async fn load_child_remote(
901		&self,
902		top_kv: &[KeyValue],
903		pending_ext: &mut TestExternalities<HashingFor<B>>,
904	) -> Result<ChildKeyValues> {
905		let child_roots = top_kv
906			.iter()
907			.filter(|(k, _)| is_default_child_storage_key(k.as_ref()))
908			.map(|(k, _)| k.clone())
909			.collect::<Vec<_>>();
910
911		if child_roots.is_empty() {
912			info!(target: LOG_TARGET, "๐Ÿ‘ฉโ€๐Ÿ‘ฆ no child roots found to scrape");
913			return Ok(Default::default())
914		}
915
916		info!(
917			target: LOG_TARGET,
918			"๐Ÿ‘ฉโ€๐Ÿ‘ฆ scraping child-tree data from {} top keys",
919			child_roots.len(),
920		);
921
922		let at = self.as_online().at_expected();
923
924		let client = self.as_online().rpc_client();
925		let mut child_kv = vec![];
926		for prefixed_top_key in child_roots {
927			let child_keys =
928				Self::rpc_child_get_keys(client, &prefixed_top_key, StorageKey(vec![]), at).await?;
929
930			let child_kv_inner =
931				Self::rpc_child_get_storage_paged(client, &prefixed_top_key, child_keys, at)
932					.await?;
933
934			let prefixed_top_key = PrefixedStorageKey::new(prefixed_top_key.clone().0);
935			let un_prefixed = match ChildType::from_prefixed_key(&prefixed_top_key) {
936				Some((ChildType::ParentKeyId, storage_key)) => storage_key,
937				None => {
938					error!(target: LOG_TARGET, "invalid key: {prefixed_top_key:?}");
939					return Err("Invalid child key")
940				},
941			};
942
943			let info = ChildInfo::new_default(un_prefixed);
944			let key_values =
945				child_kv_inner.iter().cloned().map(|(k, v)| (k.0, v.0)).collect::<Vec<_>>();
946			child_kv.push((info.clone(), child_kv_inner));
947			for (k, v) in key_values {
948				pending_ext.insert_child(info.clone(), k, v);
949			}
950		}
951
952		Ok(child_kv)
953	}
954
955	/// Build `Self` from a network node denoted by `uri`.
956	///
957	/// This function concurrently populates `pending_ext`. the return value is only for writing to
958	/// cache, we can also optimize further.
959	async fn load_top_remote(
960		&self,
961		pending_ext: &mut TestExternalities<HashingFor<B>>,
962	) -> Result<TopKeyValues> {
963		let config = self.as_online();
964		let at = self
965			.as_online()
966			.at
967			.expect("online config must be initialized by this point; qed.");
968		info!(target: LOG_TARGET, "scraping key-pairs from remote at block height {at:?}");
969
970		let mut keys_and_values = Vec::new();
971		for prefix in &config.hashed_prefixes {
972			let now = std::time::Instant::now();
973			let additional_key_values =
974				self.rpc_get_pairs(StorageKey(prefix.to_vec()), at, pending_ext).await?;
975			let elapsed = now.elapsed();
976			info!(
977				target: LOG_TARGET,
978				"adding data for hashed prefix: {:?}, took {:.2}s",
979				HexDisplay::from(prefix),
980				elapsed.as_secs_f32()
981			);
982			keys_and_values.extend(additional_key_values);
983		}
984
985		for key in &config.hashed_keys {
986			let key = StorageKey(key.to_vec());
987			info!(
988				target: LOG_TARGET,
989				"adding data for hashed key: {:?}",
990				HexDisplay::from(&key)
991			);
992			match self.rpc_get_storage(key.clone(), Some(at)).await? {
993				Some(value) => {
994					pending_ext.insert(key.clone().0, value.clone().0);
995					keys_and_values.push((key, value));
996				},
997				None => {
998					warn!(
999						target: LOG_TARGET,
1000						"no data found for hashed key: {:?}",
1001						HexDisplay::from(&key)
1002					);
1003				},
1004			}
1005		}
1006
1007		Ok(keys_and_values)
1008	}
1009
1010	/// The entry point of execution, if `mode` is online.
1011	///
1012	/// initializes the remote client in `transport`, and sets the `at` field, if not specified.
1013	async fn init_remote_client(&mut self) -> Result<()> {
1014		// First, initialize the http client.
1015		self.as_online_mut().transport.init().await?;
1016
1017		// Then, if `at` is not set, set it.
1018		if self.as_online().at.is_none() {
1019			let at = self.rpc_get_head().await?;
1020			info!(
1021				target: LOG_TARGET,
1022				"since no at is provided, setting it to latest finalized head, {at:?}",
1023			);
1024			self.as_online_mut().at = Some(at);
1025		}
1026
1027		// Then, a few transformation that we want to perform in the online config:
1028		let online_config = self.as_online_mut();
1029		online_config.pallets.iter().for_each(|p| {
1030			online_config
1031				.hashed_prefixes
1032				.push(sp_crypto_hashing::twox_128(p.as_bytes()).to_vec())
1033		});
1034
1035		if online_config.child_trie {
1036			online_config.hashed_prefixes.push(DEFAULT_CHILD_STORAGE_KEY_PREFIX.to_vec());
1037		}
1038
1039		// Finally, if by now, we have put any limitations on prefixes that we are interested in, we
1040		// download everything.
1041		if online_config
1042			.hashed_prefixes
1043			.iter()
1044			.filter(|p| *p != DEFAULT_CHILD_STORAGE_KEY_PREFIX)
1045			.count() == 0
1046		{
1047			info!(
1048				target: LOG_TARGET,
1049				"since no prefix is filtered, the data for all pallets will be downloaded"
1050			);
1051			online_config.hashed_prefixes.push(vec![]);
1052		}
1053
1054		Ok(())
1055	}
1056
1057	async fn load_header(&self) -> Result<B::Header> {
1058		let retry_strategy =
1059			FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES);
1060		let get_header_closure = || {
1061			ChainApi::<(), _, B::Header, ()>::header(
1062				self.as_online().rpc_client(),
1063				Some(self.as_online().at_expected()),
1064			)
1065		};
1066		Retry::spawn(retry_strategy, get_header_closure)
1067			.await
1068			.map_err(|_| "Failed to fetch header for block from network")?
1069			.ok_or("Network returned None block header")
1070	}
1071
1072	/// Load the data from a remote server. The main code path is calling into `load_top_remote` and
1073	/// `load_child_remote`.
1074	///
1075	/// Must be called after `init_remote_client`.
1076	async fn load_remote_and_maybe_save(&mut self) -> Result<TestExternalities<HashingFor<B>>> {
1077		let state_version =
1078			StateApi::<B::Hash>::runtime_version(self.as_online().rpc_client(), None)
1079				.await
1080				.map_err(|e| {
1081					error!(target: LOG_TARGET, "Error = {e:?}");
1082					"rpc runtime_version failed."
1083				})
1084				.map(|v| v.state_version())?;
1085		let mut pending_ext = TestExternalities::new_with_code_and_state(
1086			Default::default(),
1087			Default::default(),
1088			self.overwrite_state_version.unwrap_or(state_version),
1089		);
1090
1091		// Load data from the remote into `pending_ext`.
1092		let top_kv = self.load_top_remote(&mut pending_ext).await?;
1093		self.load_child_remote(&top_kv, &mut pending_ext).await?;
1094
1095		// If we need to save a snapshot, save the raw storage and root hash to the snapshot.
1096		if let Some(path) = self.as_online().state_snapshot.clone().map(|c| c.path) {
1097			let (raw_storage, storage_root) = pending_ext.into_raw_snapshot();
1098			let snapshot = Snapshot::<B>::new(
1099				state_version,
1100				raw_storage.clone(),
1101				storage_root,
1102				self.load_header().await?,
1103			);
1104			let encoded = snapshot.encode();
1105			info!(
1106				target: LOG_TARGET,
1107				"writing snapshot of {} bytes to {path:?}",
1108				encoded.len(),
1109			);
1110			std::fs::write(path, encoded).map_err(|_| "fs::write failed")?;
1111
1112			// pending_ext was consumed when creating the snapshot, need to reinitailize it
1113			return Ok(TestExternalities::from_raw_snapshot(
1114				raw_storage,
1115				storage_root,
1116				self.overwrite_state_version.unwrap_or(state_version),
1117			))
1118		}
1119
1120		Ok(pending_ext)
1121	}
1122
1123	async fn do_load_remote(&mut self) -> Result<RemoteExternalities<B>> {
1124		self.init_remote_client().await?;
1125		let inner_ext = self.load_remote_and_maybe_save().await?;
1126		Ok(RemoteExternalities { header: self.load_header().await?, inner_ext })
1127	}
1128
1129	fn do_load_offline(&mut self, config: OfflineConfig) -> Result<RemoteExternalities<B>> {
1130		let (header, inner_ext) = logging::with_elapsed(
1131			|| {
1132				info!(target: LOG_TARGET, "Loading snapshot from {:?}", &config.state_snapshot.path);
1133
1134				let Snapshot { header, state_version, raw_storage, storage_root, .. } =
1135					Snapshot::<B>::load(&config.state_snapshot.path)?;
1136				let inner_ext = TestExternalities::from_raw_snapshot(
1137					raw_storage,
1138					storage_root,
1139					self.overwrite_state_version.unwrap_or(state_version),
1140				);
1141
1142				Ok((header, inner_ext))
1143			},
1144			"Loading snapshot...",
1145			|_| "Loaded snapshot".into(),
1146		)?;
1147
1148		Ok(RemoteExternalities { inner_ext, header })
1149	}
1150
1151	pub(crate) async fn pre_build(mut self) -> Result<RemoteExternalities<B>> {
1152		let mut ext = match self.mode.clone() {
1153			Mode::Offline(config) => self.do_load_offline(config)?,
1154			Mode::Online(_) => self.do_load_remote().await?,
1155			Mode::OfflineOrElseOnline(offline_config, _) => {
1156				match self.do_load_offline(offline_config) {
1157					Ok(x) => x,
1158					Err(_) => self.do_load_remote().await?,
1159				}
1160			},
1161		};
1162
1163		// inject manual key values.
1164		if !self.hashed_key_values.is_empty() {
1165			info!(
1166				target: LOG_TARGET,
1167				"extending externalities with {} manually injected key-values",
1168				self.hashed_key_values.len()
1169			);
1170			ext.batch_insert(self.hashed_key_values.into_iter().map(|(k, v)| (k.0, v.0)));
1171		}
1172
1173		// exclude manual key values.
1174		if !self.hashed_blacklist.is_empty() {
1175			info!(
1176				target: LOG_TARGET,
1177				"excluding externalities from {} keys",
1178				self.hashed_blacklist.len()
1179			);
1180			for k in self.hashed_blacklist {
1181				ext.execute_with(|| sp_io::storage::clear(&k));
1182			}
1183		}
1184
1185		Ok(ext)
1186	}
1187}
1188
1189// Public methods
1190impl<B: BlockT> Builder<B>
1191where
1192	B::Hash: DeserializeOwned,
1193	B::Header: DeserializeOwned,
1194{
1195	/// Create a new builder.
1196	pub fn new() -> Self {
1197		Default::default()
1198	}
1199
1200	/// Inject a manual list of key and values to the storage.
1201	pub fn inject_hashed_key_value(mut self, injections: Vec<KeyValue>) -> Self {
1202		for i in injections {
1203			self.hashed_key_values.push(i.clone());
1204		}
1205		self
1206	}
1207
1208	/// Blacklist this hashed key from the final externalities. This is treated as-is, and should be
1209	/// pre-hashed.
1210	pub fn blacklist_hashed_key(mut self, hashed: &[u8]) -> Self {
1211		self.hashed_blacklist.push(hashed.to_vec());
1212		self
1213	}
1214
1215	/// Configure a state snapshot to be used.
1216	pub fn mode(mut self, mode: Mode<B::Hash>) -> Self {
1217		self.mode = mode;
1218		self
1219	}
1220
1221	/// The state version to use.
1222	pub fn overwrite_state_version(mut self, version: StateVersion) -> Self {
1223		self.overwrite_state_version = Some(version);
1224		self
1225	}
1226
1227	pub async fn build(self) -> Result<RemoteExternalities<B>> {
1228		let mut ext = self.pre_build().await?;
1229		ext.commit_all().unwrap();
1230
1231		info!(
1232			target: LOG_TARGET,
1233			"initialized state externalities with storage root {:?} and state_version {:?}",
1234			ext.as_backend().root(),
1235			ext.state_version
1236		);
1237
1238		Ok(ext)
1239	}
1240}
1241
1242#[cfg(test)]
1243mod test_prelude {
1244	pub(crate) use super::*;
1245	pub(crate) use sp_runtime::testing::{Block as RawBlock, MockCallU64};
1246	pub(crate) type UncheckedXt = sp_runtime::testing::TestXt<MockCallU64, ()>;
1247	pub(crate) type Block = RawBlock<UncheckedXt>;
1248
1249	pub(crate) fn init_logger() {
1250		sp_tracing::try_init_simple();
1251	}
1252}
1253
1254#[cfg(test)]
1255mod tests {
1256	use super::test_prelude::*;
1257
1258	#[tokio::test]
1259	async fn can_load_state_snapshot() {
1260		init_logger();
1261		Builder::<Block>::new()
1262			.mode(Mode::Offline(OfflineConfig {
1263				state_snapshot: SnapshotConfig::new("test_data/test.snap"),
1264			}))
1265			.build()
1266			.await
1267			.unwrap()
1268			.execute_with(|| {});
1269	}
1270
1271	#[tokio::test]
1272	async fn can_exclude_from_snapshot() {
1273		init_logger();
1274
1275		// get the first key from the snapshot file.
1276		let some_key = Builder::<Block>::new()
1277			.mode(Mode::Offline(OfflineConfig {
1278				state_snapshot: SnapshotConfig::new("test_data/test.snap"),
1279			}))
1280			.build()
1281			.await
1282			.expect("Can't read state snapshot file")
1283			.execute_with(|| {
1284				let key =
1285					sp_io::storage::next_key(&[]).expect("some key must exist in the snapshot");
1286				assert!(sp_io::storage::get(&key).is_some());
1287				key
1288			});
1289
1290		Builder::<Block>::new()
1291			.mode(Mode::Offline(OfflineConfig {
1292				state_snapshot: SnapshotConfig::new("test_data/test.snap"),
1293			}))
1294			.blacklist_hashed_key(&some_key)
1295			.build()
1296			.await
1297			.expect("Can't read state snapshot file")
1298			.execute_with(|| assert!(sp_io::storage::get(&some_key).is_none()));
1299	}
1300}
1301
1302#[cfg(all(test, feature = "remote-test"))]
1303mod remote_tests {
1304	use super::test_prelude::*;
1305	use std::{env, os::unix::fs::MetadataExt};
1306
1307	fn endpoint() -> String {
1308		env::var("TEST_WS").unwrap_or_else(|_| DEFAULT_HTTP_ENDPOINT.to_string())
1309	}
1310
1311	#[tokio::test]
1312	async fn state_version_is_kept_and_can_be_altered() {
1313		const CACHE: &'static str = "state_version_is_kept_and_can_be_altered";
1314		init_logger();
1315
1316		// first, build a snapshot.
1317		let ext = Builder::<Block>::new()
1318			.mode(Mode::Online(OnlineConfig {
1319				transport: endpoint().clone().into(),
1320				pallets: vec!["Proxy".to_owned()],
1321				child_trie: false,
1322				state_snapshot: Some(SnapshotConfig::new(CACHE)),
1323				..Default::default()
1324			}))
1325			.build()
1326			.await
1327			.unwrap();
1328
1329		// now re-create the same snapshot.
1330		let cached_ext = Builder::<Block>::new()
1331			.mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
1332			.build()
1333			.await
1334			.unwrap();
1335
1336		assert_eq!(ext.state_version, cached_ext.state_version);
1337
1338		// now overwrite it
1339		let other = match ext.state_version {
1340			StateVersion::V0 => StateVersion::V1,
1341			StateVersion::V1 => StateVersion::V0,
1342		};
1343		let cached_ext = Builder::<Block>::new()
1344			.mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
1345			.overwrite_state_version(other)
1346			.build()
1347			.await
1348			.unwrap();
1349
1350		assert_eq!(cached_ext.state_version, other);
1351	}
1352
1353	#[tokio::test]
1354	async fn snapshot_block_hash_works() {
1355		const CACHE: &'static str = "snapshot_block_hash_works";
1356		init_logger();
1357
1358		// first, build a snapshot.
1359		let ext = Builder::<Block>::new()
1360			.mode(Mode::Online(OnlineConfig {
1361				transport: endpoint().clone().into(),
1362				pallets: vec!["Proxy".to_owned()],
1363				child_trie: false,
1364				state_snapshot: Some(SnapshotConfig::new(CACHE)),
1365				..Default::default()
1366			}))
1367			.build()
1368			.await
1369			.unwrap();
1370
1371		// now re-create the same snapshot.
1372		let cached_ext = Builder::<Block>::new()
1373			.mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
1374			.build()
1375			.await
1376			.unwrap();
1377
1378		assert_eq!(ext.header.hash(), cached_ext.header.hash());
1379	}
1380
1381	#[tokio::test]
1382	async fn child_keys_are_loaded() {
1383		const CACHE: &'static str = "snapshot_retains_storage";
1384		init_logger();
1385
1386		// create an ext with children keys
1387		let mut child_ext = Builder::<Block>::new()
1388			.mode(Mode::Online(OnlineConfig {
1389				transport: endpoint().clone().into(),
1390				pallets: vec!["Proxy".to_owned()],
1391				child_trie: true,
1392				state_snapshot: Some(SnapshotConfig::new(CACHE)),
1393				..Default::default()
1394			}))
1395			.build()
1396			.await
1397			.unwrap();
1398
1399		// create an ext without children keys
1400		let mut ext = Builder::<Block>::new()
1401			.mode(Mode::Online(OnlineConfig {
1402				transport: endpoint().clone().into(),
1403				pallets: vec!["Proxy".to_owned()],
1404				child_trie: false,
1405				state_snapshot: Some(SnapshotConfig::new(CACHE)),
1406				..Default::default()
1407			}))
1408			.build()
1409			.await
1410			.unwrap();
1411
1412		// there should be more keys in the child ext.
1413		assert!(
1414			child_ext.as_backend().backend_storage().keys().len() >
1415				ext.as_backend().backend_storage().keys().len()
1416		);
1417	}
1418
1419	#[tokio::test]
1420	async fn offline_else_online_works() {
1421		const CACHE: &'static str = "offline_else_online_works_data";
1422		init_logger();
1423		// this shows that in the second run, we use the remote and create a snapshot.
1424		Builder::<Block>::new()
1425			.mode(Mode::OfflineOrElseOnline(
1426				OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) },
1427				OnlineConfig {
1428					transport: endpoint().clone().into(),
1429					pallets: vec!["Proxy".to_owned()],
1430					child_trie: false,
1431					state_snapshot: Some(SnapshotConfig::new(CACHE)),
1432					..Default::default()
1433				},
1434			))
1435			.build()
1436			.await
1437			.unwrap()
1438			.execute_with(|| {});
1439
1440		// this shows that in the second run, we are not using the remote
1441		Builder::<Block>::new()
1442			.mode(Mode::OfflineOrElseOnline(
1443				OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) },
1444				OnlineConfig {
1445					transport: "ws://non-existent:666".to_owned().into(),
1446					..Default::default()
1447				},
1448			))
1449			.build()
1450			.await
1451			.unwrap()
1452			.execute_with(|| {});
1453
1454		let to_delete = std::fs::read_dir(Path::new("."))
1455			.unwrap()
1456			.into_iter()
1457			.map(|d| d.unwrap())
1458			.filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
1459			.collect::<Vec<_>>();
1460
1461		assert!(to_delete.len() == 1);
1462		std::fs::remove_file(to_delete[0].path()).unwrap();
1463	}
1464
1465	#[tokio::test]
1466	async fn can_build_one_small_pallet() {
1467		init_logger();
1468		Builder::<Block>::new()
1469			.mode(Mode::Online(OnlineConfig {
1470				transport: endpoint().clone().into(),
1471				pallets: vec!["Proxy".to_owned()],
1472				child_trie: false,
1473				..Default::default()
1474			}))
1475			.build()
1476			.await
1477			.unwrap()
1478			.execute_with(|| {});
1479	}
1480
1481	#[tokio::test]
1482	async fn can_build_few_pallet() {
1483		init_logger();
1484		Builder::<Block>::new()
1485			.mode(Mode::Online(OnlineConfig {
1486				transport: endpoint().clone().into(),
1487				pallets: vec!["Proxy".to_owned(), "Multisig".to_owned()],
1488				child_trie: false,
1489				..Default::default()
1490			}))
1491			.build()
1492			.await
1493			.unwrap()
1494			.execute_with(|| {});
1495	}
1496
1497	#[tokio::test(flavor = "multi_thread")]
1498	async fn can_create_snapshot() {
1499		const CACHE: &'static str = "can_create_snapshot";
1500		init_logger();
1501
1502		Builder::<Block>::new()
1503			.mode(Mode::Online(OnlineConfig {
1504				transport: endpoint().clone().into(),
1505				state_snapshot: Some(SnapshotConfig::new(CACHE)),
1506				pallets: vec!["Proxy".to_owned()],
1507				child_trie: false,
1508				..Default::default()
1509			}))
1510			.build()
1511			.await
1512			.unwrap()
1513			.execute_with(|| {});
1514
1515		let to_delete = std::fs::read_dir(Path::new("."))
1516			.unwrap()
1517			.into_iter()
1518			.map(|d| d.unwrap())
1519			.filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
1520			.collect::<Vec<_>>();
1521
1522		assert!(to_delete.len() == 1);
1523		let to_delete = to_delete.first().unwrap();
1524		assert!(std::fs::metadata(to_delete.path()).unwrap().size() > 1);
1525		std::fs::remove_file(to_delete.path()).unwrap();
1526	}
1527
1528	#[tokio::test]
1529	async fn can_create_child_snapshot() {
1530		const CACHE: &'static str = "can_create_child_snapshot";
1531		init_logger();
1532		Builder::<Block>::new()
1533			.mode(Mode::Online(OnlineConfig {
1534				transport: endpoint().clone().into(),
1535				state_snapshot: Some(SnapshotConfig::new(CACHE)),
1536				pallets: vec!["Crowdloan".to_owned()],
1537				child_trie: true,
1538				..Default::default()
1539			}))
1540			.build()
1541			.await
1542			.unwrap()
1543			.execute_with(|| {});
1544
1545		let to_delete = std::fs::read_dir(Path::new("."))
1546			.unwrap()
1547			.into_iter()
1548			.map(|d| d.unwrap())
1549			.filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
1550			.collect::<Vec<_>>();
1551
1552		assert!(to_delete.len() == 1);
1553		let to_delete = to_delete.first().unwrap();
1554		assert!(std::fs::metadata(to_delete.path()).unwrap().size() > 1);
1555		std::fs::remove_file(to_delete.path()).unwrap();
1556	}
1557
1558	#[tokio::test]
1559	async fn can_build_big_pallet() {
1560		if std::option_env!("TEST_WS").is_none() {
1561			return
1562		}
1563		init_logger();
1564		Builder::<Block>::new()
1565			.mode(Mode::Online(OnlineConfig {
1566				transport: endpoint().clone().into(),
1567				pallets: vec!["Staking".to_owned()],
1568				child_trie: false,
1569				..Default::default()
1570			}))
1571			.build()
1572			.await
1573			.unwrap()
1574			.execute_with(|| {});
1575	}
1576
1577	#[tokio::test]
1578	async fn can_fetch_all() {
1579		if std::option_env!("TEST_WS").is_none() {
1580			return
1581		}
1582		init_logger();
1583		Builder::<Block>::new()
1584			.mode(Mode::Online(OnlineConfig {
1585				transport: endpoint().clone().into(),
1586				..Default::default()
1587			}))
1588			.build()
1589			.await
1590			.unwrap()
1591			.execute_with(|| {});
1592	}
1593
1594	#[tokio::test]
1595	async fn can_fetch_in_parallel() {
1596		init_logger();
1597
1598		let mut builder = Builder::<Block>::new().mode(Mode::Online(OnlineConfig {
1599			transport: endpoint().clone().into(),
1600			..Default::default()
1601		}));
1602		builder.init_remote_client().await.unwrap();
1603
1604		let at = builder.as_online().at.unwrap();
1605
1606		let prefix = StorageKey(vec![13]);
1607		let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap();
1608		let para = builder.rpc_get_keys_parallel(&prefix, at, 4).await.unwrap();
1609		assert_eq!(paged, para);
1610
1611		let prefix = StorageKey(vec![]);
1612		let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap();
1613		let para = builder.rpc_get_keys_parallel(&prefix, at, 8).await.unwrap();
1614		assert_eq!(paged, para);
1615	}
1616}