referrerpolicy=no-referrer-when-downgrade

relay_substrate_client/client/
caching.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2// This file is part of Parity Bridges Common.
3
4// Parity Bridges Common is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Parity Bridges Common is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Parity Bridges Common.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Client implementation that is caching (whenever possible) results of its backend
18//! method calls.
19
20use crate::{
21	client::{Client, SubscriptionBroadcaster},
22	error::{Error, Result},
23	AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, ChainWithGrandpa, ChainWithTransactions,
24	HashOf, HeaderIdOf, HeaderOf, NonceOf, SignedBlockOf, SimpleRuntimeVersion, Subscription,
25	TransactionTracker, UnsignedTransaction, ANCIENT_BLOCK_THRESHOLD,
26};
27use std::{cmp::Ordering, future::Future, task::Poll};
28
29use async_std::{
30	sync::{Arc, Mutex, RwLock},
31	task::JoinHandle,
32};
33use async_trait::async_trait;
34use codec::Encode;
35use frame_support::weights::Weight;
36use futures::{FutureExt, StreamExt};
37use quick_cache::unsync::Cache;
38use sp_consensus_grandpa::{AuthorityId, OpaqueKeyOwnershipProof, SetId};
39use sp_core::{
40	storage::{StorageData, StorageKey},
41	Bytes, Pair,
42};
43use sp_runtime::{traits::Header as _, transaction_validity::TransactionValidity};
44use sp_trie::StorageProof;
45use sp_version::RuntimeVersion;
46
47/// `quick_cache::unsync::Cache` wrapped in async-aware synchronization primitives.
48type SyncCache<K, V> = Arc<RwLock<Cache<K, V>>>;
49
50/// Client implementation that is caching (whenever possible) results of its backend
51/// method calls. Apart from caching call results, it also supports some (at the
52/// moment: justifications) subscription sharing, meaning that the single server
53/// subscription may be shared by multiple subscribers at the client side.
54#[derive(Clone)]
55pub struct CachingClient<C: Chain, B: Client<C>> {
56	backend: B,
57	data: Arc<ClientData<C>>,
58}
59
60/// Client data, shared by all `CachingClient` clones.
61struct ClientData<C: Chain> {
62	grandpa_justifications: Arc<Mutex<Option<SubscriptionBroadcaster<Bytes>>>>,
63	beefy_justifications: Arc<Mutex<Option<SubscriptionBroadcaster<Bytes>>>>,
64	background_task_handle: Arc<Mutex<JoinHandle<Result<()>>>>,
65	best_header: Arc<RwLock<Option<HeaderOf<C>>>>,
66	best_finalized_header: Arc<RwLock<Option<HeaderOf<C>>>>,
67	// `quick_cache::sync::Cache` has the `get_or_insert_async` method, which fits our needs,
68	// but it uses synchronization primitives that are not aware of async execution. They
69	// can block the executor threads and cause deadlocks => let's use primitives from
70	// `async_std` crate around `quick_cache::unsync::Cache`
71	header_hash_by_number_cache: SyncCache<BlockNumberOf<C>, HashOf<C>>,
72	header_by_hash_cache: SyncCache<HashOf<C>, HeaderOf<C>>,
73	block_by_hash_cache: SyncCache<HashOf<C>, SignedBlockOf<C>>,
74	raw_storage_value_cache: SyncCache<(HashOf<C>, StorageKey), Option<StorageData>>,
75	state_call_cache: SyncCache<(HashOf<C>, String, Bytes), Bytes>,
76}
77
78impl<C: Chain, B: Client<C>> CachingClient<C, B> {
79	/// Creates new `CachingClient` on top of given `backend`.
80	pub async fn new(backend: B) -> Self {
81		// most of relayer operations will never touch more than `ANCIENT_BLOCK_THRESHOLD`
82		// headers, so we'll use this as a cache capacity for all chain-related caches
83		let chain_state_capacity = ANCIENT_BLOCK_THRESHOLD as usize;
84		let best_header = Arc::new(RwLock::new(None));
85		let best_finalized_header = Arc::new(RwLock::new(None));
86		let header_by_hash_cache = Arc::new(RwLock::new(Cache::new(chain_state_capacity)));
87		let background_task_handle = Self::start_background_task(
88			backend.clone(),
89			best_header.clone(),
90			best_finalized_header.clone(),
91			header_by_hash_cache.clone(),
92		)
93		.await;
94		CachingClient {
95			backend,
96			data: Arc::new(ClientData {
97				grandpa_justifications: Arc::new(Mutex::new(None)),
98				beefy_justifications: Arc::new(Mutex::new(None)),
99				background_task_handle: Arc::new(Mutex::new(background_task_handle)),
100				best_header,
101				best_finalized_header,
102				header_hash_by_number_cache: Arc::new(RwLock::new(Cache::new(
103					chain_state_capacity,
104				))),
105				header_by_hash_cache,
106				block_by_hash_cache: Arc::new(RwLock::new(Cache::new(chain_state_capacity))),
107				raw_storage_value_cache: Arc::new(RwLock::new(Cache::new(1_024))),
108				state_call_cache: Arc::new(RwLock::new(Cache::new(1_024))),
109			}),
110		}
111	}
112
113	/// Try to get value from the cache, or compute and insert it using given future.
114	async fn get_or_insert_async<K: Clone + std::fmt::Debug + Eq + std::hash::Hash, V: Clone>(
115		&self,
116		cache: &Arc<RwLock<Cache<K, V>>>,
117		key: &K,
118		with: impl std::future::Future<Output = Result<V>>,
119	) -> Result<V> {
120		// try to get cached value first using read lock
121		{
122			let cache = cache.read().await;
123			if let Some(value) = cache.get(key) {
124				return Ok(value.clone())
125			}
126		}
127
128		// let's compute the value without holding any locks - it may cause additional misses and
129		// double insertions, but that's better than holding a lock for a while
130		let value = with.await?;
131
132		// insert/update the value in the cache
133		cache.write().await.insert(key.clone(), value.clone());
134		Ok(value)
135	}
136
137	/// Subscribe to finality justifications, trying to reuse existing subscription.
138	async fn subscribe_finality_justifications<'a>(
139		&'a self,
140		maybe_broadcaster: &Mutex<Option<SubscriptionBroadcaster<Bytes>>>,
141		do_subscribe: impl Future<Output = Result<Subscription<Bytes>>> + 'a,
142	) -> Result<Subscription<Bytes>> {
143		let mut maybe_broadcaster = maybe_broadcaster.lock().await;
144		let broadcaster = match maybe_broadcaster.as_ref() {
145			Some(justifications) => justifications,
146			None => {
147				let broadcaster = match SubscriptionBroadcaster::new(do_subscribe.await?) {
148					Ok(broadcaster) => broadcaster,
149					Err(subscription) => return Ok(subscription),
150				};
151				maybe_broadcaster.get_or_insert(broadcaster)
152			},
153		};
154
155		broadcaster.subscribe().await
156	}
157
158	/// Start background task that reads best (and best finalized) headers from subscriptions.
159	async fn start_background_task(
160		backend: B,
161		best_header: Arc<RwLock<Option<HeaderOf<C>>>>,
162		best_finalized_header: Arc<RwLock<Option<HeaderOf<C>>>>,
163		header_by_hash_cache: SyncCache<HashOf<C>, HeaderOf<C>>,
164	) -> JoinHandle<Result<()>> {
165		async_std::task::spawn(async move {
166			// initialize by reading headers directly from backend to avoid doing that in the
167			// high-level code
168			let mut last_finalized_header =
169				backend.header_by_hash(backend.best_finalized_header_hash().await?).await?;
170			*best_header.write().await = Some(backend.best_header().await?);
171			*best_finalized_header.write().await = Some(last_finalized_header.clone());
172
173			// ...and then continue with subscriptions
174			let mut best_headers = backend.subscribe_best_headers().await?;
175			let mut finalized_headers = backend.subscribe_finalized_headers().await?;
176			loop {
177				futures::select! {
178					new_best_header = best_headers.next().fuse() => {
179						// we assume that the best header is always the actual best header, even if its
180						// number is lower than the number of previous-best-header (chain may use its own
181						// best header selection algorithms)
182						let new_best_header = new_best_header
183							.ok_or_else(|| Error::ChannelError(format!("Mandatory best headers subscription for {} has finished", C::NAME)))?;
184						let new_best_header_hash = new_best_header.hash();
185						header_by_hash_cache.write().await.insert(new_best_header_hash, new_best_header.clone());
186						*best_header.write().await = Some(new_best_header);
187					},
188					new_finalized_header = finalized_headers.next().fuse() => {
189						// in theory we'll always get finalized headers in order, but let's double check
190						let new_finalized_header = new_finalized_header.
191							ok_or_else(|| Error::ChannelError(format!("Finalized headers subscription for {} has finished", C::NAME)))?;
192						let new_finalized_header_number = *new_finalized_header.number();
193						let last_finalized_header_number = *last_finalized_header.number();
194						match new_finalized_header_number.cmp(&last_finalized_header_number) {
195							Ordering::Greater => {
196								let new_finalized_header_hash = new_finalized_header.hash();
197								header_by_hash_cache.write().await.insert(new_finalized_header_hash, new_finalized_header.clone());
198								*best_finalized_header.write().await = Some(new_finalized_header.clone());
199								last_finalized_header = new_finalized_header;
200							},
201							Ordering::Less => {
202								return Err(Error::unordered_finalized_headers::<C>(
203									new_finalized_header_number,
204									last_finalized_header_number,
205								));
206							},
207							_ => (),
208						}
209					},
210				}
211			}
212		})
213	}
214
215	/// Ensure that the background task is active.
216	async fn ensure_background_task_active(&self) -> Result<()> {
217		let mut background_task_handle = self.data.background_task_handle.lock().await;
218		if let Poll::Ready(result) = futures::poll!(&mut *background_task_handle) {
219			return Err(Error::ChannelError(format!(
220				"Background task of {} client has exited with result: {:?}",
221				C::NAME,
222				result
223			)))
224		}
225
226		Ok(())
227	}
228
229	/// Try to get header, read elsewhere by background task through subscription.
230	async fn read_header_from_background<'a>(
231		&'a self,
232		header: &Arc<RwLock<Option<HeaderOf<C>>>>,
233		read_header_from_backend: impl Future<Output = Result<HeaderOf<C>>> + 'a,
234	) -> Result<HeaderOf<C>> {
235		// ensure that the background task is active
236		self.ensure_background_task_active().await?;
237
238		// now we know that the background task is active, so we could trust that the
239		// `header` has the most recent updates from it
240		match header.read().await.clone() {
241			Some(header) => Ok(header),
242			None => {
243				// header has not yet been read from the subscription, which means that
244				// we are just starting - let's read header directly from backend this time
245				read_header_from_backend.await
246			},
247		}
248	}
249}
250
251impl<C: Chain, B: Client<C>> std::fmt::Debug for CachingClient<C, B> {
252	fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
253		fmt.write_fmt(format_args!("CachingClient<{:?}>", self.backend))
254	}
255}
256
257#[async_trait]
258impl<C: Chain, B: Client<C>> Client<C> for CachingClient<C, B> {
259	async fn ensure_synced(&self) -> Result<()> {
260		self.backend.ensure_synced().await
261	}
262
263	async fn reconnect(&self) -> Result<()> {
264		self.backend.reconnect().await?;
265		// since we have new underlying client, we need to restart subscriptions too
266		*self.data.grandpa_justifications.lock().await = None;
267		*self.data.beefy_justifications.lock().await = None;
268		// also restart background task too
269		*self.data.best_header.write().await = None;
270		*self.data.best_finalized_header.write().await = None;
271		*self.data.background_task_handle.lock().await = Self::start_background_task(
272			self.backend.clone(),
273			self.data.best_header.clone(),
274			self.data.best_finalized_header.clone(),
275			self.data.header_by_hash_cache.clone(),
276		)
277		.await;
278		Ok(())
279	}
280
281	fn genesis_hash(&self) -> HashOf<C> {
282		self.backend.genesis_hash()
283	}
284
285	async fn header_hash_by_number(&self, number: BlockNumberOf<C>) -> Result<HashOf<C>> {
286		self.get_or_insert_async(
287			&self.data.header_hash_by_number_cache,
288			&number,
289			self.backend.header_hash_by_number(number),
290		)
291		.await
292	}
293
294	async fn header_by_hash(&self, hash: HashOf<C>) -> Result<HeaderOf<C>> {
295		self.get_or_insert_async(
296			&self.data.header_by_hash_cache,
297			&hash,
298			self.backend.header_by_hash(hash),
299		)
300		.await
301	}
302
303	async fn block_by_hash(&self, hash: HashOf<C>) -> Result<SignedBlockOf<C>> {
304		self.get_or_insert_async(
305			&self.data.block_by_hash_cache,
306			&hash,
307			self.backend.block_by_hash(hash),
308		)
309		.await
310	}
311
312	async fn best_finalized_header_hash(&self) -> Result<HashOf<C>> {
313		self.read_header_from_background(
314			&self.data.best_finalized_header,
315			self.backend.best_finalized_header(),
316		)
317		.await
318		.map(|h| h.hash())
319	}
320
321	async fn best_header(&self) -> Result<HeaderOf<C>> {
322		self.read_header_from_background(&self.data.best_header, self.backend.best_header())
323			.await
324	}
325
326	async fn subscribe_best_headers(&self) -> Result<Subscription<HeaderOf<C>>> {
327		// we may share the sunbscription here, but atm there's no callers of this method
328		self.backend.subscribe_best_headers().await
329	}
330
331	async fn subscribe_finalized_headers(&self) -> Result<Subscription<HeaderOf<C>>> {
332		// we may share the sunbscription here, but atm there's no callers of this method
333		self.backend.subscribe_finalized_headers().await
334	}
335
336	async fn subscribe_grandpa_finality_justifications(&self) -> Result<Subscription<Bytes>>
337	where
338		C: ChainWithGrandpa,
339	{
340		self.subscribe_finality_justifications(
341			&self.data.grandpa_justifications,
342			self.backend.subscribe_grandpa_finality_justifications(),
343		)
344		.await
345	}
346
347	async fn generate_grandpa_key_ownership_proof(
348		&self,
349		at: HashOf<C>,
350		set_id: SetId,
351		authority_id: AuthorityId,
352	) -> Result<Option<OpaqueKeyOwnershipProof>> {
353		self.backend
354			.generate_grandpa_key_ownership_proof(at, set_id, authority_id)
355			.await
356	}
357
358	async fn subscribe_beefy_finality_justifications(&self) -> Result<Subscription<Bytes>> {
359		self.subscribe_finality_justifications(
360			&self.data.beefy_justifications,
361			self.backend.subscribe_beefy_finality_justifications(),
362		)
363		.await
364	}
365
366	async fn token_decimals(&self) -> Result<Option<u64>> {
367		self.backend.token_decimals().await
368	}
369
370	async fn runtime_version(&self) -> Result<RuntimeVersion> {
371		self.backend.runtime_version().await
372	}
373
374	async fn simple_runtime_version(&self) -> Result<SimpleRuntimeVersion> {
375		self.backend.simple_runtime_version().await
376	}
377
378	fn can_start_version_guard(&self) -> bool {
379		self.backend.can_start_version_guard()
380	}
381
382	async fn raw_storage_value(
383		&self,
384		at: HashOf<C>,
385		storage_key: StorageKey,
386	) -> Result<Option<StorageData>> {
387		self.get_or_insert_async(
388			&self.data.raw_storage_value_cache,
389			&(at, storage_key.clone()),
390			self.backend.raw_storage_value(at, storage_key),
391		)
392		.await
393	}
394
395	async fn pending_extrinsics(&self) -> Result<Vec<Bytes>> {
396		self.backend.pending_extrinsics().await
397	}
398
399	async fn submit_unsigned_extrinsic(&self, transaction: Bytes) -> Result<HashOf<C>> {
400		self.backend.submit_unsigned_extrinsic(transaction).await
401	}
402
403	async fn submit_signed_extrinsic(
404		&self,
405		signer: &AccountKeyPairOf<C>,
406		prepare_extrinsic: impl FnOnce(HeaderIdOf<C>, NonceOf<C>) -> Result<UnsignedTransaction<C>>
407			+ Send
408			+ 'static,
409	) -> Result<HashOf<C>>
410	where
411		C: ChainWithTransactions,
412		AccountIdOf<C>: From<<AccountKeyPairOf<C> as Pair>::Public>,
413	{
414		self.backend.submit_signed_extrinsic(signer, prepare_extrinsic).await
415	}
416
417	async fn submit_and_watch_signed_extrinsic(
418		&self,
419		signer: &AccountKeyPairOf<C>,
420		prepare_extrinsic: impl FnOnce(HeaderIdOf<C>, NonceOf<C>) -> Result<UnsignedTransaction<C>>
421			+ Send
422			+ 'static,
423	) -> Result<TransactionTracker<C, Self>>
424	where
425		C: ChainWithTransactions,
426		AccountIdOf<C>: From<<AccountKeyPairOf<C> as Pair>::Public>,
427	{
428		self.backend
429			.submit_and_watch_signed_extrinsic(signer, prepare_extrinsic)
430			.await
431			.map(|t| t.switch_environment(self.clone()))
432	}
433
434	async fn validate_transaction<SignedTransaction: Encode + Send + 'static>(
435		&self,
436		at: HashOf<C>,
437		transaction: SignedTransaction,
438	) -> Result<TransactionValidity> {
439		self.backend.validate_transaction(at, transaction).await
440	}
441
442	async fn estimate_extrinsic_weight<SignedTransaction: Encode + Send + 'static>(
443		&self,
444		at: HashOf<C>,
445		transaction: SignedTransaction,
446	) -> Result<Weight> {
447		self.backend.estimate_extrinsic_weight(at, transaction).await
448	}
449
450	async fn raw_state_call<Args: Encode + Send>(
451		&self,
452		at: HashOf<C>,
453		method: String,
454		arguments: Args,
455	) -> Result<Bytes> {
456		let encoded_arguments = Bytes(arguments.encode());
457		self.get_or_insert_async(
458			&self.data.state_call_cache,
459			&(at, method.clone(), encoded_arguments),
460			self.backend.raw_state_call(at, method, arguments),
461		)
462		.await
463	}
464
465	async fn prove_storage(
466		&self,
467		at: HashOf<C>,
468		keys: Vec<StorageKey>,
469	) -> Result<(StorageProof, HashOf<C>)> {
470		self.backend.prove_storage(at, keys).await
471	}
472}