1use crate::{
21 client::{Client, SubscriptionBroadcaster},
22 error::{Error, Result},
23 AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, ChainWithGrandpa, ChainWithTransactions,
24 HashOf, HeaderIdOf, HeaderOf, NonceOf, SignedBlockOf, SimpleRuntimeVersion, Subscription,
25 TransactionTracker, UnsignedTransaction, ANCIENT_BLOCK_THRESHOLD,
26};
27use std::{cmp::Ordering, future::Future, task::Poll};
28
29use async_std::{
30 sync::{Arc, Mutex, RwLock},
31 task::JoinHandle,
32};
33use async_trait::async_trait;
34use codec::Encode;
35use frame_support::weights::Weight;
36use futures::{FutureExt, StreamExt};
37use quick_cache::unsync::Cache;
38use sp_consensus_grandpa::{AuthorityId, OpaqueKeyOwnershipProof, SetId};
39use sp_core::{
40 storage::{StorageData, StorageKey},
41 Bytes, Pair,
42};
43use sp_runtime::{traits::Header as _, transaction_validity::TransactionValidity};
44use sp_trie::StorageProof;
45use sp_version::RuntimeVersion;
46
47type SyncCache<K, V> = Arc<RwLock<Cache<K, V>>>;
49
50#[derive(Clone)]
55pub struct CachingClient<C: Chain, B: Client<C>> {
56 backend: B,
57 data: Arc<ClientData<C>>,
58}
59
60struct ClientData<C: Chain> {
62 grandpa_justifications: Arc<Mutex<Option<SubscriptionBroadcaster<Bytes>>>>,
63 beefy_justifications: Arc<Mutex<Option<SubscriptionBroadcaster<Bytes>>>>,
64 background_task_handle: Arc<Mutex<JoinHandle<Result<()>>>>,
65 best_header: Arc<RwLock<Option<HeaderOf<C>>>>,
66 best_finalized_header: Arc<RwLock<Option<HeaderOf<C>>>>,
67 header_hash_by_number_cache: SyncCache<BlockNumberOf<C>, HashOf<C>>,
72 header_by_hash_cache: SyncCache<HashOf<C>, HeaderOf<C>>,
73 block_by_hash_cache: SyncCache<HashOf<C>, SignedBlockOf<C>>,
74 raw_storage_value_cache: SyncCache<(HashOf<C>, StorageKey), Option<StorageData>>,
75 state_call_cache: SyncCache<(HashOf<C>, String, Bytes), Bytes>,
76}
77
78impl<C: Chain, B: Client<C>> CachingClient<C, B> {
79 pub async fn new(backend: B) -> Self {
81 let chain_state_capacity = ANCIENT_BLOCK_THRESHOLD as usize;
84 let best_header = Arc::new(RwLock::new(None));
85 let best_finalized_header = Arc::new(RwLock::new(None));
86 let header_by_hash_cache = Arc::new(RwLock::new(Cache::new(chain_state_capacity)));
87 let background_task_handle = Self::start_background_task(
88 backend.clone(),
89 best_header.clone(),
90 best_finalized_header.clone(),
91 header_by_hash_cache.clone(),
92 )
93 .await;
94 CachingClient {
95 backend,
96 data: Arc::new(ClientData {
97 grandpa_justifications: Arc::new(Mutex::new(None)),
98 beefy_justifications: Arc::new(Mutex::new(None)),
99 background_task_handle: Arc::new(Mutex::new(background_task_handle)),
100 best_header,
101 best_finalized_header,
102 header_hash_by_number_cache: Arc::new(RwLock::new(Cache::new(
103 chain_state_capacity,
104 ))),
105 header_by_hash_cache,
106 block_by_hash_cache: Arc::new(RwLock::new(Cache::new(chain_state_capacity))),
107 raw_storage_value_cache: Arc::new(RwLock::new(Cache::new(1_024))),
108 state_call_cache: Arc::new(RwLock::new(Cache::new(1_024))),
109 }),
110 }
111 }
112
113 async fn get_or_insert_async<K: Clone + std::fmt::Debug + Eq + std::hash::Hash, V: Clone>(
115 &self,
116 cache: &Arc<RwLock<Cache<K, V>>>,
117 key: &K,
118 with: impl std::future::Future<Output = Result<V>>,
119 ) -> Result<V> {
120 {
122 let cache = cache.read().await;
123 if let Some(value) = cache.get(key) {
124 return Ok(value.clone())
125 }
126 }
127
128 let value = with.await?;
131
132 cache.write().await.insert(key.clone(), value.clone());
134 Ok(value)
135 }
136
137 async fn subscribe_finality_justifications<'a>(
139 &'a self,
140 maybe_broadcaster: &Mutex<Option<SubscriptionBroadcaster<Bytes>>>,
141 do_subscribe: impl Future<Output = Result<Subscription<Bytes>>> + 'a,
142 ) -> Result<Subscription<Bytes>> {
143 let mut maybe_broadcaster = maybe_broadcaster.lock().await;
144 let broadcaster = match maybe_broadcaster.as_ref() {
145 Some(justifications) => justifications,
146 None => {
147 let broadcaster = match SubscriptionBroadcaster::new(do_subscribe.await?) {
148 Ok(broadcaster) => broadcaster,
149 Err(subscription) => return Ok(subscription),
150 };
151 maybe_broadcaster.get_or_insert(broadcaster)
152 },
153 };
154
155 broadcaster.subscribe().await
156 }
157
158 async fn start_background_task(
160 backend: B,
161 best_header: Arc<RwLock<Option<HeaderOf<C>>>>,
162 best_finalized_header: Arc<RwLock<Option<HeaderOf<C>>>>,
163 header_by_hash_cache: SyncCache<HashOf<C>, HeaderOf<C>>,
164 ) -> JoinHandle<Result<()>> {
165 async_std::task::spawn(async move {
166 let mut last_finalized_header =
169 backend.header_by_hash(backend.best_finalized_header_hash().await?).await?;
170 *best_header.write().await = Some(backend.best_header().await?);
171 *best_finalized_header.write().await = Some(last_finalized_header.clone());
172
173 let mut best_headers = backend.subscribe_best_headers().await?;
175 let mut finalized_headers = backend.subscribe_finalized_headers().await?;
176 loop {
177 futures::select! {
178 new_best_header = best_headers.next().fuse() => {
179 let new_best_header = new_best_header
183 .ok_or_else(|| Error::ChannelError(format!("Mandatory best headers subscription for {} has finished", C::NAME)))?;
184 let new_best_header_hash = new_best_header.hash();
185 header_by_hash_cache.write().await.insert(new_best_header_hash, new_best_header.clone());
186 *best_header.write().await = Some(new_best_header);
187 },
188 new_finalized_header = finalized_headers.next().fuse() => {
189 let new_finalized_header = new_finalized_header.
191 ok_or_else(|| Error::ChannelError(format!("Finalized headers subscription for {} has finished", C::NAME)))?;
192 let new_finalized_header_number = *new_finalized_header.number();
193 let last_finalized_header_number = *last_finalized_header.number();
194 match new_finalized_header_number.cmp(&last_finalized_header_number) {
195 Ordering::Greater => {
196 let new_finalized_header_hash = new_finalized_header.hash();
197 header_by_hash_cache.write().await.insert(new_finalized_header_hash, new_finalized_header.clone());
198 *best_finalized_header.write().await = Some(new_finalized_header.clone());
199 last_finalized_header = new_finalized_header;
200 },
201 Ordering::Less => {
202 return Err(Error::unordered_finalized_headers::<C>(
203 new_finalized_header_number,
204 last_finalized_header_number,
205 ));
206 },
207 _ => (),
208 }
209 },
210 }
211 }
212 })
213 }
214
215 async fn ensure_background_task_active(&self) -> Result<()> {
217 let mut background_task_handle = self.data.background_task_handle.lock().await;
218 if let Poll::Ready(result) = futures::poll!(&mut *background_task_handle) {
219 return Err(Error::ChannelError(format!(
220 "Background task of {} client has exited with result: {:?}",
221 C::NAME,
222 result
223 )))
224 }
225
226 Ok(())
227 }
228
229 async fn read_header_from_background<'a>(
231 &'a self,
232 header: &Arc<RwLock<Option<HeaderOf<C>>>>,
233 read_header_from_backend: impl Future<Output = Result<HeaderOf<C>>> + 'a,
234 ) -> Result<HeaderOf<C>> {
235 self.ensure_background_task_active().await?;
237
238 match header.read().await.clone() {
241 Some(header) => Ok(header),
242 None => {
243 read_header_from_backend.await
246 },
247 }
248 }
249}
250
251impl<C: Chain, B: Client<C>> std::fmt::Debug for CachingClient<C, B> {
252 fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
253 fmt.write_fmt(format_args!("CachingClient<{:?}>", self.backend))
254 }
255}
256
257#[async_trait]
258impl<C: Chain, B: Client<C>> Client<C> for CachingClient<C, B> {
259 async fn ensure_synced(&self) -> Result<()> {
260 self.backend.ensure_synced().await
261 }
262
263 async fn reconnect(&self) -> Result<()> {
264 self.backend.reconnect().await?;
265 *self.data.grandpa_justifications.lock().await = None;
267 *self.data.beefy_justifications.lock().await = None;
268 *self.data.best_header.write().await = None;
270 *self.data.best_finalized_header.write().await = None;
271 *self.data.background_task_handle.lock().await = Self::start_background_task(
272 self.backend.clone(),
273 self.data.best_header.clone(),
274 self.data.best_finalized_header.clone(),
275 self.data.header_by_hash_cache.clone(),
276 )
277 .await;
278 Ok(())
279 }
280
281 fn genesis_hash(&self) -> HashOf<C> {
282 self.backend.genesis_hash()
283 }
284
285 async fn header_hash_by_number(&self, number: BlockNumberOf<C>) -> Result<HashOf<C>> {
286 self.get_or_insert_async(
287 &self.data.header_hash_by_number_cache,
288 &number,
289 self.backend.header_hash_by_number(number),
290 )
291 .await
292 }
293
294 async fn header_by_hash(&self, hash: HashOf<C>) -> Result<HeaderOf<C>> {
295 self.get_or_insert_async(
296 &self.data.header_by_hash_cache,
297 &hash,
298 self.backend.header_by_hash(hash),
299 )
300 .await
301 }
302
303 async fn block_by_hash(&self, hash: HashOf<C>) -> Result<SignedBlockOf<C>> {
304 self.get_or_insert_async(
305 &self.data.block_by_hash_cache,
306 &hash,
307 self.backend.block_by_hash(hash),
308 )
309 .await
310 }
311
312 async fn best_finalized_header_hash(&self) -> Result<HashOf<C>> {
313 self.read_header_from_background(
314 &self.data.best_finalized_header,
315 self.backend.best_finalized_header(),
316 )
317 .await
318 .map(|h| h.hash())
319 }
320
321 async fn best_header(&self) -> Result<HeaderOf<C>> {
322 self.read_header_from_background(&self.data.best_header, self.backend.best_header())
323 .await
324 }
325
326 async fn subscribe_best_headers(&self) -> Result<Subscription<HeaderOf<C>>> {
327 self.backend.subscribe_best_headers().await
329 }
330
331 async fn subscribe_finalized_headers(&self) -> Result<Subscription<HeaderOf<C>>> {
332 self.backend.subscribe_finalized_headers().await
334 }
335
336 async fn subscribe_grandpa_finality_justifications(&self) -> Result<Subscription<Bytes>>
337 where
338 C: ChainWithGrandpa,
339 {
340 self.subscribe_finality_justifications(
341 &self.data.grandpa_justifications,
342 self.backend.subscribe_grandpa_finality_justifications(),
343 )
344 .await
345 }
346
347 async fn generate_grandpa_key_ownership_proof(
348 &self,
349 at: HashOf<C>,
350 set_id: SetId,
351 authority_id: AuthorityId,
352 ) -> Result<Option<OpaqueKeyOwnershipProof>> {
353 self.backend
354 .generate_grandpa_key_ownership_proof(at, set_id, authority_id)
355 .await
356 }
357
358 async fn subscribe_beefy_finality_justifications(&self) -> Result<Subscription<Bytes>> {
359 self.subscribe_finality_justifications(
360 &self.data.beefy_justifications,
361 self.backend.subscribe_beefy_finality_justifications(),
362 )
363 .await
364 }
365
366 async fn token_decimals(&self) -> Result<Option<u64>> {
367 self.backend.token_decimals().await
368 }
369
370 async fn runtime_version(&self) -> Result<RuntimeVersion> {
371 self.backend.runtime_version().await
372 }
373
374 async fn simple_runtime_version(&self) -> Result<SimpleRuntimeVersion> {
375 self.backend.simple_runtime_version().await
376 }
377
378 fn can_start_version_guard(&self) -> bool {
379 self.backend.can_start_version_guard()
380 }
381
382 async fn raw_storage_value(
383 &self,
384 at: HashOf<C>,
385 storage_key: StorageKey,
386 ) -> Result<Option<StorageData>> {
387 self.get_or_insert_async(
388 &self.data.raw_storage_value_cache,
389 &(at, storage_key.clone()),
390 self.backend.raw_storage_value(at, storage_key),
391 )
392 .await
393 }
394
395 async fn pending_extrinsics(&self) -> Result<Vec<Bytes>> {
396 self.backend.pending_extrinsics().await
397 }
398
399 async fn submit_unsigned_extrinsic(&self, transaction: Bytes) -> Result<HashOf<C>> {
400 self.backend.submit_unsigned_extrinsic(transaction).await
401 }
402
403 async fn submit_signed_extrinsic(
404 &self,
405 signer: &AccountKeyPairOf<C>,
406 prepare_extrinsic: impl FnOnce(HeaderIdOf<C>, NonceOf<C>) -> Result<UnsignedTransaction<C>>
407 + Send
408 + 'static,
409 ) -> Result<HashOf<C>>
410 where
411 C: ChainWithTransactions,
412 AccountIdOf<C>: From<<AccountKeyPairOf<C> as Pair>::Public>,
413 {
414 self.backend.submit_signed_extrinsic(signer, prepare_extrinsic).await
415 }
416
417 async fn submit_and_watch_signed_extrinsic(
418 &self,
419 signer: &AccountKeyPairOf<C>,
420 prepare_extrinsic: impl FnOnce(HeaderIdOf<C>, NonceOf<C>) -> Result<UnsignedTransaction<C>>
421 + Send
422 + 'static,
423 ) -> Result<TransactionTracker<C, Self>>
424 where
425 C: ChainWithTransactions,
426 AccountIdOf<C>: From<<AccountKeyPairOf<C> as Pair>::Public>,
427 {
428 self.backend
429 .submit_and_watch_signed_extrinsic(signer, prepare_extrinsic)
430 .await
431 .map(|t| t.switch_environment(self.clone()))
432 }
433
434 async fn validate_transaction<SignedTransaction: Encode + Send + 'static>(
435 &self,
436 at: HashOf<C>,
437 transaction: SignedTransaction,
438 ) -> Result<TransactionValidity> {
439 self.backend.validate_transaction(at, transaction).await
440 }
441
442 async fn estimate_extrinsic_weight<SignedTransaction: Encode + Send + 'static>(
443 &self,
444 at: HashOf<C>,
445 transaction: SignedTransaction,
446 ) -> Result<Weight> {
447 self.backend.estimate_extrinsic_weight(at, transaction).await
448 }
449
450 async fn raw_state_call<Args: Encode + Send>(
451 &self,
452 at: HashOf<C>,
453 method: String,
454 arguments: Args,
455 ) -> Result<Bytes> {
456 let encoded_arguments = Bytes(arguments.encode());
457 self.get_or_insert_async(
458 &self.data.state_call_cache,
459 &(at, method.clone(), encoded_arguments),
460 self.backend.raw_state_call(at, method, arguments),
461 )
462 .await
463 }
464
465 async fn prove_storage(
466 &self,
467 at: HashOf<C>,
468 keys: Vec<StorageKey>,
469 ) -> Result<(StorageProof, HashOf<C>)> {
470 self.backend.prove_storage(at, keys).await
471 }
472}