use std::{marker::PhantomData, sync::Arc};
use sc_client_api::{Backend, ChildInfo, StorageKey, StorageProvider};
use sp_runtime::traits::Block as BlockT;
use tokio::sync::mpsc;
use super::events::{StorageQuery, StorageQueryType, StorageResult, StorageResultType};
use crate::hex_string;
pub struct Storage<Client, Block, BE> {
client: Arc<Client>,
_phandom: PhantomData<(BE, Block)>,
}
impl<Client, Block, BE> Clone for Storage<Client, Block, BE> {
fn clone(&self) -> Self {
Self { client: self.client.clone(), _phandom: PhantomData }
}
}
impl<Client, Block, BE> Storage<Client, Block, BE> {
pub fn new(client: Arc<Client>) -> Self {
Self { client, _phandom: PhantomData }
}
}
#[derive(Debug)]
pub struct QueryIter {
pub query_key: StorageKey,
pub pagination_start_key: Option<StorageKey>,
pub ty: IterQueryType,
}
#[derive(Debug)]
pub enum IterQueryType {
Value,
Hash,
}
pub type QueryResult = Result<Option<StorageResult>, String>;
impl<Client, Block, BE> Storage<Client, Block, BE>
where
Block: BlockT + 'static,
BE: Backend<Block> + 'static,
Client: StorageProvider<Block, BE> + 'static,
{
pub fn query_value(
&self,
hash: Block::Hash,
key: &StorageKey,
child_key: Option<&ChildInfo>,
) -> QueryResult {
let result = if let Some(child_key) = child_key {
self.client.child_storage(hash, child_key, key)
} else {
self.client.storage(hash, key)
};
result
.map(|opt| {
QueryResult::Ok(opt.map(|storage_data| StorageResult {
key: hex_string(&key.0),
result: StorageResultType::Value(hex_string(&storage_data.0)),
child_trie_key: child_key.map(|c| hex_string(&c.storage_key())),
}))
})
.unwrap_or_else(|error| QueryResult::Err(error.to_string()))
}
pub fn query_hash(
&self,
hash: Block::Hash,
key: &StorageKey,
child_key: Option<&ChildInfo>,
) -> QueryResult {
let result = if let Some(child_key) = child_key {
self.client.child_storage_hash(hash, child_key, key)
} else {
self.client.storage_hash(hash, key)
};
result
.map(|opt| {
QueryResult::Ok(opt.map(|storage_data| StorageResult {
key: hex_string(&key.0),
result: StorageResultType::Hash(hex_string(&storage_data.as_ref())),
child_trie_key: child_key.map(|c| hex_string(&c.storage_key())),
}))
})
.unwrap_or_else(|error| QueryResult::Err(error.to_string()))
}
pub fn query_merkle_value(
&self,
hash: Block::Hash,
key: &StorageKey,
child_key: Option<&ChildInfo>,
) -> QueryResult {
let result = if let Some(ref child_key) = child_key {
self.client.child_closest_merkle_value(hash, child_key, key)
} else {
self.client.closest_merkle_value(hash, key)
};
result
.map(|opt| {
QueryResult::Ok(opt.map(|storage_data| {
let result = match &storage_data {
sc_client_api::MerkleValue::Node(data) => hex_string(&data.as_slice()),
sc_client_api::MerkleValue::Hash(hash) => hex_string(&hash.as_ref()),
};
StorageResult {
key: hex_string(&key.0),
result: StorageResultType::ClosestDescendantMerkleValue(result),
child_trie_key: child_key.map(|c| hex_string(&c.storage_key())),
}
}))
})
.unwrap_or_else(|error| QueryResult::Err(error.to_string()))
}
pub fn query_iter_pagination_with_producer(
&self,
query: QueryIter,
hash: Block::Hash,
child_key: Option<&ChildInfo>,
tx: &mpsc::Sender<QueryResult>,
) {
let QueryIter { ty, query_key, pagination_start_key } = query;
let maybe_storage = if let Some(child_key) = child_key {
self.client.child_storage_keys(
hash,
child_key.to_owned(),
Some(&query_key),
pagination_start_key.as_ref(),
)
} else {
self.client.storage_keys(hash, Some(&query_key), pagination_start_key.as_ref())
};
let keys_iter = match maybe_storage {
Ok(keys_iter) => keys_iter,
Err(error) => {
_ = tx.blocking_send(Err(error.to_string()));
return;
},
};
for key in keys_iter {
let result = match ty {
IterQueryType::Value => self.query_value(hash, &key, child_key),
IterQueryType::Hash => self.query_hash(hash, &key, child_key),
};
if tx.blocking_send(result).is_err() {
break;
}
}
}
pub fn raw_keys_iter(
&self,
hash: Block::Hash,
child_key: Option<ChildInfo>,
) -> Result<impl Iterator<Item = StorageKey>, String> {
let keys_iter = if let Some(child_key) = child_key {
self.client.child_storage_keys(hash, child_key, None, None)
} else {
self.client.storage_keys(hash, None, None)
};
keys_iter.map_err(|err| err.to_string())
}
}
pub struct StorageSubscriptionClient<Client, Block, BE> {
client: Storage<Client, Block, BE>,
_phandom: PhantomData<(BE, Block)>,
}
impl<Client, Block, BE> Clone for StorageSubscriptionClient<Client, Block, BE> {
fn clone(&self) -> Self {
Self { client: self.client.clone(), _phandom: PhantomData }
}
}
impl<Client, Block, BE> StorageSubscriptionClient<Client, Block, BE> {
pub fn new(client: Arc<Client>) -> Self {
Self { client: Storage::new(client), _phandom: PhantomData }
}
}
impl<Client, Block, BE> StorageSubscriptionClient<Client, Block, BE>
where
Block: BlockT + 'static,
BE: Backend<Block> + 'static,
Client: StorageProvider<Block, BE> + Send + Sync + 'static,
{
pub async fn generate_events(
&mut self,
hash: Block::Hash,
items: Vec<StorageQuery<StorageKey>>,
child_key: Option<ChildInfo>,
tx: mpsc::Sender<QueryResult>,
) -> Result<(), tokio::task::JoinError> {
let this = self.clone();
tokio::task::spawn_blocking(move || {
for item in items {
match item.query_type {
StorageQueryType::Value => {
let rp = this.client.query_value(hash, &item.key, child_key.as_ref());
if tx.blocking_send(rp).is_err() {
break;
}
},
StorageQueryType::Hash => {
let rp = this.client.query_hash(hash, &item.key, child_key.as_ref());
if tx.blocking_send(rp).is_err() {
break;
}
},
StorageQueryType::ClosestDescendantMerkleValue => {
let rp =
this.client.query_merkle_value(hash, &item.key, child_key.as_ref());
if tx.blocking_send(rp).is_err() {
break;
}
},
StorageQueryType::DescendantsValues => {
let query = QueryIter {
query_key: item.key,
ty: IterQueryType::Value,
pagination_start_key: None,
};
this.client.query_iter_pagination_with_producer(
query,
hash,
child_key.as_ref(),
&tx,
)
},
StorageQueryType::DescendantsHashes => {
let query = QueryIter {
query_key: item.key,
ty: IterQueryType::Hash,
pagination_start_key: None,
};
this.client.query_iter_pagination_with_producer(
query,
hash,
child_key.as_ref(),
&tx,
)
},
}
}
})
.await?;
Ok(())
}
}