substrate_rpc_client/
lib.rs
1use async_trait::async_trait;
41use serde::de::DeserializeOwned;
42use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
43use std::collections::VecDeque;
44
45pub use jsonrpsee::{
46 core::{
47 client::{ClientT, Error, Subscription, SubscriptionClientT},
48 params::BatchRequestBuilder,
49 RpcResult,
50 },
51 rpc_params,
52 ws_client::{WsClient, WsClientBuilder},
53};
54pub use sc_rpc_api::{
55 author::AuthorApiClient as AuthorApi, chain::ChainApiClient as ChainApi,
56 child_state::ChildStateApiClient as ChildStateApi, dev::DevApiClient as DevApi,
57 offchain::OffchainApiClient as OffchainApi, state::StateApiClient as StateApi,
58 system::SystemApiClient as SystemApi,
59};
60
61pub async fn ws_client(uri: impl AsRef<str>) -> Result<WsClient, String> {
63 WsClientBuilder::default()
64 .max_request_size(u32::MAX)
65 .max_response_size(u32::MAX)
66 .request_timeout(std::time::Duration::from_secs(60 * 10))
67 .connection_timeout(std::time::Duration::from_secs(60))
68 .max_buffer_capacity_per_subscription(1024)
69 .build(uri)
70 .await
71 .map_err(|e| format!("`WsClientBuilder` failed to build: {:?}", e))
72}
73
74#[async_trait]
76pub trait HeaderProvider<Block: BlockT>
77where
78 Block::Header: HeaderT,
79{
80 async fn get_header(&self, hash: Block::Hash) -> Block::Header;
82}
83
84#[async_trait]
85impl<Block: BlockT> HeaderProvider<Block> for WsClient
86where
87 Block::Header: DeserializeOwned,
88{
89 async fn get_header(&self, hash: Block::Hash) -> Block::Header {
90 ChainApi::<(), Block::Hash, Block::Header, ()>::header(self, Some(hash))
91 .await
92 .unwrap()
93 .unwrap()
94 }
95}
96
97#[async_trait]
99pub trait HeaderSubscription<Block: BlockT>
100where
101 Block::Header: HeaderT,
102{
103 async fn next_header(&mut self) -> Option<Block::Header>;
108}
109
110#[async_trait]
111impl<Block: BlockT> HeaderSubscription<Block> for Subscription<Block::Header>
112where
113 Block::Header: DeserializeOwned,
114{
115 async fn next_header(&mut self) -> Option<Block::Header> {
116 match self.next().await {
117 Some(Ok(header)) => Some(header),
118 None => {
119 log::warn!("subscription closed");
120 None
121 },
122 Some(Err(why)) => {
123 log::warn!("subscription returned error: {:?}. Probably decoding has failed.", why);
124 None
125 },
126 }
127 }
128}
129
130pub struct FinalizedHeaders<
135 'a,
136 Block: BlockT,
137 HP: HeaderProvider<Block>,
138 HS: HeaderSubscription<Block>,
139> {
140 header_provider: &'a HP,
141 subscription: HS,
142 fetched_headers: VecDeque<Block::Header>,
143 last_returned: Option<<Block::Header as HeaderT>::Hash>,
144}
145
146impl<'a, Block: BlockT, HP: HeaderProvider<Block>, HS: HeaderSubscription<Block>>
147 FinalizedHeaders<'a, Block, HP, HS>
148where
149 <Block as BlockT>::Header: DeserializeOwned,
150{
151 pub fn new(header_provider: &'a HP, subscription: HS) -> Self {
152 Self {
153 header_provider,
154 subscription,
155 fetched_headers: VecDeque::new(),
156 last_returned: None,
157 }
158 }
159
160 async fn fetch(&mut self) -> usize {
165 let last_finalized = match self.subscription.next_header().await {
166 Some(header) => header,
167 None => return 0,
168 };
169
170 self.fetched_headers.push_front(last_finalized.clone());
171
172 let mut last_finalized_parent = *last_finalized.parent_hash();
173 let last_returned = self.last_returned.unwrap_or(last_finalized_parent);
174
175 while last_finalized_parent != last_returned {
176 let parent_header = self.header_provider.get_header(last_finalized_parent).await;
177 self.fetched_headers.push_front(parent_header.clone());
178 last_finalized_parent = *parent_header.parent_hash();
179 }
180
181 self.fetched_headers.len()
182 }
183
184 pub async fn next(&mut self) -> Option<Block::Header> {
186 if self.fetched_headers.is_empty() {
187 self.fetch().await;
188 }
189
190 if let Some(header) = self.fetched_headers.pop_front() {
191 self.last_returned = Some(header.hash());
192 Some(header)
193 } else {
194 None
195 }
196 }
197}
198
199#[cfg(test)]
200mod tests {
201 use super::*;
202 use sp_runtime::testing::{Block as TBlock, Header, MockCallU64, TestXt, H256};
203 use std::sync::Arc;
204 use tokio::sync::Mutex;
205
206 type UncheckedXt = TestXt<MockCallU64, ()>;
207 type Block = TBlock<UncheckedXt>;
208 type BlockNumber = u64;
209 type Hash = H256;
210
211 struct MockHeaderProvider(pub Arc<Mutex<VecDeque<BlockNumber>>>);
212
213 fn headers() -> Vec<Header> {
214 let mut headers = vec![Header::new_from_number(0)];
215 for n in 1..11 {
216 headers.push(Header {
217 parent_hash: headers.last().unwrap().hash(),
218 ..Header::new_from_number(n)
219 })
220 }
221 headers
222 }
223
224 #[async_trait]
225 impl HeaderProvider<Block> for MockHeaderProvider {
226 async fn get_header(&self, _hash: Hash) -> Header {
227 let height = self.0.lock().await.pop_front().unwrap();
228 headers()[height as usize].clone()
229 }
230 }
231
232 struct MockHeaderSubscription(pub VecDeque<BlockNumber>);
233
234 #[async_trait]
235 impl HeaderSubscription<Block> for MockHeaderSubscription {
236 async fn next_header(&mut self) -> Option<Header> {
237 self.0.pop_front().map(|h| headers()[h as usize].clone())
238 }
239 }
240
241 #[tokio::test]
242 async fn finalized_headers_works_when_every_block_comes_from_subscription() {
243 let heights = vec![4, 5, 6, 7];
244
245 let provider = MockHeaderProvider(Default::default());
246 let subscription = MockHeaderSubscription(heights.clone().into());
247 let mut headers = FinalizedHeaders::new(&provider, subscription);
248
249 for h in heights {
250 assert_eq!(h, headers.next().await.unwrap().number);
251 }
252 assert_eq!(None, headers.next().await);
253 }
254
255 #[tokio::test]
256 async fn finalized_headers_come_from_subscription_and_provider_if_in_need() {
257 let all_heights = 3..11;
258 let heights_in_subscription = vec![3, 4, 6, 10];
259 let heights_not_in_subscription = vec![5, 9, 8, 7];
261
262 let provider = MockHeaderProvider(Arc::new(Mutex::new(heights_not_in_subscription.into())));
263 let subscription = MockHeaderSubscription(heights_in_subscription.into());
264 let mut headers = FinalizedHeaders::new(&provider, subscription);
265
266 for h in all_heights {
267 assert_eq!(h, headers.next().await.unwrap().number);
268 }
269 assert_eq!(None, headers.next().await);
270 }
271}