sc_rpc_spec_v2/chain_head/
chain_head_storage.rs1use std::{collections::VecDeque, marker::PhantomData, sync::Arc};
22
23use sc_client_api::{Backend, ChildInfo, StorageKey, StorageProvider};
24use sc_utils::mpsc::TracingUnboundedSender;
25use sp_runtime::traits::Block as BlockT;
26
27use crate::{
28 chain_head::{
29 event::{OperationError, OperationId, OperationStorageItems},
30 subscription::BlockGuard,
31 FollowEvent,
32 },
33 common::{
34 events::{StorageQuery, StorageQueryType},
35 storage::{IterQueryType, QueryIter, QueryIterResult, Storage},
36 },
37};
38
39pub struct ChainHeadStorage<Client, Block, BE> {
41 client: Storage<Client, Block, BE>,
43 iter_operations: VecDeque<QueryIter>,
45 operation_max_storage_items: usize,
48 _phandom: PhantomData<(BE, Block)>,
49}
50
51impl<Client, Block, BE> ChainHeadStorage<Client, Block, BE> {
52 pub fn new(client: Arc<Client>, operation_max_storage_items: usize) -> Self {
54 Self {
55 client: Storage::new(client),
56 iter_operations: VecDeque::new(),
57 operation_max_storage_items,
58 _phandom: PhantomData,
59 }
60 }
61}
62
63impl<Client, Block, BE> ChainHeadStorage<Client, Block, BE>
64where
65 Block: BlockT + 'static,
66 BE: Backend<Block> + 'static,
67 Client: StorageProvider<Block, BE> + 'static,
68{
69 async fn generate_storage_iter_events(
72 &mut self,
73 mut block_guard: BlockGuard<Block, BE>,
74 hash: Block::Hash,
75 child_key: Option<ChildInfo>,
76 ) {
77 let sender = block_guard.response_sender();
78 let operation = block_guard.operation();
79
80 while let Some(query) = self.iter_operations.pop_front() {
81 if operation.was_stopped() {
82 return
83 }
84
85 let result = self.client.query_iter_pagination(
86 query,
87 hash,
88 child_key.as_ref(),
89 self.operation_max_storage_items,
90 );
91 let (events, maybe_next_query) = match result {
92 QueryIterResult::Ok(result) => result,
93 QueryIterResult::Err(error) => {
94 send_error::<Block>(&sender, operation.operation_id(), error.to_string());
95 return
96 },
97 };
98
99 if !events.is_empty() {
100 let _ = sender.unbounded_send(FollowEvent::<Block::Hash>::OperationStorageItems(
102 OperationStorageItems { operation_id: operation.operation_id(), items: events },
103 ));
104 }
105
106 if let Some(next_query) = maybe_next_query {
107 let _ =
108 sender.unbounded_send(FollowEvent::<Block::Hash>::OperationWaitingForContinue(
109 OperationId { operation_id: operation.operation_id() },
110 ));
111
112 operation.wait_for_continue().await;
115
116 self.iter_operations.push_back(next_query);
118 }
119 }
120
121 if operation.was_stopped() {
122 return
123 }
124
125 let _ =
126 sender.unbounded_send(FollowEvent::<Block::Hash>::OperationStorageDone(OperationId {
127 operation_id: operation.operation_id(),
128 }));
129 }
130
131 pub async fn generate_events(
133 &mut self,
134 mut block_guard: BlockGuard<Block, BE>,
135 hash: Block::Hash,
136 items: Vec<StorageQuery<StorageKey>>,
137 child_key: Option<ChildInfo>,
138 ) {
139 let sender = block_guard.response_sender();
140 let operation = block_guard.operation();
141
142 let mut storage_results = Vec::with_capacity(items.len());
143 for item in items {
144 match item.query_type {
145 StorageQueryType::Value => {
146 match self.client.query_value(hash, &item.key, child_key.as_ref()) {
147 Ok(Some(value)) => storage_results.push(value),
148 Ok(None) => continue,
149 Err(error) => {
150 send_error::<Block>(&sender, operation.operation_id(), error);
151 return
152 },
153 }
154 },
155 StorageQueryType::Hash =>
156 match self.client.query_hash(hash, &item.key, child_key.as_ref()) {
157 Ok(Some(value)) => storage_results.push(value),
158 Ok(None) => continue,
159 Err(error) => {
160 send_error::<Block>(&sender, operation.operation_id(), error);
161 return
162 },
163 },
164 StorageQueryType::ClosestDescendantMerkleValue =>
165 match self.client.query_merkle_value(hash, &item.key, child_key.as_ref()) {
166 Ok(Some(value)) => storage_results.push(value),
167 Ok(None) => continue,
168 Err(error) => {
169 send_error::<Block>(&sender, operation.operation_id(), error);
170 return
171 },
172 },
173 StorageQueryType::DescendantsValues => self.iter_operations.push_back(QueryIter {
174 query_key: item.key,
175 ty: IterQueryType::Value,
176 pagination_start_key: None,
177 }),
178 StorageQueryType::DescendantsHashes => self.iter_operations.push_back(QueryIter {
179 query_key: item.key,
180 ty: IterQueryType::Hash,
181 pagination_start_key: None,
182 }),
183 };
184 }
185
186 if !storage_results.is_empty() {
187 let _ = sender.unbounded_send(FollowEvent::<Block::Hash>::OperationStorageItems(
188 OperationStorageItems {
189 operation_id: operation.operation_id(),
190 items: storage_results,
191 },
192 ));
193 }
194
195 self.generate_storage_iter_events(block_guard, hash, child_key).await
196 }
197}
198
199fn send_error<Block: BlockT>(
201 sender: &TracingUnboundedSender<FollowEvent<Block::Hash>>,
202 operation_id: String,
203 error: String,
204) {
205 let _ = sender.unbounded_send(FollowEvent::<Block::Hash>::OperationError(OperationError {
206 operation_id,
207 error,
208 }));
209}