referrerpolicy=no-referrer-when-downgrade

sc_rpc_spec_v2/archive/
archive.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! API implementation for `archive`.
20
21use crate::{
22	archive::{
23		archive_storage::ArchiveStorageDiff,
24		error::{Error as ArchiveError, Infallible},
25		types::MethodResult,
26		ArchiveApiServer,
27	},
28	common::{
29		events::{
30			ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageEvent, StorageQuery,
31		},
32		storage::{QueryResult, StorageSubscriptionClient},
33	},
34	hex_string, SubscriptionTaskExecutor,
35};
36
37use codec::Encode;
38use futures::FutureExt;
39use jsonrpsee::{core::async_trait, PendingSubscriptionSink};
40use sc_client_api::{
41	Backend, BlockBackend, BlockchainEvents, CallExecutor, ChildInfo, ExecutorProvider, StorageKey,
42	StorageProvider,
43};
44use sc_rpc::utils::Subscription;
45use sp_api::{CallApiAt, CallContext};
46use sp_blockchain::{
47	Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata,
48};
49use sp_core::{Bytes, U256};
50use sp_runtime::{
51	traits::{Block as BlockT, Header as HeaderT, NumberFor},
52	SaturatedConversion,
53};
54use std::{collections::HashSet, marker::PhantomData, sync::Arc};
55
56use tokio::sync::mpsc;
57
58pub(crate) const LOG_TARGET: &str = "rpc-spec-v2::archive";
59
60/// The buffer capacity for each storage query.
61///
62/// This is small because the underlying JSON-RPC server has
63/// its down buffer capacity per connection as well.
64const STORAGE_QUERY_BUF: usize = 16;
65
66/// An API for archive RPC calls.
67pub struct Archive<BE: Backend<Block>, Block: BlockT, Client> {
68	/// Substrate client.
69	client: Arc<Client>,
70	/// Backend of the chain.
71	backend: Arc<BE>,
72	/// Executor to spawn subscriptions.
73	executor: SubscriptionTaskExecutor,
74	/// The hexadecimal encoded hash of the genesis block.
75	genesis_hash: String,
76	/// Phantom member to pin the block type.
77	_phantom: PhantomData<Block>,
78}
79
80impl<BE: Backend<Block>, Block: BlockT, Client> Archive<BE, Block, Client> {
81	/// Create a new [`Archive`].
82	pub fn new<GenesisHash: AsRef<[u8]>>(
83		client: Arc<Client>,
84		backend: Arc<BE>,
85		genesis_hash: GenesisHash,
86		executor: SubscriptionTaskExecutor,
87	) -> Self {
88		let genesis_hash = hex_string(&genesis_hash.as_ref());
89		Self { client, backend, executor, genesis_hash, _phantom: PhantomData }
90	}
91}
92
93/// Parse hex-encoded string parameter as raw bytes.
94///
95/// If the parsing fails, returns an error propagated to the RPC method.
96fn parse_hex_param(param: String) -> Result<Vec<u8>, ArchiveError> {
97	// Methods can accept empty parameters.
98	if param.is_empty() {
99		return Ok(Default::default())
100	}
101
102	array_bytes::hex2bytes(&param).map_err(|_| ArchiveError::InvalidParam(param))
103}
104
105#[async_trait]
106impl<BE, Block, Client> ArchiveApiServer<Block::Hash> for Archive<BE, Block, Client>
107where
108	Block: BlockT + 'static,
109	Block::Header: Unpin,
110	BE: Backend<Block> + 'static,
111	Client: BlockBackend<Block>
112		+ ExecutorProvider<Block>
113		+ HeaderBackend<Block>
114		+ HeaderMetadata<Block, Error = BlockChainError>
115		+ BlockchainEvents<Block>
116		+ CallApiAt<Block>
117		+ StorageProvider<Block, BE>
118		+ 'static,
119{
120	fn archive_v1_body(&self, hash: Block::Hash) -> Result<Option<Vec<String>>, Infallible> {
121		let Ok(Some(signed_block)) = self.client.block(hash) else { return Ok(None) };
122
123		let extrinsics = signed_block
124			.block
125			.extrinsics()
126			.iter()
127			.map(|extrinsic| hex_string(&extrinsic.encode()))
128			.collect();
129
130		Ok(Some(extrinsics))
131	}
132
133	fn archive_v1_genesis_hash(&self) -> Result<String, Infallible> {
134		Ok(self.genesis_hash.clone())
135	}
136
137	fn archive_v1_header(&self, hash: Block::Hash) -> Result<Option<String>, Infallible> {
138		let Ok(Some(header)) = self.client.header(hash) else { return Ok(None) };
139
140		Ok(Some(hex_string(&header.encode())))
141	}
142
143	fn archive_v1_finalized_height(&self) -> Result<u64, Infallible> {
144		Ok(self.client.info().finalized_number.saturated_into())
145	}
146
147	fn archive_v1_hash_by_height(&self, height: u64) -> Result<Vec<String>, ArchiveError> {
148		let height: NumberFor<Block> = U256::from(height)
149			.try_into()
150			.map_err(|_| ArchiveError::InvalidParam(format!("Invalid block height: {}", height)))?;
151
152		let finalized_num = self.client.info().finalized_number;
153
154		if finalized_num >= height {
155			let Ok(Some(hash)) = self.client.block_hash(height) else { return Ok(vec![]) };
156			return Ok(vec![hex_string(&hash.as_ref())])
157		}
158
159		let blockchain = self.backend.blockchain();
160		// Fetch all the leaves of the blockchain that are on a higher or equal height.
161		let mut headers: Vec<_> = blockchain
162			.leaves()
163			.map_err(|error| ArchiveError::FetchLeaves(error.to_string()))?
164			.into_iter()
165			.filter_map(|hash| {
166				let Ok(Some(header)) = self.client.header(hash) else { return None };
167
168				if header.number() < &height {
169					return None
170				}
171
172				Some(header)
173			})
174			.collect();
175
176		let mut result = Vec::new();
177		let mut visited = HashSet::new();
178
179		while let Some(header) = headers.pop() {
180			if header.number() == &height {
181				result.push(hex_string(&header.hash().as_ref()));
182				continue
183			}
184
185			let parent_hash = *header.parent_hash();
186
187			// Continue the iteration for unique hashes.
188			// Forks might intersect on a common chain that is not yet finalized.
189			if visited.insert(parent_hash) {
190				let Ok(Some(next_header)) = self.client.header(parent_hash) else { continue };
191				headers.push(next_header);
192			}
193		}
194
195		Ok(result)
196	}
197
198	fn archive_v1_call(
199		&self,
200		hash: Block::Hash,
201		function: String,
202		call_parameters: String,
203	) -> Result<MethodResult, ArchiveError> {
204		let call_parameters = Bytes::from(parse_hex_param(call_parameters)?);
205
206		let result =
207			self.client
208				.executor()
209				.call(hash, &function, &call_parameters, CallContext::Offchain);
210
211		Ok(match result {
212			Ok(result) => MethodResult::ok(hex_string(&result)),
213			Err(error) => MethodResult::err(error.to_string()),
214		})
215	}
216
217	fn archive_v1_storage(
218		&self,
219		pending: PendingSubscriptionSink,
220		hash: Block::Hash,
221		items: Vec<StorageQuery<String>>,
222		child_trie: Option<String>,
223	) {
224		let mut storage_client =
225			StorageSubscriptionClient::<Client, Block, BE>::new(self.client.clone());
226
227		let fut = async move {
228			let Ok(mut sink) = pending.accept().await.map(Subscription::from) else { return };
229
230			let items = match items
231				.into_iter()
232				.map(|query| {
233					let key = StorageKey(parse_hex_param(query.key)?);
234					Ok(StorageQuery { key, query_type: query.query_type })
235				})
236				.collect::<Result<Vec<_>, ArchiveError>>()
237			{
238				Ok(items) => items,
239				Err(error) => {
240					let _ = sink.send(&ArchiveStorageEvent::err(error.to_string()));
241					return
242				},
243			};
244
245			let child_trie = child_trie.map(|child_trie| parse_hex_param(child_trie)).transpose();
246			let child_trie = match child_trie {
247				Ok(child_trie) => child_trie.map(ChildInfo::new_default_from_vec),
248				Err(error) => {
249					let _ = sink.send(&ArchiveStorageEvent::err(error.to_string()));
250					return
251				},
252			};
253
254			let (tx, mut rx) = tokio::sync::mpsc::channel(STORAGE_QUERY_BUF);
255			let storage_fut = storage_client.generate_events(hash, items, child_trie, tx);
256
257			// We don't care about the return value of this join:
258			// - process_events might encounter an error (if the client disconnected)
259			// - storage_fut might encounter an error while processing a trie queries and
260			// the error is propagated via the sink.
261			let _ = futures::future::join(storage_fut, process_storage_events(&mut rx, &mut sink))
262				.await;
263		};
264
265		self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
266	}
267
268	fn archive_v1_storage_diff(
269		&self,
270		pending: PendingSubscriptionSink,
271		hash: Block::Hash,
272		items: Vec<ArchiveStorageDiffItem<String>>,
273		previous_hash: Option<Block::Hash>,
274	) {
275		let storage_client = ArchiveStorageDiff::new(self.client.clone());
276		let client = self.client.clone();
277
278		log::trace!(target: LOG_TARGET, "Storage diff subscription started");
279
280		let fut = async move {
281			let Ok(mut sink) = pending.accept().await.map(Subscription::from) else { return };
282
283			let previous_hash = if let Some(previous_hash) = previous_hash {
284				previous_hash
285			} else {
286				let Ok(Some(current_header)) = client.header(hash) else {
287					let message = format!("Block header is not present: {hash}");
288					let _ = sink.send(&ArchiveStorageDiffEvent::err(message)).await;
289					return
290				};
291				*current_header.parent_hash()
292			};
293
294			let (tx, mut rx) = tokio::sync::mpsc::channel(STORAGE_QUERY_BUF);
295			let storage_fut =
296				storage_client.handle_trie_queries(hash, items, previous_hash, tx.clone());
297
298			// We don't care about the return value of this join:
299			// - process_events might encounter an error (if the client disconnected)
300			// - storage_fut might encounter an error while processing a trie queries and
301			// the error is propagated via the sink.
302			let _ =
303				futures::future::join(storage_fut, process_storage_diff_events(&mut rx, &mut sink))
304					.await;
305		};
306
307		self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
308	}
309}
310
311/// Sends all the events of the storage_diff method to the sink.
312async fn process_storage_diff_events(
313	rx: &mut mpsc::Receiver<ArchiveStorageDiffEvent>,
314	sink: &mut Subscription,
315) {
316	loop {
317		tokio::select! {
318			_ = sink.closed() => {
319				return
320			},
321
322			maybe_event = rx.recv() => {
323				let Some(event) = maybe_event else {
324					break;
325				};
326
327				if event.is_done() {
328					log::debug!(target: LOG_TARGET, "Finished processing partial trie query");
329				} else if event.is_err() {
330					log::debug!(target: LOG_TARGET, "Error encountered while processing partial trie query");
331				}
332
333				if sink.send(&event).await.is_err() {
334					return
335				}
336			}
337		}
338	}
339}
340
341/// Sends all the events of the storage method to the sink.
342async fn process_storage_events(rx: &mut mpsc::Receiver<QueryResult>, sink: &mut Subscription) {
343	loop {
344		tokio::select! {
345			_ = sink.closed() => {
346				break
347			}
348
349			maybe_storage = rx.recv() => {
350				let Some(event) = maybe_storage else {
351					break;
352				};
353
354				match event {
355					Ok(None) => continue,
356
357					Ok(Some(event)) =>
358						if sink.send(&ArchiveStorageEvent::result(event)).await.is_err() {
359							return
360						},
361
362					Err(error) => {
363						let _ = sink.send(&ArchiveStorageEvent::err(error)).await;
364						return
365					}
366				}
367			}
368		}
369	}
370
371	let _ = sink.send(&ArchiveStorageEvent::StorageDone).await;
372}