1mod client;
24mod config;
25mod key_range;
26mod logging;
27mod parallel;
28
29pub use config::{Mode, OfflineConfig, OnlineConfig, SnapshotConfig};
30
31use client::{with_timeout, Client, ConnectionManager, RPC_TIMEOUT};
32use codec::Encode;
33use config::Snapshot;
34#[cfg(all(test, feature = "remote-test"))]
35use config::DEFAULT_HTTP_ENDPOINT;
36use indicatif::{ProgressBar, ProgressStyle};
37use jsonrpsee::core::params::ArrayParams;
38use log::*;
39use parallel::{run_workers, ProcessResult};
40use serde::de::DeserializeOwned;
41use sp_core::{
42 hexdisplay::HexDisplay,
43 storage::{
44 well_known_keys::{is_default_child_storage_key, DEFAULT_CHILD_STORAGE_KEY_PREFIX},
45 ChildInfo, ChildType, PrefixedStorageKey, StorageData, StorageKey,
46 },
47};
48use sp_runtime::{
49 traits::{Block as BlockT, HashingFor},
50 StateVersion,
51};
52use sp_state_machine::TestExternalities;
53use std::{
54 collections::{BTreeSet, VecDeque},
55 future::Future,
56 ops::{Deref, DerefMut},
57 sync::{
58 atomic::{AtomicUsize, Ordering},
59 Arc, Mutex,
60 },
61 time::Duration,
62};
63use substrate_rpc_client::{rpc_params, BatchRequestBuilder, ChainApi, ClientT, StateApi};
64
65use crate::key_range::{initialize_work_queue, subdivide_remaining_range};
66
67type Result<T, E = &'static str> = std::result::Result<T, E>;
68
69type KeyValue = (StorageKey, StorageData);
70type TopKeyValues = Vec<KeyValue>;
71type ChildKeyValues = Vec<(ChildInfo, Vec<KeyValue>)>;
72
73const LOG_TARGET: &str = "remote-ext";
74
75pub struct RemoteExternalities<B: BlockT> {
78 pub inner_ext: TestExternalities<HashingFor<B>>,
80 pub header: B::Header,
82}
83
84impl<B: BlockT> Deref for RemoteExternalities<B> {
85 type Target = TestExternalities<HashingFor<B>>;
86 fn deref(&self) -> &Self::Target {
87 &self.inner_ext
88 }
89}
90
91impl<B: BlockT> DerefMut for RemoteExternalities<B> {
92 fn deref_mut(&mut self) -> &mut Self::Target {
93 &mut self.inner_ext
94 }
95}
96
97#[derive(Clone)]
99pub struct Builder<B: BlockT> {
100 hashed_key_values: Vec<KeyValue>,
103 hashed_blacklist: Vec<Vec<u8>>,
105 mode: Mode<B::Hash>,
107 overwrite_state_version: Option<StateVersion>,
112 conn_manager: Option<ConnectionManager>,
114}
115
116impl<B: BlockT> Default for Builder<B> {
117 fn default() -> Self {
118 Self {
119 mode: Default::default(),
120 hashed_key_values: Default::default(),
121 hashed_blacklist: Default::default(),
122 overwrite_state_version: None,
123 conn_manager: None,
124 }
125 }
126}
127
128impl<B: BlockT> Builder<B> {
130 fn as_online(&self) -> &OnlineConfig<B::Hash> {
131 match &self.mode {
132 Mode::Online(config) => config,
133 Mode::OfflineOrElseOnline(_, config) => config,
134 _ => panic!("Unexpected mode: Online"),
135 }
136 }
137
138 fn as_online_mut(&mut self) -> &mut OnlineConfig<B::Hash> {
139 match &mut self.mode {
140 Mode::Online(config) => config,
141 Mode::OfflineOrElseOnline(_, config) => config,
142 _ => panic!("Unexpected mode: Online"),
143 }
144 }
145
146 fn conn_manager(&self) -> Result<&ConnectionManager> {
147 self.conn_manager.as_ref().ok_or("connection manager must be initialized; qed")
148 }
149}
150
151impl<B: BlockT> Builder<B>
153where
154 B::Hash: DeserializeOwned,
155 B::Header: DeserializeOwned,
156{
157 const PARALLEL_REQUESTS_PER_CLIENT: usize = 4;
158
159 fn parallel_requests(&self) -> usize {
160 self.conn_manager
161 .as_ref()
162 .map(|cm| cm.num_clients() * Self::PARALLEL_REQUESTS_PER_CLIENT)
163 .expect("connection manager must be initialized; qed")
164 }
165
166 async fn with_any_client<T, E, F, Fut>(&self, op_name: &'static str, f: F) -> Result<T, ()>
170 where
171 F: Fn(Client) -> Fut,
172 Fut: Future<Output = std::result::Result<T, E>>,
173 E: std::fmt::Debug,
174 {
175 let conn_manager = self.conn_manager().map_err(|_| ())?;
176 let num_clients = conn_manager.num_clients();
177 let start_offset: usize = rand::random();
178 for j in 0..num_clients {
179 let i = (start_offset + j) % num_clients;
180 let client = conn_manager.get(i).await;
181 let result = with_timeout(f(client), RPC_TIMEOUT).await;
182 match result {
183 Ok(Ok(value)) => return Ok(value),
184 Ok(Err(e)) => {
185 debug!(target: LOG_TARGET, "Client {i}: {op_name} RPC error: {e:?}");
186 },
187 Err(()) => {
188 debug!(target: LOG_TARGET, "Client {i}: {op_name} timeout");
189 },
190 }
191 }
192 Err(())
193 }
194
195 async fn rpc_get_storage(
197 &self,
198 key: StorageKey,
199 maybe_at: Option<B::Hash>,
200 ) -> Result<Option<StorageData>> {
201 trace!(target: LOG_TARGET, "rpc: get_storage");
202 self.with_any_client("get_storage", move |client| {
203 let key = key.clone();
204 async move { client.storage(key, maybe_at).await }
205 })
206 .await
207 .map_err(|_| "rpc get_storage failed on all clients")
208 }
209
210 async fn fetch_state_version(&self) -> Result<StateVersion> {
212 let conn_manager = self.conn_manager()?;
213
214 for i in 0..conn_manager.num_clients() {
215 let client = conn_manager.get(i).await;
216 let result = with_timeout(
217 StateApi::<B::Hash>::runtime_version(client.ws_client.as_ref(), None),
218 RPC_TIMEOUT,
219 )
220 .await;
221
222 match result {
223 Ok(Ok(version)) => return Ok(version.state_version()),
224 Ok(Err(e)) => {
225 debug!(target: LOG_TARGET, "Client {i}: runtime_version RPC error: {e:?}");
226 },
227 Err(()) => {
228 debug!(target: LOG_TARGET, "Client {i}: runtime_version timeout");
229 },
230 }
231 }
232
233 Err("rpc runtime_version failed on all clients")
234 }
235
236 async fn rpc_get_head(&self) -> Result<B::Hash> {
238 trace!(target: LOG_TARGET, "rpc: finalized_head");
239 self.with_any_client("finalized_head", |client| async move {
240 ChainApi::<(), _, B::Header, ()>::finalized_head(&*client).await
241 })
242 .await
243 .map_err(|_| "rpc finalized_head failed on all clients")
244 }
245
246 async fn rpc_get_keys_parallel(
248 &self,
249 prefix: &StorageKey,
250 block: B::Hash,
251 parallel: usize,
252 ) -> Result<Vec<StorageKey>> {
253 let work_queue = initialize_work_queue(&[prefix.clone()]);
254 let initial_ranges = work_queue.lock().unwrap().len();
255 info!(target: LOG_TARGET, "🔧 Initialized work queue with {initial_ranges} ranges");
256
257 let conn_manager = self.conn_manager()?;
258 info!(target: LOG_TARGET, "🌐 Using {} RPC provider(s)", conn_manager.num_clients());
259 info!(target: LOG_TARGET, "🚀 Spawning {parallel} parallel workers for key fetching");
260
261 let all_keys: Arc<Mutex<BTreeSet<StorageKey>>> = Arc::new(Mutex::new(BTreeSet::new()));
262 let last_logged_milestone = Arc::new(AtomicUsize::new(0));
263 let initial_work = work_queue.lock().unwrap().drain(..).collect();
264 let all_keys_for_result = all_keys.clone();
265
266 run_workers(initial_work, conn_manager, parallel, move |worker_index, range, client| {
267 let all_keys = all_keys.clone();
268 let last_logged_milestone = last_logged_milestone.clone();
269
270 async move {
271 trace!(
272 target: LOG_TARGET,
273 "Worker {worker_index}: fetching keys starting at {:?} (page_size: {})",
274 HexDisplay::from(&range.start_key.0),
275 range.page_size
276 );
277
278 let rpc_result = with_timeout(
279 client.storage_keys_paged(
280 Some(range.prefix.clone()),
281 range.page_size,
282 Some(range.start_key.clone()),
283 Some(block),
284 ),
285 RPC_TIMEOUT,
286 )
287 .await;
288
289 let page = match rpc_result {
290 Ok(Ok(p)) => p,
291 Ok(Err(e)) => {
292 debug!(target: LOG_TARGET, "Worker {worker_index}: RPC error: {e:?}");
293 return ProcessResult::Retry {
294 work: range.with_halved_page_size(),
295 sleep_duration: Duration::from_secs(15),
296 recreate_client: true,
297 };
298 },
299 Err(()) => {
300 debug!(target: LOG_TARGET, "Worker {worker_index}: timeout");
301 return ProcessResult::Retry {
302 work: range.with_halved_page_size(),
303 sleep_duration: Duration::from_secs(5),
304 recreate_client: true,
305 };
306 },
307 };
308
309 let (page, is_full_batch) = range.filter_keys(page);
311 let last_two_keys = if page.len() >= 2 {
312 Some((page[page.len() - 2].clone(), page[page.len() - 1].clone()))
313 } else {
314 None
315 };
316
317 let total_keys = {
318 let mut keys = all_keys.lock().unwrap();
319 keys.extend(page);
320 keys.len()
321 };
322
323 const LOG_INTERVAL: usize = 10_000;
325 let current_milestone = (total_keys / LOG_INTERVAL) * LOG_INTERVAL;
326 let last_milestone = last_logged_milestone.load(Ordering::Relaxed);
327 if current_milestone > last_milestone && current_milestone > 0 {
328 if last_logged_milestone
329 .compare_exchange(
330 last_milestone,
331 current_milestone,
332 Ordering::SeqCst,
333 Ordering::Relaxed,
334 )
335 .is_ok()
336 {
337 info!(target: LOG_TARGET, "📊 Scraped {total_keys} keys so far...");
338 }
339 }
340
341 let new_work = if is_full_batch {
343 if let Some((second_last, last)) = last_two_keys {
344 subdivide_remaining_range(
345 &second_last,
346 &last,
347 range.end_key.as_ref(),
348 &range.prefix,
349 )
350 } else {
351 vec![]
352 }
353 } else {
354 vec![]
355 };
356
357 ProcessResult::Success { new_work }
358 }
359 })
360 .await;
361
362 let keys: Vec<_> = all_keys_for_result.lock().unwrap().iter().cloned().collect();
363 info!(target: LOG_TARGET, "🎉 Parallel key fetching complete: {} unique keys", keys.len());
364
365 Ok(keys)
366 }
367
368 async fn get_storage_data_dynamic_batch_size(
413 client: &Client,
414 worker_index: usize,
415 payloads: &[(String, ArrayParams)],
416 bar: &ProgressBar,
417 batch_size: usize,
418 ) -> std::result::Result<Vec<Option<StorageData>>, String> {
419 let mut all_data: Vec<Option<StorageData>> = vec![];
420 let mut start_index = 0;
421 let total_payloads = payloads.len();
422
423 while start_index < total_payloads {
424 let end_index = usize::min(start_index + batch_size, total_payloads);
425 let page = &payloads[start_index..end_index];
426
427 trace!(
428 target: LOG_TARGET,
429 "Worker {worker_index}: fetching values {start_index}..{end_index} of {total_payloads}",
430 );
431
432 let mut batch = BatchRequestBuilder::new();
434 for (method, params) in page.iter() {
435 if batch.insert(method, params.clone()).is_err() {
436 panic!("Invalid batch method and/or params; qed");
437 }
438 }
439
440 let rpc_result = with_timeout(
441 client.ws_client.batch_request::<Option<StorageData>>(batch),
442 RPC_TIMEOUT,
443 )
444 .await;
445
446 let batch_response = match rpc_result {
447 Ok(Ok(r)) => r,
448 Ok(Err(e)) => return Err(format!("RPC error: {e:?}")),
449 Err(()) => return Err("timeout".to_string()),
450 };
451
452 let batch_response_len = batch_response.len();
453 for item in batch_response.into_iter() {
454 match item {
455 Ok(x) => all_data.push(x),
456 Err(e) => {
457 warn!(target: LOG_TARGET, "Value worker {worker_index}: batch item error: {}", e.message());
458 all_data.push(None);
459 },
460 }
461 }
462 bar.inc(batch_response_len as u64);
463
464 start_index = end_index;
465 }
466
467 Ok(all_data)
468 }
469
470 pub(crate) async fn rpc_get_pairs(
475 &self,
476 prefix: StorageKey,
477 at: B::Hash,
478 pending_ext: &mut TestExternalities<HashingFor<B>>,
479 ) -> Result<Vec<KeyValue>> {
480 let parallel = self.parallel_requests();
481 let keys = logging::with_elapsed_async(
482 || async { self.rpc_get_keys_parallel(&prefix, at, parallel).await },
483 "Scraping keys...",
484 |keys| format!("Found {} keys", keys.len()),
485 )
486 .await?;
487
488 if keys.is_empty() {
489 return Ok(Default::default())
490 }
491
492 let conn_manager = self.conn_manager()?;
493
494 let payloads = keys
495 .iter()
496 .map(|key| ("state_getStorage".to_string(), rpc_params!(key, at)))
497 .collect::<Vec<_>>();
498
499 let bar = ProgressBar::new(payloads.len() as u64);
500 bar.enable_steady_tick(Duration::from_secs(1));
501 bar.set_message("Downloading key values".to_string());
502 bar.set_style(
503 ProgressStyle::with_template(
504 "[{elapsed_precise}] {msg} {per_sec} [{wide_bar}] {pos}/{len} ({eta})",
505 )
506 .unwrap()
507 .progress_chars("=>-"),
508 );
509
510 const BATCH_SIZE: usize = 1000;
513 let batches: VecDeque<_> = payloads
514 .chunks(BATCH_SIZE)
515 .enumerate()
516 .map(|(i, chunk)| (i * BATCH_SIZE, chunk.to_vec(), BATCH_SIZE))
517 .collect();
518
519 info!(target: LOG_TARGET, "🔧 Initialized {} batches for value fetching", batches.len());
520 info!(target: LOG_TARGET, "🚀 Spawning {parallel} parallel workers for value fetching");
521
522 let results: Arc<Mutex<Vec<Option<StorageData>>>> =
523 Arc::new(Mutex::new(vec![None; payloads.len()]));
524 let results_for_extraction = results.clone();
525 let bar_for_finish = bar.clone();
526
527 run_workers(
528 batches,
529 conn_manager,
530 parallel,
531 move |worker_index, (start_index, batch, batch_size), client| {
532 let results = results.clone();
533 let bar = bar.clone();
534
535 async move {
536 debug!(
537 target: LOG_TARGET,
538 "Value worker {worker_index}: Processing batch at {start_index} with {} payloads",
539 batch.len()
540 );
541
542 match Self::get_storage_data_dynamic_batch_size(
543 &client,
544 worker_index,
545 &batch,
546 &bar,
547 batch_size,
548 )
549 .await
550 {
551 Ok(batch_results) => {
552 let mut results_lock = results.lock().unwrap();
553 for (offset, result) in batch_results.into_iter().enumerate() {
554 results_lock[start_index + offset] = result;
555 }
556 ProcessResult::Success { new_work: vec![] }
557 },
558 Err(e) => {
559 debug!(target: LOG_TARGET, "Value worker {worker_index}: failed: {e:?}");
560 let new_batch_size = (batch_size / 2).max(10);
561 ProcessResult::Retry {
562 work: (start_index, batch, new_batch_size),
563 sleep_duration: Duration::from_secs(15),
564 recreate_client: true,
565 }
566 },
567 }
568 }
569 },
570 )
571 .await;
572
573 let storage_data = results_for_extraction.lock().unwrap().clone();
574
575 bar_for_finish.finish_with_message("✅ Downloaded key values");
576 println!();
577
578 assert_eq!(keys.len(), storage_data.len());
580
581 let key_values: Vec<_> = keys
584 .iter()
585 .zip(storage_data)
586 .filter_map(|(key, maybe_value)| maybe_value.map(|data| (key.clone(), data)))
587 .collect();
588
589 logging::with_elapsed(
590 || {
591 pending_ext.batch_insert(key_values.clone().into_iter().filter_map(|(k, v)| {
592 match is_default_child_storage_key(&k.0) {
595 true => None,
596 false => Some((k.0, v.0)),
597 }
598 }));
599
600 Ok(())
601 },
602 "Inserting keys into DB...",
603 |_| "Inserted keys into DB".into(),
604 )
605 .expect("must succeed; qed");
606
607 Ok(key_values)
608 }
609
610 pub(crate) async fn rpc_child_get_storage_paged(
612 client: &Client,
613 prefixed_top_key: &StorageKey,
614 child_keys: Vec<StorageKey>,
615 at: B::Hash,
616 ) -> Result<Vec<KeyValue>> {
617 let payloads: Vec<_> = child_keys
618 .iter()
619 .map(|key| {
620 (
621 "childstate_getStorage".to_string(),
622 rpc_params![
623 PrefixedStorageKey::new(prefixed_top_key.as_ref().to_vec()),
624 key,
625 at
626 ],
627 )
628 })
629 .collect();
630
631 let bar = ProgressBar::new(payloads.len() as u64);
632 let storage_data =
633 Self::get_storage_data_dynamic_batch_size(client, 0, &payloads, &bar, 1000)
634 .await
635 .map_err(|_| "rpc child_get_storage failed")?;
636
637 Ok(child_keys
639 .into_iter()
640 .zip(storage_data)
641 .filter_map(|(key, maybe_value)| maybe_value.map(|v| (key, v)))
642 .collect())
643 }
644}
645
646impl<B: BlockT> Builder<B>
647where
648 B::Hash: DeserializeOwned,
649 B::Header: DeserializeOwned,
650{
651 async fn fetch_single_child_trie(
653 client: &Client,
654 prefixed_top_key: &StorageKey,
655 at: B::Hash,
656 ) -> Result<(ChildInfo, Vec<KeyValue>)> {
657 let top_key = PrefixedStorageKey::new(prefixed_top_key.0.clone());
658 let page_size = 1000u32;
659
660 trace!(
661 target: LOG_TARGET,
662 "Fetching child trie keys for {:?}",
663 HexDisplay::from(&prefixed_top_key.0)
664 );
665
666 let mut child_keys = Vec::new();
668 let mut start_key: Option<StorageKey> = None;
669
670 loop {
671 let rpc_result = with_timeout(
672 substrate_rpc_client::ChildStateApi::storage_keys_paged(
673 client.ws_client.as_ref(),
674 top_key.clone(),
675 Some(StorageKey(vec![])),
676 page_size,
677 start_key.clone(),
678 Some(at),
679 ),
680 RPC_TIMEOUT,
681 )
682 .await;
683
684 let page = match rpc_result {
685 Ok(Ok(p)) => p,
686 Ok(Err(e)) => {
687 debug!(target: LOG_TARGET, "Child trie RPC error: {e:?}");
688 return Err("rpc child_get_keys failed");
689 },
690 Err(()) => {
691 debug!(target: LOG_TARGET, "Child trie RPC timeout");
692 return Err("rpc child_get_keys timeout");
693 },
694 };
695
696 let is_full_batch = page.len() == page_size as usize;
697 start_key = page.last().cloned();
698 child_keys.extend(page);
699
700 if !is_full_batch {
701 break;
702 }
703 }
704
705 let child_kv =
707 Self::rpc_child_get_storage_paged(client, prefixed_top_key, child_keys, at).await?;
708
709 let un_prefixed = match ChildType::from_prefixed_key(&top_key) {
711 Some((ChildType::ParentKeyId, storage_key)) => storage_key,
712 None => return Err("invalid child key"),
713 };
714
715 Ok((ChildInfo::new_default(un_prefixed), child_kv))
716 }
717
718 async fn load_child_remote(
726 &self,
727 top_kv: &[KeyValue],
728 pending_ext: &mut TestExternalities<HashingFor<B>>,
729 ) -> Result<ChildKeyValues> {
730 let child_roots: VecDeque<StorageKey> = top_kv
731 .iter()
732 .filter(|(k, _)| is_default_child_storage_key(k.as_ref()))
733 .map(|(k, _)| k.clone())
734 .collect();
735
736 if child_roots.is_empty() {
737 info!(target: LOG_TARGET, "👩👦 no child roots found to scrape");
738 return Ok(Default::default())
739 }
740
741 let total_count = child_roots.len();
742 info!(
743 target: LOG_TARGET,
744 "👩👦 scraping child-tree data from {} child tries",
745 total_count,
746 );
747
748 let at = self.as_online().at_expected();
749 let conn_manager = self.conn_manager()?;
750 let parallel = self.parallel_requests();
751
752 let results: Arc<Mutex<Vec<(ChildInfo, Vec<KeyValue>)>>> = Arc::new(Mutex::new(Vec::new()));
753 let results_for_extraction = results.clone();
754 let completed_count = Arc::new(AtomicUsize::new(0));
755
756 run_workers(
757 child_roots,
758 conn_manager,
759 parallel,
760 move |worker_index, prefixed_top_key, client| {
761 let results = results.clone();
762 let completed_count = completed_count.clone();
763
764 async move {
765 match Self::fetch_single_child_trie(&client, &prefixed_top_key, at).await {
766 Ok((info, child_kv_inner)) => {
767 results.lock().unwrap().push((info, child_kv_inner));
768
769 let done = completed_count.fetch_add(1, Ordering::SeqCst) + 1;
770 if done.is_multiple_of(100) || done == total_count {
771 info!(
772 target: LOG_TARGET,
773 "👩👦 Child tries progress: {}/{} completed",
774 done,
775 total_count
776 );
777 }
778
779 ProcessResult::Success { new_work: vec![] }
780 },
781 Err(e) => {
782 error!(target: LOG_TARGET, "Worker {worker_index}: Failed: {e:?}");
783 ProcessResult::Retry {
784 work: prefixed_top_key,
785 sleep_duration: Duration::from_secs(5),
786 recreate_client: true,
787 }
788 },
789 }
790 }
791 },
792 )
793 .await;
794
795 let child_kv_results = results_for_extraction.lock().unwrap().clone();
797
798 let mut child_kv = Vec::new();
799 for (info, kv_inner) in child_kv_results {
800 let key_values: Vec<(Vec<u8>, Vec<u8>)> =
801 kv_inner.iter().cloned().map(|(k, v)| (k.0, v.0)).collect();
802 for (k, v) in key_values {
803 pending_ext.insert_child(info.clone(), k, v);
804 }
805 child_kv.push((info, kv_inner));
806 }
807
808 info!(
809 target: LOG_TARGET,
810 "👩👦 Completed scraping {} child tries",
811 child_kv.len()
812 );
813
814 Ok(child_kv)
815 }
816
817 async fn load_top_remote(
822 &self,
823 pending_ext: &mut TestExternalities<HashingFor<B>>,
824 ) -> Result<TopKeyValues> {
825 let config = self.as_online();
826 let at = self
827 .as_online()
828 .at
829 .expect("online config must be initialized by this point; qed.");
830 info!(target: LOG_TARGET, "scraping key-pairs from remote at block height {at:?}");
831
832 let mut keys_and_values = Vec::new();
833 for prefix in &config.hashed_prefixes {
834 let now = std::time::Instant::now();
835 let additional_key_values =
836 self.rpc_get_pairs(StorageKey(prefix.to_vec()), at, pending_ext).await?;
837 let elapsed = now.elapsed();
838 info!(
839 target: LOG_TARGET,
840 "adding data for hashed prefix: {:?}, took {:.2}s",
841 HexDisplay::from(prefix),
842 elapsed.as_secs_f32()
843 );
844 keys_and_values.extend(additional_key_values);
845 }
846
847 for key in &config.hashed_keys {
848 let key = StorageKey(key.to_vec());
849 info!(
850 target: LOG_TARGET,
851 "adding data for hashed key: {:?}",
852 HexDisplay::from(&key)
853 );
854 match self.rpc_get_storage(key.clone(), Some(at)).await? {
855 Some(value) => {
856 pending_ext.insert(key.clone().0, value.clone().0);
857 keys_and_values.push((key, value));
858 },
859 None => {
860 warn!(
861 target: LOG_TARGET,
862 "no data found for hashed key: {:?}",
863 HexDisplay::from(&key)
864 );
865 },
866 }
867 }
868
869 Ok(keys_and_values)
870 }
871
872 async fn init_remote_client(&mut self) -> Result<()> {
876 let online_config = self.as_online();
878 let mut clients = Vec::new();
879 for uri in &online_config.transport_uris {
880 if let Some(client) = Client::new(uri.clone()).await {
881 clients.push(Arc::new(tokio::sync::Mutex::new(client)));
882 }
883 }
884 self.conn_manager = Some(ConnectionManager::new(clients)?);
885
886 if self.as_online().at.is_none() {
888 let at = self.rpc_get_head().await?;
889 info!(
890 target: LOG_TARGET,
891 "since no at is provided, setting it to latest finalized head, {at:?}",
892 );
893 self.as_online_mut().at = Some(at);
894 }
895
896 let online_config = self.as_online_mut();
898 online_config.pallets.iter().for_each(|p| {
899 online_config
900 .hashed_prefixes
901 .push(sp_crypto_hashing::twox_128(p.as_bytes()).to_vec())
902 });
903
904 if online_config.child_trie {
905 online_config.hashed_prefixes.push(DEFAULT_CHILD_STORAGE_KEY_PREFIX.to_vec());
906 }
907
908 if online_config
911 .hashed_prefixes
912 .iter()
913 .filter(|p| *p != DEFAULT_CHILD_STORAGE_KEY_PREFIX)
914 .count() == 0
915 {
916 info!(
917 target: LOG_TARGET,
918 "since no prefix is filtered, the data for all pallets will be downloaded"
919 );
920 online_config.hashed_prefixes.push(vec![]);
921 }
922
923 Ok(())
924 }
925
926 async fn load_header(&self) -> Result<B::Header> {
928 let conn_manager = self.conn_manager()?;
929 let at = self.as_online().at_expected();
930
931 for i in 0..conn_manager.num_clients() {
932 let client = conn_manager.get(i).await;
933 let result = with_timeout(
934 ChainApi::<(), _, B::Header, ()>::header(client.ws_client.as_ref(), Some(at)),
935 RPC_TIMEOUT,
936 )
937 .await;
938
939 match result {
940 Ok(Ok(Some(header))) => return Ok(header),
941 Ok(Ok(None)) => {
942 debug!(target: LOG_TARGET, "Client {i}: header returned None");
943 },
944 Ok(Err(e)) => {
945 debug!(target: LOG_TARGET, "Client {i}: header RPC error: {e:?}");
946 },
947 Err(()) => {
948 debug!(target: LOG_TARGET, "Client {i}: header timeout");
949 },
950 }
951 }
952
953 Err("rpc header failed on all clients")
954 }
955
956 async fn load_remote_and_maybe_save(&mut self) -> Result<TestExternalities<HashingFor<B>>> {
961 let state_version = self.fetch_state_version().await?;
962 let mut pending_ext = TestExternalities::new_with_code_and_state(
963 Default::default(),
964 Default::default(),
965 self.overwrite_state_version.unwrap_or(state_version),
966 );
967
968 let top_kv = self.load_top_remote(&mut pending_ext).await?;
970 self.load_child_remote(&top_kv, &mut pending_ext).await?;
971
972 let header = self.load_header().await?;
973 let (raw_storage, computed_root) = pending_ext.into_raw_snapshot();
974
975 if let Some(path) = self.as_online().state_snapshot.clone().map(|c| c.path) {
977 let snapshot =
978 Snapshot::<B>::new(state_version, raw_storage.clone(), computed_root, header);
979 let encoded = snapshot.encode();
980 info!(
981 target: LOG_TARGET,
982 "writing snapshot of {} bytes to {path:?}",
983 encoded.len(),
984 );
985 std::fs::write(path, encoded).map_err(|_| "fs::write failed")?;
986 }
987
988 Ok(TestExternalities::from_raw_snapshot(
990 raw_storage,
991 computed_root,
992 self.overwrite_state_version.unwrap_or(state_version),
993 ))
994 }
995
996 async fn do_load_remote(&mut self) -> Result<RemoteExternalities<B>> {
997 self.init_remote_client().await?;
998 let inner_ext = self.load_remote_and_maybe_save().await?;
999 Ok(RemoteExternalities { header: self.load_header().await?, inner_ext })
1000 }
1001
1002 fn do_load_offline(&mut self, config: OfflineConfig) -> Result<RemoteExternalities<B>> {
1003 let (header, inner_ext) = logging::with_elapsed(
1004 || {
1005 info!(target: LOG_TARGET, "Loading snapshot from {:?}", &config.state_snapshot.path);
1006
1007 let Snapshot { header, state_version, raw_storage, storage_root, .. } =
1008 Snapshot::<B>::load(&config.state_snapshot.path)?;
1009 let inner_ext = TestExternalities::from_raw_snapshot(
1010 raw_storage,
1011 storage_root,
1012 self.overwrite_state_version.unwrap_or(state_version),
1013 );
1014
1015 Ok((header, inner_ext))
1016 },
1017 "Loading snapshot...",
1018 |_| "Loaded snapshot".into(),
1019 )?;
1020
1021 Ok(RemoteExternalities { inner_ext, header })
1022 }
1023
1024 pub(crate) async fn pre_build(mut self) -> Result<RemoteExternalities<B>> {
1025 let mut ext = match self.mode.clone() {
1026 Mode::Offline(config) => self.do_load_offline(config)?,
1027 Mode::Online(_) => self.do_load_remote().await?,
1028 Mode::OfflineOrElseOnline(offline_config, _) => {
1029 match self.do_load_offline(offline_config) {
1030 Ok(x) => x,
1031 Err(_) => self.do_load_remote().await?,
1032 }
1033 },
1034 };
1035
1036 if !self.hashed_key_values.is_empty() {
1038 info!(
1039 target: LOG_TARGET,
1040 "extending externalities with {} manually injected key-values",
1041 self.hashed_key_values.len()
1042 );
1043 ext.batch_insert(self.hashed_key_values.into_iter().map(|(k, v)| (k.0, v.0)));
1044 }
1045
1046 if !self.hashed_blacklist.is_empty() {
1048 info!(
1049 target: LOG_TARGET,
1050 "excluding externalities from {} keys",
1051 self.hashed_blacklist.len()
1052 );
1053 for k in self.hashed_blacklist {
1054 ext.execute_with(|| sp_io::storage::clear(&k));
1055 }
1056 }
1057
1058 Ok(ext)
1059 }
1060}
1061
1062impl<B: BlockT> Builder<B>
1064where
1065 B::Hash: DeserializeOwned,
1066 B::Header: DeserializeOwned,
1067{
1068 pub fn new() -> Self {
1070 Default::default()
1071 }
1072
1073 pub fn inject_hashed_key_value(mut self, injections: Vec<KeyValue>) -> Self {
1075 self.hashed_key_values.extend(injections);
1076 self
1077 }
1078
1079 pub fn blacklist_hashed_key(mut self, hashed: &[u8]) -> Self {
1082 self.hashed_blacklist.push(hashed.to_vec());
1083 self
1084 }
1085
1086 pub fn mode(mut self, mode: Mode<B::Hash>) -> Self {
1088 self.mode = mode;
1089 self
1090 }
1091
1092 pub fn overwrite_state_version(mut self, version: StateVersion) -> Self {
1094 self.overwrite_state_version = Some(version);
1095 self
1096 }
1097
1098 pub async fn build(self) -> Result<RemoteExternalities<B>> {
1099 let mut ext = self.pre_build().await?;
1100 ext.commit_all().unwrap();
1101
1102 info!(
1103 target: LOG_TARGET,
1104 "initialized state externalities with storage root {:?} and state_version {:?}",
1105 ext.as_backend().root(),
1106 ext.state_version
1107 );
1108
1109 Ok(ext)
1110 }
1111}
1112
1113#[cfg(test)]
1114mod test_prelude {
1115 pub(crate) use super::*;
1116 pub(crate) use sp_runtime::testing::{Block as RawBlock, MockCallU64};
1117 pub(crate) type UncheckedXt = sp_runtime::testing::TestXt<MockCallU64, ()>;
1118 pub(crate) type Block = RawBlock<UncheckedXt>;
1119
1120 pub(crate) fn init_logger() {
1121 sp_tracing::try_init_simple();
1122 }
1123}
1124
1125#[cfg(test)]
1126mod tests {
1127 use super::test_prelude::*;
1128
1129 #[tokio::test]
1130 async fn can_load_state_snapshot() {
1131 init_logger();
1132 Builder::<Block>::new()
1133 .mode(Mode::Offline(OfflineConfig {
1134 state_snapshot: SnapshotConfig::new("test_data/test.snap"),
1135 }))
1136 .build()
1137 .await
1138 .unwrap()
1139 .execute_with(|| {});
1140 }
1141
1142 #[tokio::test]
1143 async fn can_exclude_from_snapshot() {
1144 init_logger();
1145
1146 let some_key = Builder::<Block>::new()
1148 .mode(Mode::Offline(OfflineConfig {
1149 state_snapshot: SnapshotConfig::new("test_data/test.snap"),
1150 }))
1151 .build()
1152 .await
1153 .expect("Can't read state snapshot file")
1154 .execute_with(|| {
1155 let key =
1156 sp_io::storage::next_key(&[]).expect("some key must exist in the snapshot");
1157 assert!(sp_io::storage::get(&key).is_some());
1158 key
1159 });
1160
1161 Builder::<Block>::new()
1162 .mode(Mode::Offline(OfflineConfig {
1163 state_snapshot: SnapshotConfig::new("test_data/test.snap"),
1164 }))
1165 .blacklist_hashed_key(&some_key)
1166 .build()
1167 .await
1168 .expect("Can't read state snapshot file")
1169 .execute_with(|| assert!(sp_io::storage::get(&some_key).is_none()));
1170 }
1171}
1172
1173#[cfg(all(test, feature = "remote-test"))]
1174mod remote_tests {
1175 use super::test_prelude::*;
1176 use frame_support::storage::KeyPrefixIterator;
1177 use std::{env, os::unix::fs::MetadataExt, path::Path};
1178
1179 fn endpoint() -> String {
1180 env::var("TEST_WS").unwrap_or_else(|_| DEFAULT_HTTP_ENDPOINT.to_string())
1181 }
1182
1183 #[tokio::test]
1184 async fn state_version_is_kept_and_can_be_altered() {
1185 const CACHE: &'static str = "state_version_is_kept_and_can_be_altered";
1186 init_logger();
1187
1188 let ext = Builder::<Block>::new()
1190 .mode(Mode::Online(OnlineConfig {
1191 transport_uris: vec![endpoint().clone()],
1192 pallets: vec!["Proxy".to_owned()],
1193 child_trie: false,
1194 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1195 ..Default::default()
1196 }))
1197 .build()
1198 .await
1199 .unwrap();
1200
1201 let cached_ext = Builder::<Block>::new()
1203 .mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
1204 .build()
1205 .await
1206 .unwrap();
1207
1208 assert_eq!(ext.state_version, cached_ext.state_version);
1209
1210 let other = match ext.state_version {
1212 StateVersion::V0 => StateVersion::V1,
1213 StateVersion::V1 => StateVersion::V0,
1214 };
1215 let cached_ext = Builder::<Block>::new()
1216 .mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
1217 .overwrite_state_version(other)
1218 .build()
1219 .await
1220 .unwrap();
1221
1222 assert_eq!(cached_ext.state_version, other);
1223 }
1224
1225 #[tokio::test]
1226 async fn snapshot_block_hash_works() {
1227 const CACHE: &'static str = "snapshot_block_hash_works";
1228 init_logger();
1229
1230 let ext = Builder::<Block>::new()
1232 .mode(Mode::Online(OnlineConfig {
1233 transport_uris: vec![endpoint().clone()],
1234 pallets: vec!["Proxy".to_owned()],
1235 child_trie: false,
1236 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1237 ..Default::default()
1238 }))
1239 .build()
1240 .await
1241 .unwrap();
1242
1243 let cached_ext = Builder::<Block>::new()
1245 .mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
1246 .build()
1247 .await
1248 .unwrap();
1249
1250 assert_eq!(ext.header.hash(), cached_ext.header.hash());
1251 }
1252
1253 #[tokio::test]
1254 async fn child_keys_are_loaded() {
1255 const CACHE: &'static str = "snapshot_retains_storage";
1256 init_logger();
1257
1258 let mut child_ext = Builder::<Block>::new()
1260 .mode(Mode::Online(OnlineConfig {
1261 transport_uris: vec![endpoint().clone()],
1262 pallets: vec!["Proxy".to_owned()],
1263 child_trie: true,
1264 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1265 ..Default::default()
1266 }))
1267 .build()
1268 .await
1269 .unwrap();
1270
1271 let mut ext = Builder::<Block>::new()
1273 .mode(Mode::Online(OnlineConfig {
1274 transport_uris: vec![endpoint().clone()],
1275 pallets: vec!["Proxy".to_owned()],
1276 child_trie: false,
1277 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1278 ..Default::default()
1279 }))
1280 .build()
1281 .await
1282 .unwrap();
1283
1284 assert!(
1286 child_ext.as_backend().backend_storage().keys().len() >
1287 ext.as_backend().backend_storage().keys().len()
1288 );
1289 }
1290
1291 #[tokio::test]
1292 async fn offline_else_online_works() {
1293 const CACHE: &'static str = "offline_else_online_works_data";
1294 init_logger();
1295 Builder::<Block>::new()
1297 .mode(Mode::OfflineOrElseOnline(
1298 OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) },
1299 OnlineConfig {
1300 transport_uris: vec![endpoint().clone()],
1301 pallets: vec!["Proxy".to_owned()],
1302 child_trie: false,
1303 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1304 ..Default::default()
1305 },
1306 ))
1307 .build()
1308 .await
1309 .unwrap()
1310 .execute_with(|| {});
1311
1312 Builder::<Block>::new()
1314 .mode(Mode::OfflineOrElseOnline(
1315 OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) },
1316 OnlineConfig {
1317 transport_uris: vec!["ws://non-existent:666".to_owned()],
1318 ..Default::default()
1319 },
1320 ))
1321 .build()
1322 .await
1323 .unwrap()
1324 .execute_with(|| {});
1325
1326 let to_delete = std::fs::read_dir(Path::new("."))
1327 .unwrap()
1328 .into_iter()
1329 .map(|d| d.unwrap())
1330 .filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
1331 .collect::<Vec<_>>();
1332
1333 assert!(to_delete.len() == 1);
1334 std::fs::remove_file(to_delete[0].path()).unwrap();
1335 }
1336
1337 #[tokio::test]
1338 async fn can_build_one_small_pallet() {
1339 init_logger();
1340 Builder::<Block>::new()
1341 .mode(Mode::Online(OnlineConfig {
1342 transport_uris: vec![endpoint().clone()],
1343 pallets: vec!["Proxy".to_owned()],
1344 child_trie: false,
1345 ..Default::default()
1346 }))
1347 .build()
1348 .await
1349 .unwrap()
1350 .execute_with(|| {});
1351 }
1352
1353 #[tokio::test]
1354 async fn can_build_few_pallet() {
1355 init_logger();
1356 Builder::<Block>::new()
1357 .mode(Mode::Online(OnlineConfig {
1358 transport_uris: vec![endpoint().clone()],
1359 pallets: vec!["Proxy".to_owned(), "Multisig".to_owned()],
1360 child_trie: false,
1361 ..Default::default()
1362 }))
1363 .build()
1364 .await
1365 .unwrap()
1366 .execute_with(|| {});
1367 }
1368
1369 #[tokio::test(flavor = "multi_thread")]
1370 async fn can_create_snapshot() {
1371 const CACHE: &'static str = "can_create_snapshot";
1372 init_logger();
1373
1374 Builder::<Block>::new()
1375 .mode(Mode::Online(OnlineConfig {
1376 transport_uris: vec![endpoint().clone()],
1377 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1378 pallets: vec!["Proxy".to_owned()],
1379 child_trie: false,
1380 ..Default::default()
1381 }))
1382 .build()
1383 .await
1384 .unwrap()
1385 .execute_with(|| {});
1386
1387 let to_delete = std::fs::read_dir(Path::new("."))
1388 .unwrap()
1389 .into_iter()
1390 .map(|d| d.unwrap())
1391 .filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
1392 .collect::<Vec<_>>();
1393
1394 assert!(to_delete.len() == 1);
1395 let to_delete = to_delete.first().unwrap();
1396 assert!(std::fs::metadata(to_delete.path()).unwrap().size() > 1);
1397 std::fs::remove_file(to_delete.path()).unwrap();
1398 }
1399
1400 #[tokio::test]
1401 async fn can_create_child_snapshot() {
1402 const CACHE: &'static str = "can_create_child_snapshot";
1403 init_logger();
1404 Builder::<Block>::new()
1405 .mode(Mode::Online(OnlineConfig {
1406 transport_uris: vec![endpoint().clone()],
1407 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1408 pallets: vec!["Crowdloan".to_owned()],
1409 child_trie: true,
1410 ..Default::default()
1411 }))
1412 .build()
1413 .await
1414 .unwrap()
1415 .execute_with(|| {});
1416
1417 let to_delete = std::fs::read_dir(Path::new("."))
1418 .unwrap()
1419 .into_iter()
1420 .map(|d| d.unwrap())
1421 .filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
1422 .collect::<Vec<_>>();
1423
1424 assert!(to_delete.len() == 1);
1425 let to_delete = to_delete.first().unwrap();
1426 assert!(std::fs::metadata(to_delete.path()).unwrap().size() > 1);
1427 std::fs::remove_file(to_delete.path()).unwrap();
1428 }
1429
1430 #[tokio::test]
1431 async fn can_build_big_pallet() {
1432 if std::option_env!("TEST_WS").is_none() {
1433 return
1434 }
1435 init_logger();
1436 Builder::<Block>::new()
1437 .mode(Mode::Online(OnlineConfig {
1438 transport_uris: vec![endpoint().clone()],
1439 pallets: vec!["Staking".to_owned()],
1440 child_trie: false,
1441 ..Default::default()
1442 }))
1443 .build()
1444 .await
1445 .unwrap()
1446 .execute_with(|| {});
1447 }
1448
1449 #[tokio::test]
1450 async fn can_fetch_all() {
1451 if std::option_env!("TEST_WS").is_none() {
1452 return
1453 }
1454 init_logger();
1455 Builder::<Block>::new()
1456 .mode(Mode::Online(OnlineConfig {
1457 transport_uris: vec![endpoint().clone()],
1458 ..Default::default()
1459 }))
1460 .build()
1461 .await
1462 .unwrap()
1463 .execute_with(|| {});
1464 }
1465
1466 #[tokio::test]
1467 async fn can_fetch_in_parallel() {
1468 init_logger();
1469
1470 let mut builder = Builder::<Block>::new().mode(Mode::Online(OnlineConfig {
1471 transport_uris: vec![endpoint().clone()],
1472 ..Default::default()
1473 }));
1474 builder.init_remote_client().await.unwrap();
1475
1476 let at = builder.as_online().at.unwrap();
1477
1478 let prefix = StorageKey(vec![13]);
1480 let para = builder.rpc_get_keys_parallel(&prefix, at, 4).await.unwrap();
1481 assert!(!para.is_empty(), "Should fetch some keys with prefix");
1482
1483 let prefix = StorageKey(vec![]);
1485 let para = builder.rpc_get_keys_parallel(&prefix, at, 8).await.unwrap();
1486 assert!(!para.is_empty(), "Should fetch some keys with empty prefix");
1487 }
1488
1489 #[tokio::test]
1490 #[ignore] async fn bridge_hub_polkadot_storage_root_matches() {
1492 init_logger();
1493
1494 let endpoints = vec![
1496 "wss://bridge-hub-polkadot-rpc.n.dwellir.com",
1497 "wss://sys.ibp.network/bridgehub-polkadot",
1498 "wss://bridgehub-polkadot.api.onfinality.io/public",
1499 "wss://dot-rpc.stakeworld.io/bridgehub",
1500 ];
1501
1502 info!(target: LOG_TARGET, "Connecting to Bridge Hub Polkadot using {} RPC providers", endpoints.len());
1503
1504 let mut ext = Builder::<Block>::new()
1505 .mode(Mode::Online(OnlineConfig {
1506 transport_uris: endpoints.into_iter().map(|e| e.to_owned()).collect(),
1507 child_trie: true,
1508 ..Default::default()
1509 }))
1510 .build()
1511 .await
1512 .expect("Failed to build remote externalities");
1513
1514 let backend = ext.as_backend();
1516 let computed_root = *backend.root();
1517 let expected_root = ext.header.state_root;
1519
1520 info!(
1521 target: LOG_TARGET,
1522 "Computed storage root: {:?}",
1523 computed_root
1524 );
1525 info!(
1526 target: LOG_TARGET,
1527 "Expected storage root (from header): {:?}",
1528 expected_root
1529 );
1530
1531 assert_eq!(
1533 computed_root, expected_root,
1534 "Storage root mismatch! Computed: {:?}, Expected: {:?}. \
1535 This indicates that not all keys were fetched or there were duplicates.",
1536 computed_root, expected_root
1537 );
1538
1539 ext.execute_with(|| {
1541 let key_count = KeyPrefixIterator::<()>::new(vec![], vec![], |_| Ok(())).count();
1542
1543 info!(target: LOG_TARGET, "Total keys in state: {}", key_count);
1544 assert!(key_count > 0, "Should have fetched some keys");
1545 });
1546
1547 info!(
1548 target: LOG_TARGET,
1549 "✅ Storage root verification successful! All keys were fetched correctly."
1550 );
1551 }
1552}