sc_rpc_spec_v2/chain_head/
chain_head_storage.rs1use std::{marker::PhantomData, sync::Arc};
22
23use sc_client_api::{Backend, ChildInfo, StorageKey, StorageProvider};
24use sp_runtime::traits::Block as BlockT;
25use tokio::sync::mpsc;
26
27use crate::common::{
28 events::{StorageQuery, StorageQueryType},
29 storage::{IterQueryType, QueryIter, QueryResult, Storage},
30};
31
32pub struct ChainHeadStorage<Client, Block, BE> {
34 client: Storage<Client, Block, BE>,
36 _phandom: PhantomData<(BE, Block)>,
37}
38
39impl<Client, Block, BE> Clone for ChainHeadStorage<Client, Block, BE> {
40 fn clone(&self) -> Self {
41 Self { client: self.client.clone(), _phandom: PhantomData }
42 }
43}
44
45impl<Client, Block, BE> ChainHeadStorage<Client, Block, BE> {
46 pub fn new(client: Arc<Client>) -> Self {
48 Self { client: Storage::new(client), _phandom: PhantomData }
49 }
50}
51
52impl<Client, Block, BE> ChainHeadStorage<Client, Block, BE>
53where
54 Block: BlockT + 'static,
55 BE: Backend<Block> + 'static,
56 Client: StorageProvider<Block, BE> + Send + Sync + 'static,
57{
58 pub async fn generate_events(
60 &mut self,
61 hash: Block::Hash,
62 items: Vec<StorageQuery<StorageKey>>,
63 child_key: Option<ChildInfo>,
64 tx: mpsc::Sender<QueryResult>,
65 ) -> Result<(), tokio::task::JoinError> {
66 let this = self.clone();
67
68 tokio::task::spawn_blocking(move || {
69 for item in items {
70 match item.query_type {
71 StorageQueryType::Value => {
72 let rp = this.client.query_value(hash, &item.key, child_key.as_ref());
73 if tx.blocking_send(rp).is_err() {
74 break;
75 }
76 },
77 StorageQueryType::Hash => {
78 let rp = this.client.query_hash(hash, &item.key, child_key.as_ref());
79 if tx.blocking_send(rp).is_err() {
80 break;
81 }
82 },
83 StorageQueryType::ClosestDescendantMerkleValue => {
84 let rp =
85 this.client.query_merkle_value(hash, &item.key, child_key.as_ref());
86 if tx.blocking_send(rp).is_err() {
87 break;
88 }
89 },
90 StorageQueryType::DescendantsValues => {
91 let query = QueryIter {
92 query_key: item.key,
93 ty: IterQueryType::Value,
94 pagination_start_key: None,
95 };
96 this.client.query_iter_pagination_with_producer(
97 query,
98 hash,
99 child_key.as_ref(),
100 &tx,
101 )
102 },
103 StorageQueryType::DescendantsHashes => {
104 let query = QueryIter {
105 query_key: item.key,
106 ty: IterQueryType::Hash,
107 pagination_start_key: None,
108 };
109 this.client.query_iter_pagination_with_producer(
110 query,
111 hash,
112 child_key.as_ref(),
113 &tx,
114 )
115 },
116 }
117 }
118 })
119 .await?;
120
121 Ok(())
122 }
123}