referrerpolicy=no-referrer-when-downgrade

sc_rpc_spec_v2/common/
storage.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Storage queries for the RPC-V2 spec.
20
21use std::{marker::PhantomData, sync::Arc};
22
23use sc_client_api::{Backend, ChildInfo, StorageKey, StorageProvider};
24use sp_runtime::traits::Block as BlockT;
25use tokio::sync::mpsc;
26
27use super::events::{StorageQuery, StorageQueryType, StorageResult, StorageResultType};
28use crate::hex_string;
29
30/// Call into the storage of blocks.
31pub struct Storage<Client, Block, BE> {
32	/// Substrate client.
33	client: Arc<Client>,
34	_phandom: PhantomData<(BE, Block)>,
35}
36
37impl<Client, Block, BE> Clone for Storage<Client, Block, BE> {
38	fn clone(&self) -> Self {
39		Self { client: self.client.clone(), _phandom: PhantomData }
40	}
41}
42
43impl<Client, Block, BE> Storage<Client, Block, BE> {
44	/// Constructs a new [`Storage`].
45	pub fn new(client: Arc<Client>) -> Self {
46		Self { client, _phandom: PhantomData }
47	}
48}
49
50/// Query to iterate over storage.
51#[derive(Debug)]
52pub struct QueryIter {
53	/// The key from which the iteration was started.
54	pub query_key: StorageKey,
55	/// The key after which pagination should resume.
56	pub pagination_start_key: Option<StorageKey>,
57	/// The type of the query (either value or hash).
58	pub ty: IterQueryType,
59}
60
61/// The query type of an iteration.
62#[derive(Debug)]
63pub enum IterQueryType {
64	/// Iterating over (key, value) pairs.
65	Value,
66	/// Iterating over (key, hash) pairs.
67	Hash,
68}
69
70/// The result of making a query call.
71pub type QueryResult = Result<Option<StorageResult>, String>;
72
73impl<Client, Block, BE> Storage<Client, Block, BE>
74where
75	Block: BlockT + 'static,
76	BE: Backend<Block> + 'static,
77	Client: StorageProvider<Block, BE> + 'static,
78{
79	/// Fetch the value from storage.
80	pub fn query_value(
81		&self,
82		hash: Block::Hash,
83		key: &StorageKey,
84		child_key: Option<&ChildInfo>,
85	) -> QueryResult {
86		let result = if let Some(child_key) = child_key {
87			self.client.child_storage(hash, child_key, key)
88		} else {
89			self.client.storage(hash, key)
90		};
91
92		result
93			.map(|opt| {
94				QueryResult::Ok(opt.map(|storage_data| StorageResult {
95					key: hex_string(&key.0),
96					result: StorageResultType::Value(hex_string(&storage_data.0)),
97					child_trie_key: child_key.map(|c| hex_string(&c.storage_key())),
98				}))
99			})
100			.unwrap_or_else(|error| QueryResult::Err(error.to_string()))
101	}
102
103	/// Fetch the hash of a value from storage.
104	pub fn query_hash(
105		&self,
106		hash: Block::Hash,
107		key: &StorageKey,
108		child_key: Option<&ChildInfo>,
109	) -> QueryResult {
110		let result = if let Some(child_key) = child_key {
111			self.client.child_storage_hash(hash, child_key, key)
112		} else {
113			self.client.storage_hash(hash, key)
114		};
115
116		result
117			.map(|opt| {
118				QueryResult::Ok(opt.map(|storage_data| StorageResult {
119					key: hex_string(&key.0),
120					result: StorageResultType::Hash(hex_string(&storage_data.as_ref())),
121					child_trie_key: child_key.map(|c| hex_string(&c.storage_key())),
122				}))
123			})
124			.unwrap_or_else(|error| QueryResult::Err(error.to_string()))
125	}
126
127	/// Fetch the closest merkle value.
128	pub fn query_merkle_value(
129		&self,
130		hash: Block::Hash,
131		key: &StorageKey,
132		child_key: Option<&ChildInfo>,
133	) -> QueryResult {
134		let result = if let Some(ref child_key) = child_key {
135			self.client.child_closest_merkle_value(hash, child_key, key)
136		} else {
137			self.client.closest_merkle_value(hash, key)
138		};
139
140		result
141			.map(|opt| {
142				QueryResult::Ok(opt.map(|storage_data| {
143					let result = match &storage_data {
144						sc_client_api::MerkleValue::Node(data) => hex_string(&data.as_slice()),
145						sc_client_api::MerkleValue::Hash(hash) => hex_string(&hash.as_ref()),
146					};
147
148					StorageResult {
149						key: hex_string(&key.0),
150						result: StorageResultType::ClosestDescendantMerkleValue(result),
151						child_trie_key: child_key.map(|c| hex_string(&c.storage_key())),
152					}
153				}))
154			})
155			.unwrap_or_else(|error| QueryResult::Err(error.to_string()))
156	}
157
158	/// Iterate over the storage keys and send the results to the provided sender.
159	///
160	/// Because this relies on a bounded channel, it will pause the storage iteration
161	// if the channel is becomes full which in turn provides backpressure.
162	pub fn query_iter_pagination_with_producer(
163		&self,
164		query: QueryIter,
165		hash: Block::Hash,
166		child_key: Option<&ChildInfo>,
167		tx: &mpsc::Sender<QueryResult>,
168	) {
169		let QueryIter { ty, query_key, pagination_start_key } = query;
170
171		let maybe_storage = if let Some(child_key) = child_key {
172			self.client.child_storage_keys(
173				hash,
174				child_key.to_owned(),
175				Some(&query_key),
176				pagination_start_key.as_ref(),
177			)
178		} else {
179			self.client.storage_keys(hash, Some(&query_key), pagination_start_key.as_ref())
180		};
181
182		let keys_iter = match maybe_storage {
183			Ok(keys_iter) => keys_iter,
184			Err(error) => {
185				_ = tx.blocking_send(Err(error.to_string()));
186				return;
187			},
188		};
189
190		for key in keys_iter {
191			let result = match ty {
192				IterQueryType::Value => self.query_value(hash, &key, child_key),
193				IterQueryType::Hash => self.query_hash(hash, &key, child_key),
194			};
195
196			if tx.blocking_send(result).is_err() {
197				break;
198			}
199		}
200	}
201
202	/// Raw iterator over the keys.
203	pub fn raw_keys_iter(
204		&self,
205		hash: Block::Hash,
206		child_key: Option<ChildInfo>,
207	) -> Result<impl Iterator<Item = StorageKey>, String> {
208		let keys_iter = if let Some(child_key) = child_key {
209			self.client.child_storage_keys(hash, child_key, None, None)
210		} else {
211			self.client.storage_keys(hash, None, None)
212		};
213
214		keys_iter.map_err(|err| err.to_string())
215	}
216}
217
218/// Generates storage events for `chainHead_storage` and `archive_storage` subscriptions.
219pub struct StorageSubscriptionClient<Client, Block, BE> {
220	/// Storage client.
221	client: Storage<Client, Block, BE>,
222	_phandom: PhantomData<(BE, Block)>,
223}
224
225impl<Client, Block, BE> Clone for StorageSubscriptionClient<Client, Block, BE> {
226	fn clone(&self) -> Self {
227		Self { client: self.client.clone(), _phandom: PhantomData }
228	}
229}
230
231impl<Client, Block, BE> StorageSubscriptionClient<Client, Block, BE> {
232	/// Constructs a new [`StorageSubscriptionClient`].
233	pub fn new(client: Arc<Client>) -> Self {
234		Self { client: Storage::new(client), _phandom: PhantomData }
235	}
236}
237
238impl<Client, Block, BE> StorageSubscriptionClient<Client, Block, BE>
239where
240	Block: BlockT + 'static,
241	BE: Backend<Block> + 'static,
242	Client: StorageProvider<Block, BE> + Send + Sync + 'static,
243{
244	/// Generate storage events to the provided sender.
245	pub async fn generate_events(
246		&mut self,
247		hash: Block::Hash,
248		items: Vec<StorageQuery<StorageKey>>,
249		child_key: Option<ChildInfo>,
250		tx: mpsc::Sender<QueryResult>,
251	) -> Result<(), tokio::task::JoinError> {
252		let this = self.clone();
253
254		tokio::task::spawn_blocking(move || {
255			for item in items {
256				match item.query_type {
257					StorageQueryType::Value => {
258						let rp = this.client.query_value(hash, &item.key, child_key.as_ref());
259						if tx.blocking_send(rp).is_err() {
260							break;
261						}
262					},
263					StorageQueryType::Hash => {
264						let rp = this.client.query_hash(hash, &item.key, child_key.as_ref());
265						if tx.blocking_send(rp).is_err() {
266							break;
267						}
268					},
269					StorageQueryType::ClosestDescendantMerkleValue => {
270						let rp =
271							this.client.query_merkle_value(hash, &item.key, child_key.as_ref());
272						if tx.blocking_send(rp).is_err() {
273							break;
274						}
275					},
276					StorageQueryType::DescendantsValues => {
277						let query = QueryIter {
278							query_key: item.key,
279							ty: IterQueryType::Value,
280							pagination_start_key: None,
281						};
282						this.client.query_iter_pagination_with_producer(
283							query,
284							hash,
285							child_key.as_ref(),
286							&tx,
287						)
288					},
289					StorageQueryType::DescendantsHashes => {
290						let query = QueryIter {
291							query_key: item.key,
292							ty: IterQueryType::Hash,
293							pagination_start_key: None,
294						};
295						this.client.query_iter_pagination_with_producer(
296							query,
297							hash,
298							child_key.as_ref(),
299							&tx,
300						)
301					},
302				}
303			}
304		})
305		.await?;
306
307		Ok(())
308	}
309}