1use codec::{Compact, Decode, Encode};
24use indicatif::{ProgressBar, ProgressStyle};
25use jsonrpsee::{core::params::ArrayParams, http_client::HttpClient};
26use log::*;
27use serde::de::DeserializeOwned;
28use sp_core::{
29 hexdisplay::HexDisplay,
30 storage::{
31 well_known_keys::{is_default_child_storage_key, DEFAULT_CHILD_STORAGE_KEY_PREFIX},
32 ChildInfo, ChildType, PrefixedStorageKey, StorageData, StorageKey,
33 },
34};
35use sp_runtime::{
36 traits::{Block as BlockT, HashingFor},
37 StateVersion,
38};
39use sp_state_machine::TestExternalities;
40use spinners::{Spinner, Spinners};
41use std::{
42 cmp::{max, min},
43 fs,
44 ops::{Deref, DerefMut},
45 path::{Path, PathBuf},
46 sync::Arc,
47 time::{Duration, Instant},
48};
49use substrate_rpc_client::{rpc_params, BatchRequestBuilder, ChainApi, ClientT, StateApi};
50use tokio_retry::{strategy::FixedInterval, Retry};
51
52type KeyValue = (StorageKey, StorageData);
53type TopKeyValues = Vec<KeyValue>;
54type ChildKeyValues = Vec<(ChildInfo, Vec<KeyValue>)>;
55type SnapshotVersion = Compact<u16>;
56
57const LOG_TARGET: &str = "remote-ext";
58const DEFAULT_HTTP_ENDPOINT: &str = "https://try-runtime.polkadot.io:443";
59const SNAPSHOT_VERSION: SnapshotVersion = Compact(4);
60
61#[derive(Decode, Encode)]
63struct Snapshot<B: BlockT> {
64 snapshot_version: SnapshotVersion,
65 state_version: StateVersion,
66 raw_storage: Vec<(Vec<u8>, (Vec<u8>, i32))>,
68 storage_root: B::Hash,
71 header: B::Header,
72}
73
74impl<B: BlockT> Snapshot<B> {
75 pub fn new(
76 state_version: StateVersion,
77 raw_storage: Vec<(Vec<u8>, (Vec<u8>, i32))>,
78 storage_root: B::Hash,
79 header: B::Header,
80 ) -> Self {
81 Self {
82 snapshot_version: SNAPSHOT_VERSION,
83 state_version,
84 raw_storage,
85 storage_root,
86 header,
87 }
88 }
89
90 fn load(path: &PathBuf) -> Result<Snapshot<B>, &'static str> {
91 let bytes = fs::read(path).map_err(|_| "fs::read failed.")?;
92 let snapshot_version = SnapshotVersion::decode(&mut &*bytes)
95 .map_err(|_| "Failed to decode snapshot version")?;
96
97 if snapshot_version != SNAPSHOT_VERSION {
98 return Err("Unsupported snapshot version detected. Please create a new snapshot.")
99 }
100
101 Decode::decode(&mut &*bytes).map_err(|_| "Decode failed")
102 }
103}
104
105pub struct RemoteExternalities<B: BlockT> {
108 pub inner_ext: TestExternalities<HashingFor<B>>,
110 pub header: B::Header,
112}
113
114impl<B: BlockT> Deref for RemoteExternalities<B> {
115 type Target = TestExternalities<HashingFor<B>>;
116 fn deref(&self) -> &Self::Target {
117 &self.inner_ext
118 }
119}
120
121impl<B: BlockT> DerefMut for RemoteExternalities<B> {
122 fn deref_mut(&mut self) -> &mut Self::Target {
123 &mut self.inner_ext
124 }
125}
126
127#[derive(Clone)]
129pub enum Mode<H> {
130 Online(OnlineConfig<H>),
132 Offline(OfflineConfig),
134 OfflineOrElseOnline(OfflineConfig, OnlineConfig<H>),
136}
137
138impl<H> Default for Mode<H> {
139 fn default() -> Self {
140 Mode::Online(OnlineConfig::default())
141 }
142}
143
144#[derive(Clone)]
148pub struct OfflineConfig {
149 pub state_snapshot: SnapshotConfig,
151}
152
153#[derive(Debug, Clone)]
155pub enum Transport {
156 Uri(String),
158 RemoteClient(HttpClient),
160}
161
162impl Transport {
163 fn as_client(&self) -> Option<&HttpClient> {
164 match self {
165 Self::RemoteClient(client) => Some(client),
166 _ => None,
167 }
168 }
169
170 async fn init(&mut self) -> Result<(), &'static str> {
172 if let Self::Uri(uri) = self {
173 log::debug!(target: LOG_TARGET, "initializing remote client to {:?}", uri);
174
175 let uri = if uri.starts_with("ws://") {
180 let uri = uri.replace("ws://", "http://");
181 log::info!(target: LOG_TARGET, "replacing ws:// in uri with http://: {:?} (ws is currently unstable for fetching remote storage, for more see https://github.com/paritytech/jsonrpsee/issues/1086)", uri);
182 uri
183 } else if uri.starts_with("wss://") {
184 let uri = uri.replace("wss://", "https://");
185 log::info!(target: LOG_TARGET, "replacing wss:// in uri with https://: {:?} (ws is currently unstable for fetching remote storage, for more see https://github.com/paritytech/jsonrpsee/issues/1086)", uri);
186 uri
187 } else {
188 uri.clone()
189 };
190 let http_client = HttpClient::builder()
191 .max_request_size(u32::MAX)
192 .max_response_size(u32::MAX)
193 .request_timeout(std::time::Duration::from_secs(60 * 5))
194 .build(uri)
195 .map_err(|e| {
196 log::error!(target: LOG_TARGET, "error: {:?}", e);
197 "failed to build http client"
198 })?;
199
200 *self = Self::RemoteClient(http_client)
201 }
202
203 Ok(())
204 }
205}
206
207impl From<String> for Transport {
208 fn from(uri: String) -> Self {
209 Transport::Uri(uri)
210 }
211}
212
213impl From<HttpClient> for Transport {
214 fn from(client: HttpClient) -> Self {
215 Transport::RemoteClient(client)
216 }
217}
218
219#[derive(Clone)]
223pub struct OnlineConfig<H> {
224 pub at: Option<H>,
227 pub state_snapshot: Option<SnapshotConfig>,
229 pub pallets: Vec<String>,
231 pub transport: Transport,
233 pub child_trie: bool,
235 pub hashed_prefixes: Vec<Vec<u8>>,
238 pub hashed_keys: Vec<Vec<u8>>,
240}
241
242impl<H: Clone> OnlineConfig<H> {
243 fn rpc_client(&self) -> &HttpClient {
245 self.transport
246 .as_client()
247 .expect("http client must have been initialized by now; qed.")
248 }
249
250 fn at_expected(&self) -> H {
251 self.at.clone().expect("block at must be initialized; qed")
252 }
253}
254
255impl<H> Default for OnlineConfig<H> {
256 fn default() -> Self {
257 Self {
258 transport: Transport::from(DEFAULT_HTTP_ENDPOINT.to_owned()),
259 child_trie: true,
260 at: None,
261 state_snapshot: None,
262 pallets: Default::default(),
263 hashed_keys: Default::default(),
264 hashed_prefixes: Default::default(),
265 }
266 }
267}
268
269impl<H> From<String> for OnlineConfig<H> {
270 fn from(t: String) -> Self {
271 Self { transport: t.into(), ..Default::default() }
272 }
273}
274
275#[derive(Clone)]
277pub struct SnapshotConfig {
278 pub path: PathBuf,
280}
281
282impl SnapshotConfig {
283 pub fn new<P: Into<PathBuf>>(path: P) -> Self {
284 Self { path: path.into() }
285 }
286}
287
288impl From<String> for SnapshotConfig {
289 fn from(s: String) -> Self {
290 Self::new(s)
291 }
292}
293
294impl Default for SnapshotConfig {
295 fn default() -> Self {
296 Self { path: Path::new("SNAPSHOT").into() }
297 }
298}
299
300#[derive(Clone)]
302pub struct Builder<B: BlockT> {
303 hashed_key_values: Vec<KeyValue>,
306 hashed_blacklist: Vec<Vec<u8>>,
308 mode: Mode<B::Hash>,
310 overwrite_state_version: Option<StateVersion>,
315}
316
317impl<B: BlockT> Default for Builder<B> {
318 fn default() -> Self {
319 Self {
320 mode: Default::default(),
321 hashed_key_values: Default::default(),
322 hashed_blacklist: Default::default(),
323 overwrite_state_version: None,
324 }
325 }
326}
327
328impl<B: BlockT> Builder<B> {
330 fn as_online(&self) -> &OnlineConfig<B::Hash> {
331 match &self.mode {
332 Mode::Online(config) => config,
333 Mode::OfflineOrElseOnline(_, config) => config,
334 _ => panic!("Unexpected mode: Online"),
335 }
336 }
337
338 fn as_online_mut(&mut self) -> &mut OnlineConfig<B::Hash> {
339 match &mut self.mode {
340 Mode::Online(config) => config,
341 Mode::OfflineOrElseOnline(_, config) => config,
342 _ => panic!("Unexpected mode: Online"),
343 }
344 }
345}
346
347impl<B: BlockT> Builder<B>
349where
350 B::Hash: DeserializeOwned,
351 B::Header: DeserializeOwned,
352{
353 const PARALLEL_REQUESTS: usize = 4;
354 const BATCH_SIZE_INCREASE_FACTOR: f32 = 1.10;
355 const BATCH_SIZE_DECREASE_FACTOR: f32 = 0.50;
356 const REQUEST_DURATION_TARGET: Duration = Duration::from_secs(15);
357 const INITIAL_BATCH_SIZE: usize = 10;
358 const DEFAULT_KEY_DOWNLOAD_PAGE: u32 = 1000;
360 const MAX_RETRIES: usize = 12;
361 const KEYS_PAGE_RETRY_INTERVAL: Duration = Duration::from_secs(5);
362
363 async fn rpc_get_storage(
364 &self,
365 key: StorageKey,
366 maybe_at: Option<B::Hash>,
367 ) -> Result<Option<StorageData>, &'static str> {
368 trace!(target: LOG_TARGET, "rpc: get_storage");
369 self.as_online().rpc_client().storage(key, maybe_at).await.map_err(|e| {
370 error!(target: LOG_TARGET, "Error = {:?}", e);
371 "rpc get_storage failed."
372 })
373 }
374
375 async fn rpc_get_head(&self) -> Result<B::Hash, &'static str> {
377 trace!(target: LOG_TARGET, "rpc: finalized_head");
378
379 ChainApi::<(), _, B::Header, ()>::finalized_head(self.as_online().rpc_client())
381 .await
382 .map_err(|e| {
383 error!(target: LOG_TARGET, "Error = {:?}", e);
384 "rpc finalized_head failed."
385 })
386 }
387
388 async fn get_keys_single_page(
389 &self,
390 prefix: Option<StorageKey>,
391 start_key: Option<StorageKey>,
392 at: B::Hash,
393 ) -> Result<Vec<StorageKey>, &'static str> {
394 self.as_online()
395 .rpc_client()
396 .storage_keys_paged(prefix, Self::DEFAULT_KEY_DOWNLOAD_PAGE, start_key, Some(at))
397 .await
398 .map_err(|e| {
399 error!(target: LOG_TARGET, "Error = {:?}", e);
400 "rpc get_keys failed"
401 })
402 }
403
404 async fn rpc_get_keys_parallel(
406 &self,
407 prefix: &StorageKey,
408 block: B::Hash,
409 parallel: usize,
410 ) -> Result<Vec<StorageKey>, &'static str> {
411 fn gen_start_keys(prefix: &StorageKey) -> Vec<StorageKey> {
414 let mut prefix = prefix.as_ref().to_vec();
415 let scale = 32usize.saturating_sub(prefix.len());
416
417 if scale < 9 {
419 prefix.extend(vec![0; scale]);
420 return vec![StorageKey(prefix)]
421 }
422
423 let chunks = 16;
424 let step = 0x10000 / chunks;
425 let ext = scale - 2;
426
427 (0..chunks)
428 .map(|i| {
429 let mut key = prefix.clone();
430 let start = i * step;
431 key.extend(vec![(start >> 8) as u8, (start & 0xff) as u8]);
432 key.extend(vec![0; ext]);
433 StorageKey(key)
434 })
435 .collect()
436 }
437
438 let start_keys = gen_start_keys(&prefix);
439 let start_keys: Vec<Option<&StorageKey>> = start_keys.iter().map(Some).collect();
440 let mut end_keys: Vec<Option<&StorageKey>> = start_keys[1..].to_vec();
441 end_keys.push(None);
442
443 let parallel = Arc::new(tokio::sync::Semaphore::new(parallel));
445 let builder = Arc::new(self.clone());
446 let mut handles = vec![];
447
448 for (start_key, end_key) in start_keys.into_iter().zip(end_keys) {
449 let permit = parallel
450 .clone()
451 .acquire_owned()
452 .await
453 .expect("semaphore is not closed until the end of loop");
454
455 let builder = builder.clone();
456 let prefix = prefix.clone();
457 let start_key = start_key.cloned();
458 let end_key = end_key.cloned();
459
460 let handle = tokio::spawn(async move {
461 let res = builder
462 .rpc_get_keys_in_range(&prefix, block, start_key.as_ref(), end_key.as_ref())
463 .await;
464 drop(permit);
465 res
466 });
467
468 handles.push(handle);
469 }
470
471 parallel.close();
472
473 let keys = futures::future::join_all(handles)
474 .await
475 .into_iter()
476 .filter_map(|res| match res {
477 Ok(Ok(keys)) => Some(keys),
478 _ => None,
479 })
480 .flatten()
481 .collect::<Vec<StorageKey>>();
482
483 Ok(keys)
484 }
485
486 async fn rpc_get_keys_in_range(
489 &self,
490 prefix: &StorageKey,
491 block: B::Hash,
492 start_key: Option<&StorageKey>,
493 end_key: Option<&StorageKey>,
494 ) -> Result<Vec<StorageKey>, &'static str> {
495 let mut last_key: Option<&StorageKey> = start_key;
496 let mut keys: Vec<StorageKey> = vec![];
497
498 loop {
499 let retry_strategy =
502 FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES);
503 let get_page_closure =
504 || self.get_keys_single_page(Some(prefix.clone()), last_key.cloned(), block);
505 let mut page = Retry::spawn(retry_strategy, get_page_closure).await?;
506
507 if let (Some(last), Some(end)) = (page.last(), end_key) {
509 if last >= end {
510 page.retain(|key| key < end);
511 }
512 }
513
514 let page_len = page.len();
515 keys.extend(page);
516 last_key = keys.last();
517
518 if page_len < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize {
521 log::debug!(target: LOG_TARGET, "last page received: {}", page_len);
522 break
523 }
524
525 log::debug!(
526 target: LOG_TARGET,
527 "new total = {}, full page received: {}",
528 keys.len(),
529 HexDisplay::from(last_key.expect("full page received, cannot be None"))
530 );
531 }
532
533 Ok(keys)
534 }
535
536 async fn get_storage_data_dynamic_batch_size(
581 client: &HttpClient,
582 payloads: Vec<(String, ArrayParams)>,
583 bar: &ProgressBar,
584 ) -> Result<Vec<Option<StorageData>>, String> {
585 let mut all_data: Vec<Option<StorageData>> = vec![];
586 let mut start_index = 0;
587 let mut retries = 0usize;
588 let mut batch_size = Self::INITIAL_BATCH_SIZE;
589 let total_payloads = payloads.len();
590
591 while start_index < total_payloads {
592 log::debug!(
593 target: LOG_TARGET,
594 "Remaining payloads: {} Batch request size: {}",
595 total_payloads - start_index,
596 batch_size,
597 );
598
599 let end_index = usize::min(start_index + batch_size, total_payloads);
600 let page = &payloads[start_index..end_index];
601
602 let mut batch = BatchRequestBuilder::new();
604 for (method, params) in page.iter() {
605 batch
606 .insert(method, params.clone())
607 .map_err(|_| "Invalid batch method and/or params")?;
608 }
609
610 let request_started = Instant::now();
611 let batch_response = match client.batch_request::<Option<StorageData>>(batch).await {
612 Ok(batch_response) => {
613 retries = 0;
614 batch_response
615 },
616 Err(e) => {
617 if retries > Self::MAX_RETRIES {
618 return Err(e.to_string())
619 }
620
621 retries += 1;
622 let failure_log = format!(
623 "Batch request failed ({}/{} retries). Error: {}",
624 retries,
625 Self::MAX_RETRIES,
626 e
627 );
628 if retries >= 2 {
631 log::warn!("{}", failure_log);
632 batch_size = 1;
633 } else {
634 log::debug!("{}", failure_log);
635 batch_size =
637 (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize;
638 }
639 continue
640 },
641 };
642
643 let request_duration = request_started.elapsed();
644 batch_size = if request_duration > Self::REQUEST_DURATION_TARGET {
645 max(1, (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize)
647 } else {
648 min(
650 total_payloads - start_index,
651 max(
652 batch_size + 1,
653 (batch_size as f32 * Self::BATCH_SIZE_INCREASE_FACTOR) as usize,
654 ),
655 )
656 };
657
658 log::debug!(
659 target: LOG_TARGET,
660 "Request duration: {:?} Target duration: {:?} Last batch size: {} Next batch size: {}",
661 request_duration,
662 Self::REQUEST_DURATION_TARGET,
663 end_index - start_index,
664 batch_size
665 );
666
667 let batch_response_len = batch_response.len();
668 for item in batch_response.into_iter() {
669 match item {
670 Ok(x) => all_data.push(x),
671 Err(e) => return Err(e.message().to_string()),
672 }
673 }
674 bar.inc(batch_response_len as u64);
675
676 start_index = end_index;
678 }
679
680 Ok(all_data)
681 }
682
683 pub(crate) async fn rpc_get_pairs(
688 &self,
689 prefix: StorageKey,
690 at: B::Hash,
691 pending_ext: &mut TestExternalities<HashingFor<B>>,
692 ) -> Result<Vec<KeyValue>, &'static str> {
693 let start = Instant::now();
694 let mut sp = Spinner::with_timer(Spinners::Dots, "Scraping keys...".into());
695 let keys = self
698 .rpc_get_keys_parallel(&prefix, at, Self::PARALLEL_REQUESTS)
699 .await?
700 .into_iter()
701 .collect::<Vec<_>>();
702 sp.stop_with_message(format!(
703 "✅ Found {} keys ({:.2}s)",
704 keys.len(),
705 start.elapsed().as_secs_f32()
706 ));
707 if keys.is_empty() {
708 return Ok(Default::default())
709 }
710
711 let client = self.as_online().rpc_client();
712 let payloads = keys
713 .iter()
714 .map(|key| ("state_getStorage".to_string(), rpc_params!(key, at)))
715 .collect::<Vec<_>>();
716
717 let bar = ProgressBar::new(payloads.len() as u64);
718 bar.enable_steady_tick(Duration::from_secs(1));
719 bar.set_message("Downloading key values".to_string());
720 bar.set_style(
721 ProgressStyle::with_template(
722 "[{elapsed_precise}] {msg} {per_sec} [{wide_bar}] {pos}/{len} ({eta})",
723 )
724 .unwrap()
725 .progress_chars("=>-"),
726 );
727 let payloads_chunked = payloads.chunks((payloads.len() / Self::PARALLEL_REQUESTS).max(1));
728 let requests = payloads_chunked.map(|payload_chunk| {
729 Self::get_storage_data_dynamic_batch_size(client, payload_chunk.to_vec(), &bar)
730 });
731 let storage_data_result: Result<Vec<_>, _> =
733 futures::future::join_all(requests).await.into_iter().collect();
734 let storage_data = match storage_data_result {
736 Ok(storage_data) => storage_data.into_iter().flatten().collect::<Vec<_>>(),
737 Err(e) => {
738 log::error!(target: LOG_TARGET, "Error while getting storage data: {}", e);
739 return Err("Error while getting storage data")
740 },
741 };
742 bar.finish_with_message("✅ Downloaded key values");
743 println!();
744
745 assert_eq!(keys.len(), storage_data.len());
747
748 let key_values = keys
749 .iter()
750 .zip(storage_data)
751 .map(|(key, maybe_value)| match maybe_value {
752 Some(data) => (key.clone(), data),
753 None => {
754 log::warn!(target: LOG_TARGET, "key {:?} had none corresponding value.", &key);
755 let data = StorageData(vec![]);
756 (key.clone(), data)
757 },
758 })
759 .collect::<Vec<_>>();
760
761 let mut sp = Spinner::with_timer(Spinners::Dots, "Inserting keys into DB...".into());
762 let start = Instant::now();
763 pending_ext.batch_insert(key_values.clone().into_iter().filter_map(|(k, v)| {
764 match is_default_child_storage_key(&k.0) {
767 true => None,
768 false => Some((k.0, v.0)),
769 }
770 }));
771 sp.stop_with_message(format!(
772 "✅ Inserted keys into DB ({:.2}s)",
773 start.elapsed().as_secs_f32()
774 ));
775 Ok(key_values)
776 }
777
778 pub(crate) async fn rpc_child_get_storage_paged(
780 client: &HttpClient,
781 prefixed_top_key: &StorageKey,
782 child_keys: Vec<StorageKey>,
783 at: B::Hash,
784 ) -> Result<Vec<KeyValue>, &'static str> {
785 let child_keys_len = child_keys.len();
786
787 let payloads = child_keys
788 .iter()
789 .map(|key| {
790 (
791 "childstate_getStorage".to_string(),
792 rpc_params![
793 PrefixedStorageKey::new(prefixed_top_key.as_ref().to_vec()),
794 key,
795 at
796 ],
797 )
798 })
799 .collect::<Vec<_>>();
800
801 let bar = ProgressBar::new(payloads.len() as u64);
802 let storage_data =
803 match Self::get_storage_data_dynamic_batch_size(client, payloads, &bar).await {
804 Ok(storage_data) => storage_data,
805 Err(e) => {
806 log::error!(target: LOG_TARGET, "batch processing failed: {:?}", e);
807 return Err("batch processing failed")
808 },
809 };
810
811 assert_eq!(child_keys_len, storage_data.len());
812
813 Ok(child_keys
814 .iter()
815 .zip(storage_data)
816 .map(|(key, maybe_value)| match maybe_value {
817 Some(v) => (key.clone(), v),
818 None => {
819 log::warn!(target: LOG_TARGET, "key {:?} had no corresponding value.", &key);
820 (key.clone(), StorageData(vec![]))
821 },
822 })
823 .collect::<Vec<_>>())
824 }
825
826 pub(crate) async fn rpc_child_get_keys(
827 client: &HttpClient,
828 prefixed_top_key: &StorageKey,
829 child_prefix: StorageKey,
830 at: B::Hash,
831 ) -> Result<Vec<StorageKey>, &'static str> {
832 let retry_strategy =
833 FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES);
834 let mut all_child_keys = Vec::new();
835 let mut start_key = None;
836
837 loop {
838 let get_child_keys_closure = || {
839 let top_key = PrefixedStorageKey::new(prefixed_top_key.0.clone());
840 substrate_rpc_client::ChildStateApi::storage_keys_paged(
841 client,
842 top_key,
843 Some(child_prefix.clone()),
844 Self::DEFAULT_KEY_DOWNLOAD_PAGE,
845 start_key.clone(),
846 Some(at),
847 )
848 };
849
850 let child_keys = Retry::spawn(retry_strategy.clone(), get_child_keys_closure)
851 .await
852 .map_err(|e| {
853 error!(target: LOG_TARGET, "Error = {:?}", e);
854 "rpc child_get_keys failed."
855 })?;
856
857 let keys_count = child_keys.len();
858 if keys_count == 0 {
859 break;
860 }
861
862 start_key = child_keys.last().cloned();
863 all_child_keys.extend(child_keys);
864
865 if keys_count < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize {
866 break;
867 }
868 }
869
870 debug!(
871 target: LOG_TARGET,
872 "[thread = {:?}] scraped {} child-keys of the child-bearing top key: {}",
873 std::thread::current().id(),
874 all_child_keys.len(),
875 HexDisplay::from(prefixed_top_key)
876 );
877
878 Ok(all_child_keys)
879 }
880}
881
882impl<B: BlockT> Builder<B>
883where
884 B::Hash: DeserializeOwned,
885 B::Header: DeserializeOwned,
886{
887 async fn load_child_remote(
896 &self,
897 top_kv: &[KeyValue],
898 pending_ext: &mut TestExternalities<HashingFor<B>>,
899 ) -> Result<ChildKeyValues, &'static str> {
900 let child_roots = top_kv
901 .iter()
902 .filter(|(k, _)| is_default_child_storage_key(k.as_ref()))
903 .map(|(k, _)| k.clone())
904 .collect::<Vec<_>>();
905
906 if child_roots.is_empty() {
907 info!(target: LOG_TARGET, "👩👦 no child roots found to scrape",);
908 return Ok(Default::default())
909 }
910
911 info!(
912 target: LOG_TARGET,
913 "👩👦 scraping child-tree data from {} top keys",
914 child_roots.len(),
915 );
916
917 let at = self.as_online().at_expected();
918
919 let client = self.as_online().rpc_client();
920 let mut child_kv = vec![];
921 for prefixed_top_key in child_roots {
922 let child_keys =
923 Self::rpc_child_get_keys(client, &prefixed_top_key, StorageKey(vec![]), at).await?;
924
925 let child_kv_inner =
926 Self::rpc_child_get_storage_paged(client, &prefixed_top_key, child_keys, at)
927 .await?;
928
929 let prefixed_top_key = PrefixedStorageKey::new(prefixed_top_key.clone().0);
930 let un_prefixed = match ChildType::from_prefixed_key(&prefixed_top_key) {
931 Some((ChildType::ParentKeyId, storage_key)) => storage_key,
932 None => {
933 log::error!(target: LOG_TARGET, "invalid key: {:?}", prefixed_top_key);
934 return Err("Invalid child key")
935 },
936 };
937
938 let info = ChildInfo::new_default(un_prefixed);
939 let key_values =
940 child_kv_inner.iter().cloned().map(|(k, v)| (k.0, v.0)).collect::<Vec<_>>();
941 child_kv.push((info.clone(), child_kv_inner));
942 for (k, v) in key_values {
943 pending_ext.insert_child(info.clone(), k, v);
944 }
945 }
946
947 Ok(child_kv)
948 }
949
950 async fn load_top_remote(
955 &self,
956 pending_ext: &mut TestExternalities<HashingFor<B>>,
957 ) -> Result<TopKeyValues, &'static str> {
958 let config = self.as_online();
959 let at = self
960 .as_online()
961 .at
962 .expect("online config must be initialized by this point; qed.");
963 log::info!(target: LOG_TARGET, "scraping key-pairs from remote at block height {:?}", at);
964
965 let mut keys_and_values = Vec::new();
966 for prefix in &config.hashed_prefixes {
967 let now = std::time::Instant::now();
968 let additional_key_values =
969 self.rpc_get_pairs(StorageKey(prefix.to_vec()), at, pending_ext).await?;
970 let elapsed = now.elapsed();
971 log::info!(
972 target: LOG_TARGET,
973 "adding data for hashed prefix: {:?}, took {:.2}s",
974 HexDisplay::from(prefix),
975 elapsed.as_secs_f32()
976 );
977 keys_and_values.extend(additional_key_values);
978 }
979
980 for key in &config.hashed_keys {
981 let key = StorageKey(key.to_vec());
982 log::info!(
983 target: LOG_TARGET,
984 "adding data for hashed key: {:?}",
985 HexDisplay::from(&key)
986 );
987 match self.rpc_get_storage(key.clone(), Some(at)).await? {
988 Some(value) => {
989 pending_ext.insert(key.clone().0, value.clone().0);
990 keys_and_values.push((key, value));
991 },
992 None => {
993 log::warn!(
994 target: LOG_TARGET,
995 "no data found for hashed key: {:?}",
996 HexDisplay::from(&key)
997 );
998 },
999 }
1000 }
1001
1002 Ok(keys_and_values)
1003 }
1004
1005 async fn init_remote_client(&mut self) -> Result<(), &'static str> {
1009 self.as_online_mut().transport.init().await?;
1011
1012 if self.as_online().at.is_none() {
1014 let at = self.rpc_get_head().await?;
1015 log::info!(
1016 target: LOG_TARGET,
1017 "since no at is provided, setting it to latest finalized head, {:?}",
1018 at
1019 );
1020 self.as_online_mut().at = Some(at);
1021 }
1022
1023 let online_config = self.as_online_mut();
1025 online_config.pallets.iter().for_each(|p| {
1026 online_config
1027 .hashed_prefixes
1028 .push(sp_crypto_hashing::twox_128(p.as_bytes()).to_vec())
1029 });
1030
1031 if online_config.child_trie {
1032 online_config.hashed_prefixes.push(DEFAULT_CHILD_STORAGE_KEY_PREFIX.to_vec());
1033 }
1034
1035 if online_config
1038 .hashed_prefixes
1039 .iter()
1040 .filter(|p| *p != DEFAULT_CHILD_STORAGE_KEY_PREFIX)
1041 .count() == 0
1042 {
1043 log::info!(
1044 target: LOG_TARGET,
1045 "since no prefix is filtered, the data for all pallets will be downloaded"
1046 );
1047 online_config.hashed_prefixes.push(vec![]);
1048 }
1049
1050 Ok(())
1051 }
1052
1053 async fn load_header(&self) -> Result<B::Header, &'static str> {
1054 let retry_strategy =
1055 FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES);
1056 let get_header_closure = || {
1057 ChainApi::<(), _, B::Header, ()>::header(
1058 self.as_online().rpc_client(),
1059 Some(self.as_online().at_expected()),
1060 )
1061 };
1062 Retry::spawn(retry_strategy, get_header_closure)
1063 .await
1064 .map_err(|_| "Failed to fetch header for block from network")?
1065 .ok_or("Network returned None block header")
1066 }
1067
1068 async fn load_remote_and_maybe_save(
1073 &mut self,
1074 ) -> Result<TestExternalities<HashingFor<B>>, &'static str> {
1075 let state_version =
1076 StateApi::<B::Hash>::runtime_version(self.as_online().rpc_client(), None)
1077 .await
1078 .map_err(|e| {
1079 error!(target: LOG_TARGET, "Error = {:?}", e);
1080 "rpc runtime_version failed."
1081 })
1082 .map(|v| v.state_version())?;
1083 let mut pending_ext = TestExternalities::new_with_code_and_state(
1084 Default::default(),
1085 Default::default(),
1086 self.overwrite_state_version.unwrap_or(state_version),
1087 );
1088
1089 let top_kv = self.load_top_remote(&mut pending_ext).await?;
1091 self.load_child_remote(&top_kv, &mut pending_ext).await?;
1092
1093 if let Some(path) = self.as_online().state_snapshot.clone().map(|c| c.path) {
1095 let (raw_storage, storage_root) = pending_ext.into_raw_snapshot();
1096 let snapshot = Snapshot::<B>::new(
1097 state_version,
1098 raw_storage.clone(),
1099 storage_root,
1100 self.load_header().await?,
1101 );
1102 let encoded = snapshot.encode();
1103 log::info!(
1104 target: LOG_TARGET,
1105 "writing snapshot of {} bytes to {:?}",
1106 encoded.len(),
1107 path
1108 );
1109 std::fs::write(path, encoded).map_err(|_| "fs::write failed")?;
1110
1111 return Ok(TestExternalities::from_raw_snapshot(
1113 raw_storage,
1114 storage_root,
1115 self.overwrite_state_version.unwrap_or(state_version),
1116 ))
1117 }
1118
1119 Ok(pending_ext)
1120 }
1121
1122 async fn do_load_remote(&mut self) -> Result<RemoteExternalities<B>, &'static str> {
1123 self.init_remote_client().await?;
1124 let inner_ext = self.load_remote_and_maybe_save().await?;
1125 Ok(RemoteExternalities { header: self.load_header().await?, inner_ext })
1126 }
1127
1128 fn do_load_offline(
1129 &mut self,
1130 config: OfflineConfig,
1131 ) -> Result<RemoteExternalities<B>, &'static str> {
1132 let mut sp = Spinner::with_timer(Spinners::Dots, "Loading snapshot...".into());
1133 let start = Instant::now();
1134 info!(target: LOG_TARGET, "Loading snapshot from {:?}", &config.state_snapshot.path);
1135 let Snapshot { snapshot_version: _, header, state_version, raw_storage, storage_root } =
1136 Snapshot::<B>::load(&config.state_snapshot.path)?;
1137
1138 let inner_ext = TestExternalities::from_raw_snapshot(
1139 raw_storage,
1140 storage_root,
1141 self.overwrite_state_version.unwrap_or(state_version),
1142 );
1143 sp.stop_with_message(format!("✅ Loaded snapshot ({:.2}s)", start.elapsed().as_secs_f32()));
1144
1145 Ok(RemoteExternalities { inner_ext, header })
1146 }
1147
1148 pub(crate) async fn pre_build(mut self) -> Result<RemoteExternalities<B>, &'static str> {
1149 let mut ext = match self.mode.clone() {
1150 Mode::Offline(config) => self.do_load_offline(config)?,
1151 Mode::Online(_) => self.do_load_remote().await?,
1152 Mode::OfflineOrElseOnline(offline_config, _) => {
1153 match self.do_load_offline(offline_config) {
1154 Ok(x) => x,
1155 Err(_) => self.do_load_remote().await?,
1156 }
1157 },
1158 };
1159
1160 if !self.hashed_key_values.is_empty() {
1162 log::info!(
1163 target: LOG_TARGET,
1164 "extending externalities with {} manually injected key-values",
1165 self.hashed_key_values.len()
1166 );
1167 ext.batch_insert(self.hashed_key_values.into_iter().map(|(k, v)| (k.0, v.0)));
1168 }
1169
1170 if !self.hashed_blacklist.is_empty() {
1172 log::info!(
1173 target: LOG_TARGET,
1174 "excluding externalities from {} keys",
1175 self.hashed_blacklist.len()
1176 );
1177 for k in self.hashed_blacklist {
1178 ext.execute_with(|| sp_io::storage::clear(&k));
1179 }
1180 }
1181
1182 Ok(ext)
1183 }
1184}
1185
1186impl<B: BlockT> Builder<B>
1188where
1189 B::Hash: DeserializeOwned,
1190 B::Header: DeserializeOwned,
1191{
1192 pub fn new() -> Self {
1194 Default::default()
1195 }
1196
1197 pub fn inject_hashed_key_value(mut self, injections: Vec<KeyValue>) -> Self {
1199 for i in injections {
1200 self.hashed_key_values.push(i.clone());
1201 }
1202 self
1203 }
1204
1205 pub fn blacklist_hashed_key(mut self, hashed: &[u8]) -> Self {
1208 self.hashed_blacklist.push(hashed.to_vec());
1209 self
1210 }
1211
1212 pub fn mode(mut self, mode: Mode<B::Hash>) -> Self {
1214 self.mode = mode;
1215 self
1216 }
1217
1218 pub fn overwrite_state_version(mut self, version: StateVersion) -> Self {
1220 self.overwrite_state_version = Some(version);
1221 self
1222 }
1223
1224 pub async fn build(self) -> Result<RemoteExternalities<B>, &'static str> {
1225 let mut ext = self.pre_build().await?;
1226 ext.commit_all().unwrap();
1227
1228 info!(
1229 target: LOG_TARGET,
1230 "initialized state externalities with storage root {:?} and state_version {:?}",
1231 ext.as_backend().root(),
1232 ext.state_version
1233 );
1234
1235 Ok(ext)
1236 }
1237}
1238
1239#[cfg(test)]
1240mod test_prelude {
1241 pub(crate) use super::*;
1242 pub(crate) use sp_runtime::testing::{Block as RawBlock, ExtrinsicWrapper, H256 as Hash};
1243 pub(crate) type Block = RawBlock<ExtrinsicWrapper<Hash>>;
1244
1245 pub(crate) fn init_logger() {
1246 sp_tracing::try_init_simple();
1247 }
1248}
1249
1250#[cfg(test)]
1251mod tests {
1252 use super::test_prelude::*;
1253
1254 #[tokio::test]
1255 async fn can_load_state_snapshot() {
1256 init_logger();
1257 Builder::<Block>::new()
1258 .mode(Mode::Offline(OfflineConfig {
1259 state_snapshot: SnapshotConfig::new("test_data/test.snap"),
1260 }))
1261 .build()
1262 .await
1263 .unwrap()
1264 .execute_with(|| {});
1265 }
1266
1267 #[tokio::test]
1268 async fn can_exclude_from_snapshot() {
1269 init_logger();
1270
1271 let some_key = Builder::<Block>::new()
1273 .mode(Mode::Offline(OfflineConfig {
1274 state_snapshot: SnapshotConfig::new("test_data/test.snap"),
1275 }))
1276 .build()
1277 .await
1278 .expect("Can't read state snapshot file")
1279 .execute_with(|| {
1280 let key =
1281 sp_io::storage::next_key(&[]).expect("some key must exist in the snapshot");
1282 assert!(sp_io::storage::get(&key).is_some());
1283 key
1284 });
1285
1286 Builder::<Block>::new()
1287 .mode(Mode::Offline(OfflineConfig {
1288 state_snapshot: SnapshotConfig::new("test_data/test.snap"),
1289 }))
1290 .blacklist_hashed_key(&some_key)
1291 .build()
1292 .await
1293 .expect("Can't read state snapshot file")
1294 .execute_with(|| assert!(sp_io::storage::get(&some_key).is_none()));
1295 }
1296}
1297
1298#[cfg(all(test, feature = "remote-test"))]
1299mod remote_tests {
1300 use super::test_prelude::*;
1301 use std::{env, os::unix::fs::MetadataExt};
1302
1303 fn endpoint() -> String {
1304 env::var("TEST_WS").unwrap_or_else(|_| DEFAULT_HTTP_ENDPOINT.to_string())
1305 }
1306
1307 #[tokio::test]
1308 async fn state_version_is_kept_and_can_be_altered() {
1309 const CACHE: &'static str = "state_version_is_kept_and_can_be_altered";
1310 init_logger();
1311
1312 let ext = Builder::<Block>::new()
1314 .mode(Mode::Online(OnlineConfig {
1315 transport: endpoint().clone().into(),
1316 pallets: vec!["Proxy".to_owned()],
1317 child_trie: false,
1318 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1319 ..Default::default()
1320 }))
1321 .build()
1322 .await
1323 .unwrap();
1324
1325 let cached_ext = Builder::<Block>::new()
1327 .mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
1328 .build()
1329 .await
1330 .unwrap();
1331
1332 assert_eq!(ext.state_version, cached_ext.state_version);
1333
1334 let other = match ext.state_version {
1336 StateVersion::V0 => StateVersion::V1,
1337 StateVersion::V1 => StateVersion::V0,
1338 };
1339 let cached_ext = Builder::<Block>::new()
1340 .mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
1341 .overwrite_state_version(other)
1342 .build()
1343 .await
1344 .unwrap();
1345
1346 assert_eq!(cached_ext.state_version, other);
1347 }
1348
1349 #[tokio::test]
1350 async fn snapshot_block_hash_works() {
1351 const CACHE: &'static str = "snapshot_block_hash_works";
1352 init_logger();
1353
1354 let ext = Builder::<Block>::new()
1356 .mode(Mode::Online(OnlineConfig {
1357 transport: endpoint().clone().into(),
1358 pallets: vec!["Proxy".to_owned()],
1359 child_trie: false,
1360 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1361 ..Default::default()
1362 }))
1363 .build()
1364 .await
1365 .unwrap();
1366
1367 let cached_ext = Builder::<Block>::new()
1369 .mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
1370 .build()
1371 .await
1372 .unwrap();
1373
1374 assert_eq!(ext.header.hash(), cached_ext.header.hash());
1375 }
1376
1377 #[tokio::test]
1378 async fn child_keys_are_loaded() {
1379 const CACHE: &'static str = "snapshot_retains_storage";
1380 init_logger();
1381
1382 let mut child_ext = Builder::<Block>::new()
1384 .mode(Mode::Online(OnlineConfig {
1385 transport: endpoint().clone().into(),
1386 pallets: vec!["Proxy".to_owned()],
1387 child_trie: true,
1388 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1389 ..Default::default()
1390 }))
1391 .build()
1392 .await
1393 .unwrap();
1394
1395 let mut ext = Builder::<Block>::new()
1397 .mode(Mode::Online(OnlineConfig {
1398 transport: endpoint().clone().into(),
1399 pallets: vec!["Proxy".to_owned()],
1400 child_trie: false,
1401 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1402 ..Default::default()
1403 }))
1404 .build()
1405 .await
1406 .unwrap();
1407
1408 assert!(
1410 child_ext.as_backend().backend_storage().keys().len() >
1411 ext.as_backend().backend_storage().keys().len()
1412 );
1413 }
1414
1415 #[tokio::test]
1416 async fn offline_else_online_works() {
1417 const CACHE: &'static str = "offline_else_online_works_data";
1418 init_logger();
1419 Builder::<Block>::new()
1421 .mode(Mode::OfflineOrElseOnline(
1422 OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) },
1423 OnlineConfig {
1424 transport: endpoint().clone().into(),
1425 pallets: vec!["Proxy".to_owned()],
1426 child_trie: false,
1427 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1428 ..Default::default()
1429 },
1430 ))
1431 .build()
1432 .await
1433 .unwrap()
1434 .execute_with(|| {});
1435
1436 Builder::<Block>::new()
1438 .mode(Mode::OfflineOrElseOnline(
1439 OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) },
1440 OnlineConfig {
1441 transport: "ws://non-existent:666".to_owned().into(),
1442 ..Default::default()
1443 },
1444 ))
1445 .build()
1446 .await
1447 .unwrap()
1448 .execute_with(|| {});
1449
1450 let to_delete = std::fs::read_dir(Path::new("."))
1451 .unwrap()
1452 .into_iter()
1453 .map(|d| d.unwrap())
1454 .filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
1455 .collect::<Vec<_>>();
1456
1457 assert!(to_delete.len() == 1);
1458 std::fs::remove_file(to_delete[0].path()).unwrap();
1459 }
1460
1461 #[tokio::test]
1462 async fn can_build_one_small_pallet() {
1463 init_logger();
1464 Builder::<Block>::new()
1465 .mode(Mode::Online(OnlineConfig {
1466 transport: endpoint().clone().into(),
1467 pallets: vec!["Proxy".to_owned()],
1468 child_trie: false,
1469 ..Default::default()
1470 }))
1471 .build()
1472 .await
1473 .unwrap()
1474 .execute_with(|| {});
1475 }
1476
1477 #[tokio::test]
1478 async fn can_build_few_pallet() {
1479 init_logger();
1480 Builder::<Block>::new()
1481 .mode(Mode::Online(OnlineConfig {
1482 transport: endpoint().clone().into(),
1483 pallets: vec!["Proxy".to_owned(), "Multisig".to_owned()],
1484 child_trie: false,
1485 ..Default::default()
1486 }))
1487 .build()
1488 .await
1489 .unwrap()
1490 .execute_with(|| {});
1491 }
1492
1493 #[tokio::test(flavor = "multi_thread")]
1494 async fn can_create_snapshot() {
1495 const CACHE: &'static str = "can_create_snapshot";
1496 init_logger();
1497
1498 Builder::<Block>::new()
1499 .mode(Mode::Online(OnlineConfig {
1500 transport: endpoint().clone().into(),
1501 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1502 pallets: vec!["Proxy".to_owned()],
1503 child_trie: false,
1504 ..Default::default()
1505 }))
1506 .build()
1507 .await
1508 .unwrap()
1509 .execute_with(|| {});
1510
1511 let to_delete = std::fs::read_dir(Path::new("."))
1512 .unwrap()
1513 .into_iter()
1514 .map(|d| d.unwrap())
1515 .filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
1516 .collect::<Vec<_>>();
1517
1518 assert!(to_delete.len() == 1);
1519 let to_delete = to_delete.first().unwrap();
1520 assert!(std::fs::metadata(to_delete.path()).unwrap().size() > 1);
1521 std::fs::remove_file(to_delete.path()).unwrap();
1522 }
1523
1524 #[tokio::test]
1525 async fn can_create_child_snapshot() {
1526 const CACHE: &'static str = "can_create_child_snapshot";
1527 init_logger();
1528 Builder::<Block>::new()
1529 .mode(Mode::Online(OnlineConfig {
1530 transport: endpoint().clone().into(),
1531 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1532 pallets: vec!["Crowdloan".to_owned()],
1533 child_trie: true,
1534 ..Default::default()
1535 }))
1536 .build()
1537 .await
1538 .unwrap()
1539 .execute_with(|| {});
1540
1541 let to_delete = std::fs::read_dir(Path::new("."))
1542 .unwrap()
1543 .into_iter()
1544 .map(|d| d.unwrap())
1545 .filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
1546 .collect::<Vec<_>>();
1547
1548 assert!(to_delete.len() == 1);
1549 let to_delete = to_delete.first().unwrap();
1550 assert!(std::fs::metadata(to_delete.path()).unwrap().size() > 1);
1551 std::fs::remove_file(to_delete.path()).unwrap();
1552 }
1553
1554 #[tokio::test]
1555 async fn can_build_big_pallet() {
1556 if std::option_env!("TEST_WS").is_none() {
1557 return
1558 }
1559 init_logger();
1560 Builder::<Block>::new()
1561 .mode(Mode::Online(OnlineConfig {
1562 transport: endpoint().clone().into(),
1563 pallets: vec!["Staking".to_owned()],
1564 child_trie: false,
1565 ..Default::default()
1566 }))
1567 .build()
1568 .await
1569 .unwrap()
1570 .execute_with(|| {});
1571 }
1572
1573 #[tokio::test]
1574 async fn can_fetch_all() {
1575 if std::option_env!("TEST_WS").is_none() {
1576 return
1577 }
1578 init_logger();
1579 Builder::<Block>::new()
1580 .mode(Mode::Online(OnlineConfig {
1581 transport: endpoint().clone().into(),
1582 ..Default::default()
1583 }))
1584 .build()
1585 .await
1586 .unwrap()
1587 .execute_with(|| {});
1588 }
1589
1590 #[tokio::test]
1591 async fn can_fetch_in_parallel() {
1592 init_logger();
1593
1594 let mut builder = Builder::<Block>::new().mode(Mode::Online(OnlineConfig {
1595 transport: endpoint().clone().into(),
1596 ..Default::default()
1597 }));
1598 builder.init_remote_client().await.unwrap();
1599
1600 let at = builder.as_online().at.unwrap();
1601
1602 let prefix = StorageKey(vec![13]);
1603 let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap();
1604 let para = builder.rpc_get_keys_parallel(&prefix, at, 4).await.unwrap();
1605 assert_eq!(paged, para);
1606
1607 let prefix = StorageKey(vec![]);
1608 let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap();
1609 let para = builder.rpc_get_keys_parallel(&prefix, at, 8).await.unwrap();
1610 assert_eq!(paged, para);
1611 }
1612}