use codec::{Compact, Decode, Encode};
use indicatif::{ProgressBar, ProgressStyle};
use jsonrpsee::{core::params::ArrayParams, http_client::HttpClient};
use log::*;
use serde::de::DeserializeOwned;
use sp_core::{
hexdisplay::HexDisplay,
storage::{
well_known_keys::{is_default_child_storage_key, DEFAULT_CHILD_STORAGE_KEY_PREFIX},
ChildInfo, ChildType, PrefixedStorageKey, StorageData, StorageKey,
},
};
use sp_runtime::{
traits::{Block as BlockT, HashingFor},
StateVersion,
};
use sp_state_machine::TestExternalities;
use spinners::{Spinner, Spinners};
use std::{
cmp::{max, min},
fs,
ops::{Deref, DerefMut},
path::{Path, PathBuf},
sync::Arc,
time::{Duration, Instant},
};
use substrate_rpc_client::{rpc_params, BatchRequestBuilder, ChainApi, ClientT, StateApi};
use tokio_retry::{strategy::FixedInterval, Retry};
type KeyValue = (StorageKey, StorageData);
type TopKeyValues = Vec<KeyValue>;
type ChildKeyValues = Vec<(ChildInfo, Vec<KeyValue>)>;
type SnapshotVersion = Compact<u16>;
const LOG_TARGET: &str = "remote-ext";
const DEFAULT_HTTP_ENDPOINT: &str = "https://try-runtime.polkadot.io:443";
const SNAPSHOT_VERSION: SnapshotVersion = Compact(4);
#[derive(Decode, Encode)]
struct Snapshot<B: BlockT> {
snapshot_version: SnapshotVersion,
state_version: StateVersion,
raw_storage: Vec<(Vec<u8>, (Vec<u8>, i32))>,
storage_root: B::Hash,
header: B::Header,
}
impl<B: BlockT> Snapshot<B> {
pub fn new(
state_version: StateVersion,
raw_storage: Vec<(Vec<u8>, (Vec<u8>, i32))>,
storage_root: B::Hash,
header: B::Header,
) -> Self {
Self {
snapshot_version: SNAPSHOT_VERSION,
state_version,
raw_storage,
storage_root,
header,
}
}
fn load(path: &PathBuf) -> Result<Snapshot<B>, &'static str> {
let bytes = fs::read(path).map_err(|_| "fs::read failed.")?;
let snapshot_version = SnapshotVersion::decode(&mut &*bytes)
.map_err(|_| "Failed to decode snapshot version")?;
if snapshot_version != SNAPSHOT_VERSION {
return Err("Unsupported snapshot version detected. Please create a new snapshot.")
}
Decode::decode(&mut &*bytes).map_err(|_| "Decode failed")
}
}
pub struct RemoteExternalities<B: BlockT> {
pub inner_ext: TestExternalities<HashingFor<B>>,
pub header: B::Header,
}
impl<B: BlockT> Deref for RemoteExternalities<B> {
type Target = TestExternalities<HashingFor<B>>;
fn deref(&self) -> &Self::Target {
&self.inner_ext
}
}
impl<B: BlockT> DerefMut for RemoteExternalities<B> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner_ext
}
}
#[derive(Clone)]
pub enum Mode<H> {
Online(OnlineConfig<H>),
Offline(OfflineConfig),
OfflineOrElseOnline(OfflineConfig, OnlineConfig<H>),
}
impl<H> Default for Mode<H> {
fn default() -> Self {
Mode::Online(OnlineConfig::default())
}
}
#[derive(Clone)]
pub struct OfflineConfig {
pub state_snapshot: SnapshotConfig,
}
#[derive(Debug, Clone)]
pub enum Transport {
Uri(String),
RemoteClient(HttpClient),
}
impl Transport {
fn as_client(&self) -> Option<&HttpClient> {
match self {
Self::RemoteClient(client) => Some(client),
_ => None,
}
}
async fn init(&mut self) -> Result<(), &'static str> {
if let Self::Uri(uri) = self {
log::debug!(target: LOG_TARGET, "initializing remote client to {:?}", uri);
let uri = if uri.starts_with("ws://") {
let uri = uri.replace("ws://", "http://");
log::info!(target: LOG_TARGET, "replacing ws:// in uri with http://: {:?} (ws is currently unstable for fetching remote storage, for more see https://github.com/paritytech/jsonrpsee/issues/1086)", uri);
uri
} else if uri.starts_with("wss://") {
let uri = uri.replace("wss://", "https://");
log::info!(target: LOG_TARGET, "replacing wss:// in uri with https://: {:?} (ws is currently unstable for fetching remote storage, for more see https://github.com/paritytech/jsonrpsee/issues/1086)", uri);
uri
} else {
uri.clone()
};
let http_client = HttpClient::builder()
.max_request_size(u32::MAX)
.max_response_size(u32::MAX)
.request_timeout(std::time::Duration::from_secs(60 * 5))
.build(uri)
.map_err(|e| {
log::error!(target: LOG_TARGET, "error: {:?}", e);
"failed to build http client"
})?;
*self = Self::RemoteClient(http_client)
}
Ok(())
}
}
impl From<String> for Transport {
fn from(uri: String) -> Self {
Transport::Uri(uri)
}
}
impl From<HttpClient> for Transport {
fn from(client: HttpClient) -> Self {
Transport::RemoteClient(client)
}
}
#[derive(Clone)]
pub struct OnlineConfig<H> {
pub at: Option<H>,
pub state_snapshot: Option<SnapshotConfig>,
pub pallets: Vec<String>,
pub transport: Transport,
pub child_trie: bool,
pub hashed_prefixes: Vec<Vec<u8>>,
pub hashed_keys: Vec<Vec<u8>>,
}
impl<H: Clone> OnlineConfig<H> {
fn rpc_client(&self) -> &HttpClient {
self.transport
.as_client()
.expect("http client must have been initialized by now; qed.")
}
fn at_expected(&self) -> H {
self.at.clone().expect("block at must be initialized; qed")
}
}
impl<H> Default for OnlineConfig<H> {
fn default() -> Self {
Self {
transport: Transport::from(DEFAULT_HTTP_ENDPOINT.to_owned()),
child_trie: true,
at: None,
state_snapshot: None,
pallets: Default::default(),
hashed_keys: Default::default(),
hashed_prefixes: Default::default(),
}
}
}
impl<H> From<String> for OnlineConfig<H> {
fn from(t: String) -> Self {
Self { transport: t.into(), ..Default::default() }
}
}
#[derive(Clone)]
pub struct SnapshotConfig {
pub path: PathBuf,
}
impl SnapshotConfig {
pub fn new<P: Into<PathBuf>>(path: P) -> Self {
Self { path: path.into() }
}
}
impl From<String> for SnapshotConfig {
fn from(s: String) -> Self {
Self::new(s)
}
}
impl Default for SnapshotConfig {
fn default() -> Self {
Self { path: Path::new("SNAPSHOT").into() }
}
}
#[derive(Clone)]
pub struct Builder<B: BlockT> {
hashed_key_values: Vec<KeyValue>,
hashed_blacklist: Vec<Vec<u8>>,
mode: Mode<B::Hash>,
overwrite_state_version: Option<StateVersion>,
}
impl<B: BlockT> Default for Builder<B> {
fn default() -> Self {
Self {
mode: Default::default(),
hashed_key_values: Default::default(),
hashed_blacklist: Default::default(),
overwrite_state_version: None,
}
}
}
impl<B: BlockT> Builder<B> {
fn as_online(&self) -> &OnlineConfig<B::Hash> {
match &self.mode {
Mode::Online(config) => config,
Mode::OfflineOrElseOnline(_, config) => config,
_ => panic!("Unexpected mode: Online"),
}
}
fn as_online_mut(&mut self) -> &mut OnlineConfig<B::Hash> {
match &mut self.mode {
Mode::Online(config) => config,
Mode::OfflineOrElseOnline(_, config) => config,
_ => panic!("Unexpected mode: Online"),
}
}
}
impl<B: BlockT> Builder<B>
where
B::Hash: DeserializeOwned,
B::Header: DeserializeOwned,
{
const PARALLEL_REQUESTS: usize = 4;
const BATCH_SIZE_INCREASE_FACTOR: f32 = 1.10;
const BATCH_SIZE_DECREASE_FACTOR: f32 = 0.50;
const REQUEST_DURATION_TARGET: Duration = Duration::from_secs(15);
const INITIAL_BATCH_SIZE: usize = 10;
const DEFAULT_KEY_DOWNLOAD_PAGE: u32 = 1000;
const MAX_RETRIES: usize = 12;
const KEYS_PAGE_RETRY_INTERVAL: Duration = Duration::from_secs(5);
async fn rpc_get_storage(
&self,
key: StorageKey,
maybe_at: Option<B::Hash>,
) -> Result<Option<StorageData>, &'static str> {
trace!(target: LOG_TARGET, "rpc: get_storage");
self.as_online().rpc_client().storage(key, maybe_at).await.map_err(|e| {
error!(target: LOG_TARGET, "Error = {:?}", e);
"rpc get_storage failed."
})
}
async fn rpc_get_head(&self) -> Result<B::Hash, &'static str> {
trace!(target: LOG_TARGET, "rpc: finalized_head");
ChainApi::<(), _, B::Header, ()>::finalized_head(self.as_online().rpc_client())
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error = {:?}", e);
"rpc finalized_head failed."
})
}
async fn get_keys_single_page(
&self,
prefix: Option<StorageKey>,
start_key: Option<StorageKey>,
at: B::Hash,
) -> Result<Vec<StorageKey>, &'static str> {
self.as_online()
.rpc_client()
.storage_keys_paged(prefix, Self::DEFAULT_KEY_DOWNLOAD_PAGE, start_key, Some(at))
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error = {:?}", e);
"rpc get_keys failed"
})
}
async fn rpc_get_keys_parallel(
&self,
prefix: &StorageKey,
block: B::Hash,
parallel: usize,
) -> Result<Vec<StorageKey>, &'static str> {
fn gen_start_keys(prefix: &StorageKey) -> Vec<StorageKey> {
let mut prefix = prefix.as_ref().to_vec();
let scale = 32usize.saturating_sub(prefix.len());
if scale < 9 {
prefix.extend(vec![0; scale]);
return vec![StorageKey(prefix)]
}
let chunks = 16;
let step = 0x10000 / chunks;
let ext = scale - 2;
(0..chunks)
.map(|i| {
let mut key = prefix.clone();
let start = i * step;
key.extend(vec![(start >> 8) as u8, (start & 0xff) as u8]);
key.extend(vec![0; ext]);
StorageKey(key)
})
.collect()
}
let start_keys = gen_start_keys(&prefix);
let start_keys: Vec<Option<&StorageKey>> = start_keys.iter().map(Some).collect();
let mut end_keys: Vec<Option<&StorageKey>> = start_keys[1..].to_vec();
end_keys.push(None);
let parallel = Arc::new(tokio::sync::Semaphore::new(parallel));
let builder = Arc::new(self.clone());
let mut handles = vec![];
for (start_key, end_key) in start_keys.into_iter().zip(end_keys) {
let permit = parallel
.clone()
.acquire_owned()
.await
.expect("semaphore is not closed until the end of loop");
let builder = builder.clone();
let prefix = prefix.clone();
let start_key = start_key.cloned();
let end_key = end_key.cloned();
let handle = tokio::spawn(async move {
let res = builder
.rpc_get_keys_in_range(&prefix, block, start_key.as_ref(), end_key.as_ref())
.await;
drop(permit);
res
});
handles.push(handle);
}
parallel.close();
let keys = futures::future::join_all(handles)
.await
.into_iter()
.filter_map(|res| match res {
Ok(Ok(keys)) => Some(keys),
_ => None,
})
.flatten()
.collect::<Vec<StorageKey>>();
Ok(keys)
}
async fn rpc_get_keys_in_range(
&self,
prefix: &StorageKey,
block: B::Hash,
start_key: Option<&StorageKey>,
end_key: Option<&StorageKey>,
) -> Result<Vec<StorageKey>, &'static str> {
let mut last_key: Option<&StorageKey> = start_key;
let mut keys: Vec<StorageKey> = vec![];
loop {
let retry_strategy =
FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES);
let get_page_closure =
|| self.get_keys_single_page(Some(prefix.clone()), last_key.cloned(), block);
let mut page = Retry::spawn(retry_strategy, get_page_closure).await?;
if let (Some(last), Some(end)) = (page.last(), end_key) {
if last >= end {
page.retain(|key| key < end);
}
}
let page_len = page.len();
keys.extend(page);
last_key = keys.last();
if page_len < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize {
log::debug!(target: LOG_TARGET, "last page received: {}", page_len);
break
}
log::debug!(
target: LOG_TARGET,
"new total = {}, full page received: {}",
keys.len(),
HexDisplay::from(last_key.expect("full page received, cannot be None"))
);
}
Ok(keys)
}
async fn get_storage_data_dynamic_batch_size(
client: &HttpClient,
payloads: Vec<(String, ArrayParams)>,
bar: &ProgressBar,
) -> Result<Vec<Option<StorageData>>, String> {
let mut all_data: Vec<Option<StorageData>> = vec![];
let mut start_index = 0;
let mut retries = 0usize;
let mut batch_size = Self::INITIAL_BATCH_SIZE;
let total_payloads = payloads.len();
while start_index < total_payloads {
log::debug!(
target: LOG_TARGET,
"Remaining payloads: {} Batch request size: {}",
total_payloads - start_index,
batch_size,
);
let end_index = usize::min(start_index + batch_size, total_payloads);
let page = &payloads[start_index..end_index];
let mut batch = BatchRequestBuilder::new();
for (method, params) in page.iter() {
batch
.insert(method, params.clone())
.map_err(|_| "Invalid batch method and/or params")?;
}
let request_started = Instant::now();
let batch_response = match client.batch_request::<Option<StorageData>>(batch).await {
Ok(batch_response) => {
retries = 0;
batch_response
},
Err(e) => {
if retries > Self::MAX_RETRIES {
return Err(e.to_string())
}
retries += 1;
let failure_log = format!(
"Batch request failed ({}/{} retries). Error: {}",
retries,
Self::MAX_RETRIES,
e
);
if retries >= 2 {
log::warn!("{}", failure_log);
batch_size = 1;
} else {
log::debug!("{}", failure_log);
batch_size =
(batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize;
}
continue
},
};
let request_duration = request_started.elapsed();
batch_size = if request_duration > Self::REQUEST_DURATION_TARGET {
max(1, (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize)
} else {
min(
total_payloads - start_index,
max(
batch_size + 1,
(batch_size as f32 * Self::BATCH_SIZE_INCREASE_FACTOR) as usize,
),
)
};
log::debug!(
target: LOG_TARGET,
"Request duration: {:?} Target duration: {:?} Last batch size: {} Next batch size: {}",
request_duration,
Self::REQUEST_DURATION_TARGET,
end_index - start_index,
batch_size
);
let batch_response_len = batch_response.len();
for item in batch_response.into_iter() {
match item {
Ok(x) => all_data.push(x),
Err(e) => return Err(e.message().to_string()),
}
}
bar.inc(batch_response_len as u64);
start_index = end_index;
}
Ok(all_data)
}
pub(crate) async fn rpc_get_pairs(
&self,
prefix: StorageKey,
at: B::Hash,
pending_ext: &mut TestExternalities<HashingFor<B>>,
) -> Result<Vec<KeyValue>, &'static str> {
let start = Instant::now();
let mut sp = Spinner::with_timer(Spinners::Dots, "Scraping keys...".into());
let keys = self
.rpc_get_keys_parallel(&prefix, at, Self::PARALLEL_REQUESTS)
.await?
.into_iter()
.collect::<Vec<_>>();
sp.stop_with_message(format!(
"โ
Found {} keys ({:.2}s)",
keys.len(),
start.elapsed().as_secs_f32()
));
if keys.is_empty() {
return Ok(Default::default())
}
let client = self.as_online().rpc_client();
let payloads = keys
.iter()
.map(|key| ("state_getStorage".to_string(), rpc_params!(key, at)))
.collect::<Vec<_>>();
let bar = ProgressBar::new(payloads.len() as u64);
bar.enable_steady_tick(Duration::from_secs(1));
bar.set_message("Downloading key values".to_string());
bar.set_style(
ProgressStyle::with_template(
"[{elapsed_precise}] {msg} {per_sec} [{wide_bar}] {pos}/{len} ({eta})",
)
.unwrap()
.progress_chars("=>-"),
);
let payloads_chunked = payloads.chunks((payloads.len() / Self::PARALLEL_REQUESTS).max(1));
let requests = payloads_chunked.map(|payload_chunk| {
Self::get_storage_data_dynamic_batch_size(client, payload_chunk.to_vec(), &bar)
});
let storage_data_result: Result<Vec<_>, _> =
futures::future::join_all(requests).await.into_iter().collect();
let storage_data = match storage_data_result {
Ok(storage_data) => storage_data.into_iter().flatten().collect::<Vec<_>>(),
Err(e) => {
log::error!(target: LOG_TARGET, "Error while getting storage data: {}", e);
return Err("Error while getting storage data")
},
};
bar.finish_with_message("โ
Downloaded key values");
println!();
assert_eq!(keys.len(), storage_data.len());
let key_values = keys
.iter()
.zip(storage_data)
.map(|(key, maybe_value)| match maybe_value {
Some(data) => (key.clone(), data),
None => {
log::warn!(target: LOG_TARGET, "key {:?} had none corresponding value.", &key);
let data = StorageData(vec![]);
(key.clone(), data)
},
})
.collect::<Vec<_>>();
let mut sp = Spinner::with_timer(Spinners::Dots, "Inserting keys into DB...".into());
let start = Instant::now();
pending_ext.batch_insert(key_values.clone().into_iter().filter_map(|(k, v)| {
match is_default_child_storage_key(&k.0) {
true => None,
false => Some((k.0, v.0)),
}
}));
sp.stop_with_message(format!(
"โ
Inserted keys into DB ({:.2}s)",
start.elapsed().as_secs_f32()
));
Ok(key_values)
}
pub(crate) async fn rpc_child_get_storage_paged(
client: &HttpClient,
prefixed_top_key: &StorageKey,
child_keys: Vec<StorageKey>,
at: B::Hash,
) -> Result<Vec<KeyValue>, &'static str> {
let child_keys_len = child_keys.len();
let payloads = child_keys
.iter()
.map(|key| {
(
"childstate_getStorage".to_string(),
rpc_params![
PrefixedStorageKey::new(prefixed_top_key.as_ref().to_vec()),
key,
at
],
)
})
.collect::<Vec<_>>();
let bar = ProgressBar::new(payloads.len() as u64);
let storage_data =
match Self::get_storage_data_dynamic_batch_size(client, payloads, &bar).await {
Ok(storage_data) => storage_data,
Err(e) => {
log::error!(target: LOG_TARGET, "batch processing failed: {:?}", e);
return Err("batch processing failed")
},
};
assert_eq!(child_keys_len, storage_data.len());
Ok(child_keys
.iter()
.zip(storage_data)
.map(|(key, maybe_value)| match maybe_value {
Some(v) => (key.clone(), v),
None => {
log::warn!(target: LOG_TARGET, "key {:?} had no corresponding value.", &key);
(key.clone(), StorageData(vec![]))
},
})
.collect::<Vec<_>>())
}
pub(crate) async fn rpc_child_get_keys(
client: &HttpClient,
prefixed_top_key: &StorageKey,
child_prefix: StorageKey,
at: B::Hash,
) -> Result<Vec<StorageKey>, &'static str> {
let retry_strategy =
FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES);
let mut all_child_keys = Vec::new();
let mut start_key = None;
loop {
let get_child_keys_closure = || {
let top_key = PrefixedStorageKey::new(prefixed_top_key.0.clone());
substrate_rpc_client::ChildStateApi::storage_keys_paged(
client,
top_key,
Some(child_prefix.clone()),
Self::DEFAULT_KEY_DOWNLOAD_PAGE,
start_key.clone(),
Some(at),
)
};
let child_keys = Retry::spawn(retry_strategy.clone(), get_child_keys_closure)
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error = {:?}", e);
"rpc child_get_keys failed."
})?;
let keys_count = child_keys.len();
if keys_count == 0 {
break;
}
start_key = child_keys.last().cloned();
all_child_keys.extend(child_keys);
if keys_count < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize {
break;
}
}
debug!(
target: LOG_TARGET,
"[thread = {:?}] scraped {} child-keys of the child-bearing top key: {}",
std::thread::current().id(),
all_child_keys.len(),
HexDisplay::from(prefixed_top_key)
);
Ok(all_child_keys)
}
}
impl<B: BlockT> Builder<B>
where
B::Hash: DeserializeOwned,
B::Header: DeserializeOwned,
{
async fn load_child_remote(
&self,
top_kv: &[KeyValue],
pending_ext: &mut TestExternalities<HashingFor<B>>,
) -> Result<ChildKeyValues, &'static str> {
let child_roots = top_kv
.iter()
.filter(|(k, _)| is_default_child_storage_key(k.as_ref()))
.map(|(k, _)| k.clone())
.collect::<Vec<_>>();
if child_roots.is_empty() {
info!(target: LOG_TARGET, "๐ฉโ๐ฆ no child roots found to scrape",);
return Ok(Default::default())
}
info!(
target: LOG_TARGET,
"๐ฉโ๐ฆ scraping child-tree data from {} top keys",
child_roots.len(),
);
let at = self.as_online().at_expected();
let client = self.as_online().rpc_client();
let mut child_kv = vec![];
for prefixed_top_key in child_roots {
let child_keys =
Self::rpc_child_get_keys(client, &prefixed_top_key, StorageKey(vec![]), at).await?;
let child_kv_inner =
Self::rpc_child_get_storage_paged(client, &prefixed_top_key, child_keys, at)
.await?;
let prefixed_top_key = PrefixedStorageKey::new(prefixed_top_key.clone().0);
let un_prefixed = match ChildType::from_prefixed_key(&prefixed_top_key) {
Some((ChildType::ParentKeyId, storage_key)) => storage_key,
None => {
log::error!(target: LOG_TARGET, "invalid key: {:?}", prefixed_top_key);
return Err("Invalid child key")
},
};
let info = ChildInfo::new_default(un_prefixed);
let key_values =
child_kv_inner.iter().cloned().map(|(k, v)| (k.0, v.0)).collect::<Vec<_>>();
child_kv.push((info.clone(), child_kv_inner));
for (k, v) in key_values {
pending_ext.insert_child(info.clone(), k, v);
}
}
Ok(child_kv)
}
async fn load_top_remote(
&self,
pending_ext: &mut TestExternalities<HashingFor<B>>,
) -> Result<TopKeyValues, &'static str> {
let config = self.as_online();
let at = self
.as_online()
.at
.expect("online config must be initialized by this point; qed.");
log::info!(target: LOG_TARGET, "scraping key-pairs from remote at block height {:?}", at);
let mut keys_and_values = Vec::new();
for prefix in &config.hashed_prefixes {
let now = std::time::Instant::now();
let additional_key_values =
self.rpc_get_pairs(StorageKey(prefix.to_vec()), at, pending_ext).await?;
let elapsed = now.elapsed();
log::info!(
target: LOG_TARGET,
"adding data for hashed prefix: {:?}, took {:.2}s",
HexDisplay::from(prefix),
elapsed.as_secs_f32()
);
keys_and_values.extend(additional_key_values);
}
for key in &config.hashed_keys {
let key = StorageKey(key.to_vec());
log::info!(
target: LOG_TARGET,
"adding data for hashed key: {:?}",
HexDisplay::from(&key)
);
match self.rpc_get_storage(key.clone(), Some(at)).await? {
Some(value) => {
pending_ext.insert(key.clone().0, value.clone().0);
keys_and_values.push((key, value));
},
None => {
log::warn!(
target: LOG_TARGET,
"no data found for hashed key: {:?}",
HexDisplay::from(&key)
);
},
}
}
Ok(keys_and_values)
}
async fn init_remote_client(&mut self) -> Result<(), &'static str> {
self.as_online_mut().transport.init().await?;
if self.as_online().at.is_none() {
let at = self.rpc_get_head().await?;
log::info!(
target: LOG_TARGET,
"since no at is provided, setting it to latest finalized head, {:?}",
at
);
self.as_online_mut().at = Some(at);
}
let online_config = self.as_online_mut();
online_config.pallets.iter().for_each(|p| {
online_config
.hashed_prefixes
.push(sp_crypto_hashing::twox_128(p.as_bytes()).to_vec())
});
if online_config.child_trie {
online_config.hashed_prefixes.push(DEFAULT_CHILD_STORAGE_KEY_PREFIX.to_vec());
}
if online_config
.hashed_prefixes
.iter()
.filter(|p| *p != DEFAULT_CHILD_STORAGE_KEY_PREFIX)
.count() == 0
{
log::info!(
target: LOG_TARGET,
"since no prefix is filtered, the data for all pallets will be downloaded"
);
online_config.hashed_prefixes.push(vec![]);
}
Ok(())
}
async fn load_header(&self) -> Result<B::Header, &'static str> {
let retry_strategy =
FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES);
let get_header_closure = || {
ChainApi::<(), _, B::Header, ()>::header(
self.as_online().rpc_client(),
Some(self.as_online().at_expected()),
)
};
Retry::spawn(retry_strategy, get_header_closure)
.await
.map_err(|_| "Failed to fetch header for block from network")?
.ok_or("Network returned None block header")
}
async fn load_remote_and_maybe_save(
&mut self,
) -> Result<TestExternalities<HashingFor<B>>, &'static str> {
let state_version =
StateApi::<B::Hash>::runtime_version(self.as_online().rpc_client(), None)
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error = {:?}", e);
"rpc runtime_version failed."
})
.map(|v| v.state_version())?;
let mut pending_ext = TestExternalities::new_with_code_and_state(
Default::default(),
Default::default(),
self.overwrite_state_version.unwrap_or(state_version),
);
let top_kv = self.load_top_remote(&mut pending_ext).await?;
self.load_child_remote(&top_kv, &mut pending_ext).await?;
if let Some(path) = self.as_online().state_snapshot.clone().map(|c| c.path) {
let (raw_storage, storage_root) = pending_ext.into_raw_snapshot();
let snapshot = Snapshot::<B>::new(
state_version,
raw_storage.clone(),
storage_root,
self.load_header().await?,
);
let encoded = snapshot.encode();
log::info!(
target: LOG_TARGET,
"writing snapshot of {} bytes to {:?}",
encoded.len(),
path
);
std::fs::write(path, encoded).map_err(|_| "fs::write failed")?;
return Ok(TestExternalities::from_raw_snapshot(
raw_storage,
storage_root,
self.overwrite_state_version.unwrap_or(state_version),
))
}
Ok(pending_ext)
}
async fn do_load_remote(&mut self) -> Result<RemoteExternalities<B>, &'static str> {
self.init_remote_client().await?;
let inner_ext = self.load_remote_and_maybe_save().await?;
Ok(RemoteExternalities { header: self.load_header().await?, inner_ext })
}
fn do_load_offline(
&mut self,
config: OfflineConfig,
) -> Result<RemoteExternalities<B>, &'static str> {
let mut sp = Spinner::with_timer(Spinners::Dots, "Loading snapshot...".into());
let start = Instant::now();
info!(target: LOG_TARGET, "Loading snapshot from {:?}", &config.state_snapshot.path);
let Snapshot { snapshot_version: _, header, state_version, raw_storage, storage_root } =
Snapshot::<B>::load(&config.state_snapshot.path)?;
let inner_ext = TestExternalities::from_raw_snapshot(
raw_storage,
storage_root,
self.overwrite_state_version.unwrap_or(state_version),
);
sp.stop_with_message(format!("โ
Loaded snapshot ({:.2}s)", start.elapsed().as_secs_f32()));
Ok(RemoteExternalities { inner_ext, header })
}
pub(crate) async fn pre_build(mut self) -> Result<RemoteExternalities<B>, &'static str> {
let mut ext = match self.mode.clone() {
Mode::Offline(config) => self.do_load_offline(config)?,
Mode::Online(_) => self.do_load_remote().await?,
Mode::OfflineOrElseOnline(offline_config, _) => {
match self.do_load_offline(offline_config) {
Ok(x) => x,
Err(_) => self.do_load_remote().await?,
}
},
};
if !self.hashed_key_values.is_empty() {
log::info!(
target: LOG_TARGET,
"extending externalities with {} manually injected key-values",
self.hashed_key_values.len()
);
ext.batch_insert(self.hashed_key_values.into_iter().map(|(k, v)| (k.0, v.0)));
}
if !self.hashed_blacklist.is_empty() {
log::info!(
target: LOG_TARGET,
"excluding externalities from {} keys",
self.hashed_blacklist.len()
);
for k in self.hashed_blacklist {
ext.execute_with(|| sp_io::storage::clear(&k));
}
}
Ok(ext)
}
}
impl<B: BlockT> Builder<B>
where
B::Hash: DeserializeOwned,
B::Header: DeserializeOwned,
{
pub fn new() -> Self {
Default::default()
}
pub fn inject_hashed_key_value(mut self, injections: Vec<KeyValue>) -> Self {
for i in injections {
self.hashed_key_values.push(i.clone());
}
self
}
pub fn blacklist_hashed_key(mut self, hashed: &[u8]) -> Self {
self.hashed_blacklist.push(hashed.to_vec());
self
}
pub fn mode(mut self, mode: Mode<B::Hash>) -> Self {
self.mode = mode;
self
}
pub fn overwrite_state_version(mut self, version: StateVersion) -> Self {
self.overwrite_state_version = Some(version);
self
}
pub async fn build(self) -> Result<RemoteExternalities<B>, &'static str> {
let mut ext = self.pre_build().await?;
ext.commit_all().unwrap();
info!(
target: LOG_TARGET,
"initialized state externalities with storage root {:?} and state_version {:?}",
ext.as_backend().root(),
ext.state_version
);
Ok(ext)
}
}
#[cfg(test)]
mod test_prelude {
pub(crate) use super::*;
pub(crate) use sp_runtime::testing::{Block as RawBlock, MockCallU64};
pub(crate) type UncheckedXt = sp_runtime::testing::TestXt<MockCallU64, ()>;
pub(crate) type Block = RawBlock<UncheckedXt>;
pub(crate) fn init_logger() {
sp_tracing::try_init_simple();
}
}
#[cfg(test)]
mod tests {
use super::test_prelude::*;
#[tokio::test]
async fn can_load_state_snapshot() {
init_logger();
Builder::<Block>::new()
.mode(Mode::Offline(OfflineConfig {
state_snapshot: SnapshotConfig::new("test_data/test.snap"),
}))
.build()
.await
.unwrap()
.execute_with(|| {});
}
#[tokio::test]
async fn can_exclude_from_snapshot() {
init_logger();
let some_key = Builder::<Block>::new()
.mode(Mode::Offline(OfflineConfig {
state_snapshot: SnapshotConfig::new("test_data/test.snap"),
}))
.build()
.await
.expect("Can't read state snapshot file")
.execute_with(|| {
let key =
sp_io::storage::next_key(&[]).expect("some key must exist in the snapshot");
assert!(sp_io::storage::get(&key).is_some());
key
});
Builder::<Block>::new()
.mode(Mode::Offline(OfflineConfig {
state_snapshot: SnapshotConfig::new("test_data/test.snap"),
}))
.blacklist_hashed_key(&some_key)
.build()
.await
.expect("Can't read state snapshot file")
.execute_with(|| assert!(sp_io::storage::get(&some_key).is_none()));
}
}
#[cfg(all(test, feature = "remote-test"))]
mod remote_tests {
use super::test_prelude::*;
use std::{env, os::unix::fs::MetadataExt};
fn endpoint() -> String {
env::var("TEST_WS").unwrap_or_else(|_| DEFAULT_HTTP_ENDPOINT.to_string())
}
#[tokio::test]
async fn state_version_is_kept_and_can_be_altered() {
const CACHE: &'static str = "state_version_is_kept_and_can_be_altered";
init_logger();
let ext = Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
transport: endpoint().clone().into(),
pallets: vec!["Proxy".to_owned()],
child_trie: false,
state_snapshot: Some(SnapshotConfig::new(CACHE)),
..Default::default()
}))
.build()
.await
.unwrap();
let cached_ext = Builder::<Block>::new()
.mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
.build()
.await
.unwrap();
assert_eq!(ext.state_version, cached_ext.state_version);
let other = match ext.state_version {
StateVersion::V0 => StateVersion::V1,
StateVersion::V1 => StateVersion::V0,
};
let cached_ext = Builder::<Block>::new()
.mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
.overwrite_state_version(other)
.build()
.await
.unwrap();
assert_eq!(cached_ext.state_version, other);
}
#[tokio::test]
async fn snapshot_block_hash_works() {
const CACHE: &'static str = "snapshot_block_hash_works";
init_logger();
let ext = Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
transport: endpoint().clone().into(),
pallets: vec!["Proxy".to_owned()],
child_trie: false,
state_snapshot: Some(SnapshotConfig::new(CACHE)),
..Default::default()
}))
.build()
.await
.unwrap();
let cached_ext = Builder::<Block>::new()
.mode(Mode::Offline(OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) }))
.build()
.await
.unwrap();
assert_eq!(ext.header.hash(), cached_ext.header.hash());
}
#[tokio::test]
async fn child_keys_are_loaded() {
const CACHE: &'static str = "snapshot_retains_storage";
init_logger();
let mut child_ext = Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
transport: endpoint().clone().into(),
pallets: vec!["Proxy".to_owned()],
child_trie: true,
state_snapshot: Some(SnapshotConfig::new(CACHE)),
..Default::default()
}))
.build()
.await
.unwrap();
let mut ext = Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
transport: endpoint().clone().into(),
pallets: vec!["Proxy".to_owned()],
child_trie: false,
state_snapshot: Some(SnapshotConfig::new(CACHE)),
..Default::default()
}))
.build()
.await
.unwrap();
assert!(
child_ext.as_backend().backend_storage().keys().len() >
ext.as_backend().backend_storage().keys().len()
);
}
#[tokio::test]
async fn offline_else_online_works() {
const CACHE: &'static str = "offline_else_online_works_data";
init_logger();
Builder::<Block>::new()
.mode(Mode::OfflineOrElseOnline(
OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) },
OnlineConfig {
transport: endpoint().clone().into(),
pallets: vec!["Proxy".to_owned()],
child_trie: false,
state_snapshot: Some(SnapshotConfig::new(CACHE)),
..Default::default()
},
))
.build()
.await
.unwrap()
.execute_with(|| {});
Builder::<Block>::new()
.mode(Mode::OfflineOrElseOnline(
OfflineConfig { state_snapshot: SnapshotConfig::new(CACHE) },
OnlineConfig {
transport: "ws://non-existent:666".to_owned().into(),
..Default::default()
},
))
.build()
.await
.unwrap()
.execute_with(|| {});
let to_delete = std::fs::read_dir(Path::new("."))
.unwrap()
.into_iter()
.map(|d| d.unwrap())
.filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
.collect::<Vec<_>>();
assert!(to_delete.len() == 1);
std::fs::remove_file(to_delete[0].path()).unwrap();
}
#[tokio::test]
async fn can_build_one_small_pallet() {
init_logger();
Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
transport: endpoint().clone().into(),
pallets: vec!["Proxy".to_owned()],
child_trie: false,
..Default::default()
}))
.build()
.await
.unwrap()
.execute_with(|| {});
}
#[tokio::test]
async fn can_build_few_pallet() {
init_logger();
Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
transport: endpoint().clone().into(),
pallets: vec!["Proxy".to_owned(), "Multisig".to_owned()],
child_trie: false,
..Default::default()
}))
.build()
.await
.unwrap()
.execute_with(|| {});
}
#[tokio::test(flavor = "multi_thread")]
async fn can_create_snapshot() {
const CACHE: &'static str = "can_create_snapshot";
init_logger();
Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
transport: endpoint().clone().into(),
state_snapshot: Some(SnapshotConfig::new(CACHE)),
pallets: vec!["Proxy".to_owned()],
child_trie: false,
..Default::default()
}))
.build()
.await
.unwrap()
.execute_with(|| {});
let to_delete = std::fs::read_dir(Path::new("."))
.unwrap()
.into_iter()
.map(|d| d.unwrap())
.filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
.collect::<Vec<_>>();
assert!(to_delete.len() == 1);
let to_delete = to_delete.first().unwrap();
assert!(std::fs::metadata(to_delete.path()).unwrap().size() > 1);
std::fs::remove_file(to_delete.path()).unwrap();
}
#[tokio::test]
async fn can_create_child_snapshot() {
const CACHE: &'static str = "can_create_child_snapshot";
init_logger();
Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
transport: endpoint().clone().into(),
state_snapshot: Some(SnapshotConfig::new(CACHE)),
pallets: vec!["Crowdloan".to_owned()],
child_trie: true,
..Default::default()
}))
.build()
.await
.unwrap()
.execute_with(|| {});
let to_delete = std::fs::read_dir(Path::new("."))
.unwrap()
.into_iter()
.map(|d| d.unwrap())
.filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
.collect::<Vec<_>>();
assert!(to_delete.len() == 1);
let to_delete = to_delete.first().unwrap();
assert!(std::fs::metadata(to_delete.path()).unwrap().size() > 1);
std::fs::remove_file(to_delete.path()).unwrap();
}
#[tokio::test]
async fn can_build_big_pallet() {
if std::option_env!("TEST_WS").is_none() {
return
}
init_logger();
Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
transport: endpoint().clone().into(),
pallets: vec!["Staking".to_owned()],
child_trie: false,
..Default::default()
}))
.build()
.await
.unwrap()
.execute_with(|| {});
}
#[tokio::test]
async fn can_fetch_all() {
if std::option_env!("TEST_WS").is_none() {
return
}
init_logger();
Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
transport: endpoint().clone().into(),
..Default::default()
}))
.build()
.await
.unwrap()
.execute_with(|| {});
}
#[tokio::test]
async fn can_fetch_in_parallel() {
init_logger();
let mut builder = Builder::<Block>::new().mode(Mode::Online(OnlineConfig {
transport: endpoint().clone().into(),
..Default::default()
}));
builder.init_remote_client().await.unwrap();
let at = builder.as_online().at.unwrap();
let prefix = StorageKey(vec![13]);
let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap();
let para = builder.rpc_get_keys_parallel(&prefix, at, 4).await.unwrap();
assert_eq!(paged, para);
let prefix = StorageKey(vec![]);
let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap();
let para = builder.rpc_get_keys_parallel(&prefix, at, 8).await.unwrap();
assert_eq!(paged, para);
}
}