1mod client;
24mod config;
25mod key_range;
26mod logging;
27mod parallel;
28
29pub use config::{Mode, OfflineConfig, OnlineConfig, SnapshotConfig};
30
31use client::{with_timeout, Client, ConnectionManager, RPC_TIMEOUT};
32use codec::Encode;
33use config::Snapshot;
34#[cfg(all(test, feature = "remote-test"))]
35use config::DEFAULT_WS_ENDPOINT;
36use indicatif::{ProgressBar, ProgressStyle};
37use jsonrpsee::core::params::ArrayParams;
38use log::*;
39use parallel::{run_workers, ProcessResult};
40use serde::de::DeserializeOwned;
41use sp_core::{
42 hexdisplay::HexDisplay,
43 storage::{
44 well_known_keys::{is_default_child_storage_key, DEFAULT_CHILD_STORAGE_KEY_PREFIX},
45 ChildInfo, ChildType, PrefixedStorageKey, StorageData, StorageKey,
46 },
47};
48use sp_runtime::{
49 traits::{Block as BlockT, HashingFor},
50 StateVersion,
51};
52use sp_state_machine::TestExternalities;
53use std::{
54 collections::{BTreeSet, VecDeque},
55 future::Future,
56 ops::{Deref, DerefMut},
57 sync::{
58 atomic::{AtomicUsize, Ordering},
59 Arc, Mutex,
60 },
61 time::Duration,
62};
63use substrate_rpc_client::{rpc_params, BatchRequestBuilder, ChainApi, ClientT, StateApi};
64
65use crate::key_range::{initialize_work_queue, subdivide_remaining_range};
66
67type Result<T, E = &'static str> = std::result::Result<T, E>;
68
69type KeyValue = (StorageKey, StorageData);
70type TopKeyValues = Vec<KeyValue>;
71type ChildKeyValues = Vec<(ChildInfo, Vec<KeyValue>)>;
72
73const LOG_TARGET: &str = "remote-ext";
74
75pub struct RemoteExternalities<B: BlockT> {
78 pub inner_ext: TestExternalities<HashingFor<B>>,
80 pub header: B::Header,
82}
83
84impl<B: BlockT> Deref for RemoteExternalities<B> {
85 type Target = TestExternalities<HashingFor<B>>;
86 fn deref(&self) -> &Self::Target {
87 &self.inner_ext
88 }
89}
90
91impl<B: BlockT> DerefMut for RemoteExternalities<B> {
92 fn deref_mut(&mut self) -> &mut Self::Target {
93 &mut self.inner_ext
94 }
95}
96
97#[derive(Clone)]
99pub struct Builder<B: BlockT> {
100 hashed_key_values: Vec<KeyValue>,
103 hashed_blacklist: Vec<Vec<u8>>,
105 mode: Mode<B::Hash>,
107 overwrite_state_version: Option<StateVersion>,
112 conn_manager: Option<ConnectionManager>,
114}
115
116impl<B: BlockT> Default for Builder<B> {
117 fn default() -> Self {
118 Self {
119 mode: Default::default(),
120 hashed_key_values: Default::default(),
121 hashed_blacklist: Default::default(),
122 overwrite_state_version: None,
123 conn_manager: None,
124 }
125 }
126}
127
128impl<B: BlockT> Builder<B> {
130 fn as_online(&self) -> &OnlineConfig<B::Hash> {
131 match &self.mode {
132 Mode::Online(config) => config,
133 Mode::OfflineOrElseOnline(_, config) => config,
134 _ => panic!("Unexpected mode: Online"),
135 }
136 }
137
138 fn as_online_mut(&mut self) -> &mut OnlineConfig<B::Hash> {
139 match &mut self.mode {
140 Mode::Online(config) => config,
141 Mode::OfflineOrElseOnline(_, config) => config,
142 _ => panic!("Unexpected mode: Online"),
143 }
144 }
145
146 fn conn_manager(&self) -> Result<&ConnectionManager> {
147 self.conn_manager.as_ref().ok_or("connection manager must be initialized; qed")
148 }
149}
150
151impl<B: BlockT> Builder<B>
153where
154 B::Hash: DeserializeOwned,
155 B::Header: DeserializeOwned,
156{
157 const PARALLEL_REQUESTS_PER_CLIENT: usize = 4;
158
159 fn parallel_requests(&self) -> usize {
160 self.conn_manager
161 .as_ref()
162 .map(|cm| cm.num_clients() * Self::PARALLEL_REQUESTS_PER_CLIENT)
163 .expect("connection manager must be initialized; qed")
164 }
165
166 async fn with_any_client<T, E, F, Fut>(&self, op_name: &'static str, f: F) -> Result<T, ()>
170 where
171 F: Fn(Client) -> Fut,
172 Fut: Future<Output = std::result::Result<T, E>>,
173 E: std::fmt::Debug,
174 {
175 let conn_manager = self.conn_manager().map_err(|_| ())?;
176 let num_clients = conn_manager.num_clients();
177 let start_offset: usize = rand::random();
178 for j in 0..num_clients {
179 let i = (start_offset + j) % num_clients;
180 let client = conn_manager.get(i).await;
181 let result = with_timeout(f(client), RPC_TIMEOUT).await;
182 match result {
183 Ok(Ok(value)) => return Ok(value),
184 Ok(Err(e)) => {
185 debug!(target: LOG_TARGET, "Client {i}: {op_name} RPC error: {e:?}");
186 },
187 Err(()) => {
188 debug!(target: LOG_TARGET, "Client {i}: {op_name} timeout");
189 },
190 }
191 }
192 Err(())
193 }
194
195 async fn rpc_get_storage(
197 &self,
198 key: StorageKey,
199 maybe_at: Option<B::Hash>,
200 ) -> Result<Option<StorageData>> {
201 trace!(target: LOG_TARGET, "rpc: get_storage");
202 self.with_any_client("get_storage", move |client| {
203 let key = key.clone();
204 async move { client.storage(key, maybe_at).await }
205 })
206 .await
207 .map_err(|_| "rpc get_storage failed on all clients")
208 }
209
210 async fn fetch_state_version(&self) -> Result<StateVersion> {
212 let conn_manager = self.conn_manager()?;
213
214 for i in 0..conn_manager.num_clients() {
215 let client = conn_manager.get(i).await;
216 let result = with_timeout(
217 StateApi::<B::Hash>::runtime_version(client.ws_client.as_ref(), None),
218 RPC_TIMEOUT,
219 )
220 .await;
221
222 match result {
223 Ok(Ok(version)) => return Ok(version.state_version()),
224 Ok(Err(e)) => {
225 debug!(target: LOG_TARGET, "Client {i}: runtime_version RPC error: {e:?}");
226 },
227 Err(()) => {
228 debug!(target: LOG_TARGET, "Client {i}: runtime_version timeout");
229 },
230 }
231 }
232
233 Err("rpc runtime_version failed on all clients")
234 }
235
236 async fn rpc_get_head(&self) -> Result<B::Hash> {
238 trace!(target: LOG_TARGET, "rpc: finalized_head");
239 self.with_any_client("finalized_head", |client| async move {
240 ChainApi::<(), _, B::Header, ()>::finalized_head(&*client).await
241 })
242 .await
243 .map_err(|_| "rpc finalized_head failed on all clients")
244 }
245
246 async fn rpc_get_keys_parallel(
248 &self,
249 prefix: &StorageKey,
250 block: B::Hash,
251 parallel: usize,
252 ) -> Result<Vec<StorageKey>> {
253 let work_queue = initialize_work_queue(&[prefix.clone()]);
254 let initial_ranges = work_queue.lock().unwrap().len();
255 info!(target: LOG_TARGET, "🔧 Initialized work queue with {initial_ranges} ranges");
256
257 let conn_manager = self.conn_manager()?;
258 info!(target: LOG_TARGET, "🌐 Using {} RPC provider(s)", conn_manager.num_clients());
259 info!(target: LOG_TARGET, "🚀 Spawning {parallel} parallel workers for key fetching");
260
261 let all_keys: Arc<Mutex<BTreeSet<StorageKey>>> = Arc::new(Mutex::new(BTreeSet::new()));
262 let last_logged_milestone = Arc::new(AtomicUsize::new(0));
263 let initial_work = work_queue.lock().unwrap().drain(..).collect();
264 let all_keys_for_result = all_keys.clone();
265
266 run_workers(initial_work, conn_manager, parallel, move |worker_index, range, client| {
267 let all_keys = all_keys.clone();
268 let last_logged_milestone = last_logged_milestone.clone();
269
270 async move {
271 trace!(
272 target: LOG_TARGET,
273 "Worker {worker_index}: fetching keys starting at {:?} (page_size: {})",
274 HexDisplay::from(&range.start_key.0),
275 range.page_size
276 );
277
278 let rpc_result = with_timeout(
279 client.storage_keys_paged(
280 Some(range.prefix.clone()),
281 range.page_size,
282 Some(range.start_key.clone()),
283 Some(block),
284 ),
285 RPC_TIMEOUT,
286 )
287 .await;
288
289 let page = match rpc_result {
290 Ok(Ok(p)) => p,
291 Ok(Err(e)) => {
292 debug!(target: LOG_TARGET, "Worker {worker_index}: RPC error: {e:?}");
293 return ProcessResult::Retry {
294 work: range.with_halved_page_size(),
295 sleep_duration: Duration::from_secs(15),
296 recreate_client: true,
297 };
298 },
299 Err(()) => {
300 debug!(target: LOG_TARGET, "Worker {worker_index}: timeout");
301 return ProcessResult::Retry {
302 work: range.with_halved_page_size(),
303 sleep_duration: Duration::from_secs(5),
304 recreate_client: true,
305 };
306 },
307 };
308
309 let (page, is_full_batch) = range.filter_keys(page);
311 let last_two_keys = if page.len() >= 2 {
312 Some((page[page.len() - 2].clone(), page[page.len() - 1].clone()))
313 } else {
314 None
315 };
316
317 let total_keys = {
318 let mut keys = all_keys.lock().unwrap();
319 keys.extend(page);
320 keys.len()
321 };
322
323 const LOG_INTERVAL: usize = 10_000;
325 let current_milestone = (total_keys / LOG_INTERVAL) * LOG_INTERVAL;
326 let last_milestone = last_logged_milestone.load(Ordering::Relaxed);
327 if current_milestone > last_milestone && current_milestone > 0 {
328 if last_logged_milestone
329 .compare_exchange(
330 last_milestone,
331 current_milestone,
332 Ordering::SeqCst,
333 Ordering::Relaxed,
334 )
335 .is_ok()
336 {
337 info!(target: LOG_TARGET, "📊 Scraped {total_keys} keys so far...");
338 }
339 }
340
341 let new_work = if is_full_batch {
343 if let Some((second_last, last)) = last_two_keys {
344 subdivide_remaining_range(
345 &second_last,
346 &last,
347 range.end_key.as_ref(),
348 &range.prefix,
349 )
350 } else {
351 vec![]
352 }
353 } else {
354 vec![]
355 };
356
357 ProcessResult::Success { new_work }
358 }
359 })
360 .await;
361
362 let keys: Vec<_> = all_keys_for_result.lock().unwrap().iter().cloned().collect();
363 info!(target: LOG_TARGET, "🎉 Parallel key fetching complete: {} unique keys", keys.len());
364
365 Ok(keys)
366 }
367
368 async fn get_storage_data_dynamic_batch_size(
413 client: &Client,
414 worker_index: usize,
415 payloads: &[(String, ArrayParams)],
416 bar: &ProgressBar,
417 batch_size: usize,
418 ) -> std::result::Result<Vec<Option<StorageData>>, String> {
419 let mut all_data: Vec<Option<StorageData>> = vec![];
420 let mut start_index = 0;
421 let total_payloads = payloads.len();
422
423 while start_index < total_payloads {
424 let end_index = usize::min(start_index + batch_size, total_payloads);
425 let page = &payloads[start_index..end_index];
426
427 trace!(
428 target: LOG_TARGET,
429 "Worker {worker_index}: fetching values {start_index}..{end_index} of {total_payloads}",
430 );
431
432 let mut batch = BatchRequestBuilder::new();
434 for (method, params) in page.iter() {
435 if batch.insert(method, params.clone()).is_err() {
436 panic!("Invalid batch method and/or params; qed");
437 }
438 }
439
440 let rpc_result = with_timeout(
441 client.ws_client.batch_request::<Option<StorageData>>(batch),
442 RPC_TIMEOUT,
443 )
444 .await;
445
446 let batch_response = match rpc_result {
447 Ok(Ok(r)) => r,
448 Ok(Err(e)) => return Err(format!("RPC error: {e:?}")),
449 Err(()) => return Err("timeout".to_string()),
450 };
451
452 let batch_response_len = batch_response.len();
453 for item in batch_response.into_iter() {
454 match item {
455 Ok(x) => all_data.push(x),
456 Err(e) => {
457 warn!(target: LOG_TARGET, "Value worker {worker_index}: batch item error: {}", e.message());
458 all_data.push(None);
459 },
460 }
461 }
462 bar.inc(batch_response_len as u64);
463
464 start_index = end_index;
465 }
466
467 Ok(all_data)
468 }
469
470 pub(crate) async fn rpc_get_pairs(
475 &self,
476 prefix: StorageKey,
477 at: B::Hash,
478 pending_ext: &mut TestExternalities<HashingFor<B>>,
479 ) -> Result<Vec<KeyValue>> {
480 let parallel = self.parallel_requests();
481 let keys = logging::with_elapsed_async(
482 || async { self.rpc_get_keys_parallel(&prefix, at, parallel).await },
483 "Scraping keys...",
484 |keys| format!("Found {} keys", keys.len()),
485 )
486 .await?;
487
488 if keys.is_empty() {
489 return Ok(Default::default());
490 }
491
492 let conn_manager = self.conn_manager()?;
493
494 let payloads = keys
495 .iter()
496 .map(|key| ("state_getStorage".to_string(), rpc_params!(key, at)))
497 .collect::<Vec<_>>();
498
499 let bar = ProgressBar::new(payloads.len() as u64);
500 bar.enable_steady_tick(Duration::from_secs(1));
501 bar.set_message("Downloading key values".to_string());
502 bar.set_style(
503 ProgressStyle::with_template(
504 "[{elapsed_precise}] {msg} {per_sec} [{wide_bar}] {pos}/{len} ({eta})",
505 )
506 .unwrap()
507 .progress_chars("=>-"),
508 );
509
510 const BATCH_SIZE: usize = 1000;
513 let batches: VecDeque<_> = payloads
514 .chunks(BATCH_SIZE)
515 .enumerate()
516 .map(|(i, chunk)| (i * BATCH_SIZE, chunk.to_vec(), BATCH_SIZE))
517 .collect();
518
519 info!(target: LOG_TARGET, "🔧 Initialized {} batches for value fetching", batches.len());
520 info!(target: LOG_TARGET, "🚀 Spawning {parallel} parallel workers for value fetching");
521
522 let results: Arc<Mutex<Vec<Option<StorageData>>>> =
523 Arc::new(Mutex::new(vec![None; payloads.len()]));
524 let results_for_extraction = results.clone();
525 let bar_for_finish = bar.clone();
526
527 run_workers(
528 batches,
529 conn_manager,
530 parallel,
531 move |worker_index, (start_index, batch, batch_size), client| {
532 let results = results.clone();
533 let bar = bar.clone();
534
535 async move {
536 debug!(
537 target: LOG_TARGET,
538 "Value worker {worker_index}: Processing batch at {start_index} with {} payloads",
539 batch.len()
540 );
541
542 match Self::get_storage_data_dynamic_batch_size(
543 &client,
544 worker_index,
545 &batch,
546 &bar,
547 batch_size,
548 )
549 .await
550 {
551 Ok(batch_results) => {
552 let mut results_lock = results.lock().unwrap();
553 for (offset, result) in batch_results.into_iter().enumerate() {
554 results_lock[start_index + offset] = result;
555 }
556 ProcessResult::Success { new_work: vec![] }
557 },
558 Err(e) => {
559 debug!(target: LOG_TARGET, "Value worker {worker_index}: failed: {e:?}");
560 let new_batch_size = (batch_size / 2).max(10);
561 ProcessResult::Retry {
562 work: (start_index, batch, new_batch_size),
563 sleep_duration: Duration::from_secs(15),
564 recreate_client: true,
565 }
566 },
567 }
568 }
569 },
570 )
571 .await;
572
573 let storage_data = results_for_extraction.lock().unwrap().clone();
574
575 bar_for_finish.finish_with_message("✅ Downloaded key values");
576 println!();
577
578 assert_eq!(keys.len(), storage_data.len());
580
581 let key_values: Vec<_> = keys
584 .iter()
585 .zip(storage_data)
586 .filter_map(|(key, maybe_value)| maybe_value.map(|data| (key.clone(), data)))
587 .collect();
588
589 logging::with_elapsed(
590 || {
591 pending_ext.batch_insert(key_values.clone().into_iter().filter_map(|(k, v)| {
592 match is_default_child_storage_key(&k.0) {
595 true => None,
596 false => Some((k.0, v.0)),
597 }
598 }));
599
600 Ok(())
601 },
602 "Inserting keys into DB...",
603 |_| "Inserted keys into DB".into(),
604 )
605 .expect("must succeed; qed");
606
607 Ok(key_values)
608 }
609
610 pub(crate) async fn rpc_child_get_storage_paged(
612 client: &Client,
613 prefixed_top_key: &StorageKey,
614 child_keys: Vec<StorageKey>,
615 at: B::Hash,
616 ) -> Result<Vec<KeyValue>> {
617 let payloads: Vec<_> = child_keys
618 .iter()
619 .map(|key| {
620 (
621 "childstate_getStorage".to_string(),
622 rpc_params![
623 PrefixedStorageKey::new(prefixed_top_key.as_ref().to_vec()),
624 key,
625 at
626 ],
627 )
628 })
629 .collect();
630
631 let bar = ProgressBar::new(payloads.len() as u64);
632 let storage_data =
633 Self::get_storage_data_dynamic_batch_size(client, 0, &payloads, &bar, 1000)
634 .await
635 .map_err(|_| "rpc child_get_storage failed")?;
636
637 Ok(child_keys
639 .into_iter()
640 .zip(storage_data)
641 .filter_map(|(key, maybe_value)| maybe_value.map(|v| (key, v)))
642 .collect())
643 }
644}
645
646impl<B: BlockT> Builder<B>
647where
648 B::Hash: DeserializeOwned,
649 B::Header: DeserializeOwned,
650{
651 async fn fetch_single_child_trie(
653 client: &Client,
654 prefixed_top_key: &StorageKey,
655 at: B::Hash,
656 ) -> Result<(ChildInfo, Vec<KeyValue>)> {
657 let top_key = PrefixedStorageKey::new(prefixed_top_key.0.clone());
658 let page_size = 1000u32;
659
660 trace!(
661 target: LOG_TARGET,
662 "Fetching child trie keys for {:?}",
663 HexDisplay::from(&prefixed_top_key.0)
664 );
665
666 let mut child_keys = Vec::new();
668 let mut start_key: Option<StorageKey> = None;
669
670 loop {
671 let rpc_result = with_timeout(
672 substrate_rpc_client::ChildStateApi::storage_keys_paged(
673 client.ws_client.as_ref(),
674 top_key.clone(),
675 Some(StorageKey(vec![])),
676 page_size,
677 start_key.clone(),
678 Some(at),
679 ),
680 RPC_TIMEOUT,
681 )
682 .await;
683
684 let page = match rpc_result {
685 Ok(Ok(p)) => p,
686 Ok(Err(e)) => {
687 debug!(target: LOG_TARGET, "Child trie RPC error: {e:?}");
688 return Err("rpc child_get_keys failed");
689 },
690 Err(()) => {
691 debug!(target: LOG_TARGET, "Child trie RPC timeout");
692 return Err("rpc child_get_keys timeout");
693 },
694 };
695
696 let is_full_batch = page.len() == page_size as usize;
697 start_key = page.last().cloned();
698 child_keys.extend(page);
699
700 if !is_full_batch {
701 break;
702 }
703 }
704
705 let child_kv =
707 Self::rpc_child_get_storage_paged(client, prefixed_top_key, child_keys, at).await?;
708
709 let un_prefixed = match ChildType::from_prefixed_key(&top_key) {
711 Some((ChildType::ParentKeyId, storage_key)) => storage_key,
712 None => return Err("invalid child key"),
713 };
714
715 Ok((ChildInfo::new_default(un_prefixed), child_kv))
716 }
717
718 async fn load_child_remote(
726 &self,
727 top_kv: &[KeyValue],
728 pending_ext: &mut TestExternalities<HashingFor<B>>,
729 ) -> Result<ChildKeyValues> {
730 let child_roots: VecDeque<StorageKey> = top_kv
731 .iter()
732 .filter(|(k, _)| is_default_child_storage_key(k.as_ref()))
733 .map(|(k, _)| k.clone())
734 .collect();
735
736 if child_roots.is_empty() {
737 info!(target: LOG_TARGET, "👩👦 no child roots found to scrape");
738 return Ok(Default::default());
739 }
740
741 let total_count = child_roots.len();
742 info!(
743 target: LOG_TARGET,
744 "👩👦 scraping child-tree data from {} child tries",
745 total_count,
746 );
747
748 let at = self.as_online().at_expected();
749 let conn_manager = self.conn_manager()?;
750 let parallel = self.parallel_requests();
751
752 let results: Arc<Mutex<Vec<(ChildInfo, Vec<KeyValue>)>>> = Arc::new(Mutex::new(Vec::new()));
753 let results_for_extraction = results.clone();
754 let completed_count = Arc::new(AtomicUsize::new(0));
755
756 run_workers(
757 child_roots,
758 conn_manager,
759 parallel,
760 move |worker_index, prefixed_top_key, client| {
761 let results = results.clone();
762 let completed_count = completed_count.clone();
763
764 async move {
765 match Self::fetch_single_child_trie(&client, &prefixed_top_key, at).await {
766 Ok((info, child_kv_inner)) => {
767 results.lock().unwrap().push((info, child_kv_inner));
768
769 let done = completed_count.fetch_add(1, Ordering::SeqCst) + 1;
770 if done.is_multiple_of(100) || done == total_count {
771 info!(
772 target: LOG_TARGET,
773 "👩👦 Child tries progress: {}/{} completed",
774 done,
775 total_count
776 );
777 }
778
779 ProcessResult::Success { new_work: vec![] }
780 },
781 Err(e) => {
782 error!(target: LOG_TARGET, "Worker {worker_index}: Failed: {e:?}");
783 ProcessResult::Retry {
784 work: prefixed_top_key,
785 sleep_duration: Duration::from_secs(5),
786 recreate_client: true,
787 }
788 },
789 }
790 }
791 },
792 )
793 .await;
794
795 let child_kv_results = results_for_extraction.lock().unwrap().clone();
797
798 let mut child_kv = Vec::new();
799 for (info, kv_inner) in child_kv_results {
800 let key_values: Vec<(Vec<u8>, Vec<u8>)> =
801 kv_inner.iter().cloned().map(|(k, v)| (k.0, v.0)).collect();
802 for (k, v) in key_values {
803 pending_ext.insert_child(info.clone(), k, v);
804 }
805 child_kv.push((info, kv_inner));
806 }
807
808 info!(
809 target: LOG_TARGET,
810 "👩👦 Completed scraping {} child tries",
811 child_kv.len()
812 );
813
814 Ok(child_kv)
815 }
816
817 async fn load_top_remote(
822 &self,
823 pending_ext: &mut TestExternalities<HashingFor<B>>,
824 ) -> Result<TopKeyValues> {
825 let config = self.as_online();
826 let at = self
827 .as_online()
828 .at
829 .expect("online config must be initialized by this point; qed.");
830 info!(target: LOG_TARGET, "scraping key-pairs from remote at block height {at:?}");
831
832 let mut keys_and_values = Vec::new();
833 for prefix in &config.hashed_prefixes {
834 let now = std::time::Instant::now();
835 let additional_key_values =
836 self.rpc_get_pairs(StorageKey(prefix.to_vec()), at, pending_ext).await?;
837 let elapsed = now.elapsed();
838 info!(
839 target: LOG_TARGET,
840 "adding data for hashed prefix: {:?}, took {:.2}s",
841 HexDisplay::from(prefix),
842 elapsed.as_secs_f32()
843 );
844 keys_and_values.extend(additional_key_values);
845 }
846
847 for key in &config.hashed_keys {
848 let key = StorageKey(key.to_vec());
849 info!(
850 target: LOG_TARGET,
851 "adding data for hashed key: {:?}",
852 HexDisplay::from(&key)
853 );
854 match self.rpc_get_storage(key.clone(), Some(at)).await? {
855 Some(value) => {
856 pending_ext.insert(key.clone().0, value.clone().0);
857 keys_and_values.push((key, value));
858 },
859 None => {
860 warn!(
861 target: LOG_TARGET,
862 "no data found for hashed key: {:?}",
863 HexDisplay::from(&key)
864 );
865 },
866 }
867 }
868
869 Ok(keys_and_values)
870 }
871
872 async fn init_remote_client(&mut self) -> Result<()> {
876 let online_config = self.as_online();
878 let mut clients = Vec::new();
879 for uri in &online_config.transport_uris {
880 if let Some(client) = Client::new(uri.clone()).await {
881 clients.push(Arc::new(tokio::sync::Mutex::new(client)));
882 }
883 }
884 self.conn_manager = Some(ConnectionManager::new(clients)?);
885
886 if self.as_online().at.is_none() {
888 let at = self.rpc_get_head().await?;
889 info!(
890 target: LOG_TARGET,
891 "since no at is provided, setting it to latest finalized head, {at:?}",
892 );
893 self.as_online_mut().at = Some(at);
894 }
895
896 let online_config = self.as_online_mut();
898 online_config.pallets.iter().for_each(|p| {
899 online_config
900 .hashed_prefixes
901 .push(sp_crypto_hashing::twox_128(p.as_bytes()).to_vec())
902 });
903
904 if online_config.child_trie {
905 online_config.hashed_prefixes.push(DEFAULT_CHILD_STORAGE_KEY_PREFIX.to_vec());
906 }
907
908 if online_config
911 .hashed_prefixes
912 .iter()
913 .filter(|p| *p != DEFAULT_CHILD_STORAGE_KEY_PREFIX)
914 .count() == 0
915 {
916 info!(
917 target: LOG_TARGET,
918 "since no prefix is filtered, the data for all pallets will be downloaded"
919 );
920 online_config.hashed_prefixes.push(vec![]);
921 }
922
923 Ok(())
924 }
925
926 async fn load_header(&self) -> Result<B::Header> {
928 let conn_manager = self.conn_manager()?;
929 let at = self.as_online().at_expected();
930
931 for i in 0..conn_manager.num_clients() {
932 let client = conn_manager.get(i).await;
933 let result = with_timeout(
934 ChainApi::<(), _, B::Header, ()>::header(client.ws_client.as_ref(), Some(at)),
935 RPC_TIMEOUT,
936 )
937 .await;
938
939 match result {
940 Ok(Ok(Some(header))) => return Ok(header),
941 Ok(Ok(None)) => {
942 debug!(target: LOG_TARGET, "Client {i}: header returned None");
943 },
944 Ok(Err(e)) => {
945 debug!(target: LOG_TARGET, "Client {i}: header RPC error: {e:?}");
946 },
947 Err(()) => {
948 debug!(target: LOG_TARGET, "Client {i}: header timeout");
949 },
950 }
951 }
952
953 Err("rpc header failed on all clients")
954 }
955
956 async fn load_remote_and_maybe_save(&mut self) -> Result<TestExternalities<HashingFor<B>>> {
961 let state_version = self.fetch_state_version().await?;
962 let mut pending_ext = TestExternalities::new_with_code_and_state(
963 Default::default(),
964 Default::default(),
965 self.overwrite_state_version.unwrap_or(state_version),
966 );
967
968 let top_kv = self.load_top_remote(&mut pending_ext).await?;
970 self.load_child_remote(&top_kv, &mut pending_ext).await?;
971
972 let header = self.load_header().await?;
973 let (raw_storage, computed_root) = pending_ext.into_raw_snapshot();
974
975 if let Some(path) = self.as_online().state_snapshot.clone().map(|c| c.path) {
977 let snapshot =
978 Snapshot::<B>::new(state_version, raw_storage.clone(), computed_root, header);
979 let encoded = snapshot.encode();
980 info!(
981 target: LOG_TARGET,
982 "writing snapshot of {} bytes to {path:?}",
983 encoded.len(),
984 );
985 std::fs::write(path, encoded).map_err(|_| "fs::write failed")?;
986 }
987
988 Ok(TestExternalities::from_raw_snapshot(
990 raw_storage,
991 computed_root,
992 self.overwrite_state_version.unwrap_or(state_version),
993 ))
994 }
995
996 async fn do_load_remote(&mut self) -> Result<RemoteExternalities<B>> {
997 self.init_remote_client().await?;
998 let inner_ext = self.load_remote_and_maybe_save().await?;
999 Ok(RemoteExternalities { header: self.load_header().await?, inner_ext })
1000 }
1001
1002 fn do_load_offline(&mut self, config: OfflineConfig) -> Result<RemoteExternalities<B>> {
1003 let (header, inner_ext) = logging::with_elapsed(
1004 || {
1005 info!(target: LOG_TARGET, "Loading snapshot from {:?}", &config.state_snapshot.path);
1006
1007 let Snapshot { header, state_version, raw_storage, storage_root, .. } =
1008 Snapshot::<B>::load(&config.state_snapshot.path)?;
1009 let inner_ext = TestExternalities::from_raw_snapshot(
1010 raw_storage,
1011 storage_root,
1012 self.overwrite_state_version.unwrap_or(state_version),
1013 );
1014
1015 Ok((header, inner_ext))
1016 },
1017 "Loading snapshot...",
1018 |_| "Loaded snapshot".into(),
1019 )?;
1020
1021 Ok(RemoteExternalities { inner_ext, header })
1022 }
1023
1024 pub(crate) async fn pre_build(mut self) -> Result<RemoteExternalities<B>> {
1025 let mut ext = match self.mode.clone() {
1026 Mode::Offline(config) => self.do_load_offline(config)?,
1027 Mode::Online(_) => self.do_load_remote().await?,
1028 Mode::OfflineOrElseOnline(offline_config, _) => {
1029 match self.do_load_offline(offline_config) {
1030 Ok(x) => x,
1031 Err(_) => self.do_load_remote().await?,
1032 }
1033 },
1034 };
1035
1036 if !self.hashed_key_values.is_empty() {
1038 info!(
1039 target: LOG_TARGET,
1040 "extending externalities with {} manually injected key-values",
1041 self.hashed_key_values.len()
1042 );
1043 ext.batch_insert(self.hashed_key_values.into_iter().map(|(k, v)| (k.0, v.0)));
1044 }
1045
1046 if !self.hashed_blacklist.is_empty() {
1048 info!(
1049 target: LOG_TARGET,
1050 "excluding externalities from {} keys",
1051 self.hashed_blacklist.len()
1052 );
1053 for k in self.hashed_blacklist {
1054 ext.execute_with(|| sp_io::storage::clear(&k));
1055 }
1056 }
1057
1058 Ok(ext)
1059 }
1060}
1061
1062impl<B: BlockT> Builder<B>
1064where
1065 B::Hash: DeserializeOwned,
1066 B::Header: DeserializeOwned,
1067{
1068 pub fn new() -> Self {
1070 Default::default()
1071 }
1072
1073 pub fn inject_hashed_key_value(mut self, injections: Vec<KeyValue>) -> Self {
1075 self.hashed_key_values.extend(injections);
1076 self
1077 }
1078
1079 pub fn blacklist_hashed_key(mut self, hashed: &[u8]) -> Self {
1082 self.hashed_blacklist.push(hashed.to_vec());
1083 self
1084 }
1085
1086 pub fn mode(mut self, mode: Mode<B::Hash>) -> Self {
1088 self.mode = mode;
1089 self
1090 }
1091
1092 pub fn overwrite_state_version(mut self, version: StateVersion) -> Self {
1094 self.overwrite_state_version = Some(version);
1095 self
1096 }
1097
1098 pub async fn build(self) -> Result<RemoteExternalities<B>> {
1099 let mut ext = self.pre_build().await?;
1100 ext.commit_all().unwrap();
1101
1102 info!(
1103 target: LOG_TARGET,
1104 "initialized state externalities with storage root {:?} and state_version {:?}",
1105 ext.as_backend().root(),
1106 ext.state_version
1107 );
1108
1109 Ok(ext)
1110 }
1111}
1112
1113#[cfg(test)]
1114mod test_prelude {
1115 pub(crate) use super::*;
1116 pub(crate) use sp_runtime::testing::{Block as RawBlock, MockCallU64};
1117 pub(crate) type UncheckedXt = sp_runtime::testing::TestXt<MockCallU64, ()>;
1118 pub(crate) type Block = RawBlock<UncheckedXt>;
1119
1120 pub(crate) fn init_logger() {
1121 sp_tracing::try_init_simple();
1122 }
1123}
1124
1125#[cfg(test)]
1126mod tests {
1127 use super::test_prelude::*;
1128
1129 #[tokio::test]
1130 async fn can_load_state_snapshot() {
1131 init_logger();
1132 Builder::<Block>::new()
1133 .mode(Mode::Offline(OfflineConfig {
1134 state_snapshot: SnapshotConfig::new("test_data/test.snap"),
1135 }))
1136 .build()
1137 .await
1138 .unwrap()
1139 .execute_with(|| {});
1140 }
1141
1142 #[tokio::test]
1143 async fn can_exclude_from_snapshot() {
1144 init_logger();
1145
1146 let some_key = Builder::<Block>::new()
1148 .mode(Mode::Offline(OfflineConfig {
1149 state_snapshot: SnapshotConfig::new("test_data/test.snap"),
1150 }))
1151 .build()
1152 .await
1153 .expect("Can't read state snapshot file")
1154 .execute_with(|| {
1155 let key =
1156 sp_io::storage::next_key(&[]).expect("some key must exist in the snapshot");
1157 assert!(sp_io::storage::get(&key).is_some());
1158 key
1159 });
1160
1161 Builder::<Block>::new()
1162 .mode(Mode::Offline(OfflineConfig {
1163 state_snapshot: SnapshotConfig::new("test_data/test.snap"),
1164 }))
1165 .blacklist_hashed_key(&some_key)
1166 .build()
1167 .await
1168 .expect("Can't read state snapshot file")
1169 .execute_with(|| assert!(sp_io::storage::get(&some_key).is_none()));
1170 }
1171}
1172
1173#[cfg(all(test, feature = "remote-test"))]
1174mod remote_tests {
1175 use super::test_prelude::*;
1176 use frame_support::storage::KeyPrefixIterator;
1177 use std::{env, os::unix::fs::MetadataExt, path::Path};
1178
1179 fn endpoint() -> String {
1180 env::var("TEST_WS").unwrap_or_else(|_| DEFAULT_WS_ENDPOINT.to_string())
1181 }
1182
1183 #[tokio::test]
1184 async fn state_version_is_kept_and_can_be_altered() {
1185 const CACHE: &'static str = "state_version_is_kept_and_can_be_altered";
1186 init_logger();
1187
1188 let ext = Builder::<Block>::new()
1190 .mode(Mode::Online(OnlineConfig {
1191 transport_uris: vec![endpoint().clone()],
1192 pallets: vec!["Proxy".to_owned()],
1193 child_trie: false,
1194 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1195 ..Default::default()
1196 }))
1197 .build()
1198 .await
1199 .unwrap();
1200
1201 let cached_ext = Builder::<Block>::new()
1203 .mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
1204 .build()
1205 .await
1206 .unwrap();
1207
1208 assert_eq!(ext.state_version, cached_ext.state_version);
1209
1210 let other = match ext.state_version {
1212 StateVersion::V0 => StateVersion::V1,
1213 StateVersion::V1 => StateVersion::V0,
1214 };
1215 let cached_ext = Builder::<Block>::new()
1216 .mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
1217 .overwrite_state_version(other)
1218 .build()
1219 .await
1220 .unwrap();
1221
1222 assert_eq!(cached_ext.state_version, other);
1223 }
1224
1225 #[tokio::test]
1226 async fn snapshot_block_hash_works() {
1227 const CACHE: &'static str = "snapshot_block_hash_works";
1228 init_logger();
1229
1230 let ext = Builder::<Block>::new()
1232 .mode(Mode::Online(OnlineConfig {
1233 transport_uris: vec![endpoint().clone()],
1234 pallets: vec!["Proxy".to_owned()],
1235 child_trie: false,
1236 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1237 ..Default::default()
1238 }))
1239 .build()
1240 .await
1241 .unwrap();
1242
1243 let cached_ext = Builder::<Block>::new()
1245 .mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
1246 .build()
1247 .await
1248 .unwrap();
1249
1250 assert_eq!(ext.header.hash(), cached_ext.header.hash());
1251 }
1252
1253 #[tokio::test]
1254 async fn child_keys_are_loaded() {
1255 const CACHE: &'static str = "snapshot_retains_storage";
1256 init_logger();
1257
1258 use sp_state_machine::Backend;
1261
1262 let mut child_ext = Builder::<Block>::new()
1264 .mode(Mode::Online(OnlineConfig {
1265 transport_uris: vec![endpoint().clone()],
1266 pallets: vec!["Proxy".to_owned()],
1267 child_trie: true,
1268 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1269 ..Default::default()
1270 }))
1271 .build()
1272 .await
1273 .unwrap();
1274
1275 let mut ext = Builder::<Block>::new()
1277 .mode(Mode::Online(OnlineConfig {
1278 transport_uris: vec![endpoint().clone()],
1279 pallets: vec!["Proxy".to_owned()],
1280 child_trie: false,
1281 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1282 ..Default::default()
1283 }))
1284 .build()
1285 .await
1286 .unwrap();
1287
1288 let child_info = sp_core::storage::ChildInfo::new_default(b"test_child");
1290 let child_key: Vec<u8> = b"k1".to_vec();
1291 let child_value: Vec<u8> = b"v1".to_vec();
1292
1293 let child_db_keys_before = child_ext.as_backend().backend_storage().keys().len();
1295
1296 child_ext.insert_child(child_info.clone(), child_key.clone(), child_value.clone());
1298
1299 let child_backend = child_ext.as_backend();
1301 let backend = ext.as_backend();
1302 assert_eq!(
1303 child_backend.child_storage(&child_info, &child_key).unwrap(),
1304 Some(child_value)
1305 );
1306 assert_eq!(backend.child_storage(&child_info, &child_key).unwrap(), None);
1307
1308 let child_db_keys_after = child_backend.backend_storage().keys().len();
1310 assert!(child_db_keys_after > child_db_keys_before);
1311 }
1312
1313 #[tokio::test]
1314 async fn offline_else_online_works() {
1315 const CACHE: &'static str = "offline_else_online_works_data";
1316 init_logger();
1317 Builder::<Block>::new()
1319 .mode(Mode::OfflineOrElseOnline(
1320 OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) },
1321 OnlineConfig {
1322 transport_uris: vec![endpoint().clone()],
1323 pallets: vec!["Proxy".to_owned()],
1324 child_trie: false,
1325 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1326 ..Default::default()
1327 },
1328 ))
1329 .build()
1330 .await
1331 .unwrap()
1332 .execute_with(|| {});
1333
1334 Builder::<Block>::new()
1336 .mode(Mode::OfflineOrElseOnline(
1337 OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) },
1338 OnlineConfig {
1339 transport_uris: vec!["ws://non-existent:666".to_owned()],
1340 ..Default::default()
1341 },
1342 ))
1343 .build()
1344 .await
1345 .unwrap()
1346 .execute_with(|| {});
1347
1348 let to_delete = std::fs::read_dir(Path::new("."))
1349 .unwrap()
1350 .into_iter()
1351 .map(|d| d.unwrap())
1352 .filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
1353 .collect::<Vec<_>>();
1354
1355 assert!(to_delete.len() == 1);
1356 std::fs::remove_file(to_delete[0].path()).unwrap();
1357 }
1358
1359 #[tokio::test]
1360 async fn can_build_one_small_pallet() {
1361 init_logger();
1362 Builder::<Block>::new()
1363 .mode(Mode::Online(OnlineConfig {
1364 transport_uris: vec![endpoint().clone()],
1365 pallets: vec!["Proxy".to_owned()],
1366 child_trie: false,
1367 ..Default::default()
1368 }))
1369 .build()
1370 .await
1371 .unwrap()
1372 .execute_with(|| {});
1373 }
1374
1375 #[tokio::test]
1376 async fn can_build_few_pallet() {
1377 init_logger();
1378 Builder::<Block>::new()
1379 .mode(Mode::Online(OnlineConfig {
1380 transport_uris: vec![endpoint().clone()],
1381 pallets: vec!["Proxy".to_owned(), "Multisig".to_owned()],
1382 child_trie: false,
1383 ..Default::default()
1384 }))
1385 .build()
1386 .await
1387 .unwrap()
1388 .execute_with(|| {});
1389 }
1390
1391 #[tokio::test(flavor = "multi_thread")]
1392 async fn can_create_snapshot() {
1393 const CACHE: &'static str = "can_create_snapshot";
1394 init_logger();
1395
1396 Builder::<Block>::new()
1397 .mode(Mode::Online(OnlineConfig {
1398 transport_uris: vec![endpoint().clone()],
1399 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1400 pallets: vec!["Proxy".to_owned()],
1401 child_trie: false,
1402 ..Default::default()
1403 }))
1404 .build()
1405 .await
1406 .unwrap()
1407 .execute_with(|| {});
1408
1409 let to_delete = std::fs::read_dir(Path::new("."))
1410 .unwrap()
1411 .into_iter()
1412 .map(|d| d.unwrap())
1413 .filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
1414 .collect::<Vec<_>>();
1415
1416 assert!(to_delete.len() == 1);
1417 let to_delete = to_delete.first().unwrap();
1418 assert!(std::fs::metadata(to_delete.path()).unwrap().size() > 1);
1419 std::fs::remove_file(to_delete.path()).unwrap();
1420 }
1421
1422 #[tokio::test]
1423 async fn can_create_child_snapshot() {
1424 const CACHE: &'static str = "can_create_child_snapshot";
1425 init_logger();
1426 Builder::<Block>::new()
1427 .mode(Mode::Online(OnlineConfig {
1428 transport_uris: vec![endpoint().clone()],
1429 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1430 pallets: vec!["Crowdloan".to_owned()],
1431 child_trie: true,
1432 ..Default::default()
1433 }))
1434 .build()
1435 .await
1436 .unwrap()
1437 .execute_with(|| {});
1438
1439 let to_delete = std::fs::read_dir(Path::new("."))
1440 .unwrap()
1441 .into_iter()
1442 .map(|d| d.unwrap())
1443 .filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
1444 .collect::<Vec<_>>();
1445
1446 assert!(to_delete.len() == 1);
1447 let to_delete = to_delete.first().unwrap();
1448 assert!(std::fs::metadata(to_delete.path()).unwrap().size() > 1);
1449 std::fs::remove_file(to_delete.path()).unwrap();
1450 }
1451
1452 #[tokio::test]
1453 async fn can_build_big_pallet() {
1454 if std::option_env!("TEST_WS").is_none() {
1455 return;
1456 }
1457 init_logger();
1458 Builder::<Block>::new()
1459 .mode(Mode::Online(OnlineConfig {
1460 transport_uris: vec![endpoint().clone()],
1461 pallets: vec!["Staking".to_owned()],
1462 child_trie: false,
1463 ..Default::default()
1464 }))
1465 .build()
1466 .await
1467 .unwrap()
1468 .execute_with(|| {});
1469 }
1470
1471 #[tokio::test]
1472 async fn can_fetch_all() {
1473 if std::option_env!("TEST_WS").is_none() {
1474 return;
1475 }
1476 init_logger();
1477 Builder::<Block>::new()
1478 .mode(Mode::Online(OnlineConfig {
1479 transport_uris: vec![endpoint().clone()],
1480 ..Default::default()
1481 }))
1482 .build()
1483 .await
1484 .unwrap()
1485 .execute_with(|| {});
1486 }
1487
1488 #[tokio::test]
1489 async fn can_fetch_in_parallel() {
1490 init_logger();
1491
1492 let mut builder = Builder::<Block>::new().mode(Mode::Online(OnlineConfig {
1493 transport_uris: vec![endpoint().clone()],
1494 ..Default::default()
1495 }));
1496 builder.init_remote_client().await.unwrap();
1497
1498 let at = builder.as_online().at.unwrap();
1499
1500 let prefix = StorageKey(vec![13]);
1502 let para = builder.rpc_get_keys_parallel(&prefix, at, 4).await.unwrap();
1503 assert!(!para.is_empty(), "Should fetch some keys with prefix");
1504
1505 let prefix = StorageKey(vec![]);
1507 let para = builder.rpc_get_keys_parallel(&prefix, at, 8).await.unwrap();
1508 assert!(!para.is_empty(), "Should fetch some keys with empty prefix");
1509 }
1510
1511 #[tokio::test]
1512 #[ignore] async fn bridge_hub_polkadot_storage_root_matches() {
1514 init_logger();
1515
1516 let endpoints = vec![
1518 "wss://bridge-hub-polkadot-rpc.n.dwellir.com",
1519 "wss://sys.ibp.network/bridgehub-polkadot",
1520 "wss://bridgehub-polkadot.api.onfinality.io/public",
1521 "wss://dot-rpc.stakeworld.io/bridgehub",
1522 ];
1523
1524 info!(target: LOG_TARGET, "Connecting to Bridge Hub Polkadot using {} RPC providers", endpoints.len());
1525
1526 let mut ext = Builder::<Block>::new()
1527 .mode(Mode::Online(OnlineConfig {
1528 transport_uris: endpoints.into_iter().map(|e| e.to_owned()).collect(),
1529 child_trie: true,
1530 ..Default::default()
1531 }))
1532 .build()
1533 .await
1534 .expect("Failed to build remote externalities");
1535
1536 let backend = ext.as_backend();
1538 let computed_root = *backend.root();
1539 let expected_root = ext.header.state_root;
1541
1542 info!(
1543 target: LOG_TARGET,
1544 "Computed storage root: {:?}",
1545 computed_root
1546 );
1547 info!(
1548 target: LOG_TARGET,
1549 "Expected storage root (from header): {:?}",
1550 expected_root
1551 );
1552
1553 assert_eq!(
1555 computed_root, expected_root,
1556 "Storage root mismatch! Computed: {:?}, Expected: {:?}. \
1557 This indicates that not all keys were fetched or there were duplicates.",
1558 computed_root, expected_root
1559 );
1560
1561 ext.execute_with(|| {
1563 let key_count = KeyPrefixIterator::<()>::new(vec![], vec![], |_| Ok(())).count();
1564
1565 info!(target: LOG_TARGET, "Total keys in state: {}", key_count);
1566 assert!(key_count > 0, "Should have fetched some keys");
1567 });
1568
1569 info!(
1570 target: LOG_TARGET,
1571 "✅ Storage root verification successful! All keys were fetched correctly."
1572 );
1573 }
1574
1575 #[tokio::test]
1576 async fn builder_fails_with_invalid_transport_uris() {
1577 init_logger();
1578
1579 let result = Builder::<Block>::new()
1581 .mode(Mode::Online(OnlineConfig {
1582 transport_uris: vec!["http://try-runtime.polkadot.io:443".to_string()],
1583 pallets: vec!["Proxy".to_owned()],
1584 ..Default::default()
1585 }))
1586 .build()
1587 .await;
1588
1589 match result {
1590 Err(e) => assert_eq!(e, "At least one client must be provided"),
1591 Ok(_) => panic!("Expected error but got success"),
1592 }
1593
1594 let result = Builder::<Block>::new()
1596 .mode(Mode::Online(OnlineConfig {
1597 transport_uris: vec![
1598 "http://try-runtime.polkadot.io:443".to_string(),
1599 "https://try-runtime.polkadot.io:443".to_string(),
1600 "garbage".to_string(),
1601 ],
1602 pallets: vec!["Proxy".to_owned()],
1603 ..Default::default()
1604 }))
1605 .build()
1606 .await;
1607
1608 match result {
1609 Err(e) => assert_eq!(e, "At least one client must be provided"),
1610 Ok(_) => panic!("Expected error but got success"),
1611 }
1612 }
1613}