1use crate::{
22 archive::{
23 archive_storage::ArchiveStorageDiff,
24 error::{Error as ArchiveError, Infallible},
25 types::MethodResult,
26 ArchiveApiServer,
27 },
28 common::{
29 events::{
30 ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageEvent, StorageQuery,
31 },
32 storage::{QueryResult, StorageSubscriptionClient},
33 },
34 hex_string, SubscriptionTaskExecutor,
35};
36
37use codec::Encode;
38use futures::FutureExt;
39use jsonrpsee::{core::async_trait, PendingSubscriptionSink};
40use sc_client_api::{
41 Backend, BlockBackend, BlockchainEvents, CallExecutor, ChildInfo, ExecutorProvider, StorageKey,
42 StorageProvider,
43};
44use sc_rpc::utils::Subscription;
45use sp_api::{CallApiAt, CallContext};
46use sp_blockchain::{
47 Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata,
48};
49use sp_core::{Bytes, U256};
50use sp_runtime::{
51 traits::{Block as BlockT, Header as HeaderT, NumberFor},
52 SaturatedConversion,
53};
54use std::{collections::HashSet, marker::PhantomData, sync::Arc};
55
56use tokio::sync::mpsc;
57
58pub(crate) const LOG_TARGET: &str = "rpc-spec-v2::archive";
59
60const STORAGE_QUERY_BUF: usize = 16;
65
66pub struct Archive<BE: Backend<Block>, Block: BlockT, Client> {
68 client: Arc<Client>,
70 backend: Arc<BE>,
72 executor: SubscriptionTaskExecutor,
74 genesis_hash: String,
76 _phantom: PhantomData<Block>,
78}
79
80impl<BE: Backend<Block>, Block: BlockT, Client> Archive<BE, Block, Client> {
81 pub fn new<GenesisHash: AsRef<[u8]>>(
83 client: Arc<Client>,
84 backend: Arc<BE>,
85 genesis_hash: GenesisHash,
86 executor: SubscriptionTaskExecutor,
87 ) -> Self {
88 let genesis_hash = hex_string(&genesis_hash.as_ref());
89 Self { client, backend, executor, genesis_hash, _phantom: PhantomData }
90 }
91}
92
93fn parse_hex_param(param: String) -> Result<Vec<u8>, ArchiveError> {
97 if param.is_empty() {
99 return Ok(Default::default())
100 }
101
102 array_bytes::hex2bytes(¶m).map_err(|_| ArchiveError::InvalidParam(param))
103}
104
105#[async_trait]
106impl<BE, Block, Client> ArchiveApiServer<Block::Hash> for Archive<BE, Block, Client>
107where
108 Block: BlockT + 'static,
109 Block::Header: Unpin,
110 BE: Backend<Block> + 'static,
111 Client: BlockBackend<Block>
112 + ExecutorProvider<Block>
113 + HeaderBackend<Block>
114 + HeaderMetadata<Block, Error = BlockChainError>
115 + BlockchainEvents<Block>
116 + CallApiAt<Block>
117 + StorageProvider<Block, BE>
118 + 'static,
119{
120 fn archive_v1_body(&self, hash: Block::Hash) -> Result<Option<Vec<String>>, Infallible> {
121 let Ok(Some(signed_block)) = self.client.block(hash) else { return Ok(None) };
122
123 let extrinsics = signed_block
124 .block
125 .extrinsics()
126 .iter()
127 .map(|extrinsic| hex_string(&extrinsic.encode()))
128 .collect();
129
130 Ok(Some(extrinsics))
131 }
132
133 fn archive_v1_genesis_hash(&self) -> Result<String, Infallible> {
134 Ok(self.genesis_hash.clone())
135 }
136
137 fn archive_v1_header(&self, hash: Block::Hash) -> Result<Option<String>, Infallible> {
138 let Ok(Some(header)) = self.client.header(hash) else { return Ok(None) };
139
140 Ok(Some(hex_string(&header.encode())))
141 }
142
143 fn archive_v1_finalized_height(&self) -> Result<u64, Infallible> {
144 Ok(self.client.info().finalized_number.saturated_into())
145 }
146
147 fn archive_v1_hash_by_height(&self, height: u64) -> Result<Vec<String>, ArchiveError> {
148 let height: NumberFor<Block> = U256::from(height)
149 .try_into()
150 .map_err(|_| ArchiveError::InvalidParam(format!("Invalid block height: {}", height)))?;
151
152 let finalized_num = self.client.info().finalized_number;
153
154 if finalized_num >= height {
155 let Ok(Some(hash)) = self.client.block_hash(height) else { return Ok(vec![]) };
156 return Ok(vec![hex_string(&hash.as_ref())])
157 }
158
159 let blockchain = self.backend.blockchain();
160 let mut headers: Vec<_> = blockchain
162 .leaves()
163 .map_err(|error| ArchiveError::FetchLeaves(error.to_string()))?
164 .into_iter()
165 .filter_map(|hash| {
166 let Ok(Some(header)) = self.client.header(hash) else { return None };
167
168 if header.number() < &height {
169 return None
170 }
171
172 Some(header)
173 })
174 .collect();
175
176 let mut result = Vec::new();
177 let mut visited = HashSet::new();
178
179 while let Some(header) = headers.pop() {
180 if header.number() == &height {
181 result.push(hex_string(&header.hash().as_ref()));
182 continue
183 }
184
185 let parent_hash = *header.parent_hash();
186
187 if visited.insert(parent_hash) {
190 let Ok(Some(next_header)) = self.client.header(parent_hash) else { continue };
191 headers.push(next_header);
192 }
193 }
194
195 Ok(result)
196 }
197
198 fn archive_v1_call(
199 &self,
200 hash: Block::Hash,
201 function: String,
202 call_parameters: String,
203 ) -> Result<MethodResult, ArchiveError> {
204 let call_parameters = Bytes::from(parse_hex_param(call_parameters)?);
205
206 let result =
207 self.client
208 .executor()
209 .call(hash, &function, &call_parameters, CallContext::Offchain);
210
211 Ok(match result {
212 Ok(result) => MethodResult::ok(hex_string(&result)),
213 Err(error) => MethodResult::err(error.to_string()),
214 })
215 }
216
217 fn archive_v1_storage(
218 &self,
219 pending: PendingSubscriptionSink,
220 hash: Block::Hash,
221 items: Vec<StorageQuery<String>>,
222 child_trie: Option<String>,
223 ) {
224 let mut storage_client =
225 StorageSubscriptionClient::<Client, Block, BE>::new(self.client.clone());
226
227 let fut = async move {
228 let Ok(mut sink) = pending.accept().await.map(Subscription::from) else { return };
229
230 let items = match items
231 .into_iter()
232 .map(|query| {
233 let key = StorageKey(parse_hex_param(query.key)?);
234
235 if query.pagination_start_key.is_some() &&
237 !query.query_type.is_descendant_query()
238 {
239 return Err(ArchiveError::InvalidParam(
240 "paginationStartKey is only valid for descendantsValues and descendantsHashes query types"
241 .to_string(),
242 ));
243 }
244
245 let pagination_start_key = query
246 .pagination_start_key
247 .map(|key| parse_hex_param(key).map(StorageKey))
248 .transpose()?;
249
250 Ok(StorageQuery { key, query_type: query.query_type, pagination_start_key })
251 })
252 .collect::<Result<Vec<_>, ArchiveError>>()
253 {
254 Ok(items) => items,
255 Err(error) => {
256 let _ = sink.send(&ArchiveStorageEvent::err(error.to_string())).await;
257 return
258 },
259 };
260
261 let child_trie = child_trie.map(|child_trie| parse_hex_param(child_trie)).transpose();
262 let child_trie = match child_trie {
263 Ok(child_trie) => child_trie.map(ChildInfo::new_default_from_vec),
264 Err(error) => {
265 let _ = sink.send(&ArchiveStorageEvent::err(error.to_string())).await;
266 return
267 },
268 };
269
270 let (tx, mut rx) = tokio::sync::mpsc::channel(STORAGE_QUERY_BUF);
271 let storage_fut = storage_client.generate_events(hash, items, child_trie, tx);
272
273 let _ = futures::future::join(storage_fut, process_storage_events(&mut rx, &mut sink))
278 .await;
279 };
280
281 self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
282 }
283
284 fn archive_v1_storage_diff(
285 &self,
286 pending: PendingSubscriptionSink,
287 hash: Block::Hash,
288 items: Vec<ArchiveStorageDiffItem<String>>,
289 previous_hash: Option<Block::Hash>,
290 ) {
291 let storage_client = ArchiveStorageDiff::new(self.client.clone());
292 let client = self.client.clone();
293
294 log::trace!(target: LOG_TARGET, "Storage diff subscription started");
295
296 let fut = async move {
297 let Ok(mut sink) = pending.accept().await.map(Subscription::from) else { return };
298
299 let previous_hash = if let Some(previous_hash) = previous_hash {
300 previous_hash
301 } else {
302 let Ok(Some(current_header)) = client.header(hash) else {
303 let message = format!("Block header is not present: {hash}");
304 let _ = sink.send(&ArchiveStorageDiffEvent::err(message)).await;
305 return
306 };
307 *current_header.parent_hash()
308 };
309
310 let (tx, mut rx) = tokio::sync::mpsc::channel(STORAGE_QUERY_BUF);
311 let storage_fut =
312 storage_client.handle_trie_queries(hash, items, previous_hash, tx.clone());
313
314 let _ =
319 futures::future::join(storage_fut, process_storage_diff_events(&mut rx, &mut sink))
320 .await;
321 };
322
323 self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
324 }
325}
326
327async fn process_storage_diff_events(
329 rx: &mut mpsc::Receiver<ArchiveStorageDiffEvent>,
330 sink: &mut Subscription,
331) {
332 loop {
333 tokio::select! {
334 _ = sink.closed() => {
335 return
336 },
337
338 maybe_event = rx.recv() => {
339 let Some(event) = maybe_event else {
340 break;
341 };
342
343 if event.is_done() {
344 log::debug!(target: LOG_TARGET, "Finished processing partial trie query");
345 } else if event.is_err() {
346 log::debug!(target: LOG_TARGET, "Error encountered while processing partial trie query");
347 }
348
349 if sink.send(&event).await.is_err() {
350 return
351 }
352 }
353 }
354 }
355}
356
357async fn process_storage_events(rx: &mut mpsc::Receiver<QueryResult>, sink: &mut Subscription) {
359 loop {
360 tokio::select! {
361 _ = sink.closed() => {
362 break
363 }
364
365 maybe_storage = rx.recv() => {
366 let Some(event) = maybe_storage else {
367 break;
368 };
369
370 match event {
371 Ok(None) => continue,
372
373 Ok(Some(event)) =>
374 if sink.send(&ArchiveStorageEvent::result(event)).await.is_err() {
375 return
376 },
377
378 Err(error) => {
379 let _ = sink.send(&ArchiveStorageEvent::err(error)).await;
380 return
381 }
382 }
383 }
384 }
385 }
386
387 let _ = sink.send(&ArchiveStorageEvent::StorageDone).await;
388}