referrerpolicy=no-referrer-when-downgrade
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// This file is part of Substrate.

// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! Implementation of the `chainHead_storage` method.

use std::{marker::PhantomData, sync::Arc};

use sc_client_api::{Backend, ChildInfo, StorageKey, StorageProvider};
use sp_runtime::traits::Block as BlockT;
use tokio::sync::mpsc;

use crate::common::{
	events::{StorageQuery, StorageQueryType},
	storage::{IterQueryType, QueryIter, QueryResult, Storage},
};

/// Generates the events of the `chainHead_storage` method.
pub struct ChainHeadStorage<Client, Block, BE> {
	/// Storage client.
	client: Storage<Client, Block, BE>,
	_phandom: PhantomData<(BE, Block)>,
}

impl<Client, Block, BE> Clone for ChainHeadStorage<Client, Block, BE> {
	fn clone(&self) -> Self {
		Self { client: self.client.clone(), _phandom: PhantomData }
	}
}

impl<Client, Block, BE> ChainHeadStorage<Client, Block, BE> {
	/// Constructs a new [`ChainHeadStorage`].
	pub fn new(client: Arc<Client>) -> Self {
		Self { client: Storage::new(client), _phandom: PhantomData }
	}
}

impl<Client, Block, BE> ChainHeadStorage<Client, Block, BE>
where
	Block: BlockT + 'static,
	BE: Backend<Block> + 'static,
	Client: StorageProvider<Block, BE> + Send + Sync + 'static,
{
	/// Generate the block events for the `chainHead_storage` method.
	pub async fn generate_events(
		&mut self,
		hash: Block::Hash,
		items: Vec<StorageQuery<StorageKey>>,
		child_key: Option<ChildInfo>,
		tx: mpsc::Sender<QueryResult>,
	) -> Result<(), tokio::task::JoinError> {
		let this = self.clone();

		tokio::task::spawn_blocking(move || {
			for item in items {
				match item.query_type {
					StorageQueryType::Value => {
						let rp = this.client.query_value(hash, &item.key, child_key.as_ref());
						if tx.blocking_send(rp).is_err() {
							break;
						}
					},
					StorageQueryType::Hash => {
						let rp = this.client.query_hash(hash, &item.key, child_key.as_ref());
						if tx.blocking_send(rp).is_err() {
							break;
						}
					},
					StorageQueryType::ClosestDescendantMerkleValue => {
						let rp =
							this.client.query_merkle_value(hash, &item.key, child_key.as_ref());
						if tx.blocking_send(rp).is_err() {
							break;
						}
					},
					StorageQueryType::DescendantsValues => {
						let query = QueryIter {
							query_key: item.key,
							ty: IterQueryType::Value,
							pagination_start_key: None,
						};
						this.client.query_iter_pagination_with_producer(
							query,
							hash,
							child_key.as_ref(),
							&tx,
						)
					},
					StorageQueryType::DescendantsHashes => {
						let query = QueryIter {
							query_key: item.key,
							ty: IterQueryType::Hash,
							pagination_start_key: None,
						};
						this.client.query_iter_pagination_with_producer(
							query,
							hash,
							child_key.as_ref(),
							&tx,
						)
					},
				}
			}
		})
		.await?;

		Ok(())
	}
}