1mod logging;
24
25use codec::{Compact, Decode, Encode};
26use indicatif::{ProgressBar, ProgressStyle};
27use jsonrpsee::{core::params::ArrayParams, http_client::HttpClient};
28use log::*;
29use serde::de::DeserializeOwned;
30use sp_core::{
31 hexdisplay::HexDisplay,
32 storage::{
33 well_known_keys::{is_default_child_storage_key, DEFAULT_CHILD_STORAGE_KEY_PREFIX},
34 ChildInfo, ChildType, PrefixedStorageKey, StorageData, StorageKey,
35 },
36};
37use sp_runtime::{
38 traits::{Block as BlockT, HashingFor},
39 StateVersion,
40};
41use sp_state_machine::TestExternalities;
42use std::{
43 cmp::{max, min},
44 fs,
45 ops::{Deref, DerefMut},
46 path::{Path, PathBuf},
47 sync::Arc,
48 time::{Duration, Instant},
49};
50use substrate_rpc_client::{rpc_params, BatchRequestBuilder, ChainApi, ClientT, StateApi};
51use tokio_retry::{strategy::FixedInterval, Retry};
52
53type Result<T, E = &'static str> = std::result::Result<T, E>;
54
55type KeyValue = (StorageKey, StorageData);
56type TopKeyValues = Vec<KeyValue>;
57type ChildKeyValues = Vec<(ChildInfo, Vec<KeyValue>)>;
58type SnapshotVersion = Compact<u16>;
59
60const LOG_TARGET: &str = "remote-ext";
61const DEFAULT_HTTP_ENDPOINT: &str = "https://try-runtime.polkadot.io:443";
62const SNAPSHOT_VERSION: SnapshotVersion = Compact(4);
63
64#[derive(Decode, Encode)]
66struct Snapshot<B: BlockT> {
67 snapshot_version: SnapshotVersion,
68 state_version: StateVersion,
69 raw_storage: Vec<(Vec<u8>, (Vec<u8>, i32))>,
71 storage_root: B::Hash,
74 header: B::Header,
75}
76
77impl<B: BlockT> Snapshot<B> {
78 pub fn new(
79 state_version: StateVersion,
80 raw_storage: Vec<(Vec<u8>, (Vec<u8>, i32))>,
81 storage_root: B::Hash,
82 header: B::Header,
83 ) -> Self {
84 Self {
85 snapshot_version: SNAPSHOT_VERSION,
86 state_version,
87 raw_storage,
88 storage_root,
89 header,
90 }
91 }
92
93 fn load(path: &PathBuf) -> Result<Snapshot<B>> {
94 let bytes = fs::read(path).map_err(|_| "fs::read failed.")?;
95 let snapshot_version = SnapshotVersion::decode(&mut &*bytes)
98 .map_err(|_| "Failed to decode snapshot version")?;
99
100 if snapshot_version != SNAPSHOT_VERSION {
101 return Err("Unsupported snapshot version detected. Please create a new snapshot.")
102 }
103
104 Decode::decode(&mut &*bytes).map_err(|_| "Decode failed")
105 }
106}
107
108pub struct RemoteExternalities<B: BlockT> {
111 pub inner_ext: TestExternalities<HashingFor<B>>,
113 pub header: B::Header,
115}
116
117impl<B: BlockT> Deref for RemoteExternalities<B> {
118 type Target = TestExternalities<HashingFor<B>>;
119 fn deref(&self) -> &Self::Target {
120 &self.inner_ext
121 }
122}
123
124impl<B: BlockT> DerefMut for RemoteExternalities<B> {
125 fn deref_mut(&mut self) -> &mut Self::Target {
126 &mut self.inner_ext
127 }
128}
129
130#[derive(Clone)]
132pub enum Mode<H> {
133 Online(OnlineConfig<H>),
135 Offline(OfflineConfig),
137 OfflineOrElseOnline(OfflineConfig, OnlineConfig<H>),
139}
140
141impl<H> Default for Mode<H> {
142 fn default() -> Self {
143 Mode::Online(OnlineConfig::default())
144 }
145}
146
147#[derive(Clone)]
151pub struct OfflineConfig {
152 pub state_snapshot: SnapshotConfig,
154}
155
156#[derive(Debug, Clone)]
158pub enum Transport {
159 Uri(String),
161 RemoteClient(HttpClient),
163}
164
165impl Transport {
166 fn as_client(&self) -> Option<&HttpClient> {
167 match self {
168 Self::RemoteClient(client) => Some(client),
169 _ => None,
170 }
171 }
172
173 async fn init(&mut self) -> Result<()> {
175 if let Self::Uri(uri) = self {
176 debug!(target: LOG_TARGET, "initializing remote client to {uri:?}");
177
178 let http_client = HttpClient::builder()
179 .max_request_size(u32::MAX)
180 .max_response_size(u32::MAX)
181 .request_timeout(std::time::Duration::from_secs(60 * 5))
182 .build(uri)
183 .map_err(|e| {
184 error!(target: LOG_TARGET, "error: {e:?}");
185 "failed to build http client"
186 })?;
187
188 *self = Self::RemoteClient(http_client)
189 }
190
191 Ok(())
192 }
193}
194
195impl From<String> for Transport {
196 fn from(uri: String) -> Self {
197 Transport::Uri(uri)
198 }
199}
200
201impl From<HttpClient> for Transport {
202 fn from(client: HttpClient) -> Self {
203 Transport::RemoteClient(client)
204 }
205}
206
207#[derive(Clone)]
211pub struct OnlineConfig<H> {
212 pub at: Option<H>,
215 pub state_snapshot: Option<SnapshotConfig>,
217 pub pallets: Vec<String>,
219 pub transport: Transport,
221 pub child_trie: bool,
223 pub hashed_prefixes: Vec<Vec<u8>>,
226 pub hashed_keys: Vec<Vec<u8>>,
228}
229
230impl<H: Clone> OnlineConfig<H> {
231 fn rpc_client(&self) -> &HttpClient {
233 self.transport
234 .as_client()
235 .expect("http client must have been initialized by now; qed.")
236 }
237
238 fn at_expected(&self) -> H {
239 self.at.clone().expect("block at must be initialized; qed")
240 }
241}
242
243impl<H> Default for OnlineConfig<H> {
244 fn default() -> Self {
245 Self {
246 transport: Transport::from(DEFAULT_HTTP_ENDPOINT.to_owned()),
247 child_trie: true,
248 at: None,
249 state_snapshot: None,
250 pallets: Default::default(),
251 hashed_keys: Default::default(),
252 hashed_prefixes: Default::default(),
253 }
254 }
255}
256
257impl<H> From<String> for OnlineConfig<H> {
258 fn from(t: String) -> Self {
259 Self { transport: t.into(), ..Default::default() }
260 }
261}
262
263#[derive(Clone)]
265pub struct SnapshotConfig {
266 pub path: PathBuf,
268}
269
270impl SnapshotConfig {
271 pub fn new<P: Into<PathBuf>>(path: P) -> Self {
272 Self { path: path.into() }
273 }
274}
275
276impl From<String> for SnapshotConfig {
277 fn from(s: String) -> Self {
278 Self::new(s)
279 }
280}
281
282impl Default for SnapshotConfig {
283 fn default() -> Self {
284 Self { path: Path::new("SNAPSHOT").into() }
285 }
286}
287
288#[derive(Clone)]
290pub struct Builder<B: BlockT> {
291 hashed_key_values: Vec<KeyValue>,
294 hashed_blacklist: Vec<Vec<u8>>,
296 mode: Mode<B::Hash>,
298 overwrite_state_version: Option<StateVersion>,
303}
304
305impl<B: BlockT> Default for Builder<B> {
306 fn default() -> Self {
307 Self {
308 mode: Default::default(),
309 hashed_key_values: Default::default(),
310 hashed_blacklist: Default::default(),
311 overwrite_state_version: None,
312 }
313 }
314}
315
316impl<B: BlockT> Builder<B> {
318 fn as_online(&self) -> &OnlineConfig<B::Hash> {
319 match &self.mode {
320 Mode::Online(config) => config,
321 Mode::OfflineOrElseOnline(_, config) => config,
322 _ => panic!("Unexpected mode: Online"),
323 }
324 }
325
326 fn as_online_mut(&mut self) -> &mut OnlineConfig<B::Hash> {
327 match &mut self.mode {
328 Mode::Online(config) => config,
329 Mode::OfflineOrElseOnline(_, config) => config,
330 _ => panic!("Unexpected mode: Online"),
331 }
332 }
333}
334
335impl<B: BlockT> Builder<B>
337where
338 B::Hash: DeserializeOwned,
339 B::Header: DeserializeOwned,
340{
341 const PARALLEL_REQUESTS: usize = 4;
342 const BATCH_SIZE_INCREASE_FACTOR: f32 = 1.10;
343 const BATCH_SIZE_DECREASE_FACTOR: f32 = 0.50;
344 const REQUEST_DURATION_TARGET: Duration = Duration::from_secs(15);
345 const INITIAL_BATCH_SIZE: usize = 10;
346 const DEFAULT_KEY_DOWNLOAD_PAGE: u32 = 1000;
348 const MAX_RETRIES: usize = 12;
349 const KEYS_PAGE_RETRY_INTERVAL: Duration = Duration::from_secs(5);
350
351 async fn rpc_get_storage(
352 &self,
353 key: StorageKey,
354 maybe_at: Option<B::Hash>,
355 ) -> Result<Option<StorageData>> {
356 trace!(target: LOG_TARGET, "rpc: get_storage");
357 self.as_online().rpc_client().storage(key, maybe_at).await.map_err(|e| {
358 error!(target: LOG_TARGET, "Error = {e:?}");
359 "rpc get_storage failed."
360 })
361 }
362
363 async fn rpc_get_head(&self) -> Result<B::Hash> {
365 trace!(target: LOG_TARGET, "rpc: finalized_head");
366
367 ChainApi::<(), _, B::Header, ()>::finalized_head(self.as_online().rpc_client())
369 .await
370 .map_err(|e| {
371 error!(target: LOG_TARGET, "Error = {e:?}");
372 "rpc finalized_head failed."
373 })
374 }
375
376 async fn get_keys_single_page(
377 &self,
378 prefix: Option<StorageKey>,
379 start_key: Option<StorageKey>,
380 at: B::Hash,
381 ) -> Result<Vec<StorageKey>> {
382 self.as_online()
383 .rpc_client()
384 .storage_keys_paged(prefix, Self::DEFAULT_KEY_DOWNLOAD_PAGE, start_key, Some(at))
385 .await
386 .map_err(|e| {
387 error!(target: LOG_TARGET, "Error = {e:?}");
388 "rpc get_keys failed"
389 })
390 }
391
392 async fn rpc_get_keys_parallel(
394 &self,
395 prefix: &StorageKey,
396 block: B::Hash,
397 parallel: usize,
398 ) -> Result<Vec<StorageKey>> {
399 fn gen_start_keys(prefix: &StorageKey) -> Vec<StorageKey> {
402 let mut prefix = prefix.as_ref().to_vec();
403 let scale = 32usize.saturating_sub(prefix.len());
404
405 if scale < 9 {
407 prefix.extend(vec![0; scale]);
408 return vec![StorageKey(prefix)]
409 }
410
411 let chunks = 16;
412 let step = 0x10000 / chunks;
413 let ext = scale - 2;
414
415 (0..chunks)
416 .map(|i| {
417 let mut key = prefix.clone();
418 let start = i * step;
419 key.extend(vec![(start >> 8) as u8, (start & 0xff) as u8]);
420 key.extend(vec![0; ext]);
421 StorageKey(key)
422 })
423 .collect()
424 }
425
426 let start_keys = gen_start_keys(&prefix);
427 let start_keys: Vec<Option<&StorageKey>> = start_keys.iter().map(Some).collect();
428 let mut end_keys: Vec<Option<&StorageKey>> = start_keys[1..].to_vec();
429 end_keys.push(None);
430
431 let parallel = Arc::new(tokio::sync::Semaphore::new(parallel));
433 let builder = Arc::new(self.clone());
434 let mut handles = vec![];
435
436 for (start_key, end_key) in start_keys.into_iter().zip(end_keys) {
437 let permit = parallel
438 .clone()
439 .acquire_owned()
440 .await
441 .expect("semaphore is not closed until the end of loop");
442
443 let builder = builder.clone();
444 let prefix = prefix.clone();
445 let start_key = start_key.cloned();
446 let end_key = end_key.cloned();
447
448 let handle = tokio::spawn(async move {
449 let res = builder
450 .rpc_get_keys_in_range(&prefix, block, start_key.as_ref(), end_key.as_ref())
451 .await;
452 drop(permit);
453 res
454 });
455
456 handles.push(handle);
457 }
458
459 parallel.close();
460
461 let keys = futures::future::join_all(handles)
462 .await
463 .into_iter()
464 .filter_map(|res| match res {
465 Ok(Ok(keys)) => Some(keys),
466 _ => None,
467 })
468 .flatten()
469 .collect::<Vec<StorageKey>>();
470
471 Ok(keys)
472 }
473
474 async fn rpc_get_keys_in_range(
477 &self,
478 prefix: &StorageKey,
479 block: B::Hash,
480 start_key: Option<&StorageKey>,
481 end_key: Option<&StorageKey>,
482 ) -> Result<Vec<StorageKey>> {
483 let mut last_key: Option<&StorageKey> = start_key;
484 let mut keys: Vec<StorageKey> = vec![];
485
486 loop {
487 let retry_strategy =
490 FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES);
491 let get_page_closure =
492 || self.get_keys_single_page(Some(prefix.clone()), last_key.cloned(), block);
493 let mut page = Retry::spawn(retry_strategy, get_page_closure).await?;
494
495 if let (Some(last), Some(end)) = (page.last(), end_key) {
497 if last >= end {
498 page.retain(|key| key < end);
499 }
500 }
501
502 let page_len = page.len();
503 keys.extend(page);
504 last_key = keys.last();
505
506 if page_len < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize {
509 debug!(target: LOG_TARGET, "last page received: {page_len}");
510 break
511 }
512
513 debug!(
514 target: LOG_TARGET,
515 "new total = {}, full page received: {}",
516 keys.len(),
517 HexDisplay::from(last_key.expect("full page received, cannot be None"))
518 );
519 }
520
521 Ok(keys)
522 }
523
524 async fn get_storage_data_dynamic_batch_size(
569 client: &HttpClient,
570 payloads: Vec<(String, ArrayParams)>,
571 bar: &ProgressBar,
572 ) -> Result<Vec<Option<StorageData>>, String> {
573 let mut all_data: Vec<Option<StorageData>> = vec![];
574 let mut start_index = 0;
575 let mut retries = 0usize;
576 let mut batch_size = Self::INITIAL_BATCH_SIZE;
577 let total_payloads = payloads.len();
578
579 while start_index < total_payloads {
580 debug!(
581 target: LOG_TARGET,
582 "Remaining payloads: {} Batch request size: {batch_size}",
583 total_payloads - start_index,
584 );
585
586 let end_index = usize::min(start_index + batch_size, total_payloads);
587 let page = &payloads[start_index..end_index];
588
589 let mut batch = BatchRequestBuilder::new();
591 for (method, params) in page.iter() {
592 batch
593 .insert(method, params.clone())
594 .map_err(|_| "Invalid batch method and/or params")?;
595 }
596
597 let request_started = Instant::now();
598 let batch_response = match client.batch_request::<Option<StorageData>>(batch).await {
599 Ok(batch_response) => {
600 retries = 0;
601 batch_response
602 },
603 Err(e) => {
604 if retries > Self::MAX_RETRIES {
605 return Err(e.to_string())
606 }
607
608 retries += 1;
609 let failure_log = format!(
610 "Batch request failed ({retries}/{} retries). Error: {e}",
611 Self::MAX_RETRIES
612 );
613 if retries >= 2 {
616 warn!("{failure_log}");
617 batch_size = 1;
618 } else {
619 debug!("{failure_log}");
620 batch_size =
622 (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize;
623 }
624 continue
625 },
626 };
627
628 let request_duration = request_started.elapsed();
629 batch_size = if request_duration > Self::REQUEST_DURATION_TARGET {
630 max(1, (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize)
632 } else {
633 min(
635 total_payloads - start_index,
636 max(
637 batch_size + 1,
638 (batch_size as f32 * Self::BATCH_SIZE_INCREASE_FACTOR) as usize,
639 ),
640 )
641 };
642
643 debug!(
644 target: LOG_TARGET,
645 "Request duration: {request_duration:?} Target duration: {:?} Last batch size: {} Next batch size: {batch_size}",
646 Self::REQUEST_DURATION_TARGET,
647 end_index - start_index,
648 );
649
650 let batch_response_len = batch_response.len();
651 for item in batch_response.into_iter() {
652 match item {
653 Ok(x) => all_data.push(x),
654 Err(e) => return Err(e.message().to_string()),
655 }
656 }
657 bar.inc(batch_response_len as u64);
658
659 start_index = end_index;
661 }
662
663 Ok(all_data)
664 }
665
666 pub(crate) async fn rpc_get_pairs(
671 &self,
672 prefix: StorageKey,
673 at: B::Hash,
674 pending_ext: &mut TestExternalities<HashingFor<B>>,
675 ) -> Result<Vec<KeyValue>> {
676 let keys = logging::with_elapsed_async(
677 || async {
678 let keys = self
681 .rpc_get_keys_parallel(&prefix, at, Self::PARALLEL_REQUESTS)
682 .await?
683 .into_iter()
684 .collect::<Vec<_>>();
685
686 Ok(keys)
687 },
688 "Scraping keys...",
689 |keys| format!("Found {} keys", keys.len()),
690 )
691 .await?;
692
693 if keys.is_empty() {
694 return Ok(Default::default())
695 }
696
697 let client = self.as_online().rpc_client();
698 let payloads = keys
699 .iter()
700 .map(|key| ("state_getStorage".to_string(), rpc_params!(key, at)))
701 .collect::<Vec<_>>();
702
703 let bar = ProgressBar::new(payloads.len() as u64);
704 bar.enable_steady_tick(Duration::from_secs(1));
705 bar.set_message("Downloading key values".to_string());
706 bar.set_style(
707 ProgressStyle::with_template(
708 "[{elapsed_precise}] {msg} {per_sec} [{wide_bar}] {pos}/{len} ({eta})",
709 )
710 .unwrap()
711 .progress_chars("=>-"),
712 );
713 let payloads_chunked = payloads.chunks((payloads.len() / Self::PARALLEL_REQUESTS).max(1));
714 let requests = payloads_chunked.map(|payload_chunk| {
715 Self::get_storage_data_dynamic_batch_size(client, payload_chunk.to_vec(), &bar)
716 });
717 let storage_data_result: Result<Vec<_>, _> =
719 futures::future::join_all(requests).await.into_iter().collect();
720 let storage_data = match storage_data_result {
722 Ok(storage_data) => storage_data.into_iter().flatten().collect::<Vec<_>>(),
723 Err(e) => {
724 error!(target: LOG_TARGET, "Error while getting storage data: {e}");
725 return Err("Error while getting storage data")
726 },
727 };
728 bar.finish_with_message("โ
Downloaded key values");
729 println!();
730
731 assert_eq!(keys.len(), storage_data.len());
733
734 let key_values = keys
735 .iter()
736 .zip(storage_data)
737 .map(|(key, maybe_value)| match maybe_value {
738 Some(data) => (key.clone(), data),
739 None => {
740 warn!(target: LOG_TARGET, "key {key:?} had none corresponding value.");
741 let data = StorageData(vec![]);
742 (key.clone(), data)
743 },
744 })
745 .collect::<Vec<_>>();
746
747 logging::with_elapsed(
748 || {
749 pending_ext.batch_insert(key_values.clone().into_iter().filter_map(|(k, v)| {
750 match is_default_child_storage_key(&k.0) {
753 true => None,
754 false => Some((k.0, v.0)),
755 }
756 }));
757
758 Ok(())
759 },
760 "Inserting keys into DB...",
761 |_| "Inserted keys into DB".into(),
762 )
763 .expect("must succeed; qed");
764
765 Ok(key_values)
766 }
767
768 pub(crate) async fn rpc_child_get_storage_paged(
770 client: &HttpClient,
771 prefixed_top_key: &StorageKey,
772 child_keys: Vec<StorageKey>,
773 at: B::Hash,
774 ) -> Result<Vec<KeyValue>> {
775 let child_keys_len = child_keys.len();
776
777 let payloads = child_keys
778 .iter()
779 .map(|key| {
780 (
781 "childstate_getStorage".to_string(),
782 rpc_params![
783 PrefixedStorageKey::new(prefixed_top_key.as_ref().to_vec()),
784 key,
785 at
786 ],
787 )
788 })
789 .collect::<Vec<_>>();
790
791 let bar = ProgressBar::new(payloads.len() as u64);
792 let storage_data =
793 match Self::get_storage_data_dynamic_batch_size(client, payloads, &bar).await {
794 Ok(storage_data) => storage_data,
795 Err(e) => {
796 error!(target: LOG_TARGET, "batch processing failed: {e:?}");
797 return Err("batch processing failed")
798 },
799 };
800
801 assert_eq!(child_keys_len, storage_data.len());
802
803 Ok(child_keys
804 .iter()
805 .zip(storage_data)
806 .map(|(key, maybe_value)| match maybe_value {
807 Some(v) => (key.clone(), v),
808 None => {
809 warn!(target: LOG_TARGET, "key {key:?} had no corresponding value.");
810 (key.clone(), StorageData(vec![]))
811 },
812 })
813 .collect::<Vec<_>>())
814 }
815
816 pub(crate) async fn rpc_child_get_keys(
817 client: &HttpClient,
818 prefixed_top_key: &StorageKey,
819 child_prefix: StorageKey,
820 at: B::Hash,
821 ) -> Result<Vec<StorageKey>> {
822 let retry_strategy =
823 FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES);
824 let mut all_child_keys = Vec::new();
825 let mut start_key = None;
826
827 loop {
828 let get_child_keys_closure = || {
829 let top_key = PrefixedStorageKey::new(prefixed_top_key.0.clone());
830 substrate_rpc_client::ChildStateApi::storage_keys_paged(
831 client,
832 top_key,
833 Some(child_prefix.clone()),
834 Self::DEFAULT_KEY_DOWNLOAD_PAGE,
835 start_key.clone(),
836 Some(at),
837 )
838 };
839
840 let child_keys = Retry::spawn(retry_strategy.clone(), get_child_keys_closure)
841 .await
842 .map_err(|e| {
843 error!(target: LOG_TARGET, "Error = {e:?}");
844 "rpc child_get_keys failed."
845 })?;
846
847 let keys_count = child_keys.len();
848 if keys_count == 0 {
849 break;
850 }
851
852 start_key = child_keys.last().cloned();
853 all_child_keys.extend(child_keys);
854
855 if keys_count < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize {
856 break;
857 }
858 }
859
860 debug!(
861 target: LOG_TARGET,
862 "[thread = {:?}] scraped {} child-keys of the child-bearing top key: {}",
863 std::thread::current().id(),
864 all_child_keys.len(),
865 HexDisplay::from(prefixed_top_key)
866 );
867
868 Ok(all_child_keys)
869 }
870}
871
872impl<B: BlockT> Builder<B>
873where
874 B::Hash: DeserializeOwned,
875 B::Header: DeserializeOwned,
876{
877 async fn load_child_remote(
886 &self,
887 top_kv: &[KeyValue],
888 pending_ext: &mut TestExternalities<HashingFor<B>>,
889 ) -> Result<ChildKeyValues> {
890 let child_roots = top_kv
891 .iter()
892 .filter(|(k, _)| is_default_child_storage_key(k.as_ref()))
893 .map(|(k, _)| k.clone())
894 .collect::<Vec<_>>();
895
896 if child_roots.is_empty() {
897 info!(target: LOG_TARGET, "๐ฉโ๐ฆ no child roots found to scrape");
898 return Ok(Default::default())
899 }
900
901 info!(
902 target: LOG_TARGET,
903 "๐ฉโ๐ฆ scraping child-tree data from {} top keys",
904 child_roots.len(),
905 );
906
907 let at = self.as_online().at_expected();
908
909 let client = self.as_online().rpc_client();
910 let mut child_kv = vec![];
911 for prefixed_top_key in child_roots {
912 let child_keys =
913 Self::rpc_child_get_keys(client, &prefixed_top_key, StorageKey(vec![]), at).await?;
914
915 let child_kv_inner =
916 Self::rpc_child_get_storage_paged(client, &prefixed_top_key, child_keys, at)
917 .await?;
918
919 let prefixed_top_key = PrefixedStorageKey::new(prefixed_top_key.clone().0);
920 let un_prefixed = match ChildType::from_prefixed_key(&prefixed_top_key) {
921 Some((ChildType::ParentKeyId, storage_key)) => storage_key,
922 None => {
923 error!(target: LOG_TARGET, "invalid key: {prefixed_top_key:?}");
924 return Err("Invalid child key")
925 },
926 };
927
928 let info = ChildInfo::new_default(un_prefixed);
929 let key_values =
930 child_kv_inner.iter().cloned().map(|(k, v)| (k.0, v.0)).collect::<Vec<_>>();
931 child_kv.push((info.clone(), child_kv_inner));
932 for (k, v) in key_values {
933 pending_ext.insert_child(info.clone(), k, v);
934 }
935 }
936
937 Ok(child_kv)
938 }
939
940 async fn load_top_remote(
945 &self,
946 pending_ext: &mut TestExternalities<HashingFor<B>>,
947 ) -> Result<TopKeyValues> {
948 let config = self.as_online();
949 let at = self
950 .as_online()
951 .at
952 .expect("online config must be initialized by this point; qed.");
953 info!(target: LOG_TARGET, "scraping key-pairs from remote at block height {at:?}");
954
955 let mut keys_and_values = Vec::new();
956 for prefix in &config.hashed_prefixes {
957 let now = std::time::Instant::now();
958 let additional_key_values =
959 self.rpc_get_pairs(StorageKey(prefix.to_vec()), at, pending_ext).await?;
960 let elapsed = now.elapsed();
961 info!(
962 target: LOG_TARGET,
963 "adding data for hashed prefix: {:?}, took {:.2}s",
964 HexDisplay::from(prefix),
965 elapsed.as_secs_f32()
966 );
967 keys_and_values.extend(additional_key_values);
968 }
969
970 for key in &config.hashed_keys {
971 let key = StorageKey(key.to_vec());
972 info!(
973 target: LOG_TARGET,
974 "adding data for hashed key: {:?}",
975 HexDisplay::from(&key)
976 );
977 match self.rpc_get_storage(key.clone(), Some(at)).await? {
978 Some(value) => {
979 pending_ext.insert(key.clone().0, value.clone().0);
980 keys_and_values.push((key, value));
981 },
982 None => {
983 warn!(
984 target: LOG_TARGET,
985 "no data found for hashed key: {:?}",
986 HexDisplay::from(&key)
987 );
988 },
989 }
990 }
991
992 Ok(keys_and_values)
993 }
994
995 async fn init_remote_client(&mut self) -> Result<()> {
999 self.as_online_mut().transport.init().await?;
1001
1002 if self.as_online().at.is_none() {
1004 let at = self.rpc_get_head().await?;
1005 info!(
1006 target: LOG_TARGET,
1007 "since no at is provided, setting it to latest finalized head, {at:?}",
1008 );
1009 self.as_online_mut().at = Some(at);
1010 }
1011
1012 let online_config = self.as_online_mut();
1014 online_config.pallets.iter().for_each(|p| {
1015 online_config
1016 .hashed_prefixes
1017 .push(sp_crypto_hashing::twox_128(p.as_bytes()).to_vec())
1018 });
1019
1020 if online_config.child_trie {
1021 online_config.hashed_prefixes.push(DEFAULT_CHILD_STORAGE_KEY_PREFIX.to_vec());
1022 }
1023
1024 if online_config
1027 .hashed_prefixes
1028 .iter()
1029 .filter(|p| *p != DEFAULT_CHILD_STORAGE_KEY_PREFIX)
1030 .count() == 0
1031 {
1032 info!(
1033 target: LOG_TARGET,
1034 "since no prefix is filtered, the data for all pallets will be downloaded"
1035 );
1036 online_config.hashed_prefixes.push(vec![]);
1037 }
1038
1039 Ok(())
1040 }
1041
1042 async fn load_header(&self) -> Result<B::Header> {
1043 let retry_strategy =
1044 FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES);
1045 let get_header_closure = || {
1046 ChainApi::<(), _, B::Header, ()>::header(
1047 self.as_online().rpc_client(),
1048 Some(self.as_online().at_expected()),
1049 )
1050 };
1051 Retry::spawn(retry_strategy, get_header_closure)
1052 .await
1053 .map_err(|_| "Failed to fetch header for block from network")?
1054 .ok_or("Network returned None block header")
1055 }
1056
1057 async fn load_remote_and_maybe_save(&mut self) -> Result<TestExternalities<HashingFor<B>>> {
1062 let state_version =
1063 StateApi::<B::Hash>::runtime_version(self.as_online().rpc_client(), None)
1064 .await
1065 .map_err(|e| {
1066 error!(target: LOG_TARGET, "Error = {e:?}");
1067 "rpc runtime_version failed."
1068 })
1069 .map(|v| v.state_version())?;
1070 let mut pending_ext = TestExternalities::new_with_code_and_state(
1071 Default::default(),
1072 Default::default(),
1073 self.overwrite_state_version.unwrap_or(state_version),
1074 );
1075
1076 let top_kv = self.load_top_remote(&mut pending_ext).await?;
1078 self.load_child_remote(&top_kv, &mut pending_ext).await?;
1079
1080 if let Some(path) = self.as_online().state_snapshot.clone().map(|c| c.path) {
1082 let (raw_storage, storage_root) = pending_ext.into_raw_snapshot();
1083 let snapshot = Snapshot::<B>::new(
1084 state_version,
1085 raw_storage.clone(),
1086 storage_root,
1087 self.load_header().await?,
1088 );
1089 let encoded = snapshot.encode();
1090 info!(
1091 target: LOG_TARGET,
1092 "writing snapshot of {} bytes to {path:?}",
1093 encoded.len(),
1094 );
1095 std::fs::write(path, encoded).map_err(|_| "fs::write failed")?;
1096
1097 return Ok(TestExternalities::from_raw_snapshot(
1099 raw_storage,
1100 storage_root,
1101 self.overwrite_state_version.unwrap_or(state_version),
1102 ))
1103 }
1104
1105 Ok(pending_ext)
1106 }
1107
1108 async fn do_load_remote(&mut self) -> Result<RemoteExternalities<B>> {
1109 self.init_remote_client().await?;
1110 let inner_ext = self.load_remote_and_maybe_save().await?;
1111 Ok(RemoteExternalities { header: self.load_header().await?, inner_ext })
1112 }
1113
1114 fn do_load_offline(&mut self, config: OfflineConfig) -> Result<RemoteExternalities<B>> {
1115 let (header, inner_ext) = logging::with_elapsed(
1116 || {
1117 info!(target: LOG_TARGET, "Loading snapshot from {:?}", &config.state_snapshot.path);
1118
1119 let Snapshot { header, state_version, raw_storage, storage_root, .. } =
1120 Snapshot::<B>::load(&config.state_snapshot.path)?;
1121 let inner_ext = TestExternalities::from_raw_snapshot(
1122 raw_storage,
1123 storage_root,
1124 self.overwrite_state_version.unwrap_or(state_version),
1125 );
1126
1127 Ok((header, inner_ext))
1128 },
1129 "Loading snapshot...",
1130 |_| "Loaded snapshot".into(),
1131 )?;
1132
1133 Ok(RemoteExternalities { inner_ext, header })
1134 }
1135
1136 pub(crate) async fn pre_build(mut self) -> Result<RemoteExternalities<B>> {
1137 let mut ext = match self.mode.clone() {
1138 Mode::Offline(config) => self.do_load_offline(config)?,
1139 Mode::Online(_) => self.do_load_remote().await?,
1140 Mode::OfflineOrElseOnline(offline_config, _) => {
1141 match self.do_load_offline(offline_config) {
1142 Ok(x) => x,
1143 Err(_) => self.do_load_remote().await?,
1144 }
1145 },
1146 };
1147
1148 if !self.hashed_key_values.is_empty() {
1150 info!(
1151 target: LOG_TARGET,
1152 "extending externalities with {} manually injected key-values",
1153 self.hashed_key_values.len()
1154 );
1155 ext.batch_insert(self.hashed_key_values.into_iter().map(|(k, v)| (k.0, v.0)));
1156 }
1157
1158 if !self.hashed_blacklist.is_empty() {
1160 info!(
1161 target: LOG_TARGET,
1162 "excluding externalities from {} keys",
1163 self.hashed_blacklist.len()
1164 );
1165 for k in self.hashed_blacklist {
1166 ext.execute_with(|| sp_io::storage::clear(&k));
1167 }
1168 }
1169
1170 Ok(ext)
1171 }
1172}
1173
1174impl<B: BlockT> Builder<B>
1176where
1177 B::Hash: DeserializeOwned,
1178 B::Header: DeserializeOwned,
1179{
1180 pub fn new() -> Self {
1182 Default::default()
1183 }
1184
1185 pub fn inject_hashed_key_value(mut self, injections: Vec<KeyValue>) -> Self {
1187 for i in injections {
1188 self.hashed_key_values.push(i.clone());
1189 }
1190 self
1191 }
1192
1193 pub fn blacklist_hashed_key(mut self, hashed: &[u8]) -> Self {
1196 self.hashed_blacklist.push(hashed.to_vec());
1197 self
1198 }
1199
1200 pub fn mode(mut self, mode: Mode<B::Hash>) -> Self {
1202 self.mode = mode;
1203 self
1204 }
1205
1206 pub fn overwrite_state_version(mut self, version: StateVersion) -> Self {
1208 self.overwrite_state_version = Some(version);
1209 self
1210 }
1211
1212 pub async fn build(self) -> Result<RemoteExternalities<B>> {
1213 let mut ext = self.pre_build().await?;
1214 ext.commit_all().unwrap();
1215
1216 info!(
1217 target: LOG_TARGET,
1218 "initialized state externalities with storage root {:?} and state_version {:?}",
1219 ext.as_backend().root(),
1220 ext.state_version
1221 );
1222
1223 Ok(ext)
1224 }
1225}
1226
1227#[cfg(test)]
1228mod test_prelude {
1229 pub(crate) use super::*;
1230 pub(crate) use sp_runtime::testing::{Block as RawBlock, MockCallU64};
1231 pub(crate) type UncheckedXt = sp_runtime::testing::TestXt<MockCallU64, ()>;
1232 pub(crate) type Block = RawBlock<UncheckedXt>;
1233
1234 pub(crate) fn init_logger() {
1235 sp_tracing::try_init_simple();
1236 }
1237}
1238
1239#[cfg(test)]
1240mod tests {
1241 use super::test_prelude::*;
1242
1243 #[tokio::test]
1244 async fn can_load_state_snapshot() {
1245 init_logger();
1246 Builder::<Block>::new()
1247 .mode(Mode::Offline(OfflineConfig {
1248 state_snapshot: SnapshotConfig::new("test_data/test.snap"),
1249 }))
1250 .build()
1251 .await
1252 .unwrap()
1253 .execute_with(|| {});
1254 }
1255
1256 #[tokio::test]
1257 async fn can_exclude_from_snapshot() {
1258 init_logger();
1259
1260 let some_key = Builder::<Block>::new()
1262 .mode(Mode::Offline(OfflineConfig {
1263 state_snapshot: SnapshotConfig::new("test_data/test.snap"),
1264 }))
1265 .build()
1266 .await
1267 .expect("Can't read state snapshot file")
1268 .execute_with(|| {
1269 let key =
1270 sp_io::storage::next_key(&[]).expect("some key must exist in the snapshot");
1271 assert!(sp_io::storage::get(&key).is_some());
1272 key
1273 });
1274
1275 Builder::<Block>::new()
1276 .mode(Mode::Offline(OfflineConfig {
1277 state_snapshot: SnapshotConfig::new("test_data/test.snap"),
1278 }))
1279 .blacklist_hashed_key(&some_key)
1280 .build()
1281 .await
1282 .expect("Can't read state snapshot file")
1283 .execute_with(|| assert!(sp_io::storage::get(&some_key).is_none()));
1284 }
1285}
1286
1287#[cfg(all(test, feature = "remote-test"))]
1288mod remote_tests {
1289 use super::test_prelude::*;
1290 use std::{env, os::unix::fs::MetadataExt};
1291
1292 fn endpoint() -> String {
1293 env::var("TEST_WS").unwrap_or_else(|_| DEFAULT_HTTP_ENDPOINT.to_string())
1294 }
1295
1296 #[tokio::test]
1297 async fn state_version_is_kept_and_can_be_altered() {
1298 const CACHE: &'static str = "state_version_is_kept_and_can_be_altered";
1299 init_logger();
1300
1301 let ext = Builder::<Block>::new()
1303 .mode(Mode::Online(OnlineConfig {
1304 transport: endpoint().clone().into(),
1305 pallets: vec!["Proxy".to_owned()],
1306 child_trie: false,
1307 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1308 ..Default::default()
1309 }))
1310 .build()
1311 .await
1312 .unwrap();
1313
1314 let cached_ext = Builder::<Block>::new()
1316 .mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
1317 .build()
1318 .await
1319 .unwrap();
1320
1321 assert_eq!(ext.state_version, cached_ext.state_version);
1322
1323 let other = match ext.state_version {
1325 StateVersion::V0 => StateVersion::V1,
1326 StateVersion::V1 => StateVersion::V0,
1327 };
1328 let cached_ext = Builder::<Block>::new()
1329 .mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
1330 .overwrite_state_version(other)
1331 .build()
1332 .await
1333 .unwrap();
1334
1335 assert_eq!(cached_ext.state_version, other);
1336 }
1337
1338 #[tokio::test]
1339 async fn snapshot_block_hash_works() {
1340 const CACHE: &'static str = "snapshot_block_hash_works";
1341 init_logger();
1342
1343 let ext = Builder::<Block>::new()
1345 .mode(Mode::Online(OnlineConfig {
1346 transport: endpoint().clone().into(),
1347 pallets: vec!["Proxy".to_owned()],
1348 child_trie: false,
1349 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1350 ..Default::default()
1351 }))
1352 .build()
1353 .await
1354 .unwrap();
1355
1356 let cached_ext = Builder::<Block>::new()
1358 .mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
1359 .build()
1360 .await
1361 .unwrap();
1362
1363 assert_eq!(ext.header.hash(), cached_ext.header.hash());
1364 }
1365
1366 #[tokio::test]
1367 async fn child_keys_are_loaded() {
1368 const CACHE: &'static str = "snapshot_retains_storage";
1369 init_logger();
1370
1371 let mut child_ext = Builder::<Block>::new()
1373 .mode(Mode::Online(OnlineConfig {
1374 transport: endpoint().clone().into(),
1375 pallets: vec!["Proxy".to_owned()],
1376 child_trie: true,
1377 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1378 ..Default::default()
1379 }))
1380 .build()
1381 .await
1382 .unwrap();
1383
1384 let mut ext = Builder::<Block>::new()
1386 .mode(Mode::Online(OnlineConfig {
1387 transport: endpoint().clone().into(),
1388 pallets: vec!["Proxy".to_owned()],
1389 child_trie: false,
1390 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1391 ..Default::default()
1392 }))
1393 .build()
1394 .await
1395 .unwrap();
1396
1397 assert!(
1399 child_ext.as_backend().backend_storage().keys().len() >
1400 ext.as_backend().backend_storage().keys().len()
1401 );
1402 }
1403
1404 #[tokio::test]
1405 async fn offline_else_online_works() {
1406 const CACHE: &'static str = "offline_else_online_works_data";
1407 init_logger();
1408 Builder::<Block>::new()
1410 .mode(Mode::OfflineOrElseOnline(
1411 OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) },
1412 OnlineConfig {
1413 transport: endpoint().clone().into(),
1414 pallets: vec!["Proxy".to_owned()],
1415 child_trie: false,
1416 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1417 ..Default::default()
1418 },
1419 ))
1420 .build()
1421 .await
1422 .unwrap()
1423 .execute_with(|| {});
1424
1425 Builder::<Block>::new()
1427 .mode(Mode::OfflineOrElseOnline(
1428 OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) },
1429 OnlineConfig {
1430 transport: "ws://non-existent:666".to_owned().into(),
1431 ..Default::default()
1432 },
1433 ))
1434 .build()
1435 .await
1436 .unwrap()
1437 .execute_with(|| {});
1438
1439 let to_delete = std::fs::read_dir(Path::new("."))
1440 .unwrap()
1441 .into_iter()
1442 .map(|d| d.unwrap())
1443 .filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
1444 .collect::<Vec<_>>();
1445
1446 assert!(to_delete.len() == 1);
1447 std::fs::remove_file(to_delete[0].path()).unwrap();
1448 }
1449
1450 #[tokio::test]
1451 async fn can_build_one_small_pallet() {
1452 init_logger();
1453 Builder::<Block>::new()
1454 .mode(Mode::Online(OnlineConfig {
1455 transport: endpoint().clone().into(),
1456 pallets: vec!["Proxy".to_owned()],
1457 child_trie: false,
1458 ..Default::default()
1459 }))
1460 .build()
1461 .await
1462 .unwrap()
1463 .execute_with(|| {});
1464 }
1465
1466 #[tokio::test]
1467 async fn can_build_few_pallet() {
1468 init_logger();
1469 Builder::<Block>::new()
1470 .mode(Mode::Online(OnlineConfig {
1471 transport: endpoint().clone().into(),
1472 pallets: vec!["Proxy".to_owned(), "Multisig".to_owned()],
1473 child_trie: false,
1474 ..Default::default()
1475 }))
1476 .build()
1477 .await
1478 .unwrap()
1479 .execute_with(|| {});
1480 }
1481
1482 #[tokio::test(flavor = "multi_thread")]
1483 async fn can_create_snapshot() {
1484 const CACHE: &'static str = "can_create_snapshot";
1485 init_logger();
1486
1487 Builder::<Block>::new()
1488 .mode(Mode::Online(OnlineConfig {
1489 transport: endpoint().clone().into(),
1490 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1491 pallets: vec!["Proxy".to_owned()],
1492 child_trie: false,
1493 ..Default::default()
1494 }))
1495 .build()
1496 .await
1497 .unwrap()
1498 .execute_with(|| {});
1499
1500 let to_delete = std::fs::read_dir(Path::new("."))
1501 .unwrap()
1502 .into_iter()
1503 .map(|d| d.unwrap())
1504 .filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
1505 .collect::<Vec<_>>();
1506
1507 assert!(to_delete.len() == 1);
1508 let to_delete = to_delete.first().unwrap();
1509 assert!(std::fs::metadata(to_delete.path()).unwrap().size() > 1);
1510 std::fs::remove_file(to_delete.path()).unwrap();
1511 }
1512
1513 #[tokio::test]
1514 async fn can_create_child_snapshot() {
1515 const CACHE: &'static str = "can_create_child_snapshot";
1516 init_logger();
1517 Builder::<Block>::new()
1518 .mode(Mode::Online(OnlineConfig {
1519 transport: endpoint().clone().into(),
1520 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1521 pallets: vec!["Crowdloan".to_owned()],
1522 child_trie: true,
1523 ..Default::default()
1524 }))
1525 .build()
1526 .await
1527 .unwrap()
1528 .execute_with(|| {});
1529
1530 let to_delete = std::fs::read_dir(Path::new("."))
1531 .unwrap()
1532 .into_iter()
1533 .map(|d| d.unwrap())
1534 .filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
1535 .collect::<Vec<_>>();
1536
1537 assert!(to_delete.len() == 1);
1538 let to_delete = to_delete.first().unwrap();
1539 assert!(std::fs::metadata(to_delete.path()).unwrap().size() > 1);
1540 std::fs::remove_file(to_delete.path()).unwrap();
1541 }
1542
1543 #[tokio::test]
1544 async fn can_build_big_pallet() {
1545 if std::option_env!("TEST_WS").is_none() {
1546 return
1547 }
1548 init_logger();
1549 Builder::<Block>::new()
1550 .mode(Mode::Online(OnlineConfig {
1551 transport: endpoint().clone().into(),
1552 pallets: vec!["Staking".to_owned()],
1553 child_trie: false,
1554 ..Default::default()
1555 }))
1556 .build()
1557 .await
1558 .unwrap()
1559 .execute_with(|| {});
1560 }
1561
1562 #[tokio::test]
1563 async fn can_fetch_all() {
1564 if std::option_env!("TEST_WS").is_none() {
1565 return
1566 }
1567 init_logger();
1568 Builder::<Block>::new()
1569 .mode(Mode::Online(OnlineConfig {
1570 transport: endpoint().clone().into(),
1571 ..Default::default()
1572 }))
1573 .build()
1574 .await
1575 .unwrap()
1576 .execute_with(|| {});
1577 }
1578
1579 #[tokio::test]
1580 async fn can_fetch_in_parallel() {
1581 init_logger();
1582
1583 let mut builder = Builder::<Block>::new().mode(Mode::Online(OnlineConfig {
1584 transport: endpoint().clone().into(),
1585 ..Default::default()
1586 }));
1587 builder.init_remote_client().await.unwrap();
1588
1589 let at = builder.as_online().at.unwrap();
1590
1591 let prefix = StorageKey(vec![13]);
1592 let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap();
1593 let para = builder.rpc_get_keys_parallel(&prefix, at, 4).await.unwrap();
1594 assert_eq!(paged, para);
1595
1596 let prefix = StorageKey(vec![]);
1597 let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap();
1598 let para = builder.rpc_get_keys_parallel(&prefix, at, 8).await.unwrap();
1599 assert_eq!(paged, para);
1600 }
1601}