use std::{collections::VecDeque, marker::PhantomData, sync::Arc};
use sc_client_api::{Backend, ChildInfo, StorageKey, StorageProvider};
use sc_utils::mpsc::TracingUnboundedSender;
use sp_runtime::traits::Block as BlockT;
use crate::{
chain_head::{
event::{OperationError, OperationId, OperationStorageItems},
subscription::BlockGuard,
FollowEvent,
},
common::{
events::{StorageQuery, StorageQueryType},
storage::{IterQueryType, QueryIter, QueryIterResult, Storage},
},
};
pub struct ChainHeadStorage<Client, Block, BE> {
client: Storage<Client, Block, BE>,
iter_operations: VecDeque<QueryIter>,
operation_max_storage_items: usize,
_phandom: PhantomData<(BE, Block)>,
}
impl<Client, Block, BE> ChainHeadStorage<Client, Block, BE> {
pub fn new(client: Arc<Client>, operation_max_storage_items: usize) -> Self {
Self {
client: Storage::new(client),
iter_operations: VecDeque::new(),
operation_max_storage_items,
_phandom: PhantomData,
}
}
}
impl<Client, Block, BE> ChainHeadStorage<Client, Block, BE>
where
Block: BlockT + 'static,
BE: Backend<Block> + 'static,
Client: StorageProvider<Block, BE> + 'static,
{
async fn generate_storage_iter_events(
&mut self,
mut block_guard: BlockGuard<Block, BE>,
hash: Block::Hash,
child_key: Option<ChildInfo>,
) {
let sender = block_guard.response_sender();
let operation = block_guard.operation();
while let Some(query) = self.iter_operations.pop_front() {
if operation.was_stopped() {
return
}
let result = self.client.query_iter_pagination(
query,
hash,
child_key.as_ref(),
self.operation_max_storage_items,
);
let (events, maybe_next_query) = match result {
QueryIterResult::Ok(result) => result,
QueryIterResult::Err(error) => {
send_error::<Block>(&sender, operation.operation_id(), error.to_string());
return
},
};
if !events.is_empty() {
let _ = sender.unbounded_send(FollowEvent::<Block::Hash>::OperationStorageItems(
OperationStorageItems { operation_id: operation.operation_id(), items: events },
));
}
if let Some(next_query) = maybe_next_query {
let _ =
sender.unbounded_send(FollowEvent::<Block::Hash>::OperationWaitingForContinue(
OperationId { operation_id: operation.operation_id() },
));
operation.wait_for_continue().await;
self.iter_operations.push_back(next_query);
}
}
if operation.was_stopped() {
return
}
let _ =
sender.unbounded_send(FollowEvent::<Block::Hash>::OperationStorageDone(OperationId {
operation_id: operation.operation_id(),
}));
}
pub async fn generate_events(
&mut self,
mut block_guard: BlockGuard<Block, BE>,
hash: Block::Hash,
items: Vec<StorageQuery<StorageKey>>,
child_key: Option<ChildInfo>,
) {
let sender = block_guard.response_sender();
let operation = block_guard.operation();
let mut storage_results = Vec::with_capacity(items.len());
for item in items {
match item.query_type {
StorageQueryType::Value => {
match self.client.query_value(hash, &item.key, child_key.as_ref()) {
Ok(Some(value)) => storage_results.push(value),
Ok(None) => continue,
Err(error) => {
send_error::<Block>(&sender, operation.operation_id(), error);
return
},
}
},
StorageQueryType::Hash =>
match self.client.query_hash(hash, &item.key, child_key.as_ref()) {
Ok(Some(value)) => storage_results.push(value),
Ok(None) => continue,
Err(error) => {
send_error::<Block>(&sender, operation.operation_id(), error);
return
},
},
StorageQueryType::ClosestDescendantMerkleValue =>
match self.client.query_merkle_value(hash, &item.key, child_key.as_ref()) {
Ok(Some(value)) => storage_results.push(value),
Ok(None) => continue,
Err(error) => {
send_error::<Block>(&sender, operation.operation_id(), error);
return
},
},
StorageQueryType::DescendantsValues => self.iter_operations.push_back(QueryIter {
query_key: item.key,
ty: IterQueryType::Value,
pagination_start_key: None,
}),
StorageQueryType::DescendantsHashes => self.iter_operations.push_back(QueryIter {
query_key: item.key,
ty: IterQueryType::Hash,
pagination_start_key: None,
}),
};
}
if !storage_results.is_empty() {
let _ = sender.unbounded_send(FollowEvent::<Block::Hash>::OperationStorageItems(
OperationStorageItems {
operation_id: operation.operation_id(),
items: storage_results,
},
));
}
self.generate_storage_iter_events(block_guard, hash, child_key).await
}
}
fn send_error<Block: BlockT>(
sender: &TracingUnboundedSender<FollowEvent<Block::Hash>>,
operation_id: String,
error: String,
) {
let _ = sender.unbounded_send(FollowEvent::<Block::Hash>::OperationError(OperationError {
operation_id,
error,
}));
}