referrerpolicy=no-referrer-when-downgrade

relay_substrate_client/client/
rpc.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2// This file is part of Parity Bridges Common.
3
4// Parity Bridges Common is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Parity Bridges Common is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Parity Bridges Common.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Client implementation that connects to the Substrate node over `ws`/`wss` connection
18//! and is using RPC methods to get required data and submit transactions.
19
20use crate::{
21	client::{
22		rpc_api::{
23			SubstrateAuthorClient, SubstrateBeefyClient, SubstrateChainClient,
24			SubstrateFrameSystemClient, SubstrateGrandpaClient, SubstrateStateClient,
25			SubstrateSystemClient,
26		},
27		subscription::{StreamDescription, Subscription},
28		Client,
29	},
30	error::{Error, Result},
31	guard::Environment,
32	transaction_stall_timeout, AccountIdOf, AccountKeyPairOf, BalanceOf, BlockNumberOf, Chain,
33	ChainRuntimeVersion, ChainWithGrandpa, ChainWithTransactions, ConnectionParams, HashOf,
34	HeaderIdOf, HeaderOf, NonceOf, SignParam, SignedBlockOf, SimpleRuntimeVersion,
35	TransactionTracker, UnsignedTransaction,
36};
37
38use async_std::sync::{Arc, Mutex, RwLock};
39use async_trait::async_trait;
40use bp_runtime::HeaderIdProvider;
41use codec::Encode;
42use frame_support::weights::Weight;
43use futures::TryFutureExt;
44use jsonrpsee::{
45	core::{client::Subscription as RpcSubscription, ClientError},
46	ws_client::{WsClient, WsClientBuilder},
47};
48use num_traits::Zero;
49use pallet_transaction_payment::RuntimeDispatchInfo;
50use relay_utils::{relay_loop::RECONNECT_DELAY, STALL_TIMEOUT};
51use sp_core::{
52	storage::{StorageData, StorageKey},
53	Bytes, Hasher, Pair,
54};
55use sp_runtime::{
56	traits::Header,
57	transaction_validity::{TransactionSource, TransactionValidity},
58};
59use sp_trie::StorageProof;
60use sp_version::RuntimeVersion;
61use std::{cmp::Ordering, future::Future, marker::PhantomData};
62
63const MAX_SUBSCRIPTION_CAPACITY: usize = 4096;
64
65const SUB_API_TXPOOL_VALIDATE_TRANSACTION: &str = "TaggedTransactionQueue_validate_transaction";
66const SUB_API_TX_PAYMENT_QUERY_INFO: &str = "TransactionPaymentApi_query_info";
67const SUB_API_GRANDPA_GENERATE_KEY_OWNERSHIP_PROOF: &str =
68	"GrandpaApi_generate_key_ownership_proof";
69
70/// Client implementation that connects to the Substrate node over `ws`/`wss` connection
71/// and is using RPC methods to get required data and submit transactions.
72pub struct RpcClient<C: Chain> {
73	// Lock order: `submit_signed_extrinsic_lock`, `data`
74	/// Client connection params.
75	params: Arc<ConnectionParams>,
76	/// If several tasks are submitting their transactions simultaneously using
77	/// `submit_signed_extrinsic` method, they may get the same transaction nonce. So one of
78	/// transactions will be rejected from the pool. This lock is here to prevent situations like
79	/// that.
80	submit_signed_extrinsic_lock: Arc<Mutex<()>>,
81	/// Genesis block hash.
82	genesis_hash: HashOf<C>,
83	/// Shared dynamic data.
84	data: Arc<RwLock<ClientData>>,
85	/// Generic arguments dump.
86	_phantom: PhantomData<C>,
87}
88
89/// Client data, shared by all `RpcClient` clones.
90struct ClientData {
91	/// Tokio runtime handle.
92	tokio: Arc<tokio::runtime::Runtime>,
93	/// Substrate RPC client.
94	client: Arc<WsClient>,
95}
96
97/// Already encoded value.
98struct PreEncoded(Vec<u8>);
99
100impl Encode for PreEncoded {
101	fn encode(&self) -> Vec<u8> {
102		self.0.clone()
103	}
104}
105
106impl<C: Chain> std::fmt::Debug for RpcClient<C> {
107	fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
108		fmt.write_fmt(format_args!("RpcClient<{}>", C::NAME))
109	}
110}
111
112impl<C: Chain> RpcClient<C> {
113	/// Returns client that is able to call RPCs on Substrate node over websocket connection.
114	///
115	/// This function will keep connecting to given Substrate node until connection is established
116	/// and is functional. If attempt fail, it will wait for `RECONNECT_DELAY` and retry again.
117	pub async fn new(params: ConnectionParams) -> Self {
118		let params = Arc::new(params);
119		loop {
120			match Self::try_connect(params.clone()).await {
121				Ok(client) => return client,
122				Err(error) => log::error!(
123					target: "bridge",
124					"Failed to connect to {} node: {:?}. Going to retry in {}s",
125					C::NAME,
126					error,
127					RECONNECT_DELAY.as_secs(),
128				),
129			}
130
131			async_std::task::sleep(RECONNECT_DELAY).await;
132		}
133	}
134
135	/// Try to connect to Substrate node over websocket. Returns Substrate RPC client if connection
136	/// has been established or error otherwise.
137	async fn try_connect(params: Arc<ConnectionParams>) -> Result<Self> {
138		let (tokio, client) = Self::build_client(&params).await?;
139
140		let genesis_hash_client = client.clone();
141		let genesis_hash = tokio
142			.spawn(async move {
143				SubstrateChainClient::<C>::block_hash(&*genesis_hash_client, Some(Zero::zero()))
144					.await
145			})
146			.await??;
147
148		let chain_runtime_version = params.chain_runtime_version;
149		let mut client = Self {
150			params,
151			submit_signed_extrinsic_lock: Arc::new(Mutex::new(())),
152			genesis_hash,
153			data: Arc::new(RwLock::new(ClientData { tokio, client })),
154			_phantom: PhantomData,
155		};
156		Self::ensure_correct_runtime_version(&mut client, chain_runtime_version).await?;
157		Ok(client)
158	}
159
160	// Check runtime version to understand if we need are connected to expected version, or we
161	// need to wait for upgrade, we need to abort immediately.
162	async fn ensure_correct_runtime_version<E: Environment<C, Error = Error>>(
163		env: &mut E,
164		expected: ChainRuntimeVersion,
165	) -> Result<()> {
166		// we are only interested if version mode is bundled or passed using CLI
167		let expected = match expected {
168			ChainRuntimeVersion::Auto => return Ok(()),
169			ChainRuntimeVersion::Custom(expected) => expected,
170		};
171
172		// we need to wait if actual version is < than expected, we are OK of versions are the
173		// same and we need to abort if actual version is > than expected
174		let actual = SimpleRuntimeVersion::from_runtime_version(&env.runtime_version().await?);
175		match actual.spec_version.cmp(&expected.spec_version) {
176			Ordering::Less =>
177				Err(Error::WaitingForRuntimeUpgrade { chain: C::NAME.into(), expected, actual }),
178			Ordering::Equal => Ok(()),
179			Ordering::Greater => {
180				log::error!(
181					target: "bridge",
182					"The {} client is configured to use runtime version {expected:?} and actual \
183					version is {actual:?}. Aborting",
184					C::NAME,
185				);
186				env.abort().await;
187				Err(Error::Custom("Aborted".into()))
188			},
189		}
190	}
191
192	/// Build client to use in connection.
193	async fn build_client(
194		params: &ConnectionParams,
195	) -> Result<(Arc<tokio::runtime::Runtime>, Arc<WsClient>)> {
196		let tokio = tokio::runtime::Runtime::new()?;
197		let uri = params.uri.clone();
198		log::info!(target: "bridge", "Connecting to {} node at {}", C::NAME, uri);
199
200		let client = tokio
201			.spawn(async move {
202				WsClientBuilder::default()
203					.max_buffer_capacity_per_subscription(MAX_SUBSCRIPTION_CAPACITY)
204					.build(&uri)
205					.await
206			})
207			.await??;
208
209		Ok((Arc::new(tokio), Arc::new(client)))
210	}
211
212	/// Execute jsonrpsee future in tokio context.
213	async fn jsonrpsee_execute<MF, F, T>(&self, make_jsonrpsee_future: MF) -> Result<T>
214	where
215		MF: FnOnce(Arc<WsClient>) -> F + Send + 'static,
216		F: Future<Output = Result<T>> + Send + 'static,
217		T: Send + 'static,
218	{
219		let data = self.data.read().await;
220		let client = data.client.clone();
221		data.tokio.spawn(make_jsonrpsee_future(client)).await?
222	}
223
224	/// Prepare parameters used to sign chain transactions.
225	async fn build_sign_params(&self, signer: AccountKeyPairOf<C>) -> Result<SignParam<C>>
226	where
227		C: ChainWithTransactions,
228	{
229		let runtime_version = self.simple_runtime_version().await?;
230		Ok(SignParam::<C> {
231			spec_version: runtime_version.spec_version,
232			transaction_version: runtime_version.transaction_version,
233			genesis_hash: self.genesis_hash,
234			signer,
235		})
236	}
237
238	/// Get the nonce of the given Substrate account.
239	pub async fn next_account_index(&self, account: AccountIdOf<C>) -> Result<NonceOf<C>> {
240		self.jsonrpsee_execute(move |client| async move {
241			Ok(SubstrateFrameSystemClient::<C>::account_next_index(&*client, account).await?)
242		})
243		.await
244	}
245
246	/// Subscribe to finality justifications.
247	async fn subscribe_finality_justifications<Fut>(
248		&self,
249		gadget_name: &str,
250		do_subscribe: impl FnOnce(Arc<WsClient>) -> Fut + Send + 'static,
251	) -> Result<Subscription<Bytes>>
252	where
253		Fut: Future<Output = std::result::Result<RpcSubscription<Bytes>, ClientError>> + Send,
254	{
255		let subscription = self
256			.jsonrpsee_execute(move |client| async move { Ok(do_subscribe(client).await?) })
257			.map_err(|e| Error::failed_to_subscribe_justification::<C>(e))
258			.await?;
259
260		Ok(Subscription::new_forwarded(
261			StreamDescription::new(format!("{} justifications", gadget_name), C::NAME.into()),
262			subscription,
263		))
264	}
265
266	/// Subscribe to headers stream.
267	async fn subscribe_headers<Fut>(
268		&self,
269		stream_name: &str,
270		do_subscribe: impl FnOnce(Arc<WsClient>) -> Fut + Send + 'static,
271		map_err: impl FnOnce(Error) -> Error,
272	) -> Result<Subscription<HeaderOf<C>>>
273	where
274		Fut: Future<Output = std::result::Result<RpcSubscription<HeaderOf<C>>, ClientError>> + Send,
275	{
276		let subscription = self
277			.jsonrpsee_execute(move |client| async move { Ok(do_subscribe(client).await?) })
278			.map_err(map_err)
279			.await?;
280
281		Ok(Subscription::new_forwarded(
282			StreamDescription::new(format!("{} headers", stream_name), C::NAME.into()),
283			subscription,
284		))
285	}
286}
287
288impl<C: Chain> Clone for RpcClient<C> {
289	fn clone(&self) -> Self {
290		RpcClient {
291			params: self.params.clone(),
292			submit_signed_extrinsic_lock: self.submit_signed_extrinsic_lock.clone(),
293			genesis_hash: self.genesis_hash,
294			data: self.data.clone(),
295			_phantom: PhantomData,
296		}
297	}
298}
299
300#[async_trait]
301impl<C: Chain> Client<C> for RpcClient<C> {
302	async fn ensure_synced(&self) -> Result<()> {
303		let health = self
304			.jsonrpsee_execute(|client| async move {
305				Ok(SubstrateSystemClient::<C>::health(&*client).await?)
306			})
307			.await
308			.map_err(|e| Error::failed_to_get_system_health::<C>(e))?;
309
310		let is_synced = !health.is_syncing && (!health.should_have_peers || health.peers > 0);
311		if is_synced {
312			Ok(())
313		} else {
314			Err(Error::ClientNotSynced(health))
315		}
316	}
317
318	async fn reconnect(&self) -> Result<()> {
319		let mut data = self.data.write().await;
320		let (tokio, client) = Self::build_client(&self.params).await?;
321		data.tokio = tokio;
322		data.client = client;
323		Ok(())
324	}
325
326	fn genesis_hash(&self) -> HashOf<C> {
327		self.genesis_hash
328	}
329
330	async fn header_hash_by_number(&self, number: BlockNumberOf<C>) -> Result<HashOf<C>> {
331		self.jsonrpsee_execute(move |client| async move {
332			Ok(SubstrateChainClient::<C>::block_hash(&*client, Some(number)).await?)
333		})
334		.await
335		.map_err(|e| Error::failed_to_read_header_hash_by_number::<C>(number, e))
336	}
337
338	async fn header_by_hash(&self, hash: HashOf<C>) -> Result<HeaderOf<C>> {
339		self.jsonrpsee_execute(move |client| async move {
340			Ok(SubstrateChainClient::<C>::header(&*client, Some(hash)).await?)
341		})
342		.await
343		.map_err(|e| Error::failed_to_read_header_by_hash::<C>(hash, e))
344	}
345
346	async fn block_by_hash(&self, hash: HashOf<C>) -> Result<SignedBlockOf<C>> {
347		self.jsonrpsee_execute(move |client| async move {
348			Ok(SubstrateChainClient::<C>::block(&*client, Some(hash)).await?)
349		})
350		.await
351		.map_err(|e| Error::failed_to_read_block_by_hash::<C>(hash, e))
352	}
353
354	async fn best_finalized_header_hash(&self) -> Result<HashOf<C>> {
355		self.jsonrpsee_execute(|client| async move {
356			Ok(SubstrateChainClient::<C>::finalized_head(&*client).await?)
357		})
358		.await
359		.map_err(|e| Error::failed_to_read_best_finalized_header_hash::<C>(e))
360	}
361
362	async fn best_header(&self) -> Result<HeaderOf<C>> {
363		self.jsonrpsee_execute(|client| async move {
364			Ok(SubstrateChainClient::<C>::header(&*client, None).await?)
365		})
366		.await
367		.map_err(|e| Error::failed_to_read_best_header::<C>(e))
368	}
369
370	async fn subscribe_best_headers(&self) -> Result<Subscription<HeaderOf<C>>> {
371		self.subscribe_headers(
372			"best headers",
373			move |client| async move { SubstrateChainClient::<C>::subscribe_new_heads(&*client).await },
374			|e| Error::failed_to_subscribe_best_headers::<C>(e),
375		)
376		.await
377	}
378
379	async fn subscribe_finalized_headers(&self) -> Result<Subscription<HeaderOf<C>>> {
380		self.subscribe_headers(
381			"best finalized headers",
382			move |client| async move {
383				SubstrateChainClient::<C>::subscribe_finalized_heads(&*client).await
384			},
385			|e| Error::failed_to_subscribe_finalized_headers::<C>(e),
386		)
387		.await
388	}
389
390	async fn subscribe_grandpa_finality_justifications(&self) -> Result<Subscription<Bytes>>
391	where
392		C: ChainWithGrandpa,
393	{
394		self.subscribe_finality_justifications("GRANDPA", move |client| async move {
395			SubstrateGrandpaClient::<C>::subscribe_justifications(&*client).await
396		})
397		.await
398	}
399
400	async fn generate_grandpa_key_ownership_proof(
401		&self,
402		at: HashOf<C>,
403		set_id: sp_consensus_grandpa::SetId,
404		authority_id: sp_consensus_grandpa::AuthorityId,
405	) -> Result<Option<sp_consensus_grandpa::OpaqueKeyOwnershipProof>> {
406		self.state_call(
407			at,
408			SUB_API_GRANDPA_GENERATE_KEY_OWNERSHIP_PROOF.into(),
409			(set_id, authority_id),
410		)
411		.await
412	}
413
414	async fn subscribe_beefy_finality_justifications(&self) -> Result<Subscription<Bytes>> {
415		self.subscribe_finality_justifications("BEEFY", move |client| async move {
416			SubstrateBeefyClient::<C>::subscribe_justifications(&*client).await
417		})
418		.await
419	}
420
421	async fn token_decimals(&self) -> Result<Option<u64>> {
422		self.jsonrpsee_execute(move |client| async move {
423			let system_properties = SubstrateSystemClient::<C>::properties(&*client).await?;
424			Ok(system_properties.get("tokenDecimals").and_then(|v| v.as_u64()))
425		})
426		.await
427	}
428
429	async fn runtime_version(&self) -> Result<RuntimeVersion> {
430		self.jsonrpsee_execute(move |client| async move {
431			Ok(SubstrateStateClient::<C>::runtime_version(&*client).await?)
432		})
433		.await
434		.map_err(|e| Error::failed_to_read_runtime_version::<C>(e))
435	}
436
437	async fn simple_runtime_version(&self) -> Result<SimpleRuntimeVersion> {
438		Ok(match self.params.chain_runtime_version {
439			ChainRuntimeVersion::Auto => {
440				let runtime_version = self.runtime_version().await?;
441				SimpleRuntimeVersion::from_runtime_version(&runtime_version)
442			},
443			ChainRuntimeVersion::Custom(ref version) => *version,
444		})
445	}
446
447	fn can_start_version_guard(&self) -> bool {
448		!matches!(self.params.chain_runtime_version, ChainRuntimeVersion::Auto)
449	}
450
451	async fn raw_storage_value(
452		&self,
453		at: HashOf<C>,
454		storage_key: StorageKey,
455	) -> Result<Option<StorageData>> {
456		let cloned_storage_key = storage_key.clone();
457		self.jsonrpsee_execute(move |client| async move {
458			Ok(SubstrateStateClient::<C>::storage(&*client, cloned_storage_key, Some(at)).await?)
459		})
460		.await
461		.map_err(|e| Error::failed_to_read_storage_value::<C>(at, storage_key, e))
462	}
463
464	async fn pending_extrinsics(&self) -> Result<Vec<Bytes>> {
465		self.jsonrpsee_execute(move |client| async move {
466			Ok(SubstrateAuthorClient::<C>::pending_extrinsics(&*client).await?)
467		})
468		.await
469		.map_err(|e| Error::failed_to_get_pending_extrinsics::<C>(e))
470	}
471
472	async fn submit_unsigned_extrinsic(&self, transaction: Bytes) -> Result<HashOf<C>> {
473		// one last check that the transaction is valid. Most of checks happen in the relay loop and
474		// it is the "final" check before submission.
475		let best_header_hash = self.best_header_hash().await?;
476		self.validate_transaction(best_header_hash, PreEncoded(transaction.0.clone()))
477			.await
478			.map_err(|e| Error::failed_to_submit_transaction::<C>(e))?
479			.map_err(|e| Error::failed_to_submit_transaction::<C>(Error::TransactionInvalid(e)))?;
480
481		self.jsonrpsee_execute(move |client| async move {
482			let tx_hash = SubstrateAuthorClient::<C>::submit_extrinsic(&*client, transaction)
483				.await
484				.map_err(|e| {
485					log::error!(target: "bridge", "Failed to send transaction to {} node: {:?}", C::NAME, e);
486					e
487				})?;
488			log::trace!(target: "bridge", "Sent transaction to {} node: {:?}", C::NAME, tx_hash);
489			Ok(tx_hash)
490		})
491		.await
492		.map_err(|e| Error::failed_to_submit_transaction::<C>(e))
493	}
494
495	async fn submit_signed_extrinsic(
496		&self,
497		signer: &AccountKeyPairOf<C>,
498		prepare_extrinsic: impl FnOnce(HeaderIdOf<C>, NonceOf<C>) -> Result<UnsignedTransaction<C>>
499			+ Send
500			+ 'static,
501	) -> Result<HashOf<C>>
502	where
503		C: ChainWithTransactions,
504		AccountIdOf<C>: From<<AccountKeyPairOf<C> as Pair>::Public>,
505	{
506		let _guard = self.submit_signed_extrinsic_lock.lock().await;
507		let transaction_nonce = self.next_account_index(signer.public().into()).await?;
508		let best_header = self.best_header().await?;
509		let signing_data = self.build_sign_params(signer.clone()).await?;
510
511		// By using parent of best block here, we are protecting again best-block reorganizations.
512		// E.g. transaction may have been submitted when the best block was `A[num=100]`. Then it
513		// has been changed to `B[num=100]`. Hash of `A` has been included into transaction
514		// signature payload. So when signature will be checked, the check will fail and transaction
515		// will be dropped from the pool.
516		let best_header_id = best_header.parent_id().unwrap_or_else(|| best_header.id());
517
518		let extrinsic = prepare_extrinsic(best_header_id, transaction_nonce)?;
519		let signed_extrinsic = C::sign_transaction(signing_data, extrinsic)?.encode();
520		self.submit_unsigned_extrinsic(Bytes(signed_extrinsic)).await
521	}
522
523	async fn submit_and_watch_signed_extrinsic(
524		&self,
525		signer: &AccountKeyPairOf<C>,
526		prepare_extrinsic: impl FnOnce(HeaderIdOf<C>, NonceOf<C>) -> Result<UnsignedTransaction<C>>
527			+ Send
528			+ 'static,
529	) -> Result<TransactionTracker<C, Self>>
530	where
531		C: ChainWithTransactions,
532		AccountIdOf<C>: From<<AccountKeyPairOf<C> as Pair>::Public>,
533	{
534		let self_clone = self.clone();
535		let signing_data = self.build_sign_params(signer.clone()).await?;
536		let _guard = self.submit_signed_extrinsic_lock.lock().await;
537		let transaction_nonce = self.next_account_index(signer.public().into()).await?;
538		let best_header = self.best_header().await?;
539		let best_header_id = best_header.id();
540
541		let extrinsic = prepare_extrinsic(best_header_id, transaction_nonce)?;
542		let stall_timeout = transaction_stall_timeout(
543			extrinsic.era.mortality_period(),
544			C::AVERAGE_BLOCK_INTERVAL,
545			STALL_TIMEOUT,
546		);
547		let signed_extrinsic = C::sign_transaction(signing_data, extrinsic)?.encode();
548
549		// one last check that the transaction is valid. Most of checks happen in the relay loop and
550		// it is the "final" check before submission.
551		self.validate_transaction(best_header_id.hash(), PreEncoded(signed_extrinsic.clone()))
552			.await
553			.map_err(|e| Error::failed_to_submit_transaction::<C>(e))?
554			.map_err(|e| Error::failed_to_submit_transaction::<C>(Error::TransactionInvalid(e)))?;
555
556		self.jsonrpsee_execute(move |client| async move {
557			let tx_hash = C::Hasher::hash(&signed_extrinsic);
558			let subscription: jsonrpsee::core::client::Subscription<_> =
559				SubstrateAuthorClient::<C>::submit_and_watch_extrinsic(
560					&*client,
561					Bytes(signed_extrinsic),
562				)
563				.await
564				.map_err(|e| {
565					log::error!(target: "bridge", "Failed to send transaction to {} node: {:?}", C::NAME, e);
566					e
567				})?;
568			log::trace!(target: "bridge", "Sent transaction to {} node: {:?}", C::NAME, tx_hash);
569			Ok(TransactionTracker::new(
570				self_clone,
571				stall_timeout,
572				tx_hash,
573				Subscription::new_forwarded(
574					StreamDescription::new("transaction events".into(), C::NAME.into()),
575					subscription,
576				),
577			))
578		})
579		.await
580		.map_err(|e| Error::failed_to_submit_transaction::<C>(e))
581	}
582
583	async fn validate_transaction<SignedTransaction: Encode + Send + 'static>(
584		&self,
585		at: HashOf<C>,
586		transaction: SignedTransaction,
587	) -> Result<TransactionValidity> {
588		self.state_call(
589			at,
590			SUB_API_TXPOOL_VALIDATE_TRANSACTION.into(),
591			(TransactionSource::External, transaction, at),
592		)
593		.await
594	}
595
596	async fn estimate_extrinsic_weight<SignedTransaction: Encode + Send + 'static>(
597		&self,
598		at: HashOf<C>,
599		transaction: SignedTransaction,
600	) -> Result<Weight> {
601		let transaction_len = transaction.encoded_size() as u32;
602		let dispatch_info: RuntimeDispatchInfo<BalanceOf<C>> = self
603			.state_call(at, SUB_API_TX_PAYMENT_QUERY_INFO.into(), (transaction, transaction_len))
604			.await?;
605
606		Ok(dispatch_info.weight)
607	}
608
609	async fn raw_state_call<Args: Encode + Send>(
610		&self,
611		at: HashOf<C>,
612		method: String,
613		arguments: Args,
614	) -> Result<Bytes> {
615		let arguments = Bytes(arguments.encode());
616		let arguments_clone = arguments.clone();
617		let method_clone = method.clone();
618		self.jsonrpsee_execute(move |client| async move {
619			SubstrateStateClient::<C>::call(&*client, method, arguments, Some(at))
620				.await
621				.map_err(Into::into)
622		})
623		.await
624		.map_err(|e| Error::failed_state_call::<C>(at, method_clone, arguments_clone, e))
625	}
626
627	async fn prove_storage(
628		&self,
629		at: HashOf<C>,
630		keys: Vec<StorageKey>,
631	) -> Result<(StorageProof, HashOf<C>)> {
632		let state_root = *self.header_by_hash(at).await?.state_root();
633
634		let keys_clone = keys.clone();
635		let read_proof = self
636			.jsonrpsee_execute(move |client| async move {
637				SubstrateStateClient::<C>::prove_storage(&*client, keys_clone, Some(at))
638					.await
639					.map(|proof| StorageProof::new(proof.proof.into_iter().map(|b| b.0)))
640					.map_err(Into::into)
641			})
642			.await
643			.map_err(|e| Error::failed_to_prove_storage::<C>(at, keys.clone(), e))?;
644
645		Ok((read_proof, state_root))
646	}
647}
648
649#[cfg(test)]
650mod tests {
651	use super::*;
652	use crate::{guard::tests::TestEnvironment, test_chain::TestChain};
653	use futures::{channel::mpsc::unbounded, FutureExt, SinkExt, StreamExt};
654
655	async fn run_ensure_correct_runtime_version(
656		expected: ChainRuntimeVersion,
657		actual: RuntimeVersion,
658	) -> Result<()> {
659		let (
660			(mut runtime_version_tx, runtime_version_rx),
661			(slept_tx, _slept_rx),
662			(aborted_tx, mut aborted_rx),
663		) = (unbounded(), unbounded(), unbounded());
664		runtime_version_tx.send(actual).await.unwrap();
665		let mut env = TestEnvironment { runtime_version_rx, slept_tx, aborted_tx };
666
667		let ensure_correct_runtime_version =
668			RpcClient::<TestChain>::ensure_correct_runtime_version(&mut env, expected).boxed();
669		let aborted = aborted_rx.next().map(|_| Err(Error::Custom("".into()))).boxed();
670		futures::pin_mut!(ensure_correct_runtime_version, aborted);
671		futures::future::select(ensure_correct_runtime_version, aborted)
672			.await
673			.into_inner()
674			.0
675	}
676
677	#[async_std::test]
678	async fn ensure_correct_runtime_version_works() {
679		// when we are configured to use auto version
680		assert!(matches!(
681			run_ensure_correct_runtime_version(
682				ChainRuntimeVersion::Auto,
683				RuntimeVersion {
684					spec_version: 100,
685					transaction_version: 100,
686					..Default::default()
687				},
688			)
689			.await,
690			Ok(()),
691		));
692		// when actual == expected
693		assert!(matches!(
694			run_ensure_correct_runtime_version(
695				ChainRuntimeVersion::Custom(SimpleRuntimeVersion {
696					spec_version: 100,
697					transaction_version: 100
698				}),
699				RuntimeVersion {
700					spec_version: 100,
701					transaction_version: 100,
702					..Default::default()
703				},
704			)
705			.await,
706			Ok(()),
707		));
708		// when actual spec version < expected spec version
709		assert!(matches!(
710			run_ensure_correct_runtime_version(
711				ChainRuntimeVersion::Custom(SimpleRuntimeVersion {
712					spec_version: 100,
713					transaction_version: 100
714				}),
715				RuntimeVersion { spec_version: 99, transaction_version: 100, ..Default::default() },
716			)
717			.await,
718			Err(Error::WaitingForRuntimeUpgrade {
719				expected: SimpleRuntimeVersion { spec_version: 100, transaction_version: 100 },
720				actual: SimpleRuntimeVersion { spec_version: 99, transaction_version: 100 },
721				..
722			}),
723		));
724		// when actual spec version > expected spec version
725		assert!(matches!(
726			run_ensure_correct_runtime_version(
727				ChainRuntimeVersion::Custom(SimpleRuntimeVersion {
728					spec_version: 100,
729					transaction_version: 100
730				}),
731				RuntimeVersion {
732					spec_version: 101,
733					transaction_version: 100,
734					..Default::default()
735				},
736			)
737			.await,
738			Err(Error::Custom(_)),
739		));
740	}
741}