sc_rpc_spec_v2/chain_head/
chain_head_storage.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Implementation of the `chainHead_storage` method.
20
21use std::{collections::VecDeque, marker::PhantomData, sync::Arc};
22
23use sc_client_api::{Backend, ChildInfo, StorageKey, StorageProvider};
24use sc_utils::mpsc::TracingUnboundedSender;
25use sp_runtime::traits::Block as BlockT;
26
27use crate::{
28	chain_head::{
29		event::{OperationError, OperationId, OperationStorageItems},
30		subscription::BlockGuard,
31		FollowEvent,
32	},
33	common::{
34		events::{StorageQuery, StorageQueryType},
35		storage::{IterQueryType, QueryIter, QueryIterResult, Storage},
36	},
37};
38
39/// Generates the events of the `chainHead_storage` method.
40pub struct ChainHeadStorage<Client, Block, BE> {
41	/// Storage client.
42	client: Storage<Client, Block, BE>,
43	/// Queue of operations that may require pagination.
44	iter_operations: VecDeque<QueryIter>,
45	/// The maximum number of items reported by the `chainHead_storage` before
46	/// pagination is required.
47	operation_max_storage_items: usize,
48	_phandom: PhantomData<(BE, Block)>,
49}
50
51impl<Client, Block, BE> ChainHeadStorage<Client, Block, BE> {
52	/// Constructs a new [`ChainHeadStorage`].
53	pub fn new(client: Arc<Client>, operation_max_storage_items: usize) -> Self {
54		Self {
55			client: Storage::new(client),
56			iter_operations: VecDeque::new(),
57			operation_max_storage_items,
58			_phandom: PhantomData,
59		}
60	}
61}
62
63impl<Client, Block, BE> ChainHeadStorage<Client, Block, BE>
64where
65	Block: BlockT + 'static,
66	BE: Backend<Block> + 'static,
67	Client: StorageProvider<Block, BE> + 'static,
68{
69	/// Iterate over (key, hash) and (key, value) generating the `WaitingForContinue` event if
70	/// necessary.
71	async fn generate_storage_iter_events(
72		&mut self,
73		mut block_guard: BlockGuard<Block, BE>,
74		hash: Block::Hash,
75		child_key: Option<ChildInfo>,
76	) {
77		let sender = block_guard.response_sender();
78		let operation = block_guard.operation();
79
80		while let Some(query) = self.iter_operations.pop_front() {
81			if operation.was_stopped() {
82				return
83			}
84
85			let result = self.client.query_iter_pagination(
86				query,
87				hash,
88				child_key.as_ref(),
89				self.operation_max_storage_items,
90			);
91			let (events, maybe_next_query) = match result {
92				QueryIterResult::Ok(result) => result,
93				QueryIterResult::Err(error) => {
94					send_error::<Block>(&sender, operation.operation_id(), error.to_string());
95					return
96				},
97			};
98
99			if !events.is_empty() {
100				// Send back the results of the iteration produced so far.
101				let _ = sender.unbounded_send(FollowEvent::<Block::Hash>::OperationStorageItems(
102					OperationStorageItems { operation_id: operation.operation_id(), items: events },
103				));
104			}
105
106			if let Some(next_query) = maybe_next_query {
107				let _ =
108					sender.unbounded_send(FollowEvent::<Block::Hash>::OperationWaitingForContinue(
109						OperationId { operation_id: operation.operation_id() },
110					));
111
112				// The operation might be continued or cancelled only after the
113				// `OperationWaitingForContinue` is generated above.
114				operation.wait_for_continue().await;
115
116				// Give a chance for the other items to advance next time.
117				self.iter_operations.push_back(next_query);
118			}
119		}
120
121		if operation.was_stopped() {
122			return
123		}
124
125		let _ =
126			sender.unbounded_send(FollowEvent::<Block::Hash>::OperationStorageDone(OperationId {
127				operation_id: operation.operation_id(),
128			}));
129	}
130
131	/// Generate the block events for the `chainHead_storage` method.
132	pub async fn generate_events(
133		&mut self,
134		mut block_guard: BlockGuard<Block, BE>,
135		hash: Block::Hash,
136		items: Vec<StorageQuery<StorageKey>>,
137		child_key: Option<ChildInfo>,
138	) {
139		let sender = block_guard.response_sender();
140		let operation = block_guard.operation();
141
142		let mut storage_results = Vec::with_capacity(items.len());
143		for item in items {
144			match item.query_type {
145				StorageQueryType::Value => {
146					match self.client.query_value(hash, &item.key, child_key.as_ref()) {
147						Ok(Some(value)) => storage_results.push(value),
148						Ok(None) => continue,
149						Err(error) => {
150							send_error::<Block>(&sender, operation.operation_id(), error);
151							return
152						},
153					}
154				},
155				StorageQueryType::Hash =>
156					match self.client.query_hash(hash, &item.key, child_key.as_ref()) {
157						Ok(Some(value)) => storage_results.push(value),
158						Ok(None) => continue,
159						Err(error) => {
160							send_error::<Block>(&sender, operation.operation_id(), error);
161							return
162						},
163					},
164				StorageQueryType::ClosestDescendantMerkleValue =>
165					match self.client.query_merkle_value(hash, &item.key, child_key.as_ref()) {
166						Ok(Some(value)) => storage_results.push(value),
167						Ok(None) => continue,
168						Err(error) => {
169							send_error::<Block>(&sender, operation.operation_id(), error);
170							return
171						},
172					},
173				StorageQueryType::DescendantsValues => self.iter_operations.push_back(QueryIter {
174					query_key: item.key,
175					ty: IterQueryType::Value,
176					pagination_start_key: None,
177				}),
178				StorageQueryType::DescendantsHashes => self.iter_operations.push_back(QueryIter {
179					query_key: item.key,
180					ty: IterQueryType::Hash,
181					pagination_start_key: None,
182				}),
183			};
184		}
185
186		if !storage_results.is_empty() {
187			let _ = sender.unbounded_send(FollowEvent::<Block::Hash>::OperationStorageItems(
188				OperationStorageItems {
189					operation_id: operation.operation_id(),
190					items: storage_results,
191				},
192			));
193		}
194
195		self.generate_storage_iter_events(block_guard, hash, child_key).await
196	}
197}
198
199/// Build and send the opaque error back to the `chainHead_follow` method.
200fn send_error<Block: BlockT>(
201	sender: &TracingUnboundedSender<FollowEvent<Block::Hash>>,
202	operation_id: String,
203	error: String,
204) {
205	let _ = sender.unbounded_send(FollowEvent::<Block::Hash>::OperationError(OperationError {
206		operation_id,
207		error,
208	}));
209}