1use cumulus_primitives_core::relay_chain::{
19 Block as RelayBlock, BlockNumber as RelayNumber, Hash as RelayHash, Header as RelayHeader,
20};
21use futures::{
22 channel::{mpsc::Sender, oneshot::Sender as OneshotSender},
23 future::BoxFuture,
24 stream::FuturesUnordered,
25 FutureExt, StreamExt,
26};
27use jsonrpsee::{
28 core::{
29 client::{Client as JsonRpcClient, ClientT, Subscription},
30 params::ArrayParams,
31 ClientError as JsonRpseeError, JsonValue,
32 },
33 ws_client::WsClientBuilder,
34};
35use sc_rpc_api::chain::ChainApiClient;
36use schnellru::{ByLength, LruMap};
37use sp_runtime::generic::SignedBlock;
38use std::{sync::Arc, time::Duration};
39use tokio::sync::mpsc::{
40 channel as tokio_channel, Receiver as TokioReceiver, Sender as TokioSender,
41};
42use url::Url;
43
44use crate::rpc_client::{distribute_header, RpcDispatcherMessage};
45
46const LOG_TARGET: &str = "reconnecting-websocket-client";
47const DEFAULT_EXTERNAL_RPC_CONN_RETRIES: usize = 5;
48const DEFAULT_SLEEP_TIME_MS_BETWEEN_RETRIES: u64 = 1000;
49const DEFAULT_SLEEP_EXP_BACKOFF_BETWEEN_RETRIES: i32 = 2;
50
51pub struct ReconnectingWebsocketWorker {
55 ws_urls: Vec<String>,
56 client_receiver: TokioReceiver<RpcDispatcherMessage>,
58
59 imported_header_listeners: Vec<Sender<RelayHeader>>,
61 finalized_header_listeners: Vec<Sender<RelayHeader>>,
62 best_header_listeners: Vec<Sender<RelayHeader>>,
63}
64
65fn url_to_string_with_port(url: Url) -> Option<String> {
67 if (url.scheme() != "ws" && url.scheme() != "wss") || url.host_str().is_none() {
69 tracing::warn!(target: LOG_TARGET, ?url, "Non-WebSocket URL or missing host.");
70 return None
71 }
72
73 Some(format!(
75 "{}://{}:{}{}{}",
76 url.scheme(),
77 url.host_str()?,
78 url.port_or_known_default()?,
79 url.path(),
80 url.query().map(|query| format!("?{}", query)).unwrap_or_default()
81 ))
82}
83
84#[derive(Debug)]
88struct ClientManager {
89 urls: Vec<String>,
90 active_client: Arc<JsonRpcClient>,
91 active_index: usize,
92}
93
94struct RelayChainSubscriptions {
95 import_subscription: Subscription<RelayHeader>,
96 finalized_subscription: Subscription<RelayHeader>,
97 best_subscription: Subscription<RelayHeader>,
98}
99
100async fn connect_next_available_rpc_server(
106 urls: &Vec<String>,
107 starting_position: usize,
108) -> Result<(usize, Arc<JsonRpcClient>), ()> {
109 tracing::debug!(target: LOG_TARGET, starting_position, "Connecting to RPC server.");
110
111 let mut prev_iteration: u32 = 0;
112 for (counter, url) in urls
113 .iter()
114 .cycle()
115 .skip(starting_position)
116 .take(urls.len() * DEFAULT_EXTERNAL_RPC_CONN_RETRIES)
117 .enumerate()
118 {
119 let Ok(current_iteration) = (counter / urls.len()).try_into() else {
122 tracing::error!(target: LOG_TARGET, "Too many connection attempts to the RPC servers, aborting...");
123 break;
124 };
125 if current_iteration > prev_iteration {
126 tokio::time::sleep(Duration::from_millis(
128 DEFAULT_SLEEP_TIME_MS_BETWEEN_RETRIES *
129 DEFAULT_SLEEP_EXP_BACKOFF_BETWEEN_RETRIES.pow(prev_iteration) as u64,
130 ))
131 .await;
132 prev_iteration = current_iteration;
133 }
134
135 let index = (starting_position + counter) % urls.len();
136 tracing::info!(
137 target: LOG_TARGET,
138 attempt = current_iteration,
139 index,
140 url,
141 "Trying to connect to next external relaychain node.",
142 );
143 match WsClientBuilder::default().build(&url).await {
144 Ok(ws_client) => return Ok((index, Arc::new(ws_client))),
145 Err(err) => tracing::debug!(target: LOG_TARGET, url, ?err, "Unable to connect."),
146 };
147 }
148
149 tracing::error!(target: LOG_TARGET, "Retrying to connect to any external relaychain node failed.");
150 Err(())
151}
152
153impl ClientManager {
154 pub async fn new(urls: Vec<String>) -> Result<Self, ()> {
155 if urls.is_empty() {
156 return Err(())
157 }
158 let active_client = connect_next_available_rpc_server(&urls, 0).await?;
159 Ok(Self { urls, active_client: active_client.1, active_index: active_client.0 })
160 }
161
162 pub async fn connect_to_new_rpc_server(&mut self) -> Result<(), ()> {
163 let new_active =
164 connect_next_available_rpc_server(&self.urls, self.active_index + 1).await?;
165 self.active_client = new_active.1;
166 self.active_index = new_active.0;
167 Ok(())
168 }
169
170 async fn get_subscriptions(&self) -> Result<RelayChainSubscriptions, JsonRpseeError> {
171 let import_subscription = <JsonRpcClient as ChainApiClient<
172 RelayNumber,
173 RelayHash,
174 RelayHeader,
175 SignedBlock<RelayBlock>,
176 >>::subscribe_all_heads(&self.active_client)
177 .await
178 .map_err(|e| {
179 tracing::error!(
180 target: LOG_TARGET,
181 ?e,
182 "Unable to open `chain_subscribeAllHeads` subscription."
183 );
184 e
185 })?;
186
187 let best_subscription = <JsonRpcClient as ChainApiClient<
188 RelayNumber,
189 RelayHash,
190 RelayHeader,
191 SignedBlock<RelayBlock>,
192 >>::subscribe_new_heads(&self.active_client)
193 .await
194 .map_err(|e| {
195 tracing::error!(
196 target: LOG_TARGET,
197 ?e,
198 "Unable to open `chain_subscribeNewHeads` subscription."
199 );
200 e
201 })?;
202
203 let finalized_subscription = <JsonRpcClient as ChainApiClient<
204 RelayNumber,
205 RelayHash,
206 RelayHeader,
207 SignedBlock<RelayBlock>,
208 >>::subscribe_finalized_heads(&self.active_client)
209 .await
210 .map_err(|e| {
211 tracing::error!(
212 target: LOG_TARGET,
213 ?e,
214 "Unable to open `chain_subscribeFinalizedHeads` subscription."
215 );
216 e
217 })?;
218
219 Ok(RelayChainSubscriptions {
220 import_subscription,
221 best_subscription,
222 finalized_subscription,
223 })
224 }
225
226 fn create_request(
230 &self,
231 method: String,
232 params: ArrayParams,
233 response_sender: OneshotSender<Result<JsonValue, JsonRpseeError>>,
234 ) -> BoxFuture<'static, Result<(), RpcDispatcherMessage>> {
235 let future_client = self.active_client.clone();
236 async move {
237 let resp = future_client.request(&method, params.clone()).await;
238
239 if let Err(JsonRpseeError::RestartNeeded(_)) = resp {
243 return Err(RpcDispatcherMessage::Request(method, params, response_sender))
244 }
245
246 if let Err(err) = response_sender.send(resp) {
247 tracing::debug!(
248 target: LOG_TARGET,
249 ?err,
250 "Recipient no longer interested in request result"
251 );
252 }
253 Ok(())
254 }
255 .boxed()
256 }
257}
258
259enum ConnectionStatus {
260 Connected,
261 ReconnectRequired(Option<RpcDispatcherMessage>),
262}
263
264impl ReconnectingWebsocketWorker {
265 pub async fn new(
267 urls: Vec<Url>,
268 ) -> (ReconnectingWebsocketWorker, TokioSender<RpcDispatcherMessage>) {
269 let urls = urls.into_iter().filter_map(url_to_string_with_port).collect();
270
271 let (tx, rx) = tokio_channel(100);
272 let worker = ReconnectingWebsocketWorker {
273 ws_urls: urls,
274 client_receiver: rx,
275 imported_header_listeners: Vec::new(),
276 finalized_header_listeners: Vec::new(),
277 best_header_listeners: Vec::new(),
278 };
279 (worker, tx)
280 }
281
282 async fn handle_reconnect(
284 &mut self,
285 client_manager: &mut ClientManager,
286 pending_requests: &mut FuturesUnordered<
287 BoxFuture<'static, Result<(), RpcDispatcherMessage>>,
288 >,
289 first_failed_request: Option<RpcDispatcherMessage>,
290 ) -> Result<RelayChainSubscriptions, String> {
291 let mut requests_to_retry = Vec::new();
292 if let Some(req @ RpcDispatcherMessage::Request(_, _, _)) = first_failed_request {
293 requests_to_retry.push(req);
294 }
295
296 while !pending_requests.is_empty() {
299 if let Some(Err(req)) = pending_requests.next().await {
300 requests_to_retry.push(req);
301 }
302 }
303
304 if client_manager.connect_to_new_rpc_server().await.is_err() {
305 return Err("Unable to find valid external RPC server, shutting down.".to_string())
306 };
307
308 for item in requests_to_retry.into_iter() {
309 if let RpcDispatcherMessage::Request(method, params, response_sender) = item {
310 pending_requests.push(client_manager.create_request(
311 method,
312 params,
313 response_sender,
314 ));
315 };
316 }
317
318 client_manager.get_subscriptions().await.map_err(|e| {
319 format!("Not able to create streams from newly connected RPC server, shutting down. err: {:?}", e)
320 })
321 }
322
323 pub async fn run(mut self) {
334 let mut pending_requests = FuturesUnordered::new();
335
336 let urls = std::mem::take(&mut self.ws_urls);
337 let Ok(mut client_manager) = ClientManager::new(urls).await else {
338 tracing::error!(target: LOG_TARGET, "No valid RPC url found. Stopping RPC worker.");
339 return
340 };
341 let Ok(mut subscriptions) = client_manager.get_subscriptions().await else {
342 tracing::error!(target: LOG_TARGET, "Unable to fetch subscriptions on initial connection.");
343 return
344 };
345
346 let mut imported_blocks_cache = LruMap::new(ByLength::new(40));
347 let mut should_reconnect = ConnectionStatus::Connected;
348 let mut last_seen_finalized_num: RelayNumber = 0;
349 loop {
350 if let ConnectionStatus::ReconnectRequired(maybe_failed_request) = should_reconnect {
352 match self
353 .handle_reconnect(
354 &mut client_manager,
355 &mut pending_requests,
356 maybe_failed_request,
357 )
358 .await
359 {
360 Ok(new_subscriptions) => {
361 subscriptions = new_subscriptions;
362 },
363 Err(message) => {
364 tracing::error!(
365 target: LOG_TARGET,
366 message,
367 "Unable to reconnect, stopping worker."
368 );
369 return
370 },
371 }
372 should_reconnect = ConnectionStatus::Connected;
373 }
374
375 tokio::select! {
376 evt = self.client_receiver.recv() => match evt {
377 Some(RpcDispatcherMessage::RegisterBestHeadListener(tx)) => {
378 self.best_header_listeners.push(tx);
379 },
380 Some(RpcDispatcherMessage::RegisterImportListener(tx)) => {
381 self.imported_header_listeners.push(tx)
382 },
383 Some(RpcDispatcherMessage::RegisterFinalizationListener(tx)) => {
384 self.finalized_header_listeners.push(tx)
385 },
386 Some(RpcDispatcherMessage::Request(method, params, response_sender)) => {
387 pending_requests.push(client_manager.create_request(method, params, response_sender));
388 },
389 None => {
390 tracing::error!(target: LOG_TARGET, "RPC client receiver closed. Stopping RPC Worker.");
391 return;
392 }
393 },
394 should_retry = pending_requests.next(), if !pending_requests.is_empty() => {
395 if let Some(Err(req)) = should_retry {
396 should_reconnect = ConnectionStatus::ReconnectRequired(Some(req));
397 }
398 },
399 import_event = subscriptions.import_subscription.next() => {
400 match import_event {
401 Some(Ok(header)) => {
402 let hash = header.hash();
403 if imported_blocks_cache.peek(&hash).is_some() {
404 tracing::debug!(
405 target: LOG_TARGET,
406 number = header.number,
407 ?hash,
408 "Duplicate imported block header. This might happen after switching to a new RPC node. Skipping distribution."
409 );
410 continue;
411 }
412 imported_blocks_cache.insert(hash, ());
413 distribute_header(header, &mut self.imported_header_listeners);
414 },
415 None => {
416 tracing::error!(target: LOG_TARGET, "Subscription closed.");
417 should_reconnect = ConnectionStatus::ReconnectRequired(None);
418 },
419 Some(Err(error)) => {
420 tracing::error!(target: LOG_TARGET, ?error, "Error in RPC subscription.");
421 should_reconnect = ConnectionStatus::ReconnectRequired(None);
422 },
423 }
424 },
425 best_header_event = subscriptions.best_subscription.next() => {
426 match best_header_event {
427 Some(Ok(header)) => distribute_header(header, &mut self.best_header_listeners),
428 None => {
429 tracing::error!(target: LOG_TARGET, "Subscription closed.");
430 should_reconnect = ConnectionStatus::ReconnectRequired(None);
431 },
432 Some(Err(error)) => {
433 tracing::error!(target: LOG_TARGET, ?error, "Error in RPC subscription.");
434 should_reconnect = ConnectionStatus::ReconnectRequired(None);
435 },
436 }
437 }
438 finalized_event = subscriptions.finalized_subscription.next() => {
439 match finalized_event {
440 Some(Ok(header)) if header.number > last_seen_finalized_num => {
441 last_seen_finalized_num = header.number;
442 distribute_header(header, &mut self.finalized_header_listeners);
443 },
444 Some(Ok(header)) => {
445 tracing::debug!(
446 target: LOG_TARGET,
447 number = header.number,
448 last_seen_finalized_num,
449 "Duplicate finalized block header. This might happen after switching to a new RPC node. Skipping distribution."
450 );
451 },
452 None => {
453 tracing::error!(target: LOG_TARGET, "Subscription closed.");
454 should_reconnect = ConnectionStatus::ReconnectRequired(None);
455 },
456 Some(Err(error)) => {
457 tracing::error!(target: LOG_TARGET, ?error, "Error in RPC subscription.");
458 should_reconnect = ConnectionStatus::ReconnectRequired(None);
459 },
460 }
461 }
462 }
463 }
464 }
465}
466
467#[cfg(test)]
468mod test {
469 use std::time::Duration;
470
471 use super::{url_to_string_with_port, ClientManager};
472 use jsonrpsee::Methods;
473 use url::Url;
474
475 const SERVER_STARTUP_DELAY_SECONDS: u64 = 10;
476
477 #[test]
478 fn url_to_string_works() {
479 let url = Url::parse("wss://something/path").unwrap();
480 assert_eq!(Some("wss://something:443/path".to_string()), url_to_string_with_port(url));
481
482 let url = Url::parse("ws://something/path").unwrap();
483 assert_eq!(Some("ws://something:80/path".to_string()), url_to_string_with_port(url));
484
485 let url = Url::parse("wss://something:100/path").unwrap();
486 assert_eq!(Some("wss://something:100/path".to_string()), url_to_string_with_port(url));
487
488 let url = Url::parse("wss://something:100/path").unwrap();
489 assert_eq!(Some("wss://something:100/path".to_string()), url_to_string_with_port(url));
490
491 let url = Url::parse("wss://something/path?query=yes").unwrap();
492 assert_eq!(
493 Some("wss://something:443/path?query=yes".to_string()),
494 url_to_string_with_port(url)
495 );
496
497 let url = Url::parse("wss://something:9090/path?query=yes").unwrap();
498 assert_eq!(
499 Some("wss://something:9090/path?query=yes".to_string()),
500 url_to_string_with_port(url)
501 );
502 }
503
504 #[tokio::test]
505 async fn client_manager_retry_logic() {
508 let port = portpicker::pick_unused_port().unwrap();
509 let server = jsonrpsee::server::Server::builder()
510 .build(format!("0.0.0.0:{}", port))
511 .await
512 .unwrap();
513
514 let server = tokio::spawn(async {
516 tokio::time::sleep(Duration::from_secs(SERVER_STARTUP_DELAY_SECONDS)).await;
517 server.start(Methods::default())
518 });
519
520 let res = ClientManager::new(vec![format!("ws://127.0.0.1:{}", port)]).await;
524 assert!(res.is_ok());
525
526 server.await.unwrap();
527 }
528}