1use std::{marker::PhantomData, sync::Arc};
22
23use sc_client_api::{Backend, ChildInfo, StorageKey, StorageProvider};
24use sp_runtime::traits::Block as BlockT;
25use tokio::sync::mpsc;
26
27use super::events::{StorageQuery, StorageQueryType, StorageResult, StorageResultType};
28use crate::hex_string;
29
30pub struct Storage<Client, Block, BE> {
32 client: Arc<Client>,
34 _phandom: PhantomData<(BE, Block)>,
35}
36
37impl<Client, Block, BE> Clone for Storage<Client, Block, BE> {
38 fn clone(&self) -> Self {
39 Self { client: self.client.clone(), _phandom: PhantomData }
40 }
41}
42
43impl<Client, Block, BE> Storage<Client, Block, BE> {
44 pub fn new(client: Arc<Client>) -> Self {
46 Self { client, _phandom: PhantomData }
47 }
48}
49
50#[derive(Debug)]
52pub struct QueryIter {
53 pub query_key: StorageKey,
55 pub pagination_start_key: Option<StorageKey>,
57 pub ty: IterQueryType,
59}
60
61#[derive(Debug)]
63pub enum IterQueryType {
64 Value,
66 Hash,
68}
69
70pub type QueryResult = Result<Option<StorageResult>, String>;
72
73impl<Client, Block, BE> Storage<Client, Block, BE>
74where
75 Block: BlockT + 'static,
76 BE: Backend<Block> + 'static,
77 Client: StorageProvider<Block, BE> + 'static,
78{
79 pub fn query_value(
81 &self,
82 hash: Block::Hash,
83 key: &StorageKey,
84 child_key: Option<&ChildInfo>,
85 ) -> QueryResult {
86 let result = if let Some(child_key) = child_key {
87 self.client.child_storage(hash, child_key, key)
88 } else {
89 self.client.storage(hash, key)
90 };
91
92 result
93 .map(|opt| {
94 QueryResult::Ok(opt.map(|storage_data| StorageResult {
95 key: hex_string(&key.0),
96 result: StorageResultType::Value(hex_string(&storage_data.0)),
97 child_trie_key: child_key.map(|c| hex_string(&c.storage_key())),
98 }))
99 })
100 .unwrap_or_else(|error| QueryResult::Err(error.to_string()))
101 }
102
103 pub fn query_hash(
105 &self,
106 hash: Block::Hash,
107 key: &StorageKey,
108 child_key: Option<&ChildInfo>,
109 ) -> QueryResult {
110 let result = if let Some(child_key) = child_key {
111 self.client.child_storage_hash(hash, child_key, key)
112 } else {
113 self.client.storage_hash(hash, key)
114 };
115
116 result
117 .map(|opt| {
118 QueryResult::Ok(opt.map(|storage_data| StorageResult {
119 key: hex_string(&key.0),
120 result: StorageResultType::Hash(hex_string(&storage_data.as_ref())),
121 child_trie_key: child_key.map(|c| hex_string(&c.storage_key())),
122 }))
123 })
124 .unwrap_or_else(|error| QueryResult::Err(error.to_string()))
125 }
126
127 pub fn query_merkle_value(
129 &self,
130 hash: Block::Hash,
131 key: &StorageKey,
132 child_key: Option<&ChildInfo>,
133 ) -> QueryResult {
134 let result = if let Some(ref child_key) = child_key {
135 self.client.child_closest_merkle_value(hash, child_key, key)
136 } else {
137 self.client.closest_merkle_value(hash, key)
138 };
139
140 result
141 .map(|opt| {
142 QueryResult::Ok(opt.map(|storage_data| {
143 let result = match &storage_data {
144 sc_client_api::MerkleValue::Node(data) => hex_string(&data.as_slice()),
145 sc_client_api::MerkleValue::Hash(hash) => hex_string(&hash.as_ref()),
146 };
147
148 StorageResult {
149 key: hex_string(&key.0),
150 result: StorageResultType::ClosestDescendantMerkleValue(result),
151 child_trie_key: child_key.map(|c| hex_string(&c.storage_key())),
152 }
153 }))
154 })
155 .unwrap_or_else(|error| QueryResult::Err(error.to_string()))
156 }
157
158 pub fn query_iter_pagination_with_producer(
163 &self,
164 query: QueryIter,
165 hash: Block::Hash,
166 child_key: Option<&ChildInfo>,
167 tx: &mpsc::Sender<QueryResult>,
168 ) {
169 let QueryIter { ty, query_key, pagination_start_key } = query;
170
171 let maybe_storage = if let Some(child_key) = child_key {
172 self.client.child_storage_keys(
173 hash,
174 child_key.to_owned(),
175 Some(&query_key),
176 pagination_start_key.as_ref(),
177 )
178 } else {
179 self.client.storage_keys(hash, Some(&query_key), pagination_start_key.as_ref())
180 };
181
182 let keys_iter = match maybe_storage {
183 Ok(keys_iter) => keys_iter,
184 Err(error) => {
185 _ = tx.blocking_send(Err(error.to_string()));
186 return;
187 },
188 };
189
190 for key in keys_iter {
191 let result = match ty {
192 IterQueryType::Value => self.query_value(hash, &key, child_key),
193 IterQueryType::Hash => self.query_hash(hash, &key, child_key),
194 };
195
196 if tx.blocking_send(result).is_err() {
197 break;
198 }
199 }
200 }
201
202 pub fn raw_keys_iter(
204 &self,
205 hash: Block::Hash,
206 child_key: Option<ChildInfo>,
207 ) -> Result<impl Iterator<Item = StorageKey>, String> {
208 let keys_iter = if let Some(child_key) = child_key {
209 self.client.child_storage_keys(hash, child_key, None, None)
210 } else {
211 self.client.storage_keys(hash, None, None)
212 };
213
214 keys_iter.map_err(|err| err.to_string())
215 }
216}
217
218pub struct StorageSubscriptionClient<Client, Block, BE> {
220 client: Storage<Client, Block, BE>,
222 _phandom: PhantomData<(BE, Block)>,
223}
224
225impl<Client, Block, BE> Clone for StorageSubscriptionClient<Client, Block, BE> {
226 fn clone(&self) -> Self {
227 Self { client: self.client.clone(), _phandom: PhantomData }
228 }
229}
230
231impl<Client, Block, BE> StorageSubscriptionClient<Client, Block, BE> {
232 pub fn new(client: Arc<Client>) -> Self {
234 Self { client: Storage::new(client), _phandom: PhantomData }
235 }
236}
237
238impl<Client, Block, BE> StorageSubscriptionClient<Client, Block, BE>
239where
240 Block: BlockT + 'static,
241 BE: Backend<Block> + 'static,
242 Client: StorageProvider<Block, BE> + Send + Sync + 'static,
243{
244 pub async fn generate_events(
246 &mut self,
247 hash: Block::Hash,
248 items: Vec<StorageQuery<StorageKey>>,
249 child_key: Option<ChildInfo>,
250 tx: mpsc::Sender<QueryResult>,
251 ) -> Result<(), tokio::task::JoinError> {
252 let this = self.clone();
253
254 tokio::task::spawn_blocking(move || {
255 for item in items {
256 match item.query_type {
257 StorageQueryType::Value => {
258 let rp = this.client.query_value(hash, &item.key, child_key.as_ref());
259 if tx.blocking_send(rp).is_err() {
260 break;
261 }
262 },
263 StorageQueryType::Hash => {
264 let rp = this.client.query_hash(hash, &item.key, child_key.as_ref());
265 if tx.blocking_send(rp).is_err() {
266 break;
267 }
268 },
269 StorageQueryType::ClosestDescendantMerkleValue => {
270 let rp =
271 this.client.query_merkle_value(hash, &item.key, child_key.as_ref());
272 if tx.blocking_send(rp).is_err() {
273 break;
274 }
275 },
276 StorageQueryType::DescendantsValues => {
277 let query = QueryIter {
278 query_key: item.key,
279 ty: IterQueryType::Value,
280 pagination_start_key: None,
281 };
282 this.client.query_iter_pagination_with_producer(
283 query,
284 hash,
285 child_key.as_ref(),
286 &tx,
287 )
288 },
289 StorageQueryType::DescendantsHashes => {
290 let query = QueryIter {
291 query_key: item.key,
292 ty: IterQueryType::Hash,
293 pagination_start_key: None,
294 };
295 this.client.query_iter_pagination_with_producer(
296 query,
297 hash,
298 child_key.as_ref(),
299 &tx,
300 )
301 },
302 }
303 }
304 })
305 .await?;
306
307 Ok(())
308 }
309}