use async_trait::async_trait;
use serde::de::DeserializeOwned;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
use std::collections::VecDeque;
pub use jsonrpsee::{
core::{
client::{ClientT, Error, Subscription, SubscriptionClientT},
params::BatchRequestBuilder,
RpcResult,
},
rpc_params,
ws_client::{WsClient, WsClientBuilder},
};
pub use sc_rpc_api::{
author::AuthorApiClient as AuthorApi, chain::ChainApiClient as ChainApi,
child_state::ChildStateApiClient as ChildStateApi, dev::DevApiClient as DevApi,
offchain::OffchainApiClient as OffchainApi, state::StateApiClient as StateApi,
system::SystemApiClient as SystemApi,
};
pub async fn ws_client(uri: impl AsRef<str>) -> Result<WsClient, String> {
WsClientBuilder::default()
.max_request_size(u32::MAX)
.max_response_size(u32::MAX)
.request_timeout(std::time::Duration::from_secs(60 * 10))
.connection_timeout(std::time::Duration::from_secs(60))
.max_buffer_capacity_per_subscription(1024)
.build(uri)
.await
.map_err(|e| format!("`WsClientBuilder` failed to build: {:?}", e))
}
#[async_trait]
pub trait HeaderProvider<Block: BlockT>
where
Block::Header: HeaderT,
{
async fn get_header(&self, hash: Block::Hash) -> Block::Header;
}
#[async_trait]
impl<Block: BlockT> HeaderProvider<Block> for WsClient
where
Block::Header: DeserializeOwned,
{
async fn get_header(&self, hash: Block::Hash) -> Block::Header {
ChainApi::<(), Block::Hash, Block::Header, ()>::header(self, Some(hash))
.await
.unwrap()
.unwrap()
}
}
#[async_trait]
pub trait HeaderSubscription<Block: BlockT>
where
Block::Header: HeaderT,
{
async fn next_header(&mut self) -> Option<Block::Header>;
}
#[async_trait]
impl<Block: BlockT> HeaderSubscription<Block> for Subscription<Block::Header>
where
Block::Header: DeserializeOwned,
{
async fn next_header(&mut self) -> Option<Block::Header> {
match self.next().await {
Some(Ok(header)) => Some(header),
None => {
log::warn!("subscription closed");
None
},
Some(Err(why)) => {
log::warn!("subscription returned error: {:?}. Probably decoding has failed.", why);
None
},
}
}
}
pub struct FinalizedHeaders<
'a,
Block: BlockT,
HP: HeaderProvider<Block>,
HS: HeaderSubscription<Block>,
> {
header_provider: &'a HP,
subscription: HS,
fetched_headers: VecDeque<Block::Header>,
last_returned: Option<<Block::Header as HeaderT>::Hash>,
}
impl<'a, Block: BlockT, HP: HeaderProvider<Block>, HS: HeaderSubscription<Block>>
FinalizedHeaders<'a, Block, HP, HS>
where
<Block as BlockT>::Header: DeserializeOwned,
{
pub fn new(header_provider: &'a HP, subscription: HS) -> Self {
Self {
header_provider,
subscription,
fetched_headers: VecDeque::new(),
last_returned: None,
}
}
async fn fetch(&mut self) -> usize {
let last_finalized = match self.subscription.next_header().await {
Some(header) => header,
None => return 0,
};
self.fetched_headers.push_front(last_finalized.clone());
let mut last_finalized_parent = *last_finalized.parent_hash();
let last_returned = self.last_returned.unwrap_or(last_finalized_parent);
while last_finalized_parent != last_returned {
let parent_header = self.header_provider.get_header(last_finalized_parent).await;
self.fetched_headers.push_front(parent_header.clone());
last_finalized_parent = *parent_header.parent_hash();
}
self.fetched_headers.len()
}
pub async fn next(&mut self) -> Option<Block::Header> {
if self.fetched_headers.is_empty() {
self.fetch().await;
}
if let Some(header) = self.fetched_headers.pop_front() {
self.last_returned = Some(header.hash());
Some(header)
} else {
None
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use sp_runtime::testing::{Block as TBlock, Header, MockCallU64, TestXt, H256};
use std::sync::Arc;
use tokio::sync::Mutex;
type UncheckedXt = TestXt<MockCallU64, ()>;
type Block = TBlock<UncheckedXt>;
type BlockNumber = u64;
type Hash = H256;
struct MockHeaderProvider(pub Arc<Mutex<VecDeque<BlockNumber>>>);
fn headers() -> Vec<Header> {
let mut headers = vec![Header::new_from_number(0)];
for n in 1..11 {
headers.push(Header {
parent_hash: headers.last().unwrap().hash(),
..Header::new_from_number(n)
})
}
headers
}
#[async_trait]
impl HeaderProvider<Block> for MockHeaderProvider {
async fn get_header(&self, _hash: Hash) -> Header {
let height = self.0.lock().await.pop_front().unwrap();
headers()[height as usize].clone()
}
}
struct MockHeaderSubscription(pub VecDeque<BlockNumber>);
#[async_trait]
impl HeaderSubscription<Block> for MockHeaderSubscription {
async fn next_header(&mut self) -> Option<Header> {
self.0.pop_front().map(|h| headers()[h as usize].clone())
}
}
#[tokio::test]
async fn finalized_headers_works_when_every_block_comes_from_subscription() {
let heights = vec![4, 5, 6, 7];
let provider = MockHeaderProvider(Default::default());
let subscription = MockHeaderSubscription(heights.clone().into());
let mut headers = FinalizedHeaders::new(&provider, subscription);
for h in heights {
assert_eq!(h, headers.next().await.unwrap().number);
}
assert_eq!(None, headers.next().await);
}
#[tokio::test]
async fn finalized_headers_come_from_subscription_and_provider_if_in_need() {
let all_heights = 3..11;
let heights_in_subscription = vec![3, 4, 6, 10];
let heights_not_in_subscription = vec![5, 9, 8, 7];
let provider = MockHeaderProvider(Arc::new(Mutex::new(heights_not_in_subscription.into())));
let subscription = MockHeaderSubscription(heights_in_subscription.into());
let mut headers = FinalizedHeaders::new(&provider, subscription);
for h in all_heights {
assert_eq!(h, headers.next().await.unwrap().number);
}
assert_eq!(None, headers.next().await);
}
}