frame_remote_externalities/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// 	http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! # Remote Externalities
19//!
20//! An equivalent of `sp_io::TestExternalities` that can load its state from a remote substrate
21//! based chain, or a local state snapshot file.
22
23mod client;
24mod config;
25mod key_range;
26mod logging;
27mod parallel;
28
29pub use config::{Mode, OfflineConfig, OnlineConfig, SnapshotConfig};
30
31use client::{with_timeout, Client, ConnectionManager, RPC_TIMEOUT};
32use codec::Encode;
33use config::Snapshot;
34#[cfg(all(test, feature = "remote-test"))]
35use config::DEFAULT_HTTP_ENDPOINT;
36use indicatif::{ProgressBar, ProgressStyle};
37use jsonrpsee::core::params::ArrayParams;
38use log::*;
39use parallel::{run_workers, ProcessResult};
40use serde::de::DeserializeOwned;
41use sp_core::{
42	hexdisplay::HexDisplay,
43	storage::{
44		well_known_keys::{is_default_child_storage_key, DEFAULT_CHILD_STORAGE_KEY_PREFIX},
45		ChildInfo, ChildType, PrefixedStorageKey, StorageData, StorageKey,
46	},
47};
48use sp_runtime::{
49	traits::{Block as BlockT, HashingFor},
50	StateVersion,
51};
52use sp_state_machine::TestExternalities;
53use std::{
54	collections::{BTreeSet, VecDeque},
55	future::Future,
56	ops::{Deref, DerefMut},
57	sync::{
58		atomic::{AtomicUsize, Ordering},
59		Arc, Mutex,
60	},
61	time::Duration,
62};
63use substrate_rpc_client::{rpc_params, BatchRequestBuilder, ChainApi, ClientT, StateApi};
64
65use crate::key_range::{initialize_work_queue, subdivide_remaining_range};
66
67type Result<T, E = &'static str> = std::result::Result<T, E>;
68
69type KeyValue = (StorageKey, StorageData);
70type TopKeyValues = Vec<KeyValue>;
71type ChildKeyValues = Vec<(ChildInfo, Vec<KeyValue>)>;
72
73const LOG_TARGET: &str = "remote-ext";
74
75/// An externalities that acts exactly the same as [`sp_io::TestExternalities`] but has a few extra
76/// bits and pieces to it, and can be loaded remotely.
77pub struct RemoteExternalities<B: BlockT> {
78	/// The inner externalities.
79	pub inner_ext: TestExternalities<HashingFor<B>>,
80	/// The block header which we created this externalities env.
81	pub header: B::Header,
82}
83
84impl<B: BlockT> Deref for RemoteExternalities<B> {
85	type Target = TestExternalities<HashingFor<B>>;
86	fn deref(&self) -> &Self::Target {
87		&self.inner_ext
88	}
89}
90
91impl<B: BlockT> DerefMut for RemoteExternalities<B> {
92	fn deref_mut(&mut self) -> &mut Self::Target {
93		&mut self.inner_ext
94	}
95}
96
97/// Builder for remote-externalities.
98#[derive(Clone)]
99pub struct Builder<B: BlockT> {
100	/// Custom key-pairs to be injected into the final externalities. The *hashed* keys and values
101	/// must be given.
102	hashed_key_values: Vec<KeyValue>,
103	/// The keys that will be excluded from the final externality. The *hashed* key must be given.
104	hashed_blacklist: Vec<Vec<u8>>,
105	/// Connectivity mode, online or offline.
106	mode: Mode<B::Hash>,
107	/// If provided, overwrite the state version with this. Otherwise, the state_version of the
108	/// remote node is used. All cache files also store their state version.
109	///
110	/// Overwrite only with care.
111	overwrite_state_version: Option<StateVersion>,
112	/// Connection manager for RPC clients (initialized during `init_remote_client`).
113	conn_manager: Option<ConnectionManager>,
114}
115
116impl<B: BlockT> Default for Builder<B> {
117	fn default() -> Self {
118		Self {
119			mode: Default::default(),
120			hashed_key_values: Default::default(),
121			hashed_blacklist: Default::default(),
122			overwrite_state_version: None,
123			conn_manager: None,
124		}
125	}
126}
127
128// Mode methods
129impl<B: BlockT> Builder<B> {
130	fn as_online(&self) -> &OnlineConfig<B::Hash> {
131		match &self.mode {
132			Mode::Online(config) => config,
133			Mode::OfflineOrElseOnline(_, config) => config,
134			_ => panic!("Unexpected mode: Online"),
135		}
136	}
137
138	fn as_online_mut(&mut self) -> &mut OnlineConfig<B::Hash> {
139		match &mut self.mode {
140			Mode::Online(config) => config,
141			Mode::OfflineOrElseOnline(_, config) => config,
142			_ => panic!("Unexpected mode: Online"),
143		}
144	}
145
146	fn conn_manager(&self) -> Result<&ConnectionManager> {
147		self.conn_manager.as_ref().ok_or("connection manager must be initialized; qed")
148	}
149}
150
151// RPC methods
152impl<B: BlockT> Builder<B>
153where
154	B::Hash: DeserializeOwned,
155	B::Header: DeserializeOwned,
156{
157	const PARALLEL_REQUESTS_PER_CLIENT: usize = 4;
158
159	fn parallel_requests(&self) -> usize {
160		self.conn_manager
161			.as_ref()
162			.map(|cm| cm.num_clients() * Self::PARALLEL_REQUESTS_PER_CLIENT)
163			.expect("connection manager must be initialized; qed")
164	}
165
166	/// Execute an RPC call on any available client. Tries each client until one succeeds.
167	///
168	/// Starts with a random client to distribute load across clients.
169	async fn with_any_client<T, E, F, Fut>(&self, op_name: &'static str, f: F) -> Result<T, ()>
170	where
171		F: Fn(Client) -> Fut,
172		Fut: Future<Output = std::result::Result<T, E>>,
173		E: std::fmt::Debug,
174	{
175		let conn_manager = self.conn_manager().map_err(|_| ())?;
176		let num_clients = conn_manager.num_clients();
177		let start_offset: usize = rand::random();
178		for j in 0..num_clients {
179			let i = (start_offset + j) % num_clients;
180			let client = conn_manager.get(i).await;
181			let result = with_timeout(f(client), RPC_TIMEOUT).await;
182			match result {
183				Ok(Ok(value)) => return Ok(value),
184				Ok(Err(e)) => {
185					debug!(target: LOG_TARGET, "Client {i}: {op_name} RPC error: {e:?}");
186				},
187				Err(()) => {
188					debug!(target: LOG_TARGET, "Client {i}: {op_name} timeout");
189				},
190			}
191		}
192		Err(())
193	}
194
195	/// Get a single storage value. Tries each client until one succeeds.
196	async fn rpc_get_storage(
197		&self,
198		key: StorageKey,
199		maybe_at: Option<B::Hash>,
200	) -> Result<Option<StorageData>> {
201		trace!(target: LOG_TARGET, "rpc: get_storage");
202		self.with_any_client("get_storage", move |client| {
203			let key = key.clone();
204			async move { client.storage(key, maybe_at).await }
205		})
206		.await
207		.map_err(|_| "rpc get_storage failed on all clients")
208	}
209
210	/// Fetch the state version from the runtime. Tries each client until one succeeds.
211	async fn fetch_state_version(&self) -> Result<StateVersion> {
212		let conn_manager = self.conn_manager()?;
213
214		for i in 0..conn_manager.num_clients() {
215			let client = conn_manager.get(i).await;
216			let result = with_timeout(
217				StateApi::<B::Hash>::runtime_version(client.ws_client.as_ref(), None),
218				RPC_TIMEOUT,
219			)
220			.await;
221
222			match result {
223				Ok(Ok(version)) => return Ok(version.state_version()),
224				Ok(Err(e)) => {
225					debug!(target: LOG_TARGET, "Client {i}: runtime_version RPC error: {e:?}");
226				},
227				Err(()) => {
228					debug!(target: LOG_TARGET, "Client {i}: runtime_version timeout");
229				},
230			}
231		}
232
233		Err("rpc runtime_version failed on all clients")
234	}
235
236	/// Get the latest finalized head. Tries each client until one succeeds.
237	async fn rpc_get_head(&self) -> Result<B::Hash> {
238		trace!(target: LOG_TARGET, "rpc: finalized_head");
239		self.with_any_client("finalized_head", |client| async move {
240			ChainApi::<(), _, B::Header, ()>::finalized_head(&*client).await
241		})
242		.await
243		.map_err(|_| "rpc finalized_head failed on all clients")
244	}
245
246	/// Get keys with `prefix` at `block` using parallel workers.
247	async fn rpc_get_keys_parallel(
248		&self,
249		prefix: &StorageKey,
250		block: B::Hash,
251		parallel: usize,
252	) -> Result<Vec<StorageKey>> {
253		let work_queue = initialize_work_queue(&[prefix.clone()]);
254		let initial_ranges = work_queue.lock().unwrap().len();
255		info!(target: LOG_TARGET, "🔧 Initialized work queue with {initial_ranges} ranges");
256
257		let conn_manager = self.conn_manager()?;
258		info!(target: LOG_TARGET, "🌐 Using {} RPC provider(s)", conn_manager.num_clients());
259		info!(target: LOG_TARGET, "🚀 Spawning {parallel} parallel workers for key fetching");
260
261		let all_keys: Arc<Mutex<BTreeSet<StorageKey>>> = Arc::new(Mutex::new(BTreeSet::new()));
262		let last_logged_milestone = Arc::new(AtomicUsize::new(0));
263		let initial_work = work_queue.lock().unwrap().drain(..).collect();
264		let all_keys_for_result = all_keys.clone();
265
266		run_workers(initial_work, conn_manager, parallel, move |worker_index, range, client| {
267			let all_keys = all_keys.clone();
268			let last_logged_milestone = last_logged_milestone.clone();
269
270			async move {
271				trace!(
272					target: LOG_TARGET,
273					"Worker {worker_index}: fetching keys starting at {:?} (page_size: {})",
274					HexDisplay::from(&range.start_key.0),
275					range.page_size
276				);
277
278				let rpc_result = with_timeout(
279					client.storage_keys_paged(
280						Some(range.prefix.clone()),
281						range.page_size,
282						Some(range.start_key.clone()),
283						Some(block),
284					),
285					RPC_TIMEOUT,
286				)
287				.await;
288
289				let page = match rpc_result {
290					Ok(Ok(p)) => p,
291					Ok(Err(e)) => {
292						debug!(target: LOG_TARGET, "Worker {worker_index}: RPC error: {e:?}");
293						return ProcessResult::Retry {
294							work: range.with_halved_page_size(),
295							sleep_duration: Duration::from_secs(15),
296							recreate_client: true,
297						};
298					},
299					Err(()) => {
300						debug!(target: LOG_TARGET, "Worker {worker_index}: timeout");
301						return ProcessResult::Retry {
302							work: range.with_halved_page_size(),
303							sleep_duration: Duration::from_secs(5),
304							recreate_client: true,
305						};
306					},
307				};
308
309				// Filter keys and determine if this was a full batch
310				let (page, is_full_batch) = range.filter_keys(page);
311				let last_two_keys = if page.len() >= 2 {
312					Some((page[page.len() - 2].clone(), page[page.len() - 1].clone()))
313				} else {
314					None
315				};
316
317				let total_keys = {
318					let mut keys = all_keys.lock().unwrap();
319					keys.extend(page);
320					keys.len()
321				};
322
323				// Log progress every 10,000 keys
324				const LOG_INTERVAL: usize = 10_000;
325				let current_milestone = (total_keys / LOG_INTERVAL) * LOG_INTERVAL;
326				let last_milestone = last_logged_milestone.load(Ordering::Relaxed);
327				if current_milestone > last_milestone && current_milestone > 0 {
328					if last_logged_milestone
329						.compare_exchange(
330							last_milestone,
331							current_milestone,
332							Ordering::SeqCst,
333							Ordering::Relaxed,
334						)
335						.is_ok()
336					{
337						info!(target: LOG_TARGET, "📊 Scraped {total_keys} keys so far...");
338					}
339				}
340
341				// Subdivide remaining range if this was a full batch
342				let new_work = if is_full_batch {
343					if let Some((second_last, last)) = last_two_keys {
344						subdivide_remaining_range(
345							&second_last,
346							&last,
347							range.end_key.as_ref(),
348							&range.prefix,
349						)
350					} else {
351						vec![]
352					}
353				} else {
354					vec![]
355				};
356
357				ProcessResult::Success { new_work }
358			}
359		})
360		.await;
361
362		let keys: Vec<_> = all_keys_for_result.lock().unwrap().iter().cloned().collect();
363		info!(target: LOG_TARGET, "🎉 Parallel key fetching complete: {} unique keys", keys.len());
364
365		Ok(keys)
366	}
367
368	/// Fetches storage data from a node using a dynamic batch size.
369	///
370	/// This function adjusts the batch size on the fly to help prevent overwhelming the node with
371	/// large batch requests, and stay within request size limits enforced by the node.
372	///
373	/// # Arguments
374	///
375	/// * `client` - An `Arc` wrapped `HttpClient` used for making the requests.
376	/// * `payloads` - A vector of tuples containing a JSONRPC method name and `ArrayParams`
377	///
378	/// # Returns
379	///
380	/// Returns a `Result` with a vector of `Option<StorageData>`, where each element corresponds to
381	/// the storage data for the given method and parameters. The result will be an `Err` with a
382	/// `String` error message if the request fails.
383	///
384	/// # Errors
385	///
386	/// This function will return an error if:
387	/// * The batch request fails and the batch size is less than 2.
388	/// * There are invalid batch params.
389	/// * There is an error in the batch response.
390	///
391	/// # Example
392	///
393	/// ```ignore
394	/// use your_crate::{get_storage_data_dynamic_batch_size, HttpClient, ArrayParams};
395	/// use std::sync::Arc;
396	///
397	/// async fn example() {
398	///     let client = HttpClient::new();
399	///     let payloads = vec![
400	///         ("some_method".to_string(), ArrayParams::new(vec![])),
401	///         ("another_method".to_string(), ArrayParams::new(vec![])),
402	///     ];
403	///     let initial_batch_size = 10;
404	///
405	///     let storage_data = get_storage_data_dynamic_batch_size(client, payloads, batch_size).await;
406	///     match storage_data {
407	///         Ok(data) => println!("Storage data: {:?}", data),
408	///         Err(e) => eprintln!("Error fetching storage data: {}", e),
409	///     }
410	/// }
411	/// ```
412	async fn get_storage_data_dynamic_batch_size(
413		client: &Client,
414		worker_index: usize,
415		payloads: &[(String, ArrayParams)],
416		bar: &ProgressBar,
417		batch_size: usize,
418	) -> std::result::Result<Vec<Option<StorageData>>, String> {
419		let mut all_data: Vec<Option<StorageData>> = vec![];
420		let mut start_index = 0;
421		let total_payloads = payloads.len();
422
423		while start_index < total_payloads {
424			let end_index = usize::min(start_index + batch_size, total_payloads);
425			let page = &payloads[start_index..end_index];
426
427			trace!(
428				target: LOG_TARGET,
429				"Worker {worker_index}: fetching values {start_index}..{end_index} of {total_payloads}",
430			);
431
432			// Build the batch request
433			let mut batch = BatchRequestBuilder::new();
434			for (method, params) in page.iter() {
435				if batch.insert(method, params.clone()).is_err() {
436					panic!("Invalid batch method and/or params; qed");
437				}
438			}
439
440			let rpc_result = with_timeout(
441				client.ws_client.batch_request::<Option<StorageData>>(batch),
442				RPC_TIMEOUT,
443			)
444			.await;
445
446			let batch_response = match rpc_result {
447				Ok(Ok(r)) => r,
448				Ok(Err(e)) => return Err(format!("RPC error: {e:?}")),
449				Err(()) => return Err("timeout".to_string()),
450			};
451
452			let batch_response_len = batch_response.len();
453			for item in batch_response.into_iter() {
454				match item {
455					Ok(x) => all_data.push(x),
456					Err(e) => {
457						warn!(target: LOG_TARGET, "Value worker {worker_index}: batch item error: {}", e.message());
458						all_data.push(None);
459					},
460				}
461			}
462			bar.inc(batch_response_len as u64);
463
464			start_index = end_index;
465		}
466
467		Ok(all_data)
468	}
469
470	/// Synonym of `getPairs` that uses paged queries to first get the keys, and then
471	/// map them to values one by one.
472	///
473	/// This can work with public nodes. But, expect it to be darn slow.
474	pub(crate) async fn rpc_get_pairs(
475		&self,
476		prefix: StorageKey,
477		at: B::Hash,
478		pending_ext: &mut TestExternalities<HashingFor<B>>,
479	) -> Result<Vec<KeyValue>> {
480		let parallel = self.parallel_requests();
481		let keys = logging::with_elapsed_async(
482			|| async { self.rpc_get_keys_parallel(&prefix, at, parallel).await },
483			"Scraping keys...",
484			|keys| format!("Found {} keys", keys.len()),
485		)
486		.await?;
487
488		if keys.is_empty() {
489			return Ok(Default::default())
490		}
491
492		let conn_manager = self.conn_manager()?;
493
494		let payloads = keys
495			.iter()
496			.map(|key| ("state_getStorage".to_string(), rpc_params!(key, at)))
497			.collect::<Vec<_>>();
498
499		let bar = ProgressBar::new(payloads.len() as u64);
500		bar.enable_steady_tick(Duration::from_secs(1));
501		bar.set_message("Downloading key values".to_string());
502		bar.set_style(
503			ProgressStyle::with_template(
504				"[{elapsed_precise}] {msg} {per_sec} [{wide_bar}] {pos}/{len} ({eta})",
505			)
506			.unwrap()
507			.progress_chars("=>-"),
508		);
509
510		// Create batches of payloads for dynamic work distribution
511		// Each batch is: (start_index, payloads, batch_size)
512		const BATCH_SIZE: usize = 1000;
513		let batches: VecDeque<_> = payloads
514			.chunks(BATCH_SIZE)
515			.enumerate()
516			.map(|(i, chunk)| (i * BATCH_SIZE, chunk.to_vec(), BATCH_SIZE))
517			.collect();
518
519		info!(target: LOG_TARGET, "🔧 Initialized {} batches for value fetching", batches.len());
520		info!(target: LOG_TARGET, "🚀 Spawning {parallel} parallel workers for value fetching");
521
522		let results: Arc<Mutex<Vec<Option<StorageData>>>> =
523			Arc::new(Mutex::new(vec![None; payloads.len()]));
524		let results_for_extraction = results.clone();
525		let bar_for_finish = bar.clone();
526
527		run_workers(
528			batches,
529			conn_manager,
530			parallel,
531			move |worker_index, (start_index, batch, batch_size), client| {
532				let results = results.clone();
533				let bar = bar.clone();
534
535				async move {
536					debug!(
537						target: LOG_TARGET,
538						"Value worker {worker_index}: Processing batch at {start_index} with {} payloads",
539						batch.len()
540					);
541
542					match Self::get_storage_data_dynamic_batch_size(
543						&client,
544						worker_index,
545						&batch,
546						&bar,
547						batch_size,
548					)
549					.await
550					{
551						Ok(batch_results) => {
552							let mut results_lock = results.lock().unwrap();
553							for (offset, result) in batch_results.into_iter().enumerate() {
554								results_lock[start_index + offset] = result;
555							}
556							ProcessResult::Success { new_work: vec![] }
557						},
558						Err(e) => {
559							debug!(target: LOG_TARGET, "Value worker {worker_index}: failed: {e:?}");
560							let new_batch_size = (batch_size / 2).max(10);
561							ProcessResult::Retry {
562								work: (start_index, batch, new_batch_size),
563								sleep_duration: Duration::from_secs(15),
564								recreate_client: true,
565							}
566						},
567					}
568				}
569			},
570		)
571		.await;
572
573		let storage_data = results_for_extraction.lock().unwrap().clone();
574
575		bar_for_finish.finish_with_message("✅ Downloaded key values");
576		println!();
577
578		// Check if we got responses for all submitted requests.
579		assert_eq!(keys.len(), storage_data.len());
580
581		// Filter out None values - keys without values should NOT be inserted
582		// (inserting with empty value would change the trie structure)
583		let key_values: Vec<_> = keys
584			.iter()
585			.zip(storage_data)
586			.filter_map(|(key, maybe_value)| maybe_value.map(|data| (key.clone(), data)))
587			.collect();
588
589		logging::with_elapsed(
590			|| {
591				pending_ext.batch_insert(key_values.clone().into_iter().filter_map(|(k, v)| {
592					// Don't insert the child keys here, they need to be inserted separately with
593					// all their data in the load_child_remote function.
594					match is_default_child_storage_key(&k.0) {
595						true => None,
596						false => Some((k.0, v.0)),
597					}
598				}));
599
600				Ok(())
601			},
602			"Inserting keys into DB...",
603			|_| "Inserted keys into DB".into(),
604		)
605		.expect("must succeed; qed");
606
607		Ok(key_values)
608	}
609
610	/// Get the values corresponding to `child_keys` at the given `prefixed_top_key`.
611	pub(crate) async fn rpc_child_get_storage_paged(
612		client: &Client,
613		prefixed_top_key: &StorageKey,
614		child_keys: Vec<StorageKey>,
615		at: B::Hash,
616	) -> Result<Vec<KeyValue>> {
617		let payloads: Vec<_> = child_keys
618			.iter()
619			.map(|key| {
620				(
621					"childstate_getStorage".to_string(),
622					rpc_params![
623						PrefixedStorageKey::new(prefixed_top_key.as_ref().to_vec()),
624						key,
625						at
626					],
627				)
628			})
629			.collect();
630
631		let bar = ProgressBar::new(payloads.len() as u64);
632		let storage_data =
633			Self::get_storage_data_dynamic_batch_size(client, 0, &payloads, &bar, 1000)
634				.await
635				.map_err(|_| "rpc child_get_storage failed")?;
636
637		// Filter out None values
638		Ok(child_keys
639			.into_iter()
640			.zip(storage_data)
641			.filter_map(|(key, maybe_value)| maybe_value.map(|v| (key, v)))
642			.collect())
643	}
644}
645
646impl<B: BlockT> Builder<B>
647where
648	B::Hash: DeserializeOwned,
649	B::Header: DeserializeOwned,
650{
651	/// Fetch all keys and values for a single child trie.
652	async fn fetch_single_child_trie(
653		client: &Client,
654		prefixed_top_key: &StorageKey,
655		at: B::Hash,
656	) -> Result<(ChildInfo, Vec<KeyValue>)> {
657		let top_key = PrefixedStorageKey::new(prefixed_top_key.0.clone());
658		let page_size = 1000u32;
659
660		trace!(
661			target: LOG_TARGET,
662			"Fetching child trie keys for {:?}",
663			HexDisplay::from(&prefixed_top_key.0)
664		);
665
666		// Fetch all keys for this child trie
667		let mut child_keys = Vec::new();
668		let mut start_key: Option<StorageKey> = None;
669
670		loop {
671			let rpc_result = with_timeout(
672				substrate_rpc_client::ChildStateApi::storage_keys_paged(
673					client.ws_client.as_ref(),
674					top_key.clone(),
675					Some(StorageKey(vec![])),
676					page_size,
677					start_key.clone(),
678					Some(at),
679				),
680				RPC_TIMEOUT,
681			)
682			.await;
683
684			let page = match rpc_result {
685				Ok(Ok(p)) => p,
686				Ok(Err(e)) => {
687					debug!(target: LOG_TARGET, "Child trie RPC error: {e:?}");
688					return Err("rpc child_get_keys failed");
689				},
690				Err(()) => {
691					debug!(target: LOG_TARGET, "Child trie RPC timeout");
692					return Err("rpc child_get_keys timeout");
693				},
694			};
695
696			let is_full_batch = page.len() == page_size as usize;
697			start_key = page.last().cloned();
698			child_keys.extend(page);
699
700			if !is_full_batch {
701				break;
702			}
703		}
704
705		// Fetch values for all keys
706		let child_kv =
707			Self::rpc_child_get_storage_paged(client, prefixed_top_key, child_keys, at).await?;
708
709		// Parse the child info
710		let un_prefixed = match ChildType::from_prefixed_key(&top_key) {
711			Some((ChildType::ParentKeyId, storage_key)) => storage_key,
712			None => return Err("invalid child key"),
713		};
714
715		Ok((ChildInfo::new_default(un_prefixed), child_kv))
716	}
717
718	/// Load all of the child keys from the remote config, given the already scraped list of top key
719	/// pairs.
720	///
721	/// `top_kv` need not be only child-bearing top keys. It should be all of the top keys that are
722	/// included thus far.
723	///
724	/// This function uses parallel workers to fetch child tries concurrently.
725	async fn load_child_remote(
726		&self,
727		top_kv: &[KeyValue],
728		pending_ext: &mut TestExternalities<HashingFor<B>>,
729	) -> Result<ChildKeyValues> {
730		let child_roots: VecDeque<StorageKey> = top_kv
731			.iter()
732			.filter(|(k, _)| is_default_child_storage_key(k.as_ref()))
733			.map(|(k, _)| k.clone())
734			.collect();
735
736		if child_roots.is_empty() {
737			info!(target: LOG_TARGET, "👩‍👦 no child roots found to scrape");
738			return Ok(Default::default())
739		}
740
741		let total_count = child_roots.len();
742		info!(
743			target: LOG_TARGET,
744			"👩‍👦 scraping child-tree data from {} child tries",
745			total_count,
746		);
747
748		let at = self.as_online().at_expected();
749		let conn_manager = self.conn_manager()?;
750		let parallel = self.parallel_requests();
751
752		let results: Arc<Mutex<Vec<(ChildInfo, Vec<KeyValue>)>>> = Arc::new(Mutex::new(Vec::new()));
753		let results_for_extraction = results.clone();
754		let completed_count = Arc::new(AtomicUsize::new(0));
755
756		run_workers(
757			child_roots,
758			conn_manager,
759			parallel,
760			move |worker_index, prefixed_top_key, client| {
761				let results = results.clone();
762				let completed_count = completed_count.clone();
763
764				async move {
765					match Self::fetch_single_child_trie(&client, &prefixed_top_key, at).await {
766						Ok((info, child_kv_inner)) => {
767							results.lock().unwrap().push((info, child_kv_inner));
768
769							let done = completed_count.fetch_add(1, Ordering::SeqCst) + 1;
770							if done.is_multiple_of(100) || done == total_count {
771								info!(
772									target: LOG_TARGET,
773									"👩‍👦 Child tries progress: {}/{} completed",
774									done,
775									total_count
776								);
777							}
778
779							ProcessResult::Success { new_work: vec![] }
780						},
781						Err(e) => {
782							error!(target: LOG_TARGET, "Worker {worker_index}: Failed: {e:?}");
783							ProcessResult::Retry {
784								work: prefixed_top_key,
785								sleep_duration: Duration::from_secs(5),
786								recreate_client: true,
787							}
788						},
789					}
790				}
791			},
792		)
793		.await;
794
795		// Extract results and populate pending_ext
796		let child_kv_results = results_for_extraction.lock().unwrap().clone();
797
798		let mut child_kv = Vec::new();
799		for (info, kv_inner) in child_kv_results {
800			let key_values: Vec<(Vec<u8>, Vec<u8>)> =
801				kv_inner.iter().cloned().map(|(k, v)| (k.0, v.0)).collect();
802			for (k, v) in key_values {
803				pending_ext.insert_child(info.clone(), k, v);
804			}
805			child_kv.push((info, kv_inner));
806		}
807
808		info!(
809			target: LOG_TARGET,
810			"👩‍👦 Completed scraping {} child tries",
811			child_kv.len()
812		);
813
814		Ok(child_kv)
815	}
816
817	/// Build `Self` from a network node denoted by `uri`.
818	///
819	/// This function concurrently populates `pending_ext`. the return value is only for writing to
820	/// cache, we can also optimize further.
821	async fn load_top_remote(
822		&self,
823		pending_ext: &mut TestExternalities<HashingFor<B>>,
824	) -> Result<TopKeyValues> {
825		let config = self.as_online();
826		let at = self
827			.as_online()
828			.at
829			.expect("online config must be initialized by this point; qed.");
830		info!(target: LOG_TARGET, "scraping key-pairs from remote at block height {at:?}");
831
832		let mut keys_and_values = Vec::new();
833		for prefix in &config.hashed_prefixes {
834			let now = std::time::Instant::now();
835			let additional_key_values =
836				self.rpc_get_pairs(StorageKey(prefix.to_vec()), at, pending_ext).await?;
837			let elapsed = now.elapsed();
838			info!(
839				target: LOG_TARGET,
840				"adding data for hashed prefix: {:?}, took {:.2}s",
841				HexDisplay::from(prefix),
842				elapsed.as_secs_f32()
843			);
844			keys_and_values.extend(additional_key_values);
845		}
846
847		for key in &config.hashed_keys {
848			let key = StorageKey(key.to_vec());
849			info!(
850				target: LOG_TARGET,
851				"adding data for hashed key: {:?}",
852				HexDisplay::from(&key)
853			);
854			match self.rpc_get_storage(key.clone(), Some(at)).await? {
855				Some(value) => {
856					pending_ext.insert(key.clone().0, value.clone().0);
857					keys_and_values.push((key, value));
858				},
859				None => {
860					warn!(
861						target: LOG_TARGET,
862						"no data found for hashed key: {:?}",
863						HexDisplay::from(&key)
864					);
865				},
866			}
867		}
868
869		Ok(keys_and_values)
870	}
871
872	/// The entry point of execution, if `mode` is online.
873	///
874	/// Initializes the remote clients and sets the `at` field if not specified.
875	async fn init_remote_client(&mut self) -> Result<()> {
876		// First, create all clients from URIs, filtering out ones that fail to connect.
877		let online_config = self.as_online();
878		let mut clients = Vec::new();
879		for uri in &online_config.transport_uris {
880			if let Some(client) = Client::new(uri.clone()).await {
881				clients.push(Arc::new(tokio::sync::Mutex::new(client)));
882			}
883		}
884		self.conn_manager = Some(ConnectionManager::new(clients)?);
885
886		// Then, if `at` is not set, set it.
887		if self.as_online().at.is_none() {
888			let at = self.rpc_get_head().await?;
889			info!(
890				target: LOG_TARGET,
891				"since no at is provided, setting it to latest finalized head, {at:?}",
892			);
893			self.as_online_mut().at = Some(at);
894		}
895
896		// Then, a few transformation that we want to perform in the online config:
897		let online_config = self.as_online_mut();
898		online_config.pallets.iter().for_each(|p| {
899			online_config
900				.hashed_prefixes
901				.push(sp_crypto_hashing::twox_128(p.as_bytes()).to_vec())
902		});
903
904		if online_config.child_trie {
905			online_config.hashed_prefixes.push(DEFAULT_CHILD_STORAGE_KEY_PREFIX.to_vec());
906		}
907
908		// Finally, if by now, we have put any limitations on prefixes that we are interested in, we
909		// download everything.
910		if online_config
911			.hashed_prefixes
912			.iter()
913			.filter(|p| *p != DEFAULT_CHILD_STORAGE_KEY_PREFIX)
914			.count() == 0
915		{
916			info!(
917				target: LOG_TARGET,
918				"since no prefix is filtered, the data for all pallets will be downloaded"
919			);
920			online_config.hashed_prefixes.push(vec![]);
921		}
922
923		Ok(())
924	}
925
926	/// Load the header for the target block. Tries each client until one succeeds.
927	async fn load_header(&self) -> Result<B::Header> {
928		let conn_manager = self.conn_manager()?;
929		let at = self.as_online().at_expected();
930
931		for i in 0..conn_manager.num_clients() {
932			let client = conn_manager.get(i).await;
933			let result = with_timeout(
934				ChainApi::<(), _, B::Header, ()>::header(client.ws_client.as_ref(), Some(at)),
935				RPC_TIMEOUT,
936			)
937			.await;
938
939			match result {
940				Ok(Ok(Some(header))) => return Ok(header),
941				Ok(Ok(None)) => {
942					debug!(target: LOG_TARGET, "Client {i}: header returned None");
943				},
944				Ok(Err(e)) => {
945					debug!(target: LOG_TARGET, "Client {i}: header RPC error: {e:?}");
946				},
947				Err(()) => {
948					debug!(target: LOG_TARGET, "Client {i}: header timeout");
949				},
950			}
951		}
952
953		Err("rpc header failed on all clients")
954	}
955
956	/// Load the data from a remote server. The main code path is calling into `load_top_remote` and
957	/// `load_child_remote`.
958	///
959	/// Must be called after `init_remote_client`.
960	async fn load_remote_and_maybe_save(&mut self) -> Result<TestExternalities<HashingFor<B>>> {
961		let state_version = self.fetch_state_version().await?;
962		let mut pending_ext = TestExternalities::new_with_code_and_state(
963			Default::default(),
964			Default::default(),
965			self.overwrite_state_version.unwrap_or(state_version),
966		);
967
968		// Load data from the remote into `pending_ext`.
969		let top_kv = self.load_top_remote(&mut pending_ext).await?;
970		self.load_child_remote(&top_kv, &mut pending_ext).await?;
971
972		let header = self.load_header().await?;
973		let (raw_storage, computed_root) = pending_ext.into_raw_snapshot();
974
975		// If we need to save a snapshot, save the raw storage and root hash to the snapshot.
976		if let Some(path) = self.as_online().state_snapshot.clone().map(|c| c.path) {
977			let snapshot =
978				Snapshot::<B>::new(state_version, raw_storage.clone(), computed_root, header);
979			let encoded = snapshot.encode();
980			info!(
981				target: LOG_TARGET,
982				"writing snapshot of {} bytes to {path:?}",
983				encoded.len(),
984			);
985			std::fs::write(path, encoded).map_err(|_| "fs::write failed")?;
986		}
987
988		// Return the externalities (reconstructed from verified snapshot)
989		Ok(TestExternalities::from_raw_snapshot(
990			raw_storage,
991			computed_root,
992			self.overwrite_state_version.unwrap_or(state_version),
993		))
994	}
995
996	async fn do_load_remote(&mut self) -> Result<RemoteExternalities<B>> {
997		self.init_remote_client().await?;
998		let inner_ext = self.load_remote_and_maybe_save().await?;
999		Ok(RemoteExternalities { header: self.load_header().await?, inner_ext })
1000	}
1001
1002	fn do_load_offline(&mut self, config: OfflineConfig) -> Result<RemoteExternalities<B>> {
1003		let (header, inner_ext) = logging::with_elapsed(
1004			|| {
1005				info!(target: LOG_TARGET, "Loading snapshot from {:?}", &config.state_snapshot.path);
1006
1007				let Snapshot { header, state_version, raw_storage, storage_root, .. } =
1008					Snapshot::<B>::load(&config.state_snapshot.path)?;
1009				let inner_ext = TestExternalities::from_raw_snapshot(
1010					raw_storage,
1011					storage_root,
1012					self.overwrite_state_version.unwrap_or(state_version),
1013				);
1014
1015				Ok((header, inner_ext))
1016			},
1017			"Loading snapshot...",
1018			|_| "Loaded snapshot".into(),
1019		)?;
1020
1021		Ok(RemoteExternalities { inner_ext, header })
1022	}
1023
1024	pub(crate) async fn pre_build(mut self) -> Result<RemoteExternalities<B>> {
1025		let mut ext = match self.mode.clone() {
1026			Mode::Offline(config) => self.do_load_offline(config)?,
1027			Mode::Online(_) => self.do_load_remote().await?,
1028			Mode::OfflineOrElseOnline(offline_config, _) => {
1029				match self.do_load_offline(offline_config) {
1030					Ok(x) => x,
1031					Err(_) => self.do_load_remote().await?,
1032				}
1033			},
1034		};
1035
1036		// inject manual key values.
1037		if !self.hashed_key_values.is_empty() {
1038			info!(
1039				target: LOG_TARGET,
1040				"extending externalities with {} manually injected key-values",
1041				self.hashed_key_values.len()
1042			);
1043			ext.batch_insert(self.hashed_key_values.into_iter().map(|(k, v)| (k.0, v.0)));
1044		}
1045
1046		// exclude manual key values.
1047		if !self.hashed_blacklist.is_empty() {
1048			info!(
1049				target: LOG_TARGET,
1050				"excluding externalities from {} keys",
1051				self.hashed_blacklist.len()
1052			);
1053			for k in self.hashed_blacklist {
1054				ext.execute_with(|| sp_io::storage::clear(&k));
1055			}
1056		}
1057
1058		Ok(ext)
1059	}
1060}
1061
1062// Public methods
1063impl<B: BlockT> Builder<B>
1064where
1065	B::Hash: DeserializeOwned,
1066	B::Header: DeserializeOwned,
1067{
1068	/// Create a new builder.
1069	pub fn new() -> Self {
1070		Default::default()
1071	}
1072
1073	/// Inject a manual list of key and values to the storage.
1074	pub fn inject_hashed_key_value(mut self, injections: Vec<KeyValue>) -> Self {
1075		self.hashed_key_values.extend(injections);
1076		self
1077	}
1078
1079	/// Blacklist this hashed key from the final externalities. This is treated as-is, and should be
1080	/// pre-hashed.
1081	pub fn blacklist_hashed_key(mut self, hashed: &[u8]) -> Self {
1082		self.hashed_blacklist.push(hashed.to_vec());
1083		self
1084	}
1085
1086	/// Configure a state snapshot to be used.
1087	pub fn mode(mut self, mode: Mode<B::Hash>) -> Self {
1088		self.mode = mode;
1089		self
1090	}
1091
1092	/// The state version to use.
1093	pub fn overwrite_state_version(mut self, version: StateVersion) -> Self {
1094		self.overwrite_state_version = Some(version);
1095		self
1096	}
1097
1098	pub async fn build(self) -> Result<RemoteExternalities<B>> {
1099		let mut ext = self.pre_build().await?;
1100		ext.commit_all().unwrap();
1101
1102		info!(
1103			target: LOG_TARGET,
1104			"initialized state externalities with storage root {:?} and state_version {:?}",
1105			ext.as_backend().root(),
1106			ext.state_version
1107		);
1108
1109		Ok(ext)
1110	}
1111}
1112
1113#[cfg(test)]
1114mod test_prelude {
1115	pub(crate) use super::*;
1116	pub(crate) use sp_runtime::testing::{Block as RawBlock, MockCallU64};
1117	pub(crate) type UncheckedXt = sp_runtime::testing::TestXt<MockCallU64, ()>;
1118	pub(crate) type Block = RawBlock<UncheckedXt>;
1119
1120	pub(crate) fn init_logger() {
1121		sp_tracing::try_init_simple();
1122	}
1123}
1124
1125#[cfg(test)]
1126mod tests {
1127	use super::test_prelude::*;
1128
1129	#[tokio::test]
1130	async fn can_load_state_snapshot() {
1131		init_logger();
1132		Builder::<Block>::new()
1133			.mode(Mode::Offline(OfflineConfig {
1134				state_snapshot: SnapshotConfig::new("test_data/test.snap"),
1135			}))
1136			.build()
1137			.await
1138			.unwrap()
1139			.execute_with(|| {});
1140	}
1141
1142	#[tokio::test]
1143	async fn can_exclude_from_snapshot() {
1144		init_logger();
1145
1146		// get the first key from the snapshot file.
1147		let some_key = Builder::<Block>::new()
1148			.mode(Mode::Offline(OfflineConfig {
1149				state_snapshot: SnapshotConfig::new("test_data/test.snap"),
1150			}))
1151			.build()
1152			.await
1153			.expect("Can't read state snapshot file")
1154			.execute_with(|| {
1155				let key =
1156					sp_io::storage::next_key(&[]).expect("some key must exist in the snapshot");
1157				assert!(sp_io::storage::get(&key).is_some());
1158				key
1159			});
1160
1161		Builder::<Block>::new()
1162			.mode(Mode::Offline(OfflineConfig {
1163				state_snapshot: SnapshotConfig::new("test_data/test.snap"),
1164			}))
1165			.blacklist_hashed_key(&some_key)
1166			.build()
1167			.await
1168			.expect("Can't read state snapshot file")
1169			.execute_with(|| assert!(sp_io::storage::get(&some_key).is_none()));
1170	}
1171}
1172
1173#[cfg(all(test, feature = "remote-test"))]
1174mod remote_tests {
1175	use super::test_prelude::*;
1176	use frame_support::storage::KeyPrefixIterator;
1177	use std::{env, os::unix::fs::MetadataExt, path::Path};
1178
1179	fn endpoint() -> String {
1180		env::var("TEST_WS").unwrap_or_else(|_| DEFAULT_HTTP_ENDPOINT.to_string())
1181	}
1182
1183	#[tokio::test]
1184	async fn state_version_is_kept_and_can_be_altered() {
1185		const CACHE: &'static str = "state_version_is_kept_and_can_be_altered";
1186		init_logger();
1187
1188		// first, build a snapshot.
1189		let ext = Builder::<Block>::new()
1190			.mode(Mode::Online(OnlineConfig {
1191				transport_uris: vec![endpoint().clone()],
1192				pallets: vec!["Proxy".to_owned()],
1193				child_trie: false,
1194				state_snapshot: Some(SnapshotConfig::new(CACHE)),
1195				..Default::default()
1196			}))
1197			.build()
1198			.await
1199			.unwrap();
1200
1201		// now re-create the same snapshot.
1202		let cached_ext = Builder::<Block>::new()
1203			.mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
1204			.build()
1205			.await
1206			.unwrap();
1207
1208		assert_eq!(ext.state_version, cached_ext.state_version);
1209
1210		// now overwrite it
1211		let other = match ext.state_version {
1212			StateVersion::V0 => StateVersion::V1,
1213			StateVersion::V1 => StateVersion::V0,
1214		};
1215		let cached_ext = Builder::<Block>::new()
1216			.mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
1217			.overwrite_state_version(other)
1218			.build()
1219			.await
1220			.unwrap();
1221
1222		assert_eq!(cached_ext.state_version, other);
1223	}
1224
1225	#[tokio::test]
1226	async fn snapshot_block_hash_works() {
1227		const CACHE: &'static str = "snapshot_block_hash_works";
1228		init_logger();
1229
1230		// first, build a snapshot.
1231		let ext = Builder::<Block>::new()
1232			.mode(Mode::Online(OnlineConfig {
1233				transport_uris: vec![endpoint().clone()],
1234				pallets: vec!["Proxy".to_owned()],
1235				child_trie: false,
1236				state_snapshot: Some(SnapshotConfig::new(CACHE)),
1237				..Default::default()
1238			}))
1239			.build()
1240			.await
1241			.unwrap();
1242
1243		// now re-create the same snapshot.
1244		let cached_ext = Builder::<Block>::new()
1245			.mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
1246			.build()
1247			.await
1248			.unwrap();
1249
1250		assert_eq!(ext.header.hash(), cached_ext.header.hash());
1251	}
1252
1253	#[tokio::test]
1254	async fn child_keys_are_loaded() {
1255		const CACHE: &'static str = "snapshot_retains_storage";
1256		init_logger();
1257
1258		// create an ext with children keys
1259		let mut child_ext = Builder::<Block>::new()
1260			.mode(Mode::Online(OnlineConfig {
1261				transport_uris: vec![endpoint().clone()],
1262				pallets: vec!["Proxy".to_owned()],
1263				child_trie: true,
1264				state_snapshot: Some(SnapshotConfig::new(CACHE)),
1265				..Default::default()
1266			}))
1267			.build()
1268			.await
1269			.unwrap();
1270
1271		// create an ext without children keys
1272		let mut ext = Builder::<Block>::new()
1273			.mode(Mode::Online(OnlineConfig {
1274				transport_uris: vec![endpoint().clone()],
1275				pallets: vec!["Proxy".to_owned()],
1276				child_trie: false,
1277				state_snapshot: Some(SnapshotConfig::new(CACHE)),
1278				..Default::default()
1279			}))
1280			.build()
1281			.await
1282			.unwrap();
1283
1284		// there should be more keys in the child ext.
1285		assert!(
1286			child_ext.as_backend().backend_storage().keys().len() >
1287				ext.as_backend().backend_storage().keys().len()
1288		);
1289	}
1290
1291	#[tokio::test]
1292	async fn offline_else_online_works() {
1293		const CACHE: &'static str = "offline_else_online_works_data";
1294		init_logger();
1295		// this shows that in the second run, we use the remote and create a snapshot.
1296		Builder::<Block>::new()
1297			.mode(Mode::OfflineOrElseOnline(
1298				OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) },
1299				OnlineConfig {
1300					transport_uris: vec![endpoint().clone()],
1301					pallets: vec!["Proxy".to_owned()],
1302					child_trie: false,
1303					state_snapshot: Some(SnapshotConfig::new(CACHE)),
1304					..Default::default()
1305				},
1306			))
1307			.build()
1308			.await
1309			.unwrap()
1310			.execute_with(|| {});
1311
1312		// this shows that in the second run, we are not using the remote
1313		Builder::<Block>::new()
1314			.mode(Mode::OfflineOrElseOnline(
1315				OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) },
1316				OnlineConfig {
1317					transport_uris: vec!["ws://non-existent:666".to_owned()],
1318					..Default::default()
1319				},
1320			))
1321			.build()
1322			.await
1323			.unwrap()
1324			.execute_with(|| {});
1325
1326		let to_delete = std::fs::read_dir(Path::new("."))
1327			.unwrap()
1328			.into_iter()
1329			.map(|d| d.unwrap())
1330			.filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
1331			.collect::<Vec<_>>();
1332
1333		assert!(to_delete.len() == 1);
1334		std::fs::remove_file(to_delete[0].path()).unwrap();
1335	}
1336
1337	#[tokio::test]
1338	async fn can_build_one_small_pallet() {
1339		init_logger();
1340		Builder::<Block>::new()
1341			.mode(Mode::Online(OnlineConfig {
1342				transport_uris: vec![endpoint().clone()],
1343				pallets: vec!["Proxy".to_owned()],
1344				child_trie: false,
1345				..Default::default()
1346			}))
1347			.build()
1348			.await
1349			.unwrap()
1350			.execute_with(|| {});
1351	}
1352
1353	#[tokio::test]
1354	async fn can_build_few_pallet() {
1355		init_logger();
1356		Builder::<Block>::new()
1357			.mode(Mode::Online(OnlineConfig {
1358				transport_uris: vec![endpoint().clone()],
1359				pallets: vec!["Proxy".to_owned(), "Multisig".to_owned()],
1360				child_trie: false,
1361				..Default::default()
1362			}))
1363			.build()
1364			.await
1365			.unwrap()
1366			.execute_with(|| {});
1367	}
1368
1369	#[tokio::test(flavor = "multi_thread")]
1370	async fn can_create_snapshot() {
1371		const CACHE: &'static str = "can_create_snapshot";
1372		init_logger();
1373
1374		Builder::<Block>::new()
1375			.mode(Mode::Online(OnlineConfig {
1376				transport_uris: vec![endpoint().clone()],
1377				state_snapshot: Some(SnapshotConfig::new(CACHE)),
1378				pallets: vec!["Proxy".to_owned()],
1379				child_trie: false,
1380				..Default::default()
1381			}))
1382			.build()
1383			.await
1384			.unwrap()
1385			.execute_with(|| {});
1386
1387		let to_delete = std::fs::read_dir(Path::new("."))
1388			.unwrap()
1389			.into_iter()
1390			.map(|d| d.unwrap())
1391			.filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
1392			.collect::<Vec<_>>();
1393
1394		assert!(to_delete.len() == 1);
1395		let to_delete = to_delete.first().unwrap();
1396		assert!(std::fs::metadata(to_delete.path()).unwrap().size() > 1);
1397		std::fs::remove_file(to_delete.path()).unwrap();
1398	}
1399
1400	#[tokio::test]
1401	async fn can_create_child_snapshot() {
1402		const CACHE: &'static str = "can_create_child_snapshot";
1403		init_logger();
1404		Builder::<Block>::new()
1405			.mode(Mode::Online(OnlineConfig {
1406				transport_uris: vec![endpoint().clone()],
1407				state_snapshot: Some(SnapshotConfig::new(CACHE)),
1408				pallets: vec!["Crowdloan".to_owned()],
1409				child_trie: true,
1410				..Default::default()
1411			}))
1412			.build()
1413			.await
1414			.unwrap()
1415			.execute_with(|| {});
1416
1417		let to_delete = std::fs::read_dir(Path::new("."))
1418			.unwrap()
1419			.into_iter()
1420			.map(|d| d.unwrap())
1421			.filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
1422			.collect::<Vec<_>>();
1423
1424		assert!(to_delete.len() == 1);
1425		let to_delete = to_delete.first().unwrap();
1426		assert!(std::fs::metadata(to_delete.path()).unwrap().size() > 1);
1427		std::fs::remove_file(to_delete.path()).unwrap();
1428	}
1429
1430	#[tokio::test]
1431	async fn can_build_big_pallet() {
1432		if std::option_env!("TEST_WS").is_none() {
1433			return
1434		}
1435		init_logger();
1436		Builder::<Block>::new()
1437			.mode(Mode::Online(OnlineConfig {
1438				transport_uris: vec![endpoint().clone()],
1439				pallets: vec!["Staking".to_owned()],
1440				child_trie: false,
1441				..Default::default()
1442			}))
1443			.build()
1444			.await
1445			.unwrap()
1446			.execute_with(|| {});
1447	}
1448
1449	#[tokio::test]
1450	async fn can_fetch_all() {
1451		if std::option_env!("TEST_WS").is_none() {
1452			return
1453		}
1454		init_logger();
1455		Builder::<Block>::new()
1456			.mode(Mode::Online(OnlineConfig {
1457				transport_uris: vec![endpoint().clone()],
1458				..Default::default()
1459			}))
1460			.build()
1461			.await
1462			.unwrap()
1463			.execute_with(|| {});
1464	}
1465
1466	#[tokio::test]
1467	async fn can_fetch_in_parallel() {
1468		init_logger();
1469
1470		let mut builder = Builder::<Block>::new().mode(Mode::Online(OnlineConfig {
1471			transport_uris: vec![endpoint().clone()],
1472			..Default::default()
1473		}));
1474		builder.init_remote_client().await.unwrap();
1475
1476		let at = builder.as_online().at.unwrap();
1477
1478		// Test with a specific prefix
1479		let prefix = StorageKey(vec![13]);
1480		let para = builder.rpc_get_keys_parallel(&prefix, at, 4).await.unwrap();
1481		assert!(!para.is_empty(), "Should fetch some keys with prefix");
1482
1483		// Test with empty prefix (all keys)
1484		let prefix = StorageKey(vec![]);
1485		let para = builder.rpc_get_keys_parallel(&prefix, at, 8).await.unwrap();
1486		assert!(!para.is_empty(), "Should fetch some keys with empty prefix");
1487	}
1488
1489	#[tokio::test]
1490	#[ignore] // This test takes a long time, run with --ignored
1491	async fn bridge_hub_polkadot_storage_root_matches() {
1492		init_logger();
1493
1494		// Use multiple RPC providers for load distribution
1495		let endpoints = vec![
1496			"wss://bridge-hub-polkadot-rpc.n.dwellir.com",
1497			"wss://sys.ibp.network/bridgehub-polkadot",
1498			"wss://bridgehub-polkadot.api.onfinality.io/public",
1499			"wss://dot-rpc.stakeworld.io/bridgehub",
1500		];
1501
1502		info!(target: LOG_TARGET, "Connecting to Bridge Hub Polkadot using {} RPC providers", endpoints.len());
1503
1504		let mut ext = Builder::<Block>::new()
1505			.mode(Mode::Online(OnlineConfig {
1506				transport_uris: endpoints.into_iter().map(|e| e.to_owned()).collect(),
1507				child_trie: true,
1508				..Default::default()
1509			}))
1510			.build()
1511			.await
1512			.expect("Failed to build remote externalities");
1513
1514		// Get the computed storage root from our downloaded state
1515		let backend = ext.as_backend();
1516		let computed_root = *backend.root();
1517		// Get the expected storage root from the block header
1518		let expected_root = ext.header.state_root;
1519
1520		info!(
1521			target: LOG_TARGET,
1522			"Computed storage root: {:?}",
1523			computed_root
1524		);
1525		info!(
1526			target: LOG_TARGET,
1527			"Expected storage root (from header): {:?}",
1528			expected_root
1529		);
1530
1531		// The storage roots must match exactly - this proves we downloaded all keys correctly
1532		assert_eq!(
1533			computed_root, expected_root,
1534			"Storage root mismatch! Computed: {:?}, Expected: {:?}. \
1535			This indicates that not all keys were fetched or there were duplicates.",
1536			computed_root, expected_root
1537		);
1538
1539		// Verify we actually got some keys
1540		ext.execute_with(|| {
1541			let key_count = KeyPrefixIterator::<()>::new(vec![], vec![], |_| Ok(())).count();
1542
1543			info!(target: LOG_TARGET, "Total keys in state: {}", key_count);
1544			assert!(key_count > 0, "Should have fetched some keys");
1545		});
1546
1547		info!(
1548			target: LOG_TARGET,
1549			"✅ Storage root verification successful! All keys were fetched correctly."
1550		);
1551	}
1552}