1mod logging;
24
25use codec::{Compact, Decode, Encode};
26use indicatif::{ProgressBar, ProgressStyle};
27use jsonrpsee::{core::params::ArrayParams, http_client::HttpClient};
28use log::*;
29use serde::de::DeserializeOwned;
30use sp_core::{
31 hexdisplay::HexDisplay,
32 storage::{
33 well_known_keys::{is_default_child_storage_key, DEFAULT_CHILD_STORAGE_KEY_PREFIX},
34 ChildInfo, ChildType, PrefixedStorageKey, StorageData, StorageKey,
35 },
36};
37use sp_runtime::{
38 traits::{Block as BlockT, HashingFor},
39 StateVersion,
40};
41use sp_state_machine::TestExternalities;
42use std::{
43 cmp::{max, min},
44 fs,
45 ops::{Deref, DerefMut},
46 path::{Path, PathBuf},
47 sync::Arc,
48 time::{Duration, Instant},
49};
50use substrate_rpc_client::{rpc_params, BatchRequestBuilder, ChainApi, ClientT, StateApi};
51use tokio_retry::{strategy::FixedInterval, Retry};
52
53type Result<T, E = &'static str> = std::result::Result<T, E>;
54
55type KeyValue = (StorageKey, StorageData);
56type TopKeyValues = Vec<KeyValue>;
57type ChildKeyValues = Vec<(ChildInfo, Vec<KeyValue>)>;
58type SnapshotVersion = Compact<u16>;
59
60const LOG_TARGET: &str = "remote-ext";
61const DEFAULT_HTTP_ENDPOINT: &str = "https://try-runtime.polkadot.io:443";
62const SNAPSHOT_VERSION: SnapshotVersion = Compact(4);
63
64#[derive(Decode, Encode)]
66struct Snapshot<B: BlockT> {
67 snapshot_version: SnapshotVersion,
68 state_version: StateVersion,
69 raw_storage: Vec<(Vec<u8>, (Vec<u8>, i32))>,
71 storage_root: B::Hash,
74 header: B::Header,
75}
76
77impl<B: BlockT> Snapshot<B> {
78 pub fn new(
79 state_version: StateVersion,
80 raw_storage: Vec<(Vec<u8>, (Vec<u8>, i32))>,
81 storage_root: B::Hash,
82 header: B::Header,
83 ) -> Self {
84 Self {
85 snapshot_version: SNAPSHOT_VERSION,
86 state_version,
87 raw_storage,
88 storage_root,
89 header,
90 }
91 }
92
93 fn load(path: &PathBuf) -> Result<Snapshot<B>> {
94 let bytes = fs::read(path).map_err(|_| "fs::read failed.")?;
95 let snapshot_version = SnapshotVersion::decode(&mut &*bytes)
98 .map_err(|_| "Failed to decode snapshot version")?;
99
100 if snapshot_version != SNAPSHOT_VERSION {
101 return Err("Unsupported snapshot version detected. Please create a new snapshot.")
102 }
103
104 Decode::decode(&mut &*bytes).map_err(|_| "Decode failed")
105 }
106}
107
108pub struct RemoteExternalities<B: BlockT> {
111 pub inner_ext: TestExternalities<HashingFor<B>>,
113 pub header: B::Header,
115}
116
117impl<B: BlockT> Deref for RemoteExternalities<B> {
118 type Target = TestExternalities<HashingFor<B>>;
119 fn deref(&self) -> &Self::Target {
120 &self.inner_ext
121 }
122}
123
124impl<B: BlockT> DerefMut for RemoteExternalities<B> {
125 fn deref_mut(&mut self) -> &mut Self::Target {
126 &mut self.inner_ext
127 }
128}
129
130#[derive(Clone)]
132pub enum Mode<H> {
133 Online(OnlineConfig<H>),
135 Offline(OfflineConfig),
137 OfflineOrElseOnline(OfflineConfig, OnlineConfig<H>),
139}
140
141impl<H> Default for Mode<H> {
142 fn default() -> Self {
143 Mode::Online(OnlineConfig::default())
144 }
145}
146
147#[derive(Clone)]
151pub struct OfflineConfig {
152 pub state_snapshot: SnapshotConfig,
154}
155
156#[derive(Debug, Clone)]
158pub enum Transport {
159 Uri(String),
161 RemoteClient(HttpClient),
163}
164
165impl Transport {
166 fn as_client(&self) -> Option<&HttpClient> {
167 match self {
168 Self::RemoteClient(client) => Some(client),
169 _ => None,
170 }
171 }
172
173 async fn init(&mut self) -> Result<()> {
175 if let Self::Uri(uri) = self {
176 debug!(target: LOG_TARGET, "initializing remote client to {uri:?}");
177
178 let uri = if uri.starts_with("ws://") {
183 let uri = uri.replace("ws://", "http://");
184 info!(target: LOG_TARGET, "replacing ws:// in uri with http://: {uri:?} (ws is currently unstable for fetching remote storage, for more see https://github.com/paritytech/jsonrpsee/issues/1086)");
185 uri
186 } else if uri.starts_with("wss://") {
187 let uri = uri.replace("wss://", "https://");
188 info!(target: LOG_TARGET, "replacing wss:// in uri with https://: {uri:?} (ws is currently unstable for fetching remote storage, for more see https://github.com/paritytech/jsonrpsee/issues/1086)");
189 uri
190 } else {
191 uri.clone()
192 };
193 let http_client = HttpClient::builder()
194 .max_request_size(u32::MAX)
195 .max_response_size(u32::MAX)
196 .request_timeout(std::time::Duration::from_secs(60 * 5))
197 .build(uri)
198 .map_err(|e| {
199 error!(target: LOG_TARGET, "error: {e:?}");
200 "failed to build http client"
201 })?;
202
203 *self = Self::RemoteClient(http_client)
204 }
205
206 Ok(())
207 }
208}
209
210impl From<String> for Transport {
211 fn from(uri: String) -> Self {
212 Transport::Uri(uri)
213 }
214}
215
216impl From<HttpClient> for Transport {
217 fn from(client: HttpClient) -> Self {
218 Transport::RemoteClient(client)
219 }
220}
221
222#[derive(Clone)]
226pub struct OnlineConfig<H> {
227 pub at: Option<H>,
230 pub state_snapshot: Option<SnapshotConfig>,
232 pub pallets: Vec<String>,
234 pub transport: Transport,
236 pub child_trie: bool,
238 pub hashed_prefixes: Vec<Vec<u8>>,
241 pub hashed_keys: Vec<Vec<u8>>,
243}
244
245impl<H: Clone> OnlineConfig<H> {
246 fn rpc_client(&self) -> &HttpClient {
248 self.transport
249 .as_client()
250 .expect("http client must have been initialized by now; qed.")
251 }
252
253 fn at_expected(&self) -> H {
254 self.at.clone().expect("block at must be initialized; qed")
255 }
256}
257
258impl<H> Default for OnlineConfig<H> {
259 fn default() -> Self {
260 Self {
261 transport: Transport::from(DEFAULT_HTTP_ENDPOINT.to_owned()),
262 child_trie: true,
263 at: None,
264 state_snapshot: None,
265 pallets: Default::default(),
266 hashed_keys: Default::default(),
267 hashed_prefixes: Default::default(),
268 }
269 }
270}
271
272impl<H> From<String> for OnlineConfig<H> {
273 fn from(t: String) -> Self {
274 Self { transport: t.into(), ..Default::default() }
275 }
276}
277
278#[derive(Clone)]
280pub struct SnapshotConfig {
281 pub path: PathBuf,
283}
284
285impl SnapshotConfig {
286 pub fn new<P: Into<PathBuf>>(path: P) -> Self {
287 Self { path: path.into() }
288 }
289}
290
291impl From<String> for SnapshotConfig {
292 fn from(s: String) -> Self {
293 Self::new(s)
294 }
295}
296
297impl Default for SnapshotConfig {
298 fn default() -> Self {
299 Self { path: Path::new("SNAPSHOT").into() }
300 }
301}
302
303#[derive(Clone)]
305pub struct Builder<B: BlockT> {
306 hashed_key_values: Vec<KeyValue>,
309 hashed_blacklist: Vec<Vec<u8>>,
311 mode: Mode<B::Hash>,
313 overwrite_state_version: Option<StateVersion>,
318}
319
320impl<B: BlockT> Default for Builder<B> {
321 fn default() -> Self {
322 Self {
323 mode: Default::default(),
324 hashed_key_values: Default::default(),
325 hashed_blacklist: Default::default(),
326 overwrite_state_version: None,
327 }
328 }
329}
330
331impl<B: BlockT> Builder<B> {
333 fn as_online(&self) -> &OnlineConfig<B::Hash> {
334 match &self.mode {
335 Mode::Online(config) => config,
336 Mode::OfflineOrElseOnline(_, config) => config,
337 _ => panic!("Unexpected mode: Online"),
338 }
339 }
340
341 fn as_online_mut(&mut self) -> &mut OnlineConfig<B::Hash> {
342 match &mut self.mode {
343 Mode::Online(config) => config,
344 Mode::OfflineOrElseOnline(_, config) => config,
345 _ => panic!("Unexpected mode: Online"),
346 }
347 }
348}
349
350impl<B: BlockT> Builder<B>
352where
353 B::Hash: DeserializeOwned,
354 B::Header: DeserializeOwned,
355{
356 const PARALLEL_REQUESTS: usize = 4;
357 const BATCH_SIZE_INCREASE_FACTOR: f32 = 1.10;
358 const BATCH_SIZE_DECREASE_FACTOR: f32 = 0.50;
359 const REQUEST_DURATION_TARGET: Duration = Duration::from_secs(15);
360 const INITIAL_BATCH_SIZE: usize = 10;
361 const DEFAULT_KEY_DOWNLOAD_PAGE: u32 = 1000;
363 const MAX_RETRIES: usize = 12;
364 const KEYS_PAGE_RETRY_INTERVAL: Duration = Duration::from_secs(5);
365
366 async fn rpc_get_storage(
367 &self,
368 key: StorageKey,
369 maybe_at: Option<B::Hash>,
370 ) -> Result<Option<StorageData>> {
371 trace!(target: LOG_TARGET, "rpc: get_storage");
372 self.as_online().rpc_client().storage(key, maybe_at).await.map_err(|e| {
373 error!(target: LOG_TARGET, "Error = {e:?}");
374 "rpc get_storage failed."
375 })
376 }
377
378 async fn rpc_get_head(&self) -> Result<B::Hash> {
380 trace!(target: LOG_TARGET, "rpc: finalized_head");
381
382 ChainApi::<(), _, B::Header, ()>::finalized_head(self.as_online().rpc_client())
384 .await
385 .map_err(|e| {
386 error!(target: LOG_TARGET, "Error = {e:?}");
387 "rpc finalized_head failed."
388 })
389 }
390
391 async fn get_keys_single_page(
392 &self,
393 prefix: Option<StorageKey>,
394 start_key: Option<StorageKey>,
395 at: B::Hash,
396 ) -> Result<Vec<StorageKey>> {
397 self.as_online()
398 .rpc_client()
399 .storage_keys_paged(prefix, Self::DEFAULT_KEY_DOWNLOAD_PAGE, start_key, Some(at))
400 .await
401 .map_err(|e| {
402 error!(target: LOG_TARGET, "Error = {e:?}");
403 "rpc get_keys failed"
404 })
405 }
406
407 async fn rpc_get_keys_parallel(
409 &self,
410 prefix: &StorageKey,
411 block: B::Hash,
412 parallel: usize,
413 ) -> Result<Vec<StorageKey>> {
414 fn gen_start_keys(prefix: &StorageKey) -> Vec<StorageKey> {
417 let mut prefix = prefix.as_ref().to_vec();
418 let scale = 32usize.saturating_sub(prefix.len());
419
420 if scale < 9 {
422 prefix.extend(vec![0; scale]);
423 return vec![StorageKey(prefix)]
424 }
425
426 let chunks = 16;
427 let step = 0x10000 / chunks;
428 let ext = scale - 2;
429
430 (0..chunks)
431 .map(|i| {
432 let mut key = prefix.clone();
433 let start = i * step;
434 key.extend(vec![(start >> 8) as u8, (start & 0xff) as u8]);
435 key.extend(vec![0; ext]);
436 StorageKey(key)
437 })
438 .collect()
439 }
440
441 let start_keys = gen_start_keys(&prefix);
442 let start_keys: Vec<Option<&StorageKey>> = start_keys.iter().map(Some).collect();
443 let mut end_keys: Vec<Option<&StorageKey>> = start_keys[1..].to_vec();
444 end_keys.push(None);
445
446 let parallel = Arc::new(tokio::sync::Semaphore::new(parallel));
448 let builder = Arc::new(self.clone());
449 let mut handles = vec![];
450
451 for (start_key, end_key) in start_keys.into_iter().zip(end_keys) {
452 let permit = parallel
453 .clone()
454 .acquire_owned()
455 .await
456 .expect("semaphore is not closed until the end of loop");
457
458 let builder = builder.clone();
459 let prefix = prefix.clone();
460 let start_key = start_key.cloned();
461 let end_key = end_key.cloned();
462
463 let handle = tokio::spawn(async move {
464 let res = builder
465 .rpc_get_keys_in_range(&prefix, block, start_key.as_ref(), end_key.as_ref())
466 .await;
467 drop(permit);
468 res
469 });
470
471 handles.push(handle);
472 }
473
474 parallel.close();
475
476 let keys = futures::future::join_all(handles)
477 .await
478 .into_iter()
479 .filter_map(|res| match res {
480 Ok(Ok(keys)) => Some(keys),
481 _ => None,
482 })
483 .flatten()
484 .collect::<Vec<StorageKey>>();
485
486 Ok(keys)
487 }
488
489 async fn rpc_get_keys_in_range(
492 &self,
493 prefix: &StorageKey,
494 block: B::Hash,
495 start_key: Option<&StorageKey>,
496 end_key: Option<&StorageKey>,
497 ) -> Result<Vec<StorageKey>> {
498 let mut last_key: Option<&StorageKey> = start_key;
499 let mut keys: Vec<StorageKey> = vec![];
500
501 loop {
502 let retry_strategy =
505 FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES);
506 let get_page_closure =
507 || self.get_keys_single_page(Some(prefix.clone()), last_key.cloned(), block);
508 let mut page = Retry::spawn(retry_strategy, get_page_closure).await?;
509
510 if let (Some(last), Some(end)) = (page.last(), end_key) {
512 if last >= end {
513 page.retain(|key| key < end);
514 }
515 }
516
517 let page_len = page.len();
518 keys.extend(page);
519 last_key = keys.last();
520
521 if page_len < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize {
524 debug!(target: LOG_TARGET, "last page received: {page_len}");
525 break
526 }
527
528 debug!(
529 target: LOG_TARGET,
530 "new total = {}, full page received: {}",
531 keys.len(),
532 HexDisplay::from(last_key.expect("full page received, cannot be None"))
533 );
534 }
535
536 Ok(keys)
537 }
538
539 async fn get_storage_data_dynamic_batch_size(
584 client: &HttpClient,
585 payloads: Vec<(String, ArrayParams)>,
586 bar: &ProgressBar,
587 ) -> Result<Vec<Option<StorageData>>, String> {
588 let mut all_data: Vec<Option<StorageData>> = vec![];
589 let mut start_index = 0;
590 let mut retries = 0usize;
591 let mut batch_size = Self::INITIAL_BATCH_SIZE;
592 let total_payloads = payloads.len();
593
594 while start_index < total_payloads {
595 debug!(
596 target: LOG_TARGET,
597 "Remaining payloads: {} Batch request size: {batch_size}",
598 total_payloads - start_index,
599 );
600
601 let end_index = usize::min(start_index + batch_size, total_payloads);
602 let page = &payloads[start_index..end_index];
603
604 let mut batch = BatchRequestBuilder::new();
606 for (method, params) in page.iter() {
607 batch
608 .insert(method, params.clone())
609 .map_err(|_| "Invalid batch method and/or params")?;
610 }
611
612 let request_started = Instant::now();
613 let batch_response = match client.batch_request::<Option<StorageData>>(batch).await {
614 Ok(batch_response) => {
615 retries = 0;
616 batch_response
617 },
618 Err(e) => {
619 if retries > Self::MAX_RETRIES {
620 return Err(e.to_string())
621 }
622
623 retries += 1;
624 let failure_log = format!(
625 "Batch request failed ({retries}/{} retries). Error: {e}",
626 Self::MAX_RETRIES
627 );
628 if retries >= 2 {
631 warn!("{failure_log}");
632 batch_size = 1;
633 } else {
634 debug!("{failure_log}");
635 batch_size =
637 (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize;
638 }
639 continue
640 },
641 };
642
643 let request_duration = request_started.elapsed();
644 batch_size = if request_duration > Self::REQUEST_DURATION_TARGET {
645 max(1, (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize)
647 } else {
648 min(
650 total_payloads - start_index,
651 max(
652 batch_size + 1,
653 (batch_size as f32 * Self::BATCH_SIZE_INCREASE_FACTOR) as usize,
654 ),
655 )
656 };
657
658 debug!(
659 target: LOG_TARGET,
660 "Request duration: {request_duration:?} Target duration: {:?} Last batch size: {} Next batch size: {batch_size}",
661 Self::REQUEST_DURATION_TARGET,
662 end_index - start_index,
663 );
664
665 let batch_response_len = batch_response.len();
666 for item in batch_response.into_iter() {
667 match item {
668 Ok(x) => all_data.push(x),
669 Err(e) => return Err(e.message().to_string()),
670 }
671 }
672 bar.inc(batch_response_len as u64);
673
674 start_index = end_index;
676 }
677
678 Ok(all_data)
679 }
680
681 pub(crate) async fn rpc_get_pairs(
686 &self,
687 prefix: StorageKey,
688 at: B::Hash,
689 pending_ext: &mut TestExternalities<HashingFor<B>>,
690 ) -> Result<Vec<KeyValue>> {
691 let keys = logging::with_elapsed_async(
692 || async {
693 let keys = self
696 .rpc_get_keys_parallel(&prefix, at, Self::PARALLEL_REQUESTS)
697 .await?
698 .into_iter()
699 .collect::<Vec<_>>();
700
701 Ok(keys)
702 },
703 "Scraping keys...",
704 |keys| format!("Found {} keys", keys.len()),
705 )
706 .await?;
707
708 if keys.is_empty() {
709 return Ok(Default::default())
710 }
711
712 let client = self.as_online().rpc_client();
713 let payloads = keys
714 .iter()
715 .map(|key| ("state_getStorage".to_string(), rpc_params!(key, at)))
716 .collect::<Vec<_>>();
717
718 let bar = ProgressBar::new(payloads.len() as u64);
719 bar.enable_steady_tick(Duration::from_secs(1));
720 bar.set_message("Downloading key values".to_string());
721 bar.set_style(
722 ProgressStyle::with_template(
723 "[{elapsed_precise}] {msg} {per_sec} [{wide_bar}] {pos}/{len} ({eta})",
724 )
725 .unwrap()
726 .progress_chars("=>-"),
727 );
728 let payloads_chunked = payloads.chunks((payloads.len() / Self::PARALLEL_REQUESTS).max(1));
729 let requests = payloads_chunked.map(|payload_chunk| {
730 Self::get_storage_data_dynamic_batch_size(client, payload_chunk.to_vec(), &bar)
731 });
732 let storage_data_result: Result<Vec<_>, _> =
734 futures::future::join_all(requests).await.into_iter().collect();
735 let storage_data = match storage_data_result {
737 Ok(storage_data) => storage_data.into_iter().flatten().collect::<Vec<_>>(),
738 Err(e) => {
739 error!(target: LOG_TARGET, "Error while getting storage data: {e}");
740 return Err("Error while getting storage data")
741 },
742 };
743 bar.finish_with_message("โ
Downloaded key values");
744 println!();
745
746 assert_eq!(keys.len(), storage_data.len());
748
749 let key_values = keys
750 .iter()
751 .zip(storage_data)
752 .map(|(key, maybe_value)| match maybe_value {
753 Some(data) => (key.clone(), data),
754 None => {
755 warn!(target: LOG_TARGET, "key {key:?} had none corresponding value.");
756 let data = StorageData(vec![]);
757 (key.clone(), data)
758 },
759 })
760 .collect::<Vec<_>>();
761
762 logging::with_elapsed(
763 || {
764 pending_ext.batch_insert(key_values.clone().into_iter().filter_map(|(k, v)| {
765 match is_default_child_storage_key(&k.0) {
768 true => None,
769 false => Some((k.0, v.0)),
770 }
771 }));
772
773 Ok(())
774 },
775 "Inserting keys into DB...",
776 |_| "Inserted keys into DB".into(),
777 )
778 .expect("must succeed; qed");
779
780 Ok(key_values)
781 }
782
783 pub(crate) async fn rpc_child_get_storage_paged(
785 client: &HttpClient,
786 prefixed_top_key: &StorageKey,
787 child_keys: Vec<StorageKey>,
788 at: B::Hash,
789 ) -> Result<Vec<KeyValue>> {
790 let child_keys_len = child_keys.len();
791
792 let payloads = child_keys
793 .iter()
794 .map(|key| {
795 (
796 "childstate_getStorage".to_string(),
797 rpc_params![
798 PrefixedStorageKey::new(prefixed_top_key.as_ref().to_vec()),
799 key,
800 at
801 ],
802 )
803 })
804 .collect::<Vec<_>>();
805
806 let bar = ProgressBar::new(payloads.len() as u64);
807 let storage_data =
808 match Self::get_storage_data_dynamic_batch_size(client, payloads, &bar).await {
809 Ok(storage_data) => storage_data,
810 Err(e) => {
811 error!(target: LOG_TARGET, "batch processing failed: {e:?}");
812 return Err("batch processing failed")
813 },
814 };
815
816 assert_eq!(child_keys_len, storage_data.len());
817
818 Ok(child_keys
819 .iter()
820 .zip(storage_data)
821 .map(|(key, maybe_value)| match maybe_value {
822 Some(v) => (key.clone(), v),
823 None => {
824 warn!(target: LOG_TARGET, "key {key:?} had no corresponding value.");
825 (key.clone(), StorageData(vec![]))
826 },
827 })
828 .collect::<Vec<_>>())
829 }
830
831 pub(crate) async fn rpc_child_get_keys(
832 client: &HttpClient,
833 prefixed_top_key: &StorageKey,
834 child_prefix: StorageKey,
835 at: B::Hash,
836 ) -> Result<Vec<StorageKey>> {
837 let retry_strategy =
838 FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES);
839 let mut all_child_keys = Vec::new();
840 let mut start_key = None;
841
842 loop {
843 let get_child_keys_closure = || {
844 let top_key = PrefixedStorageKey::new(prefixed_top_key.0.clone());
845 substrate_rpc_client::ChildStateApi::storage_keys_paged(
846 client,
847 top_key,
848 Some(child_prefix.clone()),
849 Self::DEFAULT_KEY_DOWNLOAD_PAGE,
850 start_key.clone(),
851 Some(at),
852 )
853 };
854
855 let child_keys = Retry::spawn(retry_strategy.clone(), get_child_keys_closure)
856 .await
857 .map_err(|e| {
858 error!(target: LOG_TARGET, "Error = {e:?}");
859 "rpc child_get_keys failed."
860 })?;
861
862 let keys_count = child_keys.len();
863 if keys_count == 0 {
864 break;
865 }
866
867 start_key = child_keys.last().cloned();
868 all_child_keys.extend(child_keys);
869
870 if keys_count < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize {
871 break;
872 }
873 }
874
875 debug!(
876 target: LOG_TARGET,
877 "[thread = {:?}] scraped {} child-keys of the child-bearing top key: {}",
878 std::thread::current().id(),
879 all_child_keys.len(),
880 HexDisplay::from(prefixed_top_key)
881 );
882
883 Ok(all_child_keys)
884 }
885}
886
887impl<B: BlockT> Builder<B>
888where
889 B::Hash: DeserializeOwned,
890 B::Header: DeserializeOwned,
891{
892 async fn load_child_remote(
901 &self,
902 top_kv: &[KeyValue],
903 pending_ext: &mut TestExternalities<HashingFor<B>>,
904 ) -> Result<ChildKeyValues> {
905 let child_roots = top_kv
906 .iter()
907 .filter(|(k, _)| is_default_child_storage_key(k.as_ref()))
908 .map(|(k, _)| k.clone())
909 .collect::<Vec<_>>();
910
911 if child_roots.is_empty() {
912 info!(target: LOG_TARGET, "๐ฉโ๐ฆ no child roots found to scrape");
913 return Ok(Default::default())
914 }
915
916 info!(
917 target: LOG_TARGET,
918 "๐ฉโ๐ฆ scraping child-tree data from {} top keys",
919 child_roots.len(),
920 );
921
922 let at = self.as_online().at_expected();
923
924 let client = self.as_online().rpc_client();
925 let mut child_kv = vec![];
926 for prefixed_top_key in child_roots {
927 let child_keys =
928 Self::rpc_child_get_keys(client, &prefixed_top_key, StorageKey(vec![]), at).await?;
929
930 let child_kv_inner =
931 Self::rpc_child_get_storage_paged(client, &prefixed_top_key, child_keys, at)
932 .await?;
933
934 let prefixed_top_key = PrefixedStorageKey::new(prefixed_top_key.clone().0);
935 let un_prefixed = match ChildType::from_prefixed_key(&prefixed_top_key) {
936 Some((ChildType::ParentKeyId, storage_key)) => storage_key,
937 None => {
938 error!(target: LOG_TARGET, "invalid key: {prefixed_top_key:?}");
939 return Err("Invalid child key")
940 },
941 };
942
943 let info = ChildInfo::new_default(un_prefixed);
944 let key_values =
945 child_kv_inner.iter().cloned().map(|(k, v)| (k.0, v.0)).collect::<Vec<_>>();
946 child_kv.push((info.clone(), child_kv_inner));
947 for (k, v) in key_values {
948 pending_ext.insert_child(info.clone(), k, v);
949 }
950 }
951
952 Ok(child_kv)
953 }
954
955 async fn load_top_remote(
960 &self,
961 pending_ext: &mut TestExternalities<HashingFor<B>>,
962 ) -> Result<TopKeyValues> {
963 let config = self.as_online();
964 let at = self
965 .as_online()
966 .at
967 .expect("online config must be initialized by this point; qed.");
968 info!(target: LOG_TARGET, "scraping key-pairs from remote at block height {at:?}");
969
970 let mut keys_and_values = Vec::new();
971 for prefix in &config.hashed_prefixes {
972 let now = std::time::Instant::now();
973 let additional_key_values =
974 self.rpc_get_pairs(StorageKey(prefix.to_vec()), at, pending_ext).await?;
975 let elapsed = now.elapsed();
976 info!(
977 target: LOG_TARGET,
978 "adding data for hashed prefix: {:?}, took {:.2}s",
979 HexDisplay::from(prefix),
980 elapsed.as_secs_f32()
981 );
982 keys_and_values.extend(additional_key_values);
983 }
984
985 for key in &config.hashed_keys {
986 let key = StorageKey(key.to_vec());
987 info!(
988 target: LOG_TARGET,
989 "adding data for hashed key: {:?}",
990 HexDisplay::from(&key)
991 );
992 match self.rpc_get_storage(key.clone(), Some(at)).await? {
993 Some(value) => {
994 pending_ext.insert(key.clone().0, value.clone().0);
995 keys_and_values.push((key, value));
996 },
997 None => {
998 warn!(
999 target: LOG_TARGET,
1000 "no data found for hashed key: {:?}",
1001 HexDisplay::from(&key)
1002 );
1003 },
1004 }
1005 }
1006
1007 Ok(keys_and_values)
1008 }
1009
1010 async fn init_remote_client(&mut self) -> Result<()> {
1014 self.as_online_mut().transport.init().await?;
1016
1017 if self.as_online().at.is_none() {
1019 let at = self.rpc_get_head().await?;
1020 info!(
1021 target: LOG_TARGET,
1022 "since no at is provided, setting it to latest finalized head, {at:?}",
1023 );
1024 self.as_online_mut().at = Some(at);
1025 }
1026
1027 let online_config = self.as_online_mut();
1029 online_config.pallets.iter().for_each(|p| {
1030 online_config
1031 .hashed_prefixes
1032 .push(sp_crypto_hashing::twox_128(p.as_bytes()).to_vec())
1033 });
1034
1035 if online_config.child_trie {
1036 online_config.hashed_prefixes.push(DEFAULT_CHILD_STORAGE_KEY_PREFIX.to_vec());
1037 }
1038
1039 if online_config
1042 .hashed_prefixes
1043 .iter()
1044 .filter(|p| *p != DEFAULT_CHILD_STORAGE_KEY_PREFIX)
1045 .count() == 0
1046 {
1047 info!(
1048 target: LOG_TARGET,
1049 "since no prefix is filtered, the data for all pallets will be downloaded"
1050 );
1051 online_config.hashed_prefixes.push(vec![]);
1052 }
1053
1054 Ok(())
1055 }
1056
1057 async fn load_header(&self) -> Result<B::Header> {
1058 let retry_strategy =
1059 FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES);
1060 let get_header_closure = || {
1061 ChainApi::<(), _, B::Header, ()>::header(
1062 self.as_online().rpc_client(),
1063 Some(self.as_online().at_expected()),
1064 )
1065 };
1066 Retry::spawn(retry_strategy, get_header_closure)
1067 .await
1068 .map_err(|_| "Failed to fetch header for block from network")?
1069 .ok_or("Network returned None block header")
1070 }
1071
1072 async fn load_remote_and_maybe_save(&mut self) -> Result<TestExternalities<HashingFor<B>>> {
1077 let state_version =
1078 StateApi::<B::Hash>::runtime_version(self.as_online().rpc_client(), None)
1079 .await
1080 .map_err(|e| {
1081 error!(target: LOG_TARGET, "Error = {e:?}");
1082 "rpc runtime_version failed."
1083 })
1084 .map(|v| v.state_version())?;
1085 let mut pending_ext = TestExternalities::new_with_code_and_state(
1086 Default::default(),
1087 Default::default(),
1088 self.overwrite_state_version.unwrap_or(state_version),
1089 );
1090
1091 let top_kv = self.load_top_remote(&mut pending_ext).await?;
1093 self.load_child_remote(&top_kv, &mut pending_ext).await?;
1094
1095 if let Some(path) = self.as_online().state_snapshot.clone().map(|c| c.path) {
1097 let (raw_storage, storage_root) = pending_ext.into_raw_snapshot();
1098 let snapshot = Snapshot::<B>::new(
1099 state_version,
1100 raw_storage.clone(),
1101 storage_root,
1102 self.load_header().await?,
1103 );
1104 let encoded = snapshot.encode();
1105 info!(
1106 target: LOG_TARGET,
1107 "writing snapshot of {} bytes to {path:?}",
1108 encoded.len(),
1109 );
1110 std::fs::write(path, encoded).map_err(|_| "fs::write failed")?;
1111
1112 return Ok(TestExternalities::from_raw_snapshot(
1114 raw_storage,
1115 storage_root,
1116 self.overwrite_state_version.unwrap_or(state_version),
1117 ))
1118 }
1119
1120 Ok(pending_ext)
1121 }
1122
1123 async fn do_load_remote(&mut self) -> Result<RemoteExternalities<B>> {
1124 self.init_remote_client().await?;
1125 let inner_ext = self.load_remote_and_maybe_save().await?;
1126 Ok(RemoteExternalities { header: self.load_header().await?, inner_ext })
1127 }
1128
1129 fn do_load_offline(&mut self, config: OfflineConfig) -> Result<RemoteExternalities<B>> {
1130 let (header, inner_ext) = logging::with_elapsed(
1131 || {
1132 info!(target: LOG_TARGET, "Loading snapshot from {:?}", &config.state_snapshot.path);
1133
1134 let Snapshot { header, state_version, raw_storage, storage_root, .. } =
1135 Snapshot::<B>::load(&config.state_snapshot.path)?;
1136 let inner_ext = TestExternalities::from_raw_snapshot(
1137 raw_storage,
1138 storage_root,
1139 self.overwrite_state_version.unwrap_or(state_version),
1140 );
1141
1142 Ok((header, inner_ext))
1143 },
1144 "Loading snapshot...",
1145 |_| "Loaded snapshot".into(),
1146 )?;
1147
1148 Ok(RemoteExternalities { inner_ext, header })
1149 }
1150
1151 pub(crate) async fn pre_build(mut self) -> Result<RemoteExternalities<B>> {
1152 let mut ext = match self.mode.clone() {
1153 Mode::Offline(config) => self.do_load_offline(config)?,
1154 Mode::Online(_) => self.do_load_remote().await?,
1155 Mode::OfflineOrElseOnline(offline_config, _) => {
1156 match self.do_load_offline(offline_config) {
1157 Ok(x) => x,
1158 Err(_) => self.do_load_remote().await?,
1159 }
1160 },
1161 };
1162
1163 if !self.hashed_key_values.is_empty() {
1165 info!(
1166 target: LOG_TARGET,
1167 "extending externalities with {} manually injected key-values",
1168 self.hashed_key_values.len()
1169 );
1170 ext.batch_insert(self.hashed_key_values.into_iter().map(|(k, v)| (k.0, v.0)));
1171 }
1172
1173 if !self.hashed_blacklist.is_empty() {
1175 info!(
1176 target: LOG_TARGET,
1177 "excluding externalities from {} keys",
1178 self.hashed_blacklist.len()
1179 );
1180 for k in self.hashed_blacklist {
1181 ext.execute_with(|| sp_io::storage::clear(&k));
1182 }
1183 }
1184
1185 Ok(ext)
1186 }
1187}
1188
1189impl<B: BlockT> Builder<B>
1191where
1192 B::Hash: DeserializeOwned,
1193 B::Header: DeserializeOwned,
1194{
1195 pub fn new() -> Self {
1197 Default::default()
1198 }
1199
1200 pub fn inject_hashed_key_value(mut self, injections: Vec<KeyValue>) -> Self {
1202 for i in injections {
1203 self.hashed_key_values.push(i.clone());
1204 }
1205 self
1206 }
1207
1208 pub fn blacklist_hashed_key(mut self, hashed: &[u8]) -> Self {
1211 self.hashed_blacklist.push(hashed.to_vec());
1212 self
1213 }
1214
1215 pub fn mode(mut self, mode: Mode<B::Hash>) -> Self {
1217 self.mode = mode;
1218 self
1219 }
1220
1221 pub fn overwrite_state_version(mut self, version: StateVersion) -> Self {
1223 self.overwrite_state_version = Some(version);
1224 self
1225 }
1226
1227 pub async fn build(self) -> Result<RemoteExternalities<B>> {
1228 let mut ext = self.pre_build().await?;
1229 ext.commit_all().unwrap();
1230
1231 info!(
1232 target: LOG_TARGET,
1233 "initialized state externalities with storage root {:?} and state_version {:?}",
1234 ext.as_backend().root(),
1235 ext.state_version
1236 );
1237
1238 Ok(ext)
1239 }
1240}
1241
1242#[cfg(test)]
1243mod test_prelude {
1244 pub(crate) use super::*;
1245 pub(crate) use sp_runtime::testing::{Block as RawBlock, MockCallU64};
1246 pub(crate) type UncheckedXt = sp_runtime::testing::TestXt<MockCallU64, ()>;
1247 pub(crate) type Block = RawBlock<UncheckedXt>;
1248
1249 pub(crate) fn init_logger() {
1250 sp_tracing::try_init_simple();
1251 }
1252}
1253
1254#[cfg(test)]
1255mod tests {
1256 use super::test_prelude::*;
1257
1258 #[tokio::test]
1259 async fn can_load_state_snapshot() {
1260 init_logger();
1261 Builder::<Block>::new()
1262 .mode(Mode::Offline(OfflineConfig {
1263 state_snapshot: SnapshotConfig::new("test_data/test.snap"),
1264 }))
1265 .build()
1266 .await
1267 .unwrap()
1268 .execute_with(|| {});
1269 }
1270
1271 #[tokio::test]
1272 async fn can_exclude_from_snapshot() {
1273 init_logger();
1274
1275 let some_key = Builder::<Block>::new()
1277 .mode(Mode::Offline(OfflineConfig {
1278 state_snapshot: SnapshotConfig::new("test_data/test.snap"),
1279 }))
1280 .build()
1281 .await
1282 .expect("Can't read state snapshot file")
1283 .execute_with(|| {
1284 let key =
1285 sp_io::storage::next_key(&[]).expect("some key must exist in the snapshot");
1286 assert!(sp_io::storage::get(&key).is_some());
1287 key
1288 });
1289
1290 Builder::<Block>::new()
1291 .mode(Mode::Offline(OfflineConfig {
1292 state_snapshot: SnapshotConfig::new("test_data/test.snap"),
1293 }))
1294 .blacklist_hashed_key(&some_key)
1295 .build()
1296 .await
1297 .expect("Can't read state snapshot file")
1298 .execute_with(|| assert!(sp_io::storage::get(&some_key).is_none()));
1299 }
1300}
1301
1302#[cfg(all(test, feature = "remote-test"))]
1303mod remote_tests {
1304 use super::test_prelude::*;
1305 use std::{env, os::unix::fs::MetadataExt};
1306
1307 fn endpoint() -> String {
1308 env::var("TEST_WS").unwrap_or_else(|_| DEFAULT_HTTP_ENDPOINT.to_string())
1309 }
1310
1311 #[tokio::test]
1312 async fn state_version_is_kept_and_can_be_altered() {
1313 const CACHE: &'static str = "state_version_is_kept_and_can_be_altered";
1314 init_logger();
1315
1316 let ext = Builder::<Block>::new()
1318 .mode(Mode::Online(OnlineConfig {
1319 transport: endpoint().clone().into(),
1320 pallets: vec!["Proxy".to_owned()],
1321 child_trie: false,
1322 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1323 ..Default::default()
1324 }))
1325 .build()
1326 .await
1327 .unwrap();
1328
1329 let cached_ext = Builder::<Block>::new()
1331 .mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
1332 .build()
1333 .await
1334 .unwrap();
1335
1336 assert_eq!(ext.state_version, cached_ext.state_version);
1337
1338 let other = match ext.state_version {
1340 StateVersion::V0 => StateVersion::V1,
1341 StateVersion::V1 => StateVersion::V0,
1342 };
1343 let cached_ext = Builder::<Block>::new()
1344 .mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
1345 .overwrite_state_version(other)
1346 .build()
1347 .await
1348 .unwrap();
1349
1350 assert_eq!(cached_ext.state_version, other);
1351 }
1352
1353 #[tokio::test]
1354 async fn snapshot_block_hash_works() {
1355 const CACHE: &'static str = "snapshot_block_hash_works";
1356 init_logger();
1357
1358 let ext = Builder::<Block>::new()
1360 .mode(Mode::Online(OnlineConfig {
1361 transport: endpoint().clone().into(),
1362 pallets: vec!["Proxy".to_owned()],
1363 child_trie: false,
1364 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1365 ..Default::default()
1366 }))
1367 .build()
1368 .await
1369 .unwrap();
1370
1371 let cached_ext = Builder::<Block>::new()
1373 .mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
1374 .build()
1375 .await
1376 .unwrap();
1377
1378 assert_eq!(ext.header.hash(), cached_ext.header.hash());
1379 }
1380
1381 #[tokio::test]
1382 async fn child_keys_are_loaded() {
1383 const CACHE: &'static str = "snapshot_retains_storage";
1384 init_logger();
1385
1386 let mut child_ext = Builder::<Block>::new()
1388 .mode(Mode::Online(OnlineConfig {
1389 transport: endpoint().clone().into(),
1390 pallets: vec!["Proxy".to_owned()],
1391 child_trie: true,
1392 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1393 ..Default::default()
1394 }))
1395 .build()
1396 .await
1397 .unwrap();
1398
1399 let mut ext = Builder::<Block>::new()
1401 .mode(Mode::Online(OnlineConfig {
1402 transport: endpoint().clone().into(),
1403 pallets: vec!["Proxy".to_owned()],
1404 child_trie: false,
1405 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1406 ..Default::default()
1407 }))
1408 .build()
1409 .await
1410 .unwrap();
1411
1412 assert!(
1414 child_ext.as_backend().backend_storage().keys().len() >
1415 ext.as_backend().backend_storage().keys().len()
1416 );
1417 }
1418
1419 #[tokio::test]
1420 async fn offline_else_online_works() {
1421 const CACHE: &'static str = "offline_else_online_works_data";
1422 init_logger();
1423 Builder::<Block>::new()
1425 .mode(Mode::OfflineOrElseOnline(
1426 OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) },
1427 OnlineConfig {
1428 transport: endpoint().clone().into(),
1429 pallets: vec!["Proxy".to_owned()],
1430 child_trie: false,
1431 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1432 ..Default::default()
1433 },
1434 ))
1435 .build()
1436 .await
1437 .unwrap()
1438 .execute_with(|| {});
1439
1440 Builder::<Block>::new()
1442 .mode(Mode::OfflineOrElseOnline(
1443 OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) },
1444 OnlineConfig {
1445 transport: "ws://non-existent:666".to_owned().into(),
1446 ..Default::default()
1447 },
1448 ))
1449 .build()
1450 .await
1451 .unwrap()
1452 .execute_with(|| {});
1453
1454 let to_delete = std::fs::read_dir(Path::new("."))
1455 .unwrap()
1456 .into_iter()
1457 .map(|d| d.unwrap())
1458 .filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
1459 .collect::<Vec<_>>();
1460
1461 assert!(to_delete.len() == 1);
1462 std::fs::remove_file(to_delete[0].path()).unwrap();
1463 }
1464
1465 #[tokio::test]
1466 async fn can_build_one_small_pallet() {
1467 init_logger();
1468 Builder::<Block>::new()
1469 .mode(Mode::Online(OnlineConfig {
1470 transport: endpoint().clone().into(),
1471 pallets: vec!["Proxy".to_owned()],
1472 child_trie: false,
1473 ..Default::default()
1474 }))
1475 .build()
1476 .await
1477 .unwrap()
1478 .execute_with(|| {});
1479 }
1480
1481 #[tokio::test]
1482 async fn can_build_few_pallet() {
1483 init_logger();
1484 Builder::<Block>::new()
1485 .mode(Mode::Online(OnlineConfig {
1486 transport: endpoint().clone().into(),
1487 pallets: vec!["Proxy".to_owned(), "Multisig".to_owned()],
1488 child_trie: false,
1489 ..Default::default()
1490 }))
1491 .build()
1492 .await
1493 .unwrap()
1494 .execute_with(|| {});
1495 }
1496
1497 #[tokio::test(flavor = "multi_thread")]
1498 async fn can_create_snapshot() {
1499 const CACHE: &'static str = "can_create_snapshot";
1500 init_logger();
1501
1502 Builder::<Block>::new()
1503 .mode(Mode::Online(OnlineConfig {
1504 transport: endpoint().clone().into(),
1505 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1506 pallets: vec!["Proxy".to_owned()],
1507 child_trie: false,
1508 ..Default::default()
1509 }))
1510 .build()
1511 .await
1512 .unwrap()
1513 .execute_with(|| {});
1514
1515 let to_delete = std::fs::read_dir(Path::new("."))
1516 .unwrap()
1517 .into_iter()
1518 .map(|d| d.unwrap())
1519 .filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
1520 .collect::<Vec<_>>();
1521
1522 assert!(to_delete.len() == 1);
1523 let to_delete = to_delete.first().unwrap();
1524 assert!(std::fs::metadata(to_delete.path()).unwrap().size() > 1);
1525 std::fs::remove_file(to_delete.path()).unwrap();
1526 }
1527
1528 #[tokio::test]
1529 async fn can_create_child_snapshot() {
1530 const CACHE: &'static str = "can_create_child_snapshot";
1531 init_logger();
1532 Builder::<Block>::new()
1533 .mode(Mode::Online(OnlineConfig {
1534 transport: endpoint().clone().into(),
1535 state_snapshot: Some(SnapshotConfig::new(CACHE)),
1536 pallets: vec!["Crowdloan".to_owned()],
1537 child_trie: true,
1538 ..Default::default()
1539 }))
1540 .build()
1541 .await
1542 .unwrap()
1543 .execute_with(|| {});
1544
1545 let to_delete = std::fs::read_dir(Path::new("."))
1546 .unwrap()
1547 .into_iter()
1548 .map(|d| d.unwrap())
1549 .filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
1550 .collect::<Vec<_>>();
1551
1552 assert!(to_delete.len() == 1);
1553 let to_delete = to_delete.first().unwrap();
1554 assert!(std::fs::metadata(to_delete.path()).unwrap().size() > 1);
1555 std::fs::remove_file(to_delete.path()).unwrap();
1556 }
1557
1558 #[tokio::test]
1559 async fn can_build_big_pallet() {
1560 if std::option_env!("TEST_WS").is_none() {
1561 return
1562 }
1563 init_logger();
1564 Builder::<Block>::new()
1565 .mode(Mode::Online(OnlineConfig {
1566 transport: endpoint().clone().into(),
1567 pallets: vec!["Staking".to_owned()],
1568 child_trie: false,
1569 ..Default::default()
1570 }))
1571 .build()
1572 .await
1573 .unwrap()
1574 .execute_with(|| {});
1575 }
1576
1577 #[tokio::test]
1578 async fn can_fetch_all() {
1579 if std::option_env!("TEST_WS").is_none() {
1580 return
1581 }
1582 init_logger();
1583 Builder::<Block>::new()
1584 .mode(Mode::Online(OnlineConfig {
1585 transport: endpoint().clone().into(),
1586 ..Default::default()
1587 }))
1588 .build()
1589 .await
1590 .unwrap()
1591 .execute_with(|| {});
1592 }
1593
1594 #[tokio::test]
1595 async fn can_fetch_in_parallel() {
1596 init_logger();
1597
1598 let mut builder = Builder::<Block>::new().mode(Mode::Online(OnlineConfig {
1599 transport: endpoint().clone().into(),
1600 ..Default::default()
1601 }));
1602 builder.init_remote_client().await.unwrap();
1603
1604 let at = builder.as_online().at.unwrap();
1605
1606 let prefix = StorageKey(vec![13]);
1607 let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap();
1608 let para = builder.rpc_get_keys_parallel(&prefix, at, 4).await.unwrap();
1609 assert_eq!(paged, para);
1610
1611 let prefix = StorageKey(vec![]);
1612 let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap();
1613 let para = builder.rpc_get_keys_parallel(&prefix, at, 8).await.unwrap();
1614 assert_eq!(paged, para);
1615 }
1616}