use async_recursion::async_recursion;
use codec::{Compact, Decode, Encode};
use indicatif::{ProgressBar, ProgressStyle};
use jsonrpsee::{
core::params::ArrayParams,
http_client::{HttpClient, HttpClientBuilder},
};
use log::*;
use serde::de::DeserializeOwned;
use sp_core::{
hashing::twox_128,
hexdisplay::HexDisplay,
storage::{
well_known_keys::{is_default_child_storage_key, DEFAULT_CHILD_STORAGE_KEY_PREFIX},
ChildInfo, ChildType, PrefixedStorageKey, StorageData, StorageKey,
},
};
use sp_runtime::{
traits::{Block as BlockT, HashingFor},
StateVersion,
};
use sp_state_machine::TestExternalities;
use spinners::{Spinner, Spinners};
use std::{
cmp::max,
fs,
ops::{Deref, DerefMut},
path::{Path, PathBuf},
time::{Duration, Instant},
};
use substrate_rpc_client::{rpc_params, BatchRequestBuilder, ChainApi, ClientT, StateApi};
use tokio_retry::{strategy::FixedInterval, Retry};
type KeyValue = (StorageKey, StorageData);
type TopKeyValues = Vec<KeyValue>;
type ChildKeyValues = Vec<(ChildInfo, Vec<KeyValue>)>;
type SnapshotVersion = Compact<u16>;
const LOG_TARGET: &str = "remote-ext";
const DEFAULT_HTTP_ENDPOINT: &str = "https://rpc.polkadot.io:443";
const SNAPSHOT_VERSION: SnapshotVersion = Compact(3);
#[derive(Decode, Encode)]
struct Snapshot<B: BlockT> {
snapshot_version: SnapshotVersion,
state_version: StateVersion,
block_hash: B::Hash,
raw_storage: Vec<(Vec<u8>, (Vec<u8>, i32))>,
storage_root: B::Hash,
}
impl<B: BlockT> Snapshot<B> {
pub fn new(
state_version: StateVersion,
block_hash: B::Hash,
raw_storage: Vec<(Vec<u8>, (Vec<u8>, i32))>,
storage_root: B::Hash,
) -> Self {
Self {
snapshot_version: SNAPSHOT_VERSION,
state_version,
block_hash,
raw_storage,
storage_root,
}
}
fn load(path: &PathBuf) -> Result<Snapshot<B>, &'static str> {
let bytes = fs::read(path).map_err(|_| "fs::read failed.")?;
let snapshot_version = SnapshotVersion::decode(&mut &*bytes)
.map_err(|_| "Failed to decode snapshot version")?;
if snapshot_version != SNAPSHOT_VERSION {
return Err("Unsupported snapshot version detected. Please create a new snapshot.")
}
Decode::decode(&mut &*bytes).map_err(|_| "Decode failed")
}
}
pub struct RemoteExternalities<B: BlockT> {
pub inner_ext: TestExternalities<HashingFor<B>>,
pub block_hash: B::Hash,
}
impl<B: BlockT> Deref for RemoteExternalities<B> {
type Target = TestExternalities<HashingFor<B>>;
fn deref(&self) -> &Self::Target {
&self.inner_ext
}
}
impl<B: BlockT> DerefMut for RemoteExternalities<B> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner_ext
}
}
#[derive(Clone)]
pub enum Mode<B: BlockT> {
Online(OnlineConfig<B>),
Offline(OfflineConfig),
OfflineOrElseOnline(OfflineConfig, OnlineConfig<B>),
}
impl<B: BlockT> Default for Mode<B> {
fn default() -> Self {
Mode::Online(OnlineConfig::default())
}
}
#[derive(Clone)]
pub struct OfflineConfig {
pub state_snapshot: SnapshotConfig,
}
#[derive(Debug, Clone)]
pub enum Transport {
Uri(String),
RemoteClient(HttpClient),
}
impl Transport {
fn as_client(&self) -> Option<&HttpClient> {
match self {
Self::RemoteClient(client) => Some(client),
_ => None,
}
}
async fn init(&mut self) -> Result<(), &'static str> {
if let Self::Uri(uri) = self {
log::debug!(target: LOG_TARGET, "initializing remote client to {:?}", uri);
let uri = if uri.starts_with("ws://") {
let uri = uri.replace("ws://", "http://");
log::info!(target: LOG_TARGET, "replacing ws:// in uri with http://: {:?} (ws is currently unstable for fetching remote storage, for more see https://github.com/paritytech/jsonrpsee/issues/1086)", uri);
uri
} else if uri.starts_with("wss://") {
let uri = uri.replace("wss://", "https://");
log::info!(target: LOG_TARGET, "replacing wss:// in uri with https://: {:?} (ws is currently unstable for fetching remote storage, for more see https://github.com/paritytech/jsonrpsee/issues/1086)", uri);
uri
} else {
uri.clone()
};
let http_client = HttpClientBuilder::default()
.max_request_body_size(u32::MAX)
.request_timeout(std::time::Duration::from_secs(60 * 5))
.build(uri)
.map_err(|e| {
log::error!(target: LOG_TARGET, "error: {:?}", e);
"failed to build http client"
})?;
*self = Self::RemoteClient(http_client)
}
Ok(())
}
}
impl From<String> for Transport {
fn from(uri: String) -> Self {
Transport::Uri(uri)
}
}
impl From<HttpClient> for Transport {
fn from(client: HttpClient) -> Self {
Transport::RemoteClient(client)
}
}
#[derive(Clone)]
pub struct OnlineConfig<B: BlockT> {
pub at: Option<B::Hash>,
pub state_snapshot: Option<SnapshotConfig>,
pub pallets: Vec<String>,
pub transport: Transport,
pub child_trie: bool,
pub hashed_prefixes: Vec<Vec<u8>>,
pub hashed_keys: Vec<Vec<u8>>,
}
impl<B: BlockT> OnlineConfig<B> {
fn rpc_client(&self) -> &HttpClient {
self.transport
.as_client()
.expect("http client must have been initialized by now; qed.")
}
fn at_expected(&self) -> B::Hash {
self.at.expect("block at must be initialized; qed")
}
}
impl<B: BlockT> Default for OnlineConfig<B> {
fn default() -> Self {
Self {
transport: Transport::from(DEFAULT_HTTP_ENDPOINT.to_owned()),
child_trie: true,
at: None,
state_snapshot: None,
pallets: Default::default(),
hashed_keys: Default::default(),
hashed_prefixes: Default::default(),
}
}
}
impl<B: BlockT> From<String> for OnlineConfig<B> {
fn from(t: String) -> Self {
Self { transport: t.into(), ..Default::default() }
}
}
#[derive(Clone)]
pub struct SnapshotConfig {
pub path: PathBuf,
}
impl SnapshotConfig {
pub fn new<P: Into<PathBuf>>(path: P) -> Self {
Self { path: path.into() }
}
}
impl From<String> for SnapshotConfig {
fn from(s: String) -> Self {
Self::new(s)
}
}
impl Default for SnapshotConfig {
fn default() -> Self {
Self { path: Path::new("SNAPSHOT").into() }
}
}
pub struct Builder<B: BlockT> {
hashed_key_values: Vec<KeyValue>,
hashed_blacklist: Vec<Vec<u8>>,
mode: Mode<B>,
overwrite_state_version: Option<StateVersion>,
}
impl<B: BlockT> Default for Builder<B> {
fn default() -> Self {
Self {
mode: Default::default(),
hashed_key_values: Default::default(),
hashed_blacklist: Default::default(),
overwrite_state_version: None,
}
}
}
impl<B: BlockT> Builder<B> {
fn as_online(&self) -> &OnlineConfig<B> {
match &self.mode {
Mode::Online(config) => config,
Mode::OfflineOrElseOnline(_, config) => config,
_ => panic!("Unexpected mode: Online"),
}
}
fn as_online_mut(&mut self) -> &mut OnlineConfig<B> {
match &mut self.mode {
Mode::Online(config) => config,
Mode::OfflineOrElseOnline(_, config) => config,
_ => panic!("Unexpected mode: Online"),
}
}
}
impl<B: BlockT> Builder<B>
where
B::Hash: DeserializeOwned,
B::Header: DeserializeOwned,
{
const PARALLEL_REQUESTS: usize = 4;
const BATCH_SIZE_INCREASE_FACTOR: f32 = 1.10;
const BATCH_SIZE_DECREASE_FACTOR: f32 = 0.50;
const INITIAL_BATCH_SIZE: usize = 5000;
const DEFAULT_KEY_DOWNLOAD_PAGE: u32 = 1000;
const KEYS_PAGE_MAX_RETRIES: usize = 12;
const KEYS_PAGE_RETRY_INTERVAL: Duration = Duration::from_secs(5);
async fn rpc_get_storage(
&self,
key: StorageKey,
maybe_at: Option<B::Hash>,
) -> Result<Option<StorageData>, &'static str> {
trace!(target: LOG_TARGET, "rpc: get_storage");
self.as_online().rpc_client().storage(key, maybe_at).await.map_err(|e| {
error!(target: LOG_TARGET, "Error = {:?}", e);
"rpc get_storage failed."
})
}
async fn rpc_get_head(&self) -> Result<B::Hash, &'static str> {
trace!(target: LOG_TARGET, "rpc: finalized_head");
ChainApi::<(), _, B::Header, ()>::finalized_head(self.as_online().rpc_client())
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error = {:?}", e);
"rpc finalized_head failed."
})
}
async fn get_keys_single_page(
&self,
prefix: Option<StorageKey>,
start_key: Option<StorageKey>,
at: B::Hash,
) -> Result<Vec<StorageKey>, &'static str> {
self.as_online()
.rpc_client()
.storage_keys_paged(prefix, Self::DEFAULT_KEY_DOWNLOAD_PAGE, start_key, Some(at))
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error = {:?}", e);
"rpc get_keys failed"
})
}
async fn rpc_get_keys_paged(
&self,
prefix: StorageKey,
at: B::Hash,
) -> Result<Vec<StorageKey>, &'static str> {
let mut last_key: Option<StorageKey> = None;
let mut all_keys: Vec<StorageKey> = vec![];
let keys = loop {
let retry_strategy = FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL)
.take(Self::KEYS_PAGE_MAX_RETRIES);
let get_page_closure =
|| self.get_keys_single_page(Some(prefix.clone()), last_key.clone(), at);
let page = Retry::spawn(retry_strategy, get_page_closure).await?;
let page_len = page.len();
all_keys.extend(page);
if page_len < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize {
log::debug!(target: LOG_TARGET, "last page received: {}", page_len);
break all_keys
} else {
let new_last_key =
all_keys.last().expect("all_keys is populated; has .last(); qed");
log::debug!(
target: LOG_TARGET,
"new total = {}, full page received: {}",
all_keys.len(),
HexDisplay::from(new_last_key)
);
last_key = Some(new_last_key.clone());
};
};
Ok(keys)
}
#[async_recursion]
async fn get_storage_data_dynamic_batch_size(
client: &HttpClient,
payloads: Vec<(String, ArrayParams)>,
batch_size: usize,
bar: &ProgressBar,
) -> Result<Vec<Option<StorageData>>, String> {
if payloads.is_empty() {
return Ok(vec![])
};
log::debug!(
target: LOG_TARGET,
"Remaining payloads: {} Batch request size: {}",
payloads.len(),
batch_size,
);
let page = payloads.iter().take(batch_size).cloned().collect::<Vec<_>>();
let mut batch = BatchRequestBuilder::new();
for (method, params) in page.iter() {
batch
.insert(method, params.clone())
.map_err(|_| "Invalid batch method and/or params")?
}
let batch_response = match client.batch_request::<Option<StorageData>>(batch).await {
Ok(batch_response) => batch_response,
Err(e) => {
if batch_size < 2 {
return Err(e.to_string())
}
log::debug!(
target: LOG_TARGET,
"Batch request failed, trying again with smaller batch size. {}",
e.to_string()
);
return Self::get_storage_data_dynamic_batch_size(
client,
payloads,
max(1, (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize),
bar,
)
.await
},
};
let mut data: Vec<Option<StorageData>> = vec![];
let batch_response_len = batch_response.len();
for item in batch_response.into_iter() {
match item {
Ok(x) => data.push(x),
Err(e) => return Err(e.message().to_string()),
}
}
bar.inc(batch_response_len as u64);
let remaining_payloads = payloads.iter().skip(batch_size).cloned().collect::<Vec<_>>();
let mut rest = Self::get_storage_data_dynamic_batch_size(
client,
remaining_payloads,
max(batch_size + 1, (batch_size as f32 * Self::BATCH_SIZE_INCREASE_FACTOR) as usize),
bar,
)
.await?;
data.append(&mut rest);
Ok(data)
}
pub(crate) async fn rpc_get_pairs_paged(
&self,
prefix: StorageKey,
at: B::Hash,
pending_ext: &mut TestExternalities<HashingFor<B>>,
) -> Result<Vec<KeyValue>, &'static str> {
let start = Instant::now();
let mut sp = Spinner::with_timer(Spinners::Dots, "Scraping keys...".into());
let keys = self
.rpc_get_keys_paged(prefix.clone(), at)
.await?
.into_iter()
.collect::<Vec<_>>();
sp.stop_with_message(format!(
"✅ Found {} keys ({:.2}s)",
keys.len(),
start.elapsed().as_secs_f32()
));
if keys.is_empty() {
return Ok(Default::default())
}
let client = self.as_online().rpc_client();
let payloads = keys
.iter()
.map(|key| ("state_getStorage".to_string(), rpc_params!(key, at)))
.collect::<Vec<_>>();
let bar = ProgressBar::new(payloads.len() as u64);
bar.enable_steady_tick(Duration::from_secs(1));
bar.set_message("Downloading key values".to_string());
bar.set_style(
ProgressStyle::with_template(
"[{elapsed_precise}] {msg} {per_sec} [{wide_bar}] {pos}/{len} ({eta})",
)
.unwrap()
.progress_chars("=>-"),
);
let payloads_chunked = payloads.chunks((&payloads.len() / Self::PARALLEL_REQUESTS).max(1));
let requests = payloads_chunked.map(|payload_chunk| {
Self::get_storage_data_dynamic_batch_size(
&client,
payload_chunk.to_vec(),
Self::INITIAL_BATCH_SIZE,
&bar,
)
});
let storage_data_result: Result<Vec<_>, _> =
futures::future::join_all(requests).await.into_iter().collect();
let storage_data = match storage_data_result {
Ok(storage_data) => storage_data.into_iter().flatten().collect::<Vec<_>>(),
Err(e) => {
log::error!(target: LOG_TARGET, "Error while getting storage data: {}", e);
return Err("Error while getting storage data")
},
};
bar.finish_with_message("✅ Downloaded key values");
print!("\n");
assert_eq!(keys.len(), storage_data.len());
let key_values = keys
.iter()
.zip(storage_data)
.map(|(key, maybe_value)| match maybe_value {
Some(data) => (key.clone(), data),
None => {
log::warn!(target: LOG_TARGET, "key {:?} had none corresponding value.", &key);
let data = StorageData(vec![]);
(key.clone(), data)
},
})
.collect::<Vec<_>>();
let mut sp = Spinner::with_timer(Spinners::Dots, "Inserting keys into DB...".into());
let start = Instant::now();
pending_ext.batch_insert(key_values.clone().into_iter().filter_map(|(k, v)| {
match is_default_child_storage_key(&k.0) {
true => None,
false => Some((k.0, v.0)),
}
}));
sp.stop_with_message(format!(
"✅ Inserted keys into DB ({:.2}s)",
start.elapsed().as_secs_f32()
));
Ok(key_values)
}
pub(crate) async fn rpc_child_get_storage_paged(
client: &HttpClient,
prefixed_top_key: &StorageKey,
child_keys: Vec<StorageKey>,
at: B::Hash,
) -> Result<Vec<KeyValue>, &'static str> {
let child_keys_len = child_keys.len();
let payloads = child_keys
.iter()
.map(|key| {
(
"childstate_getStorage".to_string(),
rpc_params![
PrefixedStorageKey::new(prefixed_top_key.as_ref().to_vec()),
key,
at
],
)
})
.collect::<Vec<_>>();
let bar = ProgressBar::new(payloads.len() as u64);
let storage_data = match Self::get_storage_data_dynamic_batch_size(
client,
payloads,
Self::INITIAL_BATCH_SIZE,
&bar,
)
.await
{
Ok(storage_data) => storage_data,
Err(e) => {
log::error!(target: LOG_TARGET, "batch processing failed: {:?}", e);
return Err("batch processing failed")
},
};
assert_eq!(child_keys_len, storage_data.len());
Ok(child_keys
.iter()
.zip(storage_data)
.map(|(key, maybe_value)| match maybe_value {
Some(v) => (key.clone(), v),
None => {
log::warn!(target: LOG_TARGET, "key {:?} had no corresponding value.", &key);
(key.clone(), StorageData(vec![]))
},
})
.collect::<Vec<_>>())
}
pub(crate) async fn rpc_child_get_keys(
client: &HttpClient,
prefixed_top_key: &StorageKey,
child_prefix: StorageKey,
at: B::Hash,
) -> Result<Vec<StorageKey>, &'static str> {
#[allow(warnings)]
let child_keys = substrate_rpc_client::ChildStateApi::storage_keys(
client,
PrefixedStorageKey::new(prefixed_top_key.as_ref().to_vec()),
child_prefix,
Some(at),
)
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error = {:?}", e);
"rpc child_get_keys failed."
})?;
debug!(
target: LOG_TARGET,
"[thread = {:?}] scraped {} child-keys of the child-bearing top key: {}",
std::thread::current().id(),
child_keys.len(),
HexDisplay::from(prefixed_top_key)
);
Ok(child_keys)
}
}
impl<B: BlockT + DeserializeOwned> Builder<B>
where
B::Hash: DeserializeOwned,
B::Header: DeserializeOwned,
{
async fn load_child_remote(
&self,
top_kv: &[KeyValue],
pending_ext: &mut TestExternalities<HashingFor<B>>,
) -> Result<ChildKeyValues, &'static str> {
let child_roots = top_kv
.into_iter()
.filter_map(|(k, _)| is_default_child_storage_key(k.as_ref()).then(|| k.clone()))
.collect::<Vec<_>>();
if child_roots.is_empty() {
info!(target: LOG_TARGET, "👩👦 no child roots found to scrape",);
return Ok(Default::default())
}
info!(
target: LOG_TARGET,
"👩👦 scraping child-tree data from {} top keys",
child_roots.len(),
);
let at = self.as_online().at_expected();
let client = self.as_online().rpc_client();
let mut child_kv = vec![];
for prefixed_top_key in child_roots {
let child_keys =
Self::rpc_child_get_keys(&client, &prefixed_top_key, StorageKey(vec![]), at)
.await?;
let child_kv_inner =
Self::rpc_child_get_storage_paged(&client, &prefixed_top_key, child_keys, at)
.await?;
let prefixed_top_key = PrefixedStorageKey::new(prefixed_top_key.clone().0);
let un_prefixed = match ChildType::from_prefixed_key(&prefixed_top_key) {
Some((ChildType::ParentKeyId, storage_key)) => storage_key,
None => {
log::error!(target: LOG_TARGET, "invalid key: {:?}", prefixed_top_key);
return Err("Invalid child key")
},
};
let info = ChildInfo::new_default(un_prefixed);
let key_values =
child_kv_inner.iter().cloned().map(|(k, v)| (k.0, v.0)).collect::<Vec<_>>();
child_kv.push((info.clone(), child_kv_inner));
for (k, v) in key_values {
pending_ext.insert_child(info.clone(), k, v);
}
}
Ok(child_kv)
}
async fn load_top_remote(
&self,
pending_ext: &mut TestExternalities<HashingFor<B>>,
) -> Result<TopKeyValues, &'static str> {
let config = self.as_online();
let at = self
.as_online()
.at
.expect("online config must be initialized by this point; qed.");
log::info!(target: LOG_TARGET, "scraping key-pairs from remote at block height {:?}", at);
let mut keys_and_values = Vec::new();
for prefix in &config.hashed_prefixes {
let now = std::time::Instant::now();
let additional_key_values =
self.rpc_get_pairs_paged(StorageKey(prefix.to_vec()), at, pending_ext).await?;
let elapsed = now.elapsed();
log::info!(
target: LOG_TARGET,
"adding data for hashed prefix: {:?}, took {:.2}s",
HexDisplay::from(prefix),
elapsed.as_secs_f32()
);
keys_and_values.extend(additional_key_values);
}
for key in &config.hashed_keys {
let key = StorageKey(key.to_vec());
log::info!(
target: LOG_TARGET,
"adding data for hashed key: {:?}",
HexDisplay::from(&key)
);
match self.rpc_get_storage(key.clone(), Some(at)).await? {
Some(value) => {
pending_ext.insert(key.clone().0, value.clone().0);
keys_and_values.push((key, value));
},
None => {
log::warn!(
target: LOG_TARGET,
"no data found for hashed key: {:?}",
HexDisplay::from(&key)
);
},
}
}
Ok(keys_and_values)
}
async fn init_remote_client(&mut self) -> Result<(), &'static str> {
self.as_online_mut().transport.init().await?;
if self.as_online().at.is_none() {
let at = self.rpc_get_head().await?;
log::info!(
target: LOG_TARGET,
"since no at is provided, setting it to latest finalized head, {:?}",
at
);
self.as_online_mut().at = Some(at);
}
let online_config = self.as_online_mut();
online_config
.pallets
.iter()
.for_each(|p| online_config.hashed_prefixes.push(twox_128(p.as_bytes()).to_vec()));
if online_config.child_trie {
online_config.hashed_prefixes.push(DEFAULT_CHILD_STORAGE_KEY_PREFIX.to_vec());
}
if online_config
.hashed_prefixes
.iter()
.filter(|p| *p != DEFAULT_CHILD_STORAGE_KEY_PREFIX)
.count() == 0
{
log::info!(
target: LOG_TARGET,
"since no prefix is filtered, the data for all pallets will be downloaded"
);
online_config.hashed_prefixes.push(vec![]);
}
Ok(())
}
async fn load_remote_and_maybe_save(
&mut self,
) -> Result<TestExternalities<HashingFor<B>>, &'static str> {
let state_version =
StateApi::<B::Hash>::runtime_version(self.as_online().rpc_client(), None)
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error = {:?}", e);
"rpc runtime_version failed."
})
.map(|v| v.state_version())?;
let mut pending_ext = TestExternalities::new_with_code_and_state(
Default::default(),
Default::default(),
self.overwrite_state_version.unwrap_or(state_version),
);
let top_kv = self.load_top_remote(&mut pending_ext).await?;
self.load_child_remote(&top_kv, &mut pending_ext).await?;
if let Some(path) = self.as_online().state_snapshot.clone().map(|c| c.path) {
let (raw_storage, storage_root) = pending_ext.into_raw_snapshot();
let snapshot = Snapshot::<B>::new(
state_version,
self.as_online()
.at
.expect("set to `Some` in `init_remote_client`; must be called before; qed"),
raw_storage.clone(),
storage_root,
);
let encoded = snapshot.encode();
log::info!(
target: LOG_TARGET,
"writing snapshot of {} bytes to {:?}",
encoded.len(),
path
);
std::fs::write(path, encoded).map_err(|_| "fs::write failed")?;
return Ok(TestExternalities::from_raw_snapshot(
raw_storage,
storage_root,
self.overwrite_state_version.unwrap_or(state_version),
))
}
Ok(pending_ext)
}
async fn do_load_remote(&mut self) -> Result<RemoteExternalities<B>, &'static str> {
self.init_remote_client().await?;
let block_hash = self.as_online().at_expected();
let inner_ext = self.load_remote_and_maybe_save().await?;
Ok(RemoteExternalities { block_hash, inner_ext })
}
fn do_load_offline(
&mut self,
config: OfflineConfig,
) -> Result<RemoteExternalities<B>, &'static str> {
let mut sp = Spinner::with_timer(Spinners::Dots, "Loading snapshot...".into());
let start = Instant::now();
info!(target: LOG_TARGET, "Loading snapshot from {:?}", &config.state_snapshot.path);
let Snapshot { snapshot_version: _, block_hash, state_version, raw_storage, storage_root } =
Snapshot::<B>::load(&config.state_snapshot.path)?;
let inner_ext = TestExternalities::from_raw_snapshot(
raw_storage,
storage_root,
self.overwrite_state_version.unwrap_or(state_version),
);
sp.stop_with_message(format!("✅ Loaded snapshot ({:.2}s)", start.elapsed().as_secs_f32()));
Ok(RemoteExternalities { inner_ext, block_hash })
}
pub(crate) async fn pre_build(mut self) -> Result<RemoteExternalities<B>, &'static str> {
let mut ext = match self.mode.clone() {
Mode::Offline(config) => self.do_load_offline(config)?,
Mode::Online(_) => self.do_load_remote().await?,
Mode::OfflineOrElseOnline(offline_config, _) => {
match self.do_load_offline(offline_config) {
Ok(x) => x,
Err(_) => self.do_load_remote().await?,
}
},
};
if !self.hashed_key_values.is_empty() {
log::info!(
target: LOG_TARGET,
"extending externalities with {} manually injected key-values",
self.hashed_key_values.len()
);
ext.batch_insert(self.hashed_key_values.into_iter().map(|(k, v)| (k.0, v.0)));
}
if !self.hashed_blacklist.is_empty() {
log::info!(
target: LOG_TARGET,
"excluding externalities from {} keys",
self.hashed_blacklist.len()
);
for k in self.hashed_blacklist {
ext.execute_with(|| sp_io::storage::clear(&k));
}
}
Ok(ext)
}
}
impl<B: BlockT + DeserializeOwned> Builder<B>
where
B::Hash: DeserializeOwned,
B::Header: DeserializeOwned,
{
pub fn new() -> Self {
Default::default()
}
pub fn inject_hashed_key_value(mut self, injections: Vec<KeyValue>) -> Self {
for i in injections {
self.hashed_key_values.push(i.clone());
}
self
}
pub fn blacklist_hashed_key(mut self, hashed: &[u8]) -> Self {
self.hashed_blacklist.push(hashed.to_vec());
self
}
pub fn mode(mut self, mode: Mode<B>) -> Self {
self.mode = mode;
self
}
pub fn overwrite_state_version(mut self, version: StateVersion) -> Self {
self.overwrite_state_version = Some(version);
self
}
pub async fn build(self) -> Result<RemoteExternalities<B>, &'static str> {
let mut ext = self.pre_build().await?;
ext.commit_all().unwrap();
info!(
target: LOG_TARGET,
"initialized state externalities with storage root {:?} and state_version {:?}",
ext.as_backend().root(),
ext.state_version
);
Ok(ext)
}
}
#[cfg(test)]
mod test_prelude {
pub(crate) use super::*;
pub(crate) use sp_runtime::testing::{Block as RawBlock, ExtrinsicWrapper, H256 as Hash};
pub(crate) type Block = RawBlock<ExtrinsicWrapper<Hash>>;
pub(crate) fn init_logger() {
let _ = sp_tracing::try_init_simple();
}
}
#[cfg(test)]
mod tests {
use super::test_prelude::*;
#[tokio::test]
async fn can_load_state_snapshot() {
init_logger();
Builder::<Block>::new()
.mode(Mode::Offline(OfflineConfig {
state_snapshot: SnapshotConfig::new("test_data/proxy_test"),
}))
.build()
.await
.unwrap()
.execute_with(|| {});
}
#[tokio::test]
async fn can_exclude_from_snapshot() {
init_logger();
let some_key = Builder::<Block>::new()
.mode(Mode::Offline(OfflineConfig {
state_snapshot: SnapshotConfig::new("test_data/proxy_test"),
}))
.build()
.await
.expect("Can't read state snapshot file")
.execute_with(|| {
let key =
sp_io::storage::next_key(&[]).expect("some key must exist in the snapshot");
assert!(sp_io::storage::get(&key).is_some());
key
});
Builder::<Block>::new()
.mode(Mode::Offline(OfflineConfig {
state_snapshot: SnapshotConfig::new("test_data/proxy_test"),
}))
.blacklist_hashed_key(&some_key)
.build()
.await
.expect("Can't read state snapshot file")
.execute_with(|| assert!(sp_io::storage::get(&some_key).is_none()));
}
}
#[cfg(all(test, feature = "remote-test"))]
mod remote_tests {
use super::test_prelude::*;
use std::os::unix::fs::MetadataExt;
#[tokio::test]
async fn state_version_is_kept_and_can_be_altered() {
const CACHE: &'static str = "state_version_is_kept_and_can_be_altered";
init_logger();
let ext = Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
pallets: vec!["Proxy".to_owned()],
child_trie: false,
state_snapshot: Some(SnapshotConfig::new(CACHE)),
..Default::default()
}))
.build()
.await
.unwrap();
let cached_ext = Builder::<Block>::new()
.mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
.build()
.await
.unwrap();
assert_eq!(ext.state_version, cached_ext.state_version);
let other = match ext.state_version {
StateVersion::V0 => StateVersion::V1,
StateVersion::V1 => StateVersion::V0,
};
let cached_ext = Builder::<Block>::new()
.mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
.overwrite_state_version(other)
.build()
.await
.unwrap();
assert_eq!(cached_ext.state_version, other);
}
#[tokio::test]
async fn snapshot_block_hash_works() {
const CACHE: &'static str = "snapshot_block_hash_works";
init_logger();
let ext = Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
pallets: vec!["Proxy".to_owned()],
child_trie: false,
state_snapshot: Some(SnapshotConfig::new(CACHE)),
..Default::default()
}))
.build()
.await
.unwrap();
let cached_ext = Builder::<Block>::new()
.mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
.build()
.await
.unwrap();
assert_eq!(ext.block_hash, cached_ext.block_hash);
}
#[tokio::test]
async fn child_keys_are_loaded() {
const CACHE: &'static str = "snapshot_retains_storage";
init_logger();
let child_ext = Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
pallets: vec!["Proxy".to_owned()],
child_trie: true,
state_snapshot: Some(SnapshotConfig::new(CACHE)),
..Default::default()
}))
.build()
.await
.unwrap();
let ext = Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
pallets: vec!["Proxy".to_owned()],
child_trie: false,
state_snapshot: Some(SnapshotConfig::new(CACHE)),
..Default::default()
}))
.build()
.await
.unwrap();
assert!(
child_ext.as_backend().backend_storage().keys().len() >
ext.as_backend().backend_storage().keys().len()
);
}
#[tokio::test]
async fn offline_else_online_works() {
const CACHE: &'static str = "offline_else_online_works_data";
init_logger();
Builder::<Block>::new()
.mode(Mode::OfflineOrElseOnline(
OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) },
OnlineConfig {
pallets: vec!["Proxy".to_owned()],
child_trie: false,
state_snapshot: Some(SnapshotConfig::new(CACHE)),
..Default::default()
},
))
.build()
.await
.unwrap()
.execute_with(|| {});
Builder::<Block>::new()
.mode(Mode::OfflineOrElseOnline(
OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) },
OnlineConfig {
transport: "ws://non-existent:666".to_owned().into(),
..Default::default()
},
))
.build()
.await
.unwrap()
.execute_with(|| {});
let to_delete = std::fs::read_dir(Path::new("."))
.unwrap()
.into_iter()
.map(|d| d.unwrap())
.filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
.collect::<Vec<_>>();
assert!(to_delete.len() == 1);
std::fs::remove_file(to_delete[0].path()).unwrap();
}
#[tokio::test]
async fn can_build_one_small_pallet() {
init_logger();
Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
pallets: vec!["Proxy".to_owned()],
child_trie: false,
..Default::default()
}))
.build()
.await
.unwrap()
.execute_with(|| {});
}
#[tokio::test]
async fn can_build_few_pallet() {
init_logger();
Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
pallets: vec!["Proxy".to_owned(), "Multisig".to_owned()],
child_trie: false,
..Default::default()
}))
.build()
.await
.unwrap()
.execute_with(|| {});
}
#[tokio::test(flavor = "multi_thread")]
async fn can_create_snapshot() {
const CACHE: &'static str = "can_create_snapshot";
init_logger();
Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
state_snapshot: Some(SnapshotConfig::new(CACHE)),
pallets: vec!["Proxy".to_owned()],
child_trie: false,
..Default::default()
}))
.build()
.await
.unwrap()
.execute_with(|| {});
let to_delete = std::fs::read_dir(Path::new("."))
.unwrap()
.into_iter()
.map(|d| d.unwrap())
.filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
.collect::<Vec<_>>();
assert!(to_delete.len() == 1);
let to_delete = to_delete.first().unwrap();
assert!(std::fs::metadata(to_delete.path()).unwrap().size() > 1);
std::fs::remove_file(to_delete.path()).unwrap();
}
#[tokio::test]
async fn can_create_child_snapshot() {
const CACHE: &'static str = "can_create_child_snapshot";
init_logger();
Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
state_snapshot: Some(SnapshotConfig::new(CACHE)),
pallets: vec!["Crowdloan".to_owned()],
child_trie: true,
..Default::default()
}))
.build()
.await
.unwrap()
.execute_with(|| {});
let to_delete = std::fs::read_dir(Path::new("."))
.unwrap()
.into_iter()
.map(|d| d.unwrap())
.filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
.collect::<Vec<_>>();
assert!(to_delete.len() == 1);
let to_delete = to_delete.first().unwrap();
assert!(std::fs::metadata(to_delete.path()).unwrap().size() > 1);
std::fs::remove_file(to_delete.path()).unwrap();
}
#[tokio::test]
async fn can_build_big_pallet() {
if std::option_env!("TEST_WS").is_none() {
return
}
init_logger();
Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
transport: std::option_env!("TEST_WS").unwrap().to_owned().into(),
pallets: vec!["Staking".to_owned()],
child_trie: false,
..Default::default()
}))
.build()
.await
.unwrap()
.execute_with(|| {});
}
#[tokio::test]
async fn can_fetch_all() {
if std::option_env!("TEST_WS").is_none() {
return
}
init_logger();
Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
transport: std::option_env!("TEST_WS").unwrap().to_owned().into(),
..Default::default()
}))
.build()
.await
.unwrap()
.execute_with(|| {});
}
}