referrerpolicy=no-referrer-when-downgrade

substrate_rpc_client/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// 	http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! # Shared JSON-RPC client related code and abstractions.
19//!
20//! It exposes a `WebSocket JSON-RPC` client that implements the RPC interface in [`sc-rpc-api`]
21//! along with some abstractions.
22//!
23//! ## Usage
24//!
25//! ```no_run
26//! # use substrate_rpc_client::{ws_client, StateApi};
27//! # use sp_core::{H256, storage::StorageKey};
28//!
29//! #[tokio::main]
30//! async fn main() {
31//!
32//!     let client = ws_client("ws://127.0.0.1:9944").await.unwrap();
33//!     client.storage(StorageKey(vec![]), Some(H256::zero())).await.unwrap();
34//!
35//!     // if all type params are not known you need to provide type params
36//!     StateApi::<H256>::storage(&client, StorageKey(vec![]), None).await.unwrap();
37//! }
38//! ```
39
40use async_trait::async_trait;
41use serde::de::DeserializeOwned;
42use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
43use std::collections::VecDeque;
44
45pub use jsonrpsee::{
46	core::{
47		client::{ClientT, Error, Subscription, SubscriptionClientT},
48		params::BatchRequestBuilder,
49		RpcResult,
50	},
51	rpc_params,
52	ws_client::{WsClient, WsClientBuilder},
53};
54pub use sc_rpc_api::{
55	author::AuthorApiClient as AuthorApi, chain::ChainApiClient as ChainApi,
56	child_state::ChildStateApiClient as ChildStateApi, dev::DevApiClient as DevApi,
57	offchain::OffchainApiClient as OffchainApi, state::StateApiClient as StateApi,
58	system::SystemApiClient as SystemApi,
59};
60
61/// Create a new `WebSocket` connection with shared settings.
62pub async fn ws_client(uri: impl AsRef<str>) -> Result<WsClient, String> {
63	WsClientBuilder::default()
64		.max_request_size(u32::MAX)
65		.max_response_size(u32::MAX)
66		.request_timeout(std::time::Duration::from_secs(60 * 10))
67		.connection_timeout(std::time::Duration::from_secs(60))
68		.max_buffer_capacity_per_subscription(1024)
69		.build(uri)
70		.await
71		.map_err(|e| format!("`WsClientBuilder` failed to build: {:?}", e))
72}
73
74/// Abstraction over RPC calling for headers.
75#[async_trait]
76pub trait HeaderProvider<Block: BlockT>
77where
78	Block::Header: HeaderT,
79{
80	/// Awaits for the header of the block with hash `hash`.
81	async fn get_header(&self, hash: Block::Hash) -> Block::Header;
82}
83
84#[async_trait]
85impl<Block: BlockT> HeaderProvider<Block> for WsClient
86where
87	Block::Header: DeserializeOwned,
88{
89	async fn get_header(&self, hash: Block::Hash) -> Block::Header {
90		ChainApi::<(), Block::Hash, Block::Header, ()>::header(self, Some(hash))
91			.await
92			.unwrap()
93			.unwrap()
94	}
95}
96
97/// Abstraction over RPC subscription for finalized headers.
98#[async_trait]
99pub trait HeaderSubscription<Block: BlockT>
100where
101	Block::Header: HeaderT,
102{
103	/// Await for the next finalized header from the subscription.
104	///
105	/// Returns `None` if either the subscription has been closed or there was an error when reading
106	/// an object from the client.
107	async fn next_header(&mut self) -> Option<Block::Header>;
108}
109
110#[async_trait]
111impl<Block: BlockT> HeaderSubscription<Block> for Subscription<Block::Header>
112where
113	Block::Header: DeserializeOwned,
114{
115	async fn next_header(&mut self) -> Option<Block::Header> {
116		match self.next().await {
117			Some(Ok(header)) => Some(header),
118			None => {
119				log::warn!("subscription closed");
120				None
121			},
122			Some(Err(why)) => {
123				log::warn!("subscription returned error: {:?}. Probably decoding has failed.", why);
124				None
125			},
126		}
127	}
128}
129
130/// Stream of all finalized headers.
131///
132/// Returned headers are guaranteed to be ordered. There are no missing headers (even if some of
133/// them lack justification).
134pub struct FinalizedHeaders<
135	'a,
136	Block: BlockT,
137	HP: HeaderProvider<Block>,
138	HS: HeaderSubscription<Block>,
139> {
140	header_provider: &'a HP,
141	subscription: HS,
142	fetched_headers: VecDeque<Block::Header>,
143	last_returned: Option<<Block::Header as HeaderT>::Hash>,
144}
145
146impl<'a, Block: BlockT, HP: HeaderProvider<Block>, HS: HeaderSubscription<Block>>
147	FinalizedHeaders<'a, Block, HP, HS>
148where
149	<Block as BlockT>::Header: DeserializeOwned,
150{
151	pub fn new(header_provider: &'a HP, subscription: HS) -> Self {
152		Self {
153			header_provider,
154			subscription,
155			fetched_headers: VecDeque::new(),
156			last_returned: None,
157		}
158	}
159
160	/// Reads next finalized header from the subscription. If some headers (without justification)
161	/// have been skipped, fetches them as well. Returns number of headers that have been fetched.
162	///
163	/// All fetched headers are stored in `self.fetched_headers`.
164	async fn fetch(&mut self) -> usize {
165		let last_finalized = match self.subscription.next_header().await {
166			Some(header) => header,
167			None => return 0,
168		};
169
170		self.fetched_headers.push_front(last_finalized.clone());
171
172		let mut last_finalized_parent = *last_finalized.parent_hash();
173		let last_returned = self.last_returned.unwrap_or(last_finalized_parent);
174
175		while last_finalized_parent != last_returned {
176			let parent_header = self.header_provider.get_header(last_finalized_parent).await;
177			self.fetched_headers.push_front(parent_header.clone());
178			last_finalized_parent = *parent_header.parent_hash();
179		}
180
181		self.fetched_headers.len()
182	}
183
184	/// Get the next finalized header.
185	pub async fn next(&mut self) -> Option<Block::Header> {
186		if self.fetched_headers.is_empty() {
187			self.fetch().await;
188		}
189
190		if let Some(header) = self.fetched_headers.pop_front() {
191			self.last_returned = Some(header.hash());
192			Some(header)
193		} else {
194			None
195		}
196	}
197}
198
199#[cfg(test)]
200mod tests {
201	use super::*;
202	use sp_runtime::testing::{Block as TBlock, Header, MockCallU64, TestXt, H256};
203	use std::sync::Arc;
204	use tokio::sync::Mutex;
205
206	type UncheckedXt = TestXt<MockCallU64, ()>;
207	type Block = TBlock<UncheckedXt>;
208	type BlockNumber = u64;
209	type Hash = H256;
210
211	struct MockHeaderProvider(pub Arc<Mutex<VecDeque<BlockNumber>>>);
212
213	fn headers() -> Vec<Header> {
214		let mut headers = vec![Header::new_from_number(0)];
215		for n in 1..11 {
216			headers.push(Header {
217				parent_hash: headers.last().unwrap().hash(),
218				..Header::new_from_number(n)
219			})
220		}
221		headers
222	}
223
224	#[async_trait]
225	impl HeaderProvider<Block> for MockHeaderProvider {
226		async fn get_header(&self, _hash: Hash) -> Header {
227			let height = self.0.lock().await.pop_front().unwrap();
228			headers()[height as usize].clone()
229		}
230	}
231
232	struct MockHeaderSubscription(pub VecDeque<BlockNumber>);
233
234	#[async_trait]
235	impl HeaderSubscription<Block> for MockHeaderSubscription {
236		async fn next_header(&mut self) -> Option<Header> {
237			self.0.pop_front().map(|h| headers()[h as usize].clone())
238		}
239	}
240
241	#[tokio::test]
242	async fn finalized_headers_works_when_every_block_comes_from_subscription() {
243		let heights = vec![4, 5, 6, 7];
244
245		let provider = MockHeaderProvider(Default::default());
246		let subscription = MockHeaderSubscription(heights.clone().into());
247		let mut headers = FinalizedHeaders::new(&provider, subscription);
248
249		for h in heights {
250			assert_eq!(h, headers.next().await.unwrap().number);
251		}
252		assert_eq!(None, headers.next().await);
253	}
254
255	#[tokio::test]
256	async fn finalized_headers_come_from_subscription_and_provider_if_in_need() {
257		let all_heights = 3..11;
258		let heights_in_subscription = vec![3, 4, 6, 10];
259		// Consecutive headers will be requested in the reversed order.
260		let heights_not_in_subscription = vec![5, 9, 8, 7];
261
262		let provider = MockHeaderProvider(Arc::new(Mutex::new(heights_not_in_subscription.into())));
263		let subscription = MockHeaderSubscription(heights_in_subscription.into());
264		let mut headers = FinalizedHeaders::new(&provider, subscription);
265
266		for h in all_heights {
267			assert_eq!(h, headers.next().await.unwrap().number);
268		}
269		assert_eq!(None, headers.next().await);
270	}
271}