1use std::{collections::HashMap, marker::PhantomData, sync::Arc, time::Duration};
22
23use super::{
24 client_err,
25 error::{Error, Result},
26 ChildStateBackend, StateBackend,
27};
28use crate::{
29 utils::{spawn_subscription_task, BoundedVecDeque, PendingSubscription},
30 DenyUnsafe, SubscriptionTaskExecutor,
31};
32
33use futures::{future, stream, StreamExt};
34use jsonrpsee::{core::async_trait, types::ErrorObject, PendingSubscriptionSink};
35use sc_client_api::{
36 Backend, BlockBackend, BlockchainEvents, CallExecutor, ExecutorProvider, ProofProvider,
37 StorageProvider,
38};
39use sc_rpc_api::state::ReadProof;
40use sp_api::{CallApiAt, Metadata, ProvideRuntimeApi};
41use sp_blockchain::{
42 CachedHeaderMetadata, Error as ClientError, HeaderBackend, HeaderMetadata,
43 Result as ClientResult,
44};
45use sp_core::{
46 storage::{
47 ChildInfo, ChildType, PrefixedStorageKey, StorageChangeSet, StorageData, StorageKey,
48 },
49 traits::CallContext,
50 Bytes,
51};
52use sp_runtime::traits::Block as BlockT;
53use sp_version::RuntimeVersion;
54
55const MAXIMUM_SAFE_RPC_CALL_TIMEOUT: Duration = Duration::from_secs(30);
57
58struct QueryStorageRange<Block: BlockT> {
60 pub hashes: Vec<Block::Hash>,
62}
63
64pub struct FullState<BE, Block: BlockT, Client> {
66 client: Arc<Client>,
67 executor: SubscriptionTaskExecutor,
68 _phantom: PhantomData<(BE, Block)>,
69}
70
71impl<BE, Block: BlockT, Client> FullState<BE, Block, Client>
72where
73 BE: Backend<Block>,
74 Client: StorageProvider<Block, BE>
75 + HeaderBackend<Block>
76 + BlockBackend<Block>
77 + HeaderMetadata<Block, Error = sp_blockchain::Error>,
78 Block: BlockT + 'static,
79{
80 pub fn new(client: Arc<Client>, executor: SubscriptionTaskExecutor) -> Self {
82 Self { client, executor, _phantom: PhantomData }
83 }
84
85 fn block_or_best(&self, hash: Option<Block::Hash>) -> ClientResult<Block::Hash> {
87 Ok(hash.unwrap_or_else(|| self.client.info().best_hash))
88 }
89
90 fn query_storage_range(
92 &self,
93 from: Block::Hash,
94 to: Option<Block::Hash>,
95 ) -> Result<QueryStorageRange<Block>> {
96 let to = self
97 .block_or_best(to)
98 .map_err(|e| invalid_block::<Block>(from, to, e.to_string()))?;
99
100 let invalid_block_err =
101 |e: ClientError| invalid_block::<Block>(from, Some(to), e.to_string());
102 let from_meta = self.client.header_metadata(from).map_err(invalid_block_err)?;
103 let to_meta = self.client.header_metadata(to).map_err(invalid_block_err)?;
104
105 if from_meta.number > to_meta.number {
106 return Err(invalid_block_range(
107 &from_meta,
108 &to_meta,
109 "from number > to number".to_owned(),
110 ))
111 }
112
113 let from_number = from_meta.number;
115 let hashes = {
116 let mut hashes = vec![to_meta.hash];
117 let mut last = to_meta.clone();
118 while last.number > from_number {
119 let header_metadata = self
120 .client
121 .header_metadata(last.parent)
122 .map_err(|e| invalid_block_range::<Block>(&last, &to_meta, e.to_string()))?;
123 hashes.push(header_metadata.hash);
124 last = header_metadata;
125 }
126 if last.hash != from_meta.hash {
127 return Err(invalid_block_range(
128 &from_meta,
129 &to_meta,
130 "from and to are on different forks".to_owned(),
131 ))
132 }
133 hashes.reverse();
134 hashes
135 };
136
137 Ok(QueryStorageRange { hashes })
138 }
139
140 fn query_storage_unfiltered(
142 &self,
143 range: &QueryStorageRange<Block>,
144 keys: &[StorageKey],
145 last_values: &mut HashMap<StorageKey, Option<StorageData>>,
146 changes: &mut Vec<StorageChangeSet<Block::Hash>>,
147 ) -> Result<()> {
148 for block_hash in &range.hashes {
149 let mut block_changes = StorageChangeSet { block: *block_hash, changes: Vec::new() };
150 for key in keys {
151 let (has_changed, data) = {
152 let curr_data = self.client.storage(*block_hash, key).map_err(client_err)?;
153 match last_values.get(key) {
154 Some(prev_data) => (curr_data != *prev_data, curr_data),
155 None => (true, curr_data),
156 }
157 };
158 if has_changed {
159 block_changes.changes.push((key.clone(), data.clone()));
160 }
161 last_values.insert(key.clone(), data);
162 }
163 if !block_changes.changes.is_empty() {
164 changes.push(block_changes);
165 }
166 }
167 Ok(())
168 }
169}
170
171#[async_trait]
172impl<BE, Block, Client> StateBackend<Block, Client> for FullState<BE, Block, Client>
173where
174 Block: BlockT + 'static,
175 Block::Hash: Unpin,
176 BE: Backend<Block> + 'static,
177 Client: ExecutorProvider<Block>
178 + StorageProvider<Block, BE>
179 + ProofProvider<Block>
180 + HeaderBackend<Block>
181 + HeaderMetadata<Block, Error = sp_blockchain::Error>
182 + BlockchainEvents<Block>
183 + CallApiAt<Block>
184 + ProvideRuntimeApi<Block>
185 + BlockBackend<Block>
186 + Send
187 + Sync
188 + 'static,
189 Client::Api: Metadata<Block>,
190{
191 fn call(
192 &self,
193 block: Option<Block::Hash>,
194 method: String,
195 call_data: Bytes,
196 ) -> std::result::Result<Bytes, Error> {
197 self.block_or_best(block)
198 .and_then(|block| {
199 self.client
200 .executor()
201 .call(block, &method, &call_data, CallContext::Offchain)
202 .map(Into::into)
203 })
204 .map_err(client_err)
205 }
206
207 fn storage_keys(
209 &self,
210 block: Option<Block::Hash>,
211 prefix: StorageKey,
212 ) -> std::result::Result<Vec<StorageKey>, Error> {
213 self.block_or_best(block)
215 .and_then(|block| self.client.storage_keys(block, Some(&prefix), None))
216 .map(|iter| iter.collect())
217 .map_err(client_err)
218 }
219
220 fn storage_pairs(
222 &self,
223 block: Option<Block::Hash>,
224 prefix: StorageKey,
225 ) -> std::result::Result<Vec<(StorageKey, StorageData)>, Error> {
226 self.block_or_best(block)
228 .and_then(|block| self.client.storage_pairs(block, Some(&prefix), None))
229 .map(|iter| iter.collect())
230 .map_err(client_err)
231 }
232
233 fn storage_keys_paged(
234 &self,
235 block: Option<Block::Hash>,
236 prefix: Option<StorageKey>,
237 count: u32,
238 start_key: Option<StorageKey>,
239 ) -> std::result::Result<Vec<StorageKey>, Error> {
240 self.block_or_best(block)
241 .and_then(|block| self.client.storage_keys(block, prefix.as_ref(), start_key.as_ref()))
242 .map(|iter| iter.take(count as usize).collect())
243 .map_err(client_err)
244 }
245
246 fn storage(
247 &self,
248 block: Option<Block::Hash>,
249 key: StorageKey,
250 ) -> std::result::Result<Option<StorageData>, Error> {
251 self.block_or_best(block)
252 .and_then(|block| self.client.storage(block, &key))
253 .map_err(client_err)
254 }
255
256 async fn storage_size(
257 &self,
258 block: Option<Block::Hash>,
259 key: StorageKey,
260 deny_unsafe: DenyUnsafe,
261 ) -> std::result::Result<Option<u64>, Error> {
262 let block = match self.block_or_best(block) {
263 Ok(b) => b,
264 Err(e) => return Err(client_err(e)),
265 };
266
267 let client = self.client.clone();
268 let timeout = match deny_unsafe {
269 DenyUnsafe::Yes => Some(MAXIMUM_SAFE_RPC_CALL_TIMEOUT),
270 DenyUnsafe::No => None,
271 };
272
273 super::utils::spawn_blocking_with_timeout(timeout, move |is_timed_out| {
274 match client.storage(block, &key) {
276 Ok(Some(d)) => return Ok(Ok(Some(d.0.len() as u64))),
277 Err(e) => return Ok(Err(client_err(e))),
278 Ok(None) => {},
279 }
280
281 let iter = match client.storage_keys(block, Some(&key), None).map_err(client_err) {
283 Ok(iter) => iter,
284 Err(e) => return Ok(Err(e)),
285 };
286
287 let mut sum = 0;
288 for storage_key in iter {
289 let value = client.storage(block, &storage_key).ok().flatten().unwrap_or_default();
290 sum += value.0.len() as u64;
291
292 is_timed_out.check_if_timed_out()?;
293 }
294
295 if sum > 0 {
296 Ok(Ok(Some(sum)))
297 } else {
298 Ok(Ok(None))
299 }
300 })
301 .await
302 .map_err(|error| Error::Client(Box::new(error)))?
303 }
304
305 fn storage_hash(
306 &self,
307 block: Option<Block::Hash>,
308 key: StorageKey,
309 ) -> std::result::Result<Option<Block::Hash>, Error> {
310 self.block_or_best(block)
311 .and_then(|block| self.client.storage_hash(block, &key))
312 .map_err(client_err)
313 }
314
315 fn metadata(&self, block: Option<Block::Hash>) -> std::result::Result<Bytes, Error> {
316 self.block_or_best(block).map_err(client_err).and_then(|block| {
317 self.client
318 .runtime_api()
319 .metadata(block)
320 .map(Into::into)
321 .map_err(|e| Error::Client(Box::new(e)))
322 })
323 }
324
325 fn runtime_version(
326 &self,
327 block: Option<Block::Hash>,
328 ) -> std::result::Result<RuntimeVersion, Error> {
329 self.block_or_best(block).map_err(client_err).and_then(|block| {
330 self.client.runtime_version_at(block).map_err(|e| Error::Client(Box::new(e)))
331 })
332 }
333
334 fn query_storage(
335 &self,
336 from: Block::Hash,
337 to: Option<Block::Hash>,
338 keys: Vec<StorageKey>,
339 ) -> std::result::Result<Vec<StorageChangeSet<Block::Hash>>, Error> {
340 let call_fn = move || {
341 let range = self.query_storage_range(from, to)?;
342 let mut changes = Vec::new();
343 let mut last_values = HashMap::new();
344 self.query_storage_unfiltered(&range, &keys, &mut last_values, &mut changes)?;
345 Ok(changes)
346 };
347 call_fn()
348 }
349
350 fn query_storage_at(
351 &self,
352 keys: Vec<StorageKey>,
353 at: Option<Block::Hash>,
354 ) -> std::result::Result<Vec<StorageChangeSet<Block::Hash>>, Error> {
355 let at = at.unwrap_or_else(|| self.client.info().best_hash);
356 self.query_storage(at, Some(at), keys)
357 }
358
359 fn read_proof(
360 &self,
361 block: Option<Block::Hash>,
362 keys: Vec<StorageKey>,
363 ) -> std::result::Result<ReadProof<Block::Hash>, Error> {
364 self.block_or_best(block)
365 .and_then(|block| {
366 self.client
367 .read_proof(block, &mut keys.iter().map(|key| key.0.as_ref()))
368 .map(|proof| proof.into_iter_nodes().map(|node| node.into()).collect())
369 .map(|proof| ReadProof { at: block, proof })
370 })
371 .map_err(client_err)
372 }
373
374 fn subscribe_runtime_version(&self, pending: PendingSubscriptionSink) {
375 let initial = match self
376 .block_or_best(None)
377 .and_then(|block| self.client.runtime_version_at(block).map_err(Into::into))
378 .map_err(|e| Error::Client(Box::new(e)))
379 {
380 Ok(initial) => initial,
381 Err(e) => {
382 spawn_subscription_task(&self.executor, pending.reject(e));
383 return
384 },
385 };
386
387 let mut previous_version = initial.clone();
388 let client = self.client.clone();
389
390 let version_stream = client
392 .import_notification_stream()
393 .filter(|n| future::ready(n.is_new_best))
394 .filter_map(move |n| {
395 let version =
396 client.runtime_version_at(n.hash).map_err(|e| Error::Client(Box::new(e)));
397
398 match version {
399 Ok(version) if version != previous_version => {
400 previous_version = version.clone();
401 future::ready(Some(version))
402 },
403 _ => future::ready(None),
404 }
405 });
406
407 let stream = futures::stream::once(future::ready(initial)).chain(version_stream);
408 spawn_subscription_task(
409 &self.executor,
410 PendingSubscription::from(pending).pipe_from_stream(stream, BoundedVecDeque::default()),
411 );
412 }
413
414 fn subscribe_storage(
415 &self,
416 pending: PendingSubscriptionSink,
417 keys: Option<Vec<StorageKey>>,
418 deny_unsafe: DenyUnsafe,
419 ) {
420 if keys.is_none() {
421 if let Err(err) = deny_unsafe.check_if_safe() {
422 spawn_subscription_task(&self.executor, pending.reject(ErrorObject::from(err)));
423 return
424 }
425 }
426
427 let stream = match self.client.storage_changes_notification_stream(keys.as_deref(), None) {
428 Ok(stream) => stream,
429 Err(blockchain_err) => {
430 spawn_subscription_task(
431 &self.executor,
432 pending.reject(Error::Client(Box::new(blockchain_err))),
433 );
434 return
435 },
436 };
437
438 let initial = stream::iter(keys.map(|keys| {
439 let block = self.client.info().best_hash;
440 let changes = keys
441 .into_iter()
442 .map(|key| {
443 let v = self.client.storage(block, &key).ok().flatten();
444 (key, v)
445 })
446 .collect();
447 StorageChangeSet { block, changes }
448 }));
449
450 let storage_stream = stream.map(|storage_notif| StorageChangeSet {
451 block: storage_notif.block,
452 changes: storage_notif
453 .changes
454 .iter()
455 .filter_map(|(o_sk, k, v)| o_sk.is_none().then(|| (k.clone(), v.cloned())))
456 .collect(),
457 });
458
459 let stream = initial
460 .chain(storage_stream)
461 .filter(|storage| future::ready(!storage.changes.is_empty()));
462
463 spawn_subscription_task(
464 &self.executor,
465 PendingSubscription::from(pending).pipe_from_stream(stream, BoundedVecDeque::default()),
466 );
467 }
468
469 fn trace_block(
470 &self,
471 block: Block::Hash,
472 targets: Option<String>,
473 storage_keys: Option<String>,
474 methods: Option<String>,
475 ) -> std::result::Result<sp_rpc::tracing::TraceBlockResponse, Error> {
476 sc_tracing::block::BlockExecutor::new(
477 self.client.clone(),
478 block,
479 targets,
480 storage_keys,
481 methods,
482 )
483 .trace_block()
484 .map_err(|e| invalid_block::<Block>(block, None, e.to_string()))
485 }
486}
487
488impl<BE, Block, Client> ChildStateBackend<Block, Client> for FullState<BE, Block, Client>
489where
490 Block: BlockT + 'static,
491 BE: Backend<Block> + 'static,
492 Client: ExecutorProvider<Block>
493 + StorageProvider<Block, BE>
494 + ProofProvider<Block>
495 + HeaderBackend<Block>
496 + BlockBackend<Block>
497 + HeaderMetadata<Block, Error = sp_blockchain::Error>
498 + BlockchainEvents<Block>
499 + CallApiAt<Block>
500 + ProvideRuntimeApi<Block>
501 + Send
502 + Sync
503 + 'static,
504 Client::Api: Metadata<Block>,
505{
506 fn read_child_proof(
507 &self,
508 block: Option<Block::Hash>,
509 storage_key: PrefixedStorageKey,
510 keys: Vec<StorageKey>,
511 ) -> std::result::Result<ReadProof<Block::Hash>, Error> {
512 self.block_or_best(block)
513 .and_then(|block| {
514 let child_info = match ChildType::from_prefixed_key(&storage_key) {
515 Some((ChildType::ParentKeyId, storage_key)) =>
516 ChildInfo::new_default(storage_key),
517 None => return Err(sp_blockchain::Error::InvalidChildStorageKey),
518 };
519 self.client
520 .read_child_proof(
521 block,
522 &child_info,
523 &mut keys.iter().map(|key| key.0.as_ref()),
524 )
525 .map(|proof| proof.into_iter_nodes().map(|node| node.into()).collect())
526 .map(|proof| ReadProof { at: block, proof })
527 })
528 .map_err(client_err)
529 }
530
531 fn storage_keys(
532 &self,
533 block: Option<Block::Hash>,
534 storage_key: PrefixedStorageKey,
535 prefix: StorageKey,
536 ) -> std::result::Result<Vec<StorageKey>, Error> {
537 self.block_or_best(block)
539 .and_then(|block| {
540 let child_info = match ChildType::from_prefixed_key(&storage_key) {
541 Some((ChildType::ParentKeyId, storage_key)) =>
542 ChildInfo::new_default(storage_key),
543 None => return Err(sp_blockchain::Error::InvalidChildStorageKey),
544 };
545 self.client.child_storage_keys(block, child_info, Some(&prefix), None)
546 })
547 .map(|iter| iter.collect())
548 .map_err(client_err)
549 }
550
551 fn storage_keys_paged(
552 &self,
553 block: Option<Block::Hash>,
554 storage_key: PrefixedStorageKey,
555 prefix: Option<StorageKey>,
556 count: u32,
557 start_key: Option<StorageKey>,
558 ) -> std::result::Result<Vec<StorageKey>, Error> {
559 self.block_or_best(block)
560 .and_then(|block| {
561 let child_info = match ChildType::from_prefixed_key(&storage_key) {
562 Some((ChildType::ParentKeyId, storage_key)) =>
563 ChildInfo::new_default(storage_key),
564 None => return Err(sp_blockchain::Error::InvalidChildStorageKey),
565 };
566 self.client.child_storage_keys(
567 block,
568 child_info,
569 prefix.as_ref(),
570 start_key.as_ref(),
571 )
572 })
573 .map(|iter| iter.take(count as usize).collect())
574 .map_err(client_err)
575 }
576
577 fn storage(
578 &self,
579 block: Option<Block::Hash>,
580 storage_key: PrefixedStorageKey,
581 key: StorageKey,
582 ) -> std::result::Result<Option<StorageData>, Error> {
583 self.block_or_best(block)
584 .and_then(|block| {
585 let child_info = match ChildType::from_prefixed_key(&storage_key) {
586 Some((ChildType::ParentKeyId, storage_key)) =>
587 ChildInfo::new_default(storage_key),
588 None => return Err(sp_blockchain::Error::InvalidChildStorageKey),
589 };
590 self.client.child_storage(block, &child_info, &key)
591 })
592 .map_err(client_err)
593 }
594
595 fn storage_entries(
596 &self,
597 block: Option<Block::Hash>,
598 storage_key: PrefixedStorageKey,
599 keys: Vec<StorageKey>,
600 ) -> std::result::Result<Vec<Option<StorageData>>, Error> {
601 let child_info = if let Some((ChildType::ParentKeyId, storage_key)) =
602 ChildType::from_prefixed_key(&storage_key)
603 {
604 Arc::new(ChildInfo::new_default(storage_key))
605 } else {
606 return Err(client_err(sp_blockchain::Error::InvalidChildStorageKey))
607 };
608 let block = self.block_or_best(block).map_err(client_err)?;
609 let client = self.client.clone();
610
611 keys.into_iter()
612 .map(move |key| {
613 client.clone().child_storage(block, &child_info, &key).map_err(client_err)
614 })
615 .collect()
616 }
617
618 fn storage_hash(
619 &self,
620 block: Option<Block::Hash>,
621 storage_key: PrefixedStorageKey,
622 key: StorageKey,
623 ) -> std::result::Result<Option<Block::Hash>, Error> {
624 self.block_or_best(block)
625 .and_then(|block| {
626 let child_info = match ChildType::from_prefixed_key(&storage_key) {
627 Some((ChildType::ParentKeyId, storage_key)) =>
628 ChildInfo::new_default(storage_key),
629 None => return Err(sp_blockchain::Error::InvalidChildStorageKey),
630 };
631 self.client.child_storage_hash(block, &child_info, &key)
632 })
633 .map_err(client_err)
634 }
635}
636
637fn invalid_block_range<B: BlockT>(
638 from: &CachedHeaderMetadata<B>,
639 to: &CachedHeaderMetadata<B>,
640 details: String,
641) -> Error {
642 let to_string = |h: &CachedHeaderMetadata<B>| format!("{} ({:?})", h.number, h.hash);
643
644 Error::InvalidBlockRange { from: to_string(from), to: to_string(to), details }
645}
646
647fn invalid_block<B: BlockT>(from: B::Hash, to: Option<B::Hash>, details: String) -> Error {
648 Error::InvalidBlockRange { from: format!("{:?}", from), to: format!("{:?}", to), details }
649}