referrerpolicy=no-referrer-when-downgrade

sc_rpc/state/
state_full.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! State API backend for full nodes.
20
21use std::{collections::HashMap, marker::PhantomData, sync::Arc, time::Duration};
22
23use super::{
24	client_err,
25	error::{Error, Result},
26	ChildStateBackend, StateBackend,
27};
28use crate::{
29	utils::{spawn_subscription_task, BoundedVecDeque, PendingSubscription},
30	DenyUnsafe, SubscriptionTaskExecutor,
31};
32
33use futures::{future, stream, StreamExt};
34use jsonrpsee::{core::async_trait, types::ErrorObject, PendingSubscriptionSink};
35use sc_client_api::{
36	Backend, BlockBackend, BlockchainEvents, CallExecutor, ExecutorProvider, ProofProvider,
37	StorageProvider,
38};
39use sc_rpc_api::state::ReadProof;
40use sp_api::{CallApiAt, Metadata, ProvideRuntimeApi};
41use sp_blockchain::{
42	CachedHeaderMetadata, Error as ClientError, HeaderBackend, HeaderMetadata,
43	Result as ClientResult,
44};
45use sp_core::{
46	storage::{
47		ChildInfo, ChildType, PrefixedStorageKey, StorageChangeSet, StorageData, StorageKey,
48	},
49	traits::CallContext,
50	Bytes,
51};
52use sp_runtime::traits::Block as BlockT;
53use sp_version::RuntimeVersion;
54
55/// The maximum time allowed for an RPC call when running without unsafe RPC enabled.
56const MAXIMUM_SAFE_RPC_CALL_TIMEOUT: Duration = Duration::from_secs(30);
57
58/// Ranges to query in state_queryStorage.
59struct QueryStorageRange<Block: BlockT> {
60	/// Hashes of all the blocks in the range.
61	pub hashes: Vec<Block::Hash>,
62}
63
64/// State API backend for full nodes.
65pub struct FullState<BE, Block: BlockT, Client> {
66	client: Arc<Client>,
67	executor: SubscriptionTaskExecutor,
68	_phantom: PhantomData<(BE, Block)>,
69}
70
71impl<BE, Block: BlockT, Client> FullState<BE, Block, Client>
72where
73	BE: Backend<Block>,
74	Client: StorageProvider<Block, BE>
75		+ HeaderBackend<Block>
76		+ BlockBackend<Block>
77		+ HeaderMetadata<Block, Error = sp_blockchain::Error>,
78	Block: BlockT + 'static,
79{
80	/// Create new state API backend for full nodes.
81	pub fn new(client: Arc<Client>, executor: SubscriptionTaskExecutor) -> Self {
82		Self { client, executor, _phantom: PhantomData }
83	}
84
85	/// Returns given block hash or best block hash if None is passed.
86	fn block_or_best(&self, hash: Option<Block::Hash>) -> ClientResult<Block::Hash> {
87		Ok(hash.unwrap_or_else(|| self.client.info().best_hash))
88	}
89
90	/// Validates block range.
91	fn query_storage_range(
92		&self,
93		from: Block::Hash,
94		to: Option<Block::Hash>,
95	) -> Result<QueryStorageRange<Block>> {
96		let to = self
97			.block_or_best(to)
98			.map_err(|e| invalid_block::<Block>(from, to, e.to_string()))?;
99
100		let invalid_block_err =
101			|e: ClientError| invalid_block::<Block>(from, Some(to), e.to_string());
102		let from_meta = self.client.header_metadata(from).map_err(invalid_block_err)?;
103		let to_meta = self.client.header_metadata(to).map_err(invalid_block_err)?;
104
105		if from_meta.number > to_meta.number {
106			return Err(invalid_block_range(
107				&from_meta,
108				&to_meta,
109				"from number > to number".to_owned(),
110			))
111		}
112
113		// check if we can get from `to` to `from` by going through parent_hashes.
114		let from_number = from_meta.number;
115		let hashes = {
116			let mut hashes = vec![to_meta.hash];
117			let mut last = to_meta.clone();
118			while last.number > from_number {
119				let header_metadata = self
120					.client
121					.header_metadata(last.parent)
122					.map_err(|e| invalid_block_range::<Block>(&last, &to_meta, e.to_string()))?;
123				hashes.push(header_metadata.hash);
124				last = header_metadata;
125			}
126			if last.hash != from_meta.hash {
127				return Err(invalid_block_range(
128					&from_meta,
129					&to_meta,
130					"from and to are on different forks".to_owned(),
131				))
132			}
133			hashes.reverse();
134			hashes
135		};
136
137		Ok(QueryStorageRange { hashes })
138	}
139
140	/// Iterates through range.unfiltered_range and check each block for changes of keys' values.
141	fn query_storage_unfiltered(
142		&self,
143		range: &QueryStorageRange<Block>,
144		keys: &[StorageKey],
145		last_values: &mut HashMap<StorageKey, Option<StorageData>>,
146		changes: &mut Vec<StorageChangeSet<Block::Hash>>,
147	) -> Result<()> {
148		for block_hash in &range.hashes {
149			let mut block_changes = StorageChangeSet { block: *block_hash, changes: Vec::new() };
150			for key in keys {
151				let (has_changed, data) = {
152					let curr_data = self.client.storage(*block_hash, key).map_err(client_err)?;
153					match last_values.get(key) {
154						Some(prev_data) => (curr_data != *prev_data, curr_data),
155						None => (true, curr_data),
156					}
157				};
158				if has_changed {
159					block_changes.changes.push((key.clone(), data.clone()));
160				}
161				last_values.insert(key.clone(), data);
162			}
163			if !block_changes.changes.is_empty() {
164				changes.push(block_changes);
165			}
166		}
167		Ok(())
168	}
169}
170
171#[async_trait]
172impl<BE, Block, Client> StateBackend<Block, Client> for FullState<BE, Block, Client>
173where
174	Block: BlockT + 'static,
175	Block::Hash: Unpin,
176	BE: Backend<Block> + 'static,
177	Client: ExecutorProvider<Block>
178		+ StorageProvider<Block, BE>
179		+ ProofProvider<Block>
180		+ HeaderBackend<Block>
181		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
182		+ BlockchainEvents<Block>
183		+ CallApiAt<Block>
184		+ ProvideRuntimeApi<Block>
185		+ BlockBackend<Block>
186		+ Send
187		+ Sync
188		+ 'static,
189	Client::Api: Metadata<Block>,
190{
191	fn call(
192		&self,
193		block: Option<Block::Hash>,
194		method: String,
195		call_data: Bytes,
196	) -> std::result::Result<Bytes, Error> {
197		self.block_or_best(block)
198			.and_then(|block| {
199				self.client
200					.executor()
201					.call(block, &method, &call_data, CallContext::Offchain)
202					.map(Into::into)
203			})
204			.map_err(client_err)
205	}
206
207	// TODO: This is horribly broken; either remove it, or make it streaming.
208	fn storage_keys(
209		&self,
210		block: Option<Block::Hash>,
211		prefix: StorageKey,
212	) -> std::result::Result<Vec<StorageKey>, Error> {
213		// TODO: Remove the `.collect`.
214		self.block_or_best(block)
215			.and_then(|block| self.client.storage_keys(block, Some(&prefix), None))
216			.map(|iter| iter.collect())
217			.map_err(client_err)
218	}
219
220	// TODO: This is horribly broken; either remove it, or make it streaming.
221	fn storage_pairs(
222		&self,
223		block: Option<Block::Hash>,
224		prefix: StorageKey,
225	) -> std::result::Result<Vec<(StorageKey, StorageData)>, Error> {
226		// TODO: Remove the `.collect`.
227		self.block_or_best(block)
228			.and_then(|block| self.client.storage_pairs(block, Some(&prefix), None))
229			.map(|iter| iter.collect())
230			.map_err(client_err)
231	}
232
233	fn storage_keys_paged(
234		&self,
235		block: Option<Block::Hash>,
236		prefix: Option<StorageKey>,
237		count: u32,
238		start_key: Option<StorageKey>,
239	) -> std::result::Result<Vec<StorageKey>, Error> {
240		self.block_or_best(block)
241			.and_then(|block| self.client.storage_keys(block, prefix.as_ref(), start_key.as_ref()))
242			.map(|iter| iter.take(count as usize).collect())
243			.map_err(client_err)
244	}
245
246	fn storage(
247		&self,
248		block: Option<Block::Hash>,
249		key: StorageKey,
250	) -> std::result::Result<Option<StorageData>, Error> {
251		self.block_or_best(block)
252			.and_then(|block| self.client.storage(block, &key))
253			.map_err(client_err)
254	}
255
256	async fn storage_size(
257		&self,
258		block: Option<Block::Hash>,
259		key: StorageKey,
260		deny_unsafe: DenyUnsafe,
261	) -> std::result::Result<Option<u64>, Error> {
262		let block = match self.block_or_best(block) {
263			Ok(b) => b,
264			Err(e) => return Err(client_err(e)),
265		};
266
267		let client = self.client.clone();
268		let timeout = match deny_unsafe {
269			DenyUnsafe::Yes => Some(MAXIMUM_SAFE_RPC_CALL_TIMEOUT),
270			DenyUnsafe::No => None,
271		};
272
273		super::utils::spawn_blocking_with_timeout(timeout, move |is_timed_out| {
274			// Does the key point to a concrete entry in the database?
275			match client.storage(block, &key) {
276				Ok(Some(d)) => return Ok(Ok(Some(d.0.len() as u64))),
277				Err(e) => return Ok(Err(client_err(e))),
278				Ok(None) => {},
279			}
280
281			// The key doesn't point to anything, so it's probably a prefix.
282			let iter = match client.storage_keys(block, Some(&key), None).map_err(client_err) {
283				Ok(iter) => iter,
284				Err(e) => return Ok(Err(e)),
285			};
286
287			let mut sum = 0;
288			for storage_key in iter {
289				let value = client.storage(block, &storage_key).ok().flatten().unwrap_or_default();
290				sum += value.0.len() as u64;
291
292				is_timed_out.check_if_timed_out()?;
293			}
294
295			if sum > 0 {
296				Ok(Ok(Some(sum)))
297			} else {
298				Ok(Ok(None))
299			}
300		})
301		.await
302		.map_err(|error| Error::Client(Box::new(error)))?
303	}
304
305	fn storage_hash(
306		&self,
307		block: Option<Block::Hash>,
308		key: StorageKey,
309	) -> std::result::Result<Option<Block::Hash>, Error> {
310		self.block_or_best(block)
311			.and_then(|block| self.client.storage_hash(block, &key))
312			.map_err(client_err)
313	}
314
315	fn metadata(&self, block: Option<Block::Hash>) -> std::result::Result<Bytes, Error> {
316		self.block_or_best(block).map_err(client_err).and_then(|block| {
317			self.client
318				.runtime_api()
319				.metadata(block)
320				.map(Into::into)
321				.map_err(|e| Error::Client(Box::new(e)))
322		})
323	}
324
325	fn runtime_version(
326		&self,
327		block: Option<Block::Hash>,
328	) -> std::result::Result<RuntimeVersion, Error> {
329		self.block_or_best(block).map_err(client_err).and_then(|block| {
330			self.client.runtime_version_at(block).map_err(|e| Error::Client(Box::new(e)))
331		})
332	}
333
334	fn query_storage(
335		&self,
336		from: Block::Hash,
337		to: Option<Block::Hash>,
338		keys: Vec<StorageKey>,
339	) -> std::result::Result<Vec<StorageChangeSet<Block::Hash>>, Error> {
340		let call_fn = move || {
341			let range = self.query_storage_range(from, to)?;
342			let mut changes = Vec::new();
343			let mut last_values = HashMap::new();
344			self.query_storage_unfiltered(&range, &keys, &mut last_values, &mut changes)?;
345			Ok(changes)
346		};
347		call_fn()
348	}
349
350	fn query_storage_at(
351		&self,
352		keys: Vec<StorageKey>,
353		at: Option<Block::Hash>,
354	) -> std::result::Result<Vec<StorageChangeSet<Block::Hash>>, Error> {
355		let at = at.unwrap_or_else(|| self.client.info().best_hash);
356		self.query_storage(at, Some(at), keys)
357	}
358
359	fn read_proof(
360		&self,
361		block: Option<Block::Hash>,
362		keys: Vec<StorageKey>,
363	) -> std::result::Result<ReadProof<Block::Hash>, Error> {
364		self.block_or_best(block)
365			.and_then(|block| {
366				self.client
367					.read_proof(block, &mut keys.iter().map(|key| key.0.as_ref()))
368					.map(|proof| proof.into_iter_nodes().map(|node| node.into()).collect())
369					.map(|proof| ReadProof { at: block, proof })
370			})
371			.map_err(client_err)
372	}
373
374	fn subscribe_runtime_version(&self, pending: PendingSubscriptionSink) {
375		let initial = match self
376			.block_or_best(None)
377			.and_then(|block| self.client.runtime_version_at(block).map_err(Into::into))
378			.map_err(|e| Error::Client(Box::new(e)))
379		{
380			Ok(initial) => initial,
381			Err(e) => {
382				spawn_subscription_task(&self.executor, pending.reject(e));
383				return
384			},
385		};
386
387		let mut previous_version = initial.clone();
388		let client = self.client.clone();
389
390		// A stream of new versions
391		let version_stream = client
392			.import_notification_stream()
393			.filter(|n| future::ready(n.is_new_best))
394			.filter_map(move |n| {
395				let version =
396					client.runtime_version_at(n.hash).map_err(|e| Error::Client(Box::new(e)));
397
398				match version {
399					Ok(version) if version != previous_version => {
400						previous_version = version.clone();
401						future::ready(Some(version))
402					},
403					_ => future::ready(None),
404				}
405			});
406
407		let stream = futures::stream::once(future::ready(initial)).chain(version_stream);
408		spawn_subscription_task(
409			&self.executor,
410			PendingSubscription::from(pending).pipe_from_stream(stream, BoundedVecDeque::default()),
411		);
412	}
413
414	fn subscribe_storage(
415		&self,
416		pending: PendingSubscriptionSink,
417		keys: Option<Vec<StorageKey>>,
418		deny_unsafe: DenyUnsafe,
419	) {
420		if keys.is_none() {
421			if let Err(err) = deny_unsafe.check_if_safe() {
422				spawn_subscription_task(&self.executor, pending.reject(ErrorObject::from(err)));
423				return
424			}
425		}
426
427		let stream = match self.client.storage_changes_notification_stream(keys.as_deref(), None) {
428			Ok(stream) => stream,
429			Err(blockchain_err) => {
430				spawn_subscription_task(
431					&self.executor,
432					pending.reject(Error::Client(Box::new(blockchain_err))),
433				);
434				return
435			},
436		};
437
438		let initial = stream::iter(keys.map(|keys| {
439			let block = self.client.info().best_hash;
440			let changes = keys
441				.into_iter()
442				.map(|key| {
443					let v = self.client.storage(block, &key).ok().flatten();
444					(key, v)
445				})
446				.collect();
447			StorageChangeSet { block, changes }
448		}));
449
450		let storage_stream = stream.map(|storage_notif| StorageChangeSet {
451			block: storage_notif.block,
452			changes: storage_notif
453				.changes
454				.iter()
455				.filter_map(|(o_sk, k, v)| o_sk.is_none().then(|| (k.clone(), v.cloned())))
456				.collect(),
457		});
458
459		let stream = initial
460			.chain(storage_stream)
461			.filter(|storage| future::ready(!storage.changes.is_empty()));
462
463		spawn_subscription_task(
464			&self.executor,
465			PendingSubscription::from(pending).pipe_from_stream(stream, BoundedVecDeque::default()),
466		);
467	}
468
469	fn trace_block(
470		&self,
471		block: Block::Hash,
472		targets: Option<String>,
473		storage_keys: Option<String>,
474		methods: Option<String>,
475	) -> std::result::Result<sp_rpc::tracing::TraceBlockResponse, Error> {
476		sc_tracing::block::BlockExecutor::new(
477			self.client.clone(),
478			block,
479			targets,
480			storage_keys,
481			methods,
482		)
483		.trace_block()
484		.map_err(|e| invalid_block::<Block>(block, None, e.to_string()))
485	}
486}
487
488impl<BE, Block, Client> ChildStateBackend<Block, Client> for FullState<BE, Block, Client>
489where
490	Block: BlockT + 'static,
491	BE: Backend<Block> + 'static,
492	Client: ExecutorProvider<Block>
493		+ StorageProvider<Block, BE>
494		+ ProofProvider<Block>
495		+ HeaderBackend<Block>
496		+ BlockBackend<Block>
497		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
498		+ BlockchainEvents<Block>
499		+ CallApiAt<Block>
500		+ ProvideRuntimeApi<Block>
501		+ Send
502		+ Sync
503		+ 'static,
504	Client::Api: Metadata<Block>,
505{
506	fn read_child_proof(
507		&self,
508		block: Option<Block::Hash>,
509		storage_key: PrefixedStorageKey,
510		keys: Vec<StorageKey>,
511	) -> std::result::Result<ReadProof<Block::Hash>, Error> {
512		self.block_or_best(block)
513			.and_then(|block| {
514				let child_info = match ChildType::from_prefixed_key(&storage_key) {
515					Some((ChildType::ParentKeyId, storage_key)) =>
516						ChildInfo::new_default(storage_key),
517					None => return Err(sp_blockchain::Error::InvalidChildStorageKey),
518				};
519				self.client
520					.read_child_proof(
521						block,
522						&child_info,
523						&mut keys.iter().map(|key| key.0.as_ref()),
524					)
525					.map(|proof| proof.into_iter_nodes().map(|node| node.into()).collect())
526					.map(|proof| ReadProof { at: block, proof })
527			})
528			.map_err(client_err)
529	}
530
531	fn storage_keys(
532		&self,
533		block: Option<Block::Hash>,
534		storage_key: PrefixedStorageKey,
535		prefix: StorageKey,
536	) -> std::result::Result<Vec<StorageKey>, Error> {
537		// TODO: Remove the `.collect`.
538		self.block_or_best(block)
539			.and_then(|block| {
540				let child_info = match ChildType::from_prefixed_key(&storage_key) {
541					Some((ChildType::ParentKeyId, storage_key)) =>
542						ChildInfo::new_default(storage_key),
543					None => return Err(sp_blockchain::Error::InvalidChildStorageKey),
544				};
545				self.client.child_storage_keys(block, child_info, Some(&prefix), None)
546			})
547			.map(|iter| iter.collect())
548			.map_err(client_err)
549	}
550
551	fn storage_keys_paged(
552		&self,
553		block: Option<Block::Hash>,
554		storage_key: PrefixedStorageKey,
555		prefix: Option<StorageKey>,
556		count: u32,
557		start_key: Option<StorageKey>,
558	) -> std::result::Result<Vec<StorageKey>, Error> {
559		self.block_or_best(block)
560			.and_then(|block| {
561				let child_info = match ChildType::from_prefixed_key(&storage_key) {
562					Some((ChildType::ParentKeyId, storage_key)) =>
563						ChildInfo::new_default(storage_key),
564					None => return Err(sp_blockchain::Error::InvalidChildStorageKey),
565				};
566				self.client.child_storage_keys(
567					block,
568					child_info,
569					prefix.as_ref(),
570					start_key.as_ref(),
571				)
572			})
573			.map(|iter| iter.take(count as usize).collect())
574			.map_err(client_err)
575	}
576
577	fn storage(
578		&self,
579		block: Option<Block::Hash>,
580		storage_key: PrefixedStorageKey,
581		key: StorageKey,
582	) -> std::result::Result<Option<StorageData>, Error> {
583		self.block_or_best(block)
584			.and_then(|block| {
585				let child_info = match ChildType::from_prefixed_key(&storage_key) {
586					Some((ChildType::ParentKeyId, storage_key)) =>
587						ChildInfo::new_default(storage_key),
588					None => return Err(sp_blockchain::Error::InvalidChildStorageKey),
589				};
590				self.client.child_storage(block, &child_info, &key)
591			})
592			.map_err(client_err)
593	}
594
595	fn storage_entries(
596		&self,
597		block: Option<Block::Hash>,
598		storage_key: PrefixedStorageKey,
599		keys: Vec<StorageKey>,
600	) -> std::result::Result<Vec<Option<StorageData>>, Error> {
601		let child_info = if let Some((ChildType::ParentKeyId, storage_key)) =
602			ChildType::from_prefixed_key(&storage_key)
603		{
604			Arc::new(ChildInfo::new_default(storage_key))
605		} else {
606			return Err(client_err(sp_blockchain::Error::InvalidChildStorageKey))
607		};
608		let block = self.block_or_best(block).map_err(client_err)?;
609		let client = self.client.clone();
610
611		keys.into_iter()
612			.map(move |key| {
613				client.clone().child_storage(block, &child_info, &key).map_err(client_err)
614			})
615			.collect()
616	}
617
618	fn storage_hash(
619		&self,
620		block: Option<Block::Hash>,
621		storage_key: PrefixedStorageKey,
622		key: StorageKey,
623	) -> std::result::Result<Option<Block::Hash>, Error> {
624		self.block_or_best(block)
625			.and_then(|block| {
626				let child_info = match ChildType::from_prefixed_key(&storage_key) {
627					Some((ChildType::ParentKeyId, storage_key)) =>
628						ChildInfo::new_default(storage_key),
629					None => return Err(sp_blockchain::Error::InvalidChildStorageKey),
630				};
631				self.client.child_storage_hash(block, &child_info, &key)
632			})
633			.map_err(client_err)
634	}
635}
636
637fn invalid_block_range<B: BlockT>(
638	from: &CachedHeaderMetadata<B>,
639	to: &CachedHeaderMetadata<B>,
640	details: String,
641) -> Error {
642	let to_string = |h: &CachedHeaderMetadata<B>| format!("{} ({:?})", h.number, h.hash);
643
644	Error::InvalidBlockRange { from: to_string(from), to: to_string(to), details }
645}
646
647fn invalid_block<B: BlockT>(from: B::Hash, to: Option<B::Hash>, details: String) -> Error {
648	Error::InvalidBlockRange { from: format!("{:?}", from), to: format!("{:?}", to), details }
649}