substrate_rpc_client/
lib.rs1use async_trait::async_trait;
41use serde::de::DeserializeOwned;
42use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
43use std::collections::VecDeque;
44
45pub use jsonrpsee::{
46 core::{
47 client::{ClientT, Error, Subscription, SubscriptionClientT},
48 params::BatchRequestBuilder,
49 RpcResult,
50 },
51 rpc_params,
52 ws_client::{WsClient, WsClientBuilder},
53};
54pub use sc_rpc_api::{
55 author::AuthorApiClient as AuthorApi, chain::ChainApiClient as ChainApi,
56 child_state::ChildStateApiClient as ChildStateApi, dev::DevApiClient as DevApi,
57 offchain::OffchainApiClient as OffchainApi, state::StateApiClient as StateApi,
58 system::SystemApiClient as SystemApi,
59};
60
61pub async fn ws_client(uri: impl AsRef<str>) -> Result<WsClient, String> {
63 WsClientBuilder::default()
64 .max_request_size(u32::MAX)
65 .max_response_size(u32::MAX)
66 .request_timeout(std::time::Duration::from_secs(60 * 10))
67 .connection_timeout(std::time::Duration::from_secs(60))
68 .max_buffer_capacity_per_subscription(1024)
69 .build(uri)
70 .await
71 .map_err(|e| format!("`WsClientBuilder` failed to build: {:?}", e))
72}
73
74#[async_trait]
76pub trait HeaderProvider<Block: BlockT>
77where
78 Block::Header: HeaderT,
79{
80 async fn get_header(&self, hash: Block::Hash) -> Block::Header;
82}
83
84#[async_trait]
85impl<Block: BlockT> HeaderProvider<Block> for WsClient
86where
87 Block::Header: DeserializeOwned,
88{
89 async fn get_header(&self, hash: Block::Hash) -> Block::Header {
90 ChainApi::<(), Block::Hash, Block::Header, ()>::header(self, Some(hash))
91 .await
92 .unwrap()
93 .unwrap()
94 }
95}
96
97#[async_trait]
99pub trait HeaderSubscription<Block: BlockT>
100where
101 Block::Header: HeaderT,
102{
103 async fn next_header(&mut self) -> Option<Block::Header>;
108}
109
110#[async_trait]
111impl<Block: BlockT> HeaderSubscription<Block> for Subscription<Block::Header>
112where
113 Block::Header: DeserializeOwned,
114{
115 async fn next_header(&mut self) -> Option<Block::Header> {
116 match self.next().await {
117 Some(Ok(header)) => Some(header),
118 None => {
119 log::warn!("subscription closed");
120 None
121 },
122 Some(Err(why)) => {
123 log::warn!("subscription returned error: {:?}. Probably decoding has failed.", why);
124 None
125 },
126 }
127 }
128}
129
130pub struct FinalizedHeaders<
135 'a,
136 Block: BlockT,
137 HP: HeaderProvider<Block>,
138 HS: HeaderSubscription<Block>,
139> {
140 header_provider: &'a HP,
141 subscription: HS,
142 fetched_headers: VecDeque<Block::Header>,
143 last_returned: Option<<Block::Header as HeaderT>::Hash>,
144}
145
146impl<'a, Block: BlockT, HP: HeaderProvider<Block>, HS: HeaderSubscription<Block>>
147 FinalizedHeaders<'a, Block, HP, HS>
148where
149 <Block as BlockT>::Header: DeserializeOwned,
150{
151 pub fn new(header_provider: &'a HP, subscription: HS) -> Self {
152 Self {
153 header_provider,
154 subscription,
155 fetched_headers: VecDeque::new(),
156 last_returned: None,
157 }
158 }
159
160 async fn fetch(&mut self) -> usize {
165 let last_finalized = match self.subscription.next_header().await {
166 Some(header) => header,
167 None => return 0,
168 };
169
170 self.fetched_headers.push_front(last_finalized.clone());
171
172 let mut last_finalized_parent = *last_finalized.parent_hash();
173 let last_returned = self.last_returned.unwrap_or(last_finalized_parent);
174
175 while last_finalized_parent != last_returned {
176 let parent_header = self.header_provider.get_header(last_finalized_parent).await;
177 self.fetched_headers.push_front(parent_header.clone());
178 last_finalized_parent = *parent_header.parent_hash();
179 }
180
181 self.fetched_headers.len()
182 }
183
184 pub async fn next(&mut self) -> Option<Block::Header> {
186 if self.fetched_headers.is_empty() {
187 self.fetch().await;
188 }
189
190 if let Some(header) = self.fetched_headers.pop_front() {
191 self.last_returned = Some(header.hash());
192 Some(header)
193 } else {
194 None
195 }
196 }
197}
198
199#[cfg(test)]
200mod tests {
201 use super::*;
202 use sp_runtime::testing::{Block as TBlock, ExtrinsicWrapper, Header, H256};
203 use std::sync::Arc;
204 use tokio::sync::Mutex;
205
206 type Block = TBlock<ExtrinsicWrapper<()>>;
207 type BlockNumber = u64;
208 type Hash = H256;
209
210 struct MockHeaderProvider(pub Arc<Mutex<VecDeque<BlockNumber>>>);
211
212 fn headers() -> Vec<Header> {
213 let mut headers = vec![Header::new_from_number(0)];
214 for n in 1..11 {
215 headers.push(Header {
216 parent_hash: headers.last().unwrap().hash(),
217 ..Header::new_from_number(n)
218 })
219 }
220 headers
221 }
222
223 #[async_trait]
224 impl HeaderProvider<Block> for MockHeaderProvider {
225 async fn get_header(&self, _hash: Hash) -> Header {
226 let height = self.0.lock().await.pop_front().unwrap();
227 headers()[height as usize].clone()
228 }
229 }
230
231 struct MockHeaderSubscription(pub VecDeque<BlockNumber>);
232
233 #[async_trait]
234 impl HeaderSubscription<Block> for MockHeaderSubscription {
235 async fn next_header(&mut self) -> Option<Header> {
236 self.0.pop_front().map(|h| headers()[h as usize].clone())
237 }
238 }
239
240 #[tokio::test]
241 async fn finalized_headers_works_when_every_block_comes_from_subscription() {
242 let heights = vec![4, 5, 6, 7];
243
244 let provider = MockHeaderProvider(Default::default());
245 let subscription = MockHeaderSubscription(heights.clone().into());
246 let mut headers = FinalizedHeaders::new(&provider, subscription);
247
248 for h in heights {
249 assert_eq!(h, headers.next().await.unwrap().number);
250 }
251 assert_eq!(None, headers.next().await);
252 }
253
254 #[tokio::test]
255 async fn finalized_headers_come_from_subscription_and_provider_if_in_need() {
256 let all_heights = 3..11;
257 let heights_in_subscription = vec![3, 4, 6, 10];
258 let heights_not_in_subscription = vec![5, 9, 8, 7];
260
261 let provider = MockHeaderProvider(Arc::new(Mutex::new(heights_not_in_subscription.into())));
262 let subscription = MockHeaderSubscription(heights_in_subscription.into());
263 let mut headers = FinalizedHeaders::new(&provider, subscription);
264
265 for h in all_heights {
266 assert_eq!(h, headers.next().await.unwrap().number);
267 }
268 assert_eq!(None, headers.next().await);
269 }
270}