use crate::{
client::{Client, SubscriptionBroadcaster},
error::{Error, Result},
AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, ChainWithGrandpa, ChainWithTransactions,
HashOf, HeaderIdOf, HeaderOf, NonceOf, SignedBlockOf, SimpleRuntimeVersion, Subscription,
TransactionTracker, UnsignedTransaction, ANCIENT_BLOCK_THRESHOLD,
};
use std::{cmp::Ordering, future::Future, task::Poll};
use async_std::{
sync::{Arc, Mutex, RwLock},
task::JoinHandle,
};
use async_trait::async_trait;
use codec::Encode;
use frame_support::weights::Weight;
use futures::{FutureExt, StreamExt};
use quick_cache::unsync::Cache;
use sp_consensus_grandpa::{AuthorityId, OpaqueKeyOwnershipProof, SetId};
use sp_core::{
storage::{StorageData, StorageKey},
Bytes, Pair,
};
use sp_runtime::{traits::Header as _, transaction_validity::TransactionValidity};
use sp_trie::StorageProof;
use sp_version::RuntimeVersion;
type SyncCache<K, V> = Arc<RwLock<Cache<K, V>>>;
#[derive(Clone)]
pub struct CachingClient<C: Chain, B: Client<C>> {
backend: B,
data: Arc<ClientData<C>>,
}
struct ClientData<C: Chain> {
grandpa_justifications: Arc<Mutex<Option<SubscriptionBroadcaster<Bytes>>>>,
beefy_justifications: Arc<Mutex<Option<SubscriptionBroadcaster<Bytes>>>>,
background_task_handle: Arc<Mutex<JoinHandle<Result<()>>>>,
best_header: Arc<RwLock<Option<HeaderOf<C>>>>,
best_finalized_header: Arc<RwLock<Option<HeaderOf<C>>>>,
header_hash_by_number_cache: SyncCache<BlockNumberOf<C>, HashOf<C>>,
header_by_hash_cache: SyncCache<HashOf<C>, HeaderOf<C>>,
block_by_hash_cache: SyncCache<HashOf<C>, SignedBlockOf<C>>,
raw_storage_value_cache: SyncCache<(HashOf<C>, StorageKey), Option<StorageData>>,
state_call_cache: SyncCache<(HashOf<C>, String, Bytes), Bytes>,
}
impl<C: Chain, B: Client<C>> CachingClient<C, B> {
pub async fn new(backend: B) -> Self {
let chain_state_capacity = ANCIENT_BLOCK_THRESHOLD as usize;
let best_header = Arc::new(RwLock::new(None));
let best_finalized_header = Arc::new(RwLock::new(None));
let header_by_hash_cache = Arc::new(RwLock::new(Cache::new(chain_state_capacity)));
let background_task_handle = Self::start_background_task(
backend.clone(),
best_header.clone(),
best_finalized_header.clone(),
header_by_hash_cache.clone(),
)
.await;
CachingClient {
backend,
data: Arc::new(ClientData {
grandpa_justifications: Arc::new(Mutex::new(None)),
beefy_justifications: Arc::new(Mutex::new(None)),
background_task_handle: Arc::new(Mutex::new(background_task_handle)),
best_header,
best_finalized_header,
header_hash_by_number_cache: Arc::new(RwLock::new(Cache::new(
chain_state_capacity,
))),
header_by_hash_cache,
block_by_hash_cache: Arc::new(RwLock::new(Cache::new(chain_state_capacity))),
raw_storage_value_cache: Arc::new(RwLock::new(Cache::new(1_024))),
state_call_cache: Arc::new(RwLock::new(Cache::new(1_024))),
}),
}
}
async fn get_or_insert_async<K: Clone + std::fmt::Debug + Eq + std::hash::Hash, V: Clone>(
&self,
cache: &Arc<RwLock<Cache<K, V>>>,
key: &K,
with: impl std::future::Future<Output = Result<V>>,
) -> Result<V> {
{
let cache = cache.read().await;
if let Some(value) = cache.get(key) {
return Ok(value.clone())
}
}
let value = with.await?;
cache.write().await.insert(key.clone(), value.clone());
Ok(value)
}
async fn subscribe_finality_justifications<'a>(
&'a self,
maybe_broadcaster: &Mutex<Option<SubscriptionBroadcaster<Bytes>>>,
do_subscribe: impl Future<Output = Result<Subscription<Bytes>>> + 'a,
) -> Result<Subscription<Bytes>> {
let mut maybe_broadcaster = maybe_broadcaster.lock().await;
let broadcaster = match maybe_broadcaster.as_ref() {
Some(justifications) => justifications,
None => {
let broadcaster = match SubscriptionBroadcaster::new(do_subscribe.await?) {
Ok(broadcaster) => broadcaster,
Err(subscription) => return Ok(subscription),
};
maybe_broadcaster.get_or_insert(broadcaster)
},
};
broadcaster.subscribe().await
}
async fn start_background_task(
backend: B,
best_header: Arc<RwLock<Option<HeaderOf<C>>>>,
best_finalized_header: Arc<RwLock<Option<HeaderOf<C>>>>,
header_by_hash_cache: SyncCache<HashOf<C>, HeaderOf<C>>,
) -> JoinHandle<Result<()>> {
async_std::task::spawn(async move {
let mut last_finalized_header =
backend.header_by_hash(backend.best_finalized_header_hash().await?).await?;
*best_header.write().await = Some(backend.best_header().await?);
*best_finalized_header.write().await = Some(last_finalized_header.clone());
let mut best_headers = backend.subscribe_best_headers().await?;
let mut finalized_headers = backend.subscribe_finalized_headers().await?;
loop {
futures::select! {
new_best_header = best_headers.next().fuse() => {
let new_best_header = new_best_header
.ok_or_else(|| Error::ChannelError(format!("Mandatory best headers subscription for {} has finished", C::NAME)))?;
let new_best_header_hash = new_best_header.hash();
header_by_hash_cache.write().await.insert(new_best_header_hash, new_best_header.clone());
*best_header.write().await = Some(new_best_header);
},
new_finalized_header = finalized_headers.next().fuse() => {
let new_finalized_header = new_finalized_header.
ok_or_else(|| Error::ChannelError(format!("Finalized headers subscription for {} has finished", C::NAME)))?;
let new_finalized_header_number = *new_finalized_header.number();
let last_finalized_header_number = *last_finalized_header.number();
match new_finalized_header_number.cmp(&last_finalized_header_number) {
Ordering::Greater => {
let new_finalized_header_hash = new_finalized_header.hash();
header_by_hash_cache.write().await.insert(new_finalized_header_hash, new_finalized_header.clone());
*best_finalized_header.write().await = Some(new_finalized_header.clone());
last_finalized_header = new_finalized_header;
},
Ordering::Less => {
return Err(Error::unordered_finalized_headers::<C>(
new_finalized_header_number,
last_finalized_header_number,
));
},
_ => (),
}
},
}
}
})
}
async fn ensure_background_task_active(&self) -> Result<()> {
let mut background_task_handle = self.data.background_task_handle.lock().await;
if let Poll::Ready(result) = futures::poll!(&mut *background_task_handle) {
return Err(Error::ChannelError(format!(
"Background task of {} client has exited with result: {:?}",
C::NAME,
result
)))
}
Ok(())
}
async fn read_header_from_background<'a>(
&'a self,
header: &Arc<RwLock<Option<HeaderOf<C>>>>,
read_header_from_backend: impl Future<Output = Result<HeaderOf<C>>> + 'a,
) -> Result<HeaderOf<C>> {
self.ensure_background_task_active().await?;
match header.read().await.clone() {
Some(header) => Ok(header),
None => {
read_header_from_backend.await
},
}
}
}
impl<C: Chain, B: Client<C>> std::fmt::Debug for CachingClient<C, B> {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
fmt.write_fmt(format_args!("CachingClient<{:?}>", self.backend))
}
}
#[async_trait]
impl<C: Chain, B: Client<C>> Client<C> for CachingClient<C, B> {
async fn ensure_synced(&self) -> Result<()> {
self.backend.ensure_synced().await
}
async fn reconnect(&self) -> Result<()> {
self.backend.reconnect().await?;
*self.data.grandpa_justifications.lock().await = None;
*self.data.beefy_justifications.lock().await = None;
*self.data.best_header.write().await = None;
*self.data.best_finalized_header.write().await = None;
*self.data.background_task_handle.lock().await = Self::start_background_task(
self.backend.clone(),
self.data.best_header.clone(),
self.data.best_finalized_header.clone(),
self.data.header_by_hash_cache.clone(),
)
.await;
Ok(())
}
fn genesis_hash(&self) -> HashOf<C> {
self.backend.genesis_hash()
}
async fn header_hash_by_number(&self, number: BlockNumberOf<C>) -> Result<HashOf<C>> {
self.get_or_insert_async(
&self.data.header_hash_by_number_cache,
&number,
self.backend.header_hash_by_number(number),
)
.await
}
async fn header_by_hash(&self, hash: HashOf<C>) -> Result<HeaderOf<C>> {
self.get_or_insert_async(
&self.data.header_by_hash_cache,
&hash,
self.backend.header_by_hash(hash),
)
.await
}
async fn block_by_hash(&self, hash: HashOf<C>) -> Result<SignedBlockOf<C>> {
self.get_or_insert_async(
&self.data.block_by_hash_cache,
&hash,
self.backend.block_by_hash(hash),
)
.await
}
async fn best_finalized_header_hash(&self) -> Result<HashOf<C>> {
self.read_header_from_background(
&self.data.best_finalized_header,
self.backend.best_finalized_header(),
)
.await
.map(|h| h.hash())
}
async fn best_header(&self) -> Result<HeaderOf<C>> {
self.read_header_from_background(&self.data.best_header, self.backend.best_header())
.await
}
async fn subscribe_best_headers(&self) -> Result<Subscription<HeaderOf<C>>> {
self.backend.subscribe_best_headers().await
}
async fn subscribe_finalized_headers(&self) -> Result<Subscription<HeaderOf<C>>> {
self.backend.subscribe_finalized_headers().await
}
async fn subscribe_grandpa_finality_justifications(&self) -> Result<Subscription<Bytes>>
where
C: ChainWithGrandpa,
{
self.subscribe_finality_justifications(
&self.data.grandpa_justifications,
self.backend.subscribe_grandpa_finality_justifications(),
)
.await
}
async fn generate_grandpa_key_ownership_proof(
&self,
at: HashOf<C>,
set_id: SetId,
authority_id: AuthorityId,
) -> Result<Option<OpaqueKeyOwnershipProof>> {
self.backend
.generate_grandpa_key_ownership_proof(at, set_id, authority_id)
.await
}
async fn subscribe_beefy_finality_justifications(&self) -> Result<Subscription<Bytes>> {
self.subscribe_finality_justifications(
&self.data.beefy_justifications,
self.backend.subscribe_beefy_finality_justifications(),
)
.await
}
async fn token_decimals(&self) -> Result<Option<u64>> {
self.backend.token_decimals().await
}
async fn runtime_version(&self) -> Result<RuntimeVersion> {
self.backend.runtime_version().await
}
async fn simple_runtime_version(&self) -> Result<SimpleRuntimeVersion> {
self.backend.simple_runtime_version().await
}
fn can_start_version_guard(&self) -> bool {
self.backend.can_start_version_guard()
}
async fn raw_storage_value(
&self,
at: HashOf<C>,
storage_key: StorageKey,
) -> Result<Option<StorageData>> {
self.get_or_insert_async(
&self.data.raw_storage_value_cache,
&(at, storage_key.clone()),
self.backend.raw_storage_value(at, storage_key),
)
.await
}
async fn pending_extrinsics(&self) -> Result<Vec<Bytes>> {
self.backend.pending_extrinsics().await
}
async fn submit_unsigned_extrinsic(&self, transaction: Bytes) -> Result<HashOf<C>> {
self.backend.submit_unsigned_extrinsic(transaction).await
}
async fn submit_signed_extrinsic(
&self,
signer: &AccountKeyPairOf<C>,
prepare_extrinsic: impl FnOnce(HeaderIdOf<C>, NonceOf<C>) -> Result<UnsignedTransaction<C>>
+ Send
+ 'static,
) -> Result<HashOf<C>>
where
C: ChainWithTransactions,
AccountIdOf<C>: From<<AccountKeyPairOf<C> as Pair>::Public>,
{
self.backend.submit_signed_extrinsic(signer, prepare_extrinsic).await
}
async fn submit_and_watch_signed_extrinsic(
&self,
signer: &AccountKeyPairOf<C>,
prepare_extrinsic: impl FnOnce(HeaderIdOf<C>, NonceOf<C>) -> Result<UnsignedTransaction<C>>
+ Send
+ 'static,
) -> Result<TransactionTracker<C, Self>>
where
C: ChainWithTransactions,
AccountIdOf<C>: From<<AccountKeyPairOf<C> as Pair>::Public>,
{
self.backend
.submit_and_watch_signed_extrinsic(signer, prepare_extrinsic)
.await
.map(|t| t.switch_environment(self.clone()))
}
async fn validate_transaction<SignedTransaction: Encode + Send + 'static>(
&self,
at: HashOf<C>,
transaction: SignedTransaction,
) -> Result<TransactionValidity> {
self.backend.validate_transaction(at, transaction).await
}
async fn estimate_extrinsic_weight<SignedTransaction: Encode + Send + 'static>(
&self,
at: HashOf<C>,
transaction: SignedTransaction,
) -> Result<Weight> {
self.backend.estimate_extrinsic_weight(at, transaction).await
}
async fn raw_state_call<Args: Encode + Send>(
&self,
at: HashOf<C>,
method: String,
arguments: Args,
) -> Result<Bytes> {
let encoded_arguments = Bytes(arguments.encode());
self.get_or_insert_async(
&self.data.state_call_cache,
&(at, method.clone(), encoded_arguments),
self.backend.raw_state_call(at, method, arguments),
)
.await
}
async fn prove_storage(
&self,
at: HashOf<C>,
keys: Vec<StorageKey>,
) -> Result<(StorageProof, HashOf<C>)> {
self.backend.prove_storage(at, keys).await
}
}