1use crate::{
22 archive::{
23 archive_storage::ArchiveStorageDiff,
24 error::{Error as ArchiveError, Infallible},
25 types::MethodResult,
26 ArchiveApiServer,
27 },
28 common::{
29 events::{
30 ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageEvent, StorageQuery,
31 },
32 storage::{QueryResult, StorageSubscriptionClient},
33 },
34 hex_string, SubscriptionTaskExecutor,
35};
36
37use codec::Encode;
38use futures::FutureExt;
39use jsonrpsee::{core::async_trait, PendingSubscriptionSink};
40use sc_client_api::{
41 Backend, BlockBackend, BlockchainEvents, CallExecutor, ChildInfo, ExecutorProvider, StorageKey,
42 StorageProvider,
43};
44use sc_rpc::utils::Subscription;
45use sp_api::{CallApiAt, CallContext};
46use sp_blockchain::{
47 Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata,
48};
49use sp_core::{Bytes, U256};
50use sp_runtime::{
51 traits::{Block as BlockT, Header as HeaderT, NumberFor},
52 SaturatedConversion,
53};
54use std::{collections::HashSet, marker::PhantomData, sync::Arc};
55
56use tokio::sync::mpsc;
57
58pub(crate) const LOG_TARGET: &str = "rpc-spec-v2::archive";
59
60const STORAGE_QUERY_BUF: usize = 16;
65
66pub struct Archive<BE: Backend<Block>, Block: BlockT, Client> {
68 client: Arc<Client>,
70 backend: Arc<BE>,
72 executor: SubscriptionTaskExecutor,
74 genesis_hash: String,
76 _phantom: PhantomData<Block>,
78}
79
80impl<BE: Backend<Block>, Block: BlockT, Client> Archive<BE, Block, Client> {
81 pub fn new<GenesisHash: AsRef<[u8]>>(
83 client: Arc<Client>,
84 backend: Arc<BE>,
85 genesis_hash: GenesisHash,
86 executor: SubscriptionTaskExecutor,
87 ) -> Self {
88 let genesis_hash = hex_string(&genesis_hash.as_ref());
89 Self { client, backend, executor, genesis_hash, _phantom: PhantomData }
90 }
91}
92
93fn parse_hex_param(param: String) -> Result<Vec<u8>, ArchiveError> {
97 if param.is_empty() {
99 return Ok(Default::default())
100 }
101
102 array_bytes::hex2bytes(¶m).map_err(|_| ArchiveError::InvalidParam(param))
103}
104
105#[async_trait]
106impl<BE, Block, Client> ArchiveApiServer<Block::Hash> for Archive<BE, Block, Client>
107where
108 Block: BlockT + 'static,
109 Block::Header: Unpin,
110 BE: Backend<Block> + 'static,
111 Client: BlockBackend<Block>
112 + ExecutorProvider<Block>
113 + HeaderBackend<Block>
114 + HeaderMetadata<Block, Error = BlockChainError>
115 + BlockchainEvents<Block>
116 + CallApiAt<Block>
117 + StorageProvider<Block, BE>
118 + 'static,
119{
120 fn archive_v1_body(&self, hash: Block::Hash) -> Result<Option<Vec<String>>, Infallible> {
121 let Ok(Some(signed_block)) = self.client.block(hash) else { return Ok(None) };
122
123 let extrinsics = signed_block
124 .block
125 .extrinsics()
126 .iter()
127 .map(|extrinsic| hex_string(&extrinsic.encode()))
128 .collect();
129
130 Ok(Some(extrinsics))
131 }
132
133 fn archive_v1_genesis_hash(&self) -> Result<String, Infallible> {
134 Ok(self.genesis_hash.clone())
135 }
136
137 fn archive_v1_header(&self, hash: Block::Hash) -> Result<Option<String>, Infallible> {
138 let Ok(Some(header)) = self.client.header(hash) else { return Ok(None) };
139
140 Ok(Some(hex_string(&header.encode())))
141 }
142
143 fn archive_v1_finalized_height(&self) -> Result<u64, Infallible> {
144 Ok(self.client.info().finalized_number.saturated_into())
145 }
146
147 fn archive_v1_hash_by_height(&self, height: u64) -> Result<Vec<String>, ArchiveError> {
148 let height: NumberFor<Block> = U256::from(height)
149 .try_into()
150 .map_err(|_| ArchiveError::InvalidParam(format!("Invalid block height: {}", height)))?;
151
152 let finalized_num = self.client.info().finalized_number;
153
154 if finalized_num >= height {
155 let Ok(Some(hash)) = self.client.block_hash(height) else { return Ok(vec![]) };
156 return Ok(vec![hex_string(&hash.as_ref())])
157 }
158
159 let blockchain = self.backend.blockchain();
160 let mut headers: Vec<_> = blockchain
162 .leaves()
163 .map_err(|error| ArchiveError::FetchLeaves(error.to_string()))?
164 .into_iter()
165 .filter_map(|hash| {
166 let Ok(Some(header)) = self.client.header(hash) else { return None };
167
168 if header.number() < &height {
169 return None
170 }
171
172 Some(header)
173 })
174 .collect();
175
176 let mut result = Vec::new();
177 let mut visited = HashSet::new();
178
179 while let Some(header) = headers.pop() {
180 if header.number() == &height {
181 result.push(hex_string(&header.hash().as_ref()));
182 continue
183 }
184
185 let parent_hash = *header.parent_hash();
186
187 if visited.insert(parent_hash) {
190 let Ok(Some(next_header)) = self.client.header(parent_hash) else { continue };
191 headers.push(next_header);
192 }
193 }
194
195 Ok(result)
196 }
197
198 fn archive_v1_call(
199 &self,
200 hash: Block::Hash,
201 function: String,
202 call_parameters: String,
203 ) -> Result<MethodResult, ArchiveError> {
204 let call_parameters = Bytes::from(parse_hex_param(call_parameters)?);
205
206 let result =
207 self.client
208 .executor()
209 .call(hash, &function, &call_parameters, CallContext::Offchain);
210
211 Ok(match result {
212 Ok(result) => MethodResult::ok(hex_string(&result)),
213 Err(error) => MethodResult::err(error.to_string()),
214 })
215 }
216
217 fn archive_v1_storage(
218 &self,
219 pending: PendingSubscriptionSink,
220 hash: Block::Hash,
221 items: Vec<StorageQuery<String>>,
222 child_trie: Option<String>,
223 ) {
224 let mut storage_client =
225 StorageSubscriptionClient::<Client, Block, BE>::new(self.client.clone());
226
227 let fut = async move {
228 let Ok(mut sink) = pending.accept().await.map(Subscription::from) else { return };
229
230 let items = match items
231 .into_iter()
232 .map(|query| {
233 let key = StorageKey(parse_hex_param(query.key)?);
234 Ok(StorageQuery { key, query_type: query.query_type })
235 })
236 .collect::<Result<Vec<_>, ArchiveError>>()
237 {
238 Ok(items) => items,
239 Err(error) => {
240 let _ = sink.send(&ArchiveStorageEvent::err(error.to_string()));
241 return
242 },
243 };
244
245 let child_trie = child_trie.map(|child_trie| parse_hex_param(child_trie)).transpose();
246 let child_trie = match child_trie {
247 Ok(child_trie) => child_trie.map(ChildInfo::new_default_from_vec),
248 Err(error) => {
249 let _ = sink.send(&ArchiveStorageEvent::err(error.to_string()));
250 return
251 },
252 };
253
254 let (tx, mut rx) = tokio::sync::mpsc::channel(STORAGE_QUERY_BUF);
255 let storage_fut = storage_client.generate_events(hash, items, child_trie, tx);
256
257 let _ = futures::future::join(storage_fut, process_storage_events(&mut rx, &mut sink))
262 .await;
263 };
264
265 self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
266 }
267
268 fn archive_v1_storage_diff(
269 &self,
270 pending: PendingSubscriptionSink,
271 hash: Block::Hash,
272 items: Vec<ArchiveStorageDiffItem<String>>,
273 previous_hash: Option<Block::Hash>,
274 ) {
275 let storage_client = ArchiveStorageDiff::new(self.client.clone());
276 let client = self.client.clone();
277
278 log::trace!(target: LOG_TARGET, "Storage diff subscription started");
279
280 let fut = async move {
281 let Ok(mut sink) = pending.accept().await.map(Subscription::from) else { return };
282
283 let previous_hash = if let Some(previous_hash) = previous_hash {
284 previous_hash
285 } else {
286 let Ok(Some(current_header)) = client.header(hash) else {
287 let message = format!("Block header is not present: {hash}");
288 let _ = sink.send(&ArchiveStorageDiffEvent::err(message)).await;
289 return
290 };
291 *current_header.parent_hash()
292 };
293
294 let (tx, mut rx) = tokio::sync::mpsc::channel(STORAGE_QUERY_BUF);
295 let storage_fut =
296 storage_client.handle_trie_queries(hash, items, previous_hash, tx.clone());
297
298 let _ =
303 futures::future::join(storage_fut, process_storage_diff_events(&mut rx, &mut sink))
304 .await;
305 };
306
307 self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
308 }
309}
310
311async fn process_storage_diff_events(
313 rx: &mut mpsc::Receiver<ArchiveStorageDiffEvent>,
314 sink: &mut Subscription,
315) {
316 loop {
317 tokio::select! {
318 _ = sink.closed() => {
319 return
320 },
321
322 maybe_event = rx.recv() => {
323 let Some(event) = maybe_event else {
324 break;
325 };
326
327 if event.is_done() {
328 log::debug!(target: LOG_TARGET, "Finished processing partial trie query");
329 } else if event.is_err() {
330 log::debug!(target: LOG_TARGET, "Error encountered while processing partial trie query");
331 }
332
333 if sink.send(&event).await.is_err() {
334 return
335 }
336 }
337 }
338 }
339}
340
341async fn process_storage_events(rx: &mut mpsc::Receiver<QueryResult>, sink: &mut Subscription) {
343 loop {
344 tokio::select! {
345 _ = sink.closed() => {
346 break
347 }
348
349 maybe_storage = rx.recv() => {
350 let Some(event) = maybe_storage else {
351 break;
352 };
353
354 match event {
355 Ok(None) => continue,
356
357 Ok(Some(event)) =>
358 if sink.send(&ArchiveStorageEvent::result(event)).await.is_err() {
359 return
360 },
361
362 Err(error) => {
363 let _ = sink.send(&ArchiveStorageEvent::err(error)).await;
364 return
365 }
366 }
367 }
368 }
369 }
370
371 let _ = sink.send(&ArchiveStorageEvent::StorageDone).await;
372}