referrerpolicy=no-referrer-when-downgrade

cumulus_relay_chain_rpc_interface/
reconnecting_ws_client.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18use cumulus_primitives_core::relay_chain::{
19	Block as RelayBlock, BlockNumber as RelayNumber, Hash as RelayHash, Header as RelayHeader,
20};
21use futures::{
22	channel::{mpsc::Sender, oneshot::Sender as OneshotSender},
23	future::BoxFuture,
24	stream::FuturesUnordered,
25	FutureExt, StreamExt,
26};
27use jsonrpsee::{
28	core::{
29		client::{Client as JsonRpcClient, ClientT, Subscription},
30		params::ArrayParams,
31		ClientError as JsonRpseeError, JsonValue,
32	},
33	ws_client::WsClientBuilder,
34};
35use sc_rpc_api::chain::ChainApiClient;
36use schnellru::{ByLength, LruMap};
37use sp_runtime::generic::SignedBlock;
38use std::{sync::Arc, time::Duration};
39use tokio::sync::mpsc::{
40	channel as tokio_channel, Receiver as TokioReceiver, Sender as TokioSender,
41};
42use url::Url;
43
44use crate::rpc_client::{distribute_header, RpcDispatcherMessage};
45
46const LOG_TARGET: &str = "reconnecting-websocket-client";
47const DEFAULT_EXTERNAL_RPC_CONN_RETRIES: usize = 5;
48const DEFAULT_SLEEP_TIME_MS_BETWEEN_RETRIES: u64 = 1000;
49const DEFAULT_SLEEP_EXP_BACKOFF_BETWEEN_RETRIES: i32 = 2;
50
51/// Worker that should be used in combination with [`RelayChainRpcClient`].
52///
53/// Must be polled to distribute header notifications to listeners.
54pub struct ReconnectingWebsocketWorker {
55	ws_urls: Vec<String>,
56	/// Communication channel with the RPC client
57	client_receiver: TokioReceiver<RpcDispatcherMessage>,
58
59	/// Senders to distribute incoming header notifications to
60	imported_header_listeners: Vec<Sender<RelayHeader>>,
61	finalized_header_listeners: Vec<Sender<RelayHeader>>,
62	best_header_listeners: Vec<Sender<RelayHeader>>,
63}
64
65/// Format url and force addition of a port
66fn url_to_string_with_port(url: Url) -> Option<String> {
67	// This is already validated on CLI side, just defensive here
68	if (url.scheme() != "ws" && url.scheme() != "wss") || url.host_str().is_none() {
69		tracing::warn!(target: LOG_TARGET, ?url, "Non-WebSocket URL or missing host.");
70		return None
71	}
72
73	// Either we have a user-supplied port or use the default for 'ws' or 'wss' here
74	Some(format!(
75		"{}://{}:{}{}{}",
76		url.scheme(),
77		url.host_str()?,
78		url.port_or_known_default()?,
79		url.path(),
80		url.query().map(|query| format!("?{}", query)).unwrap_or_default()
81	))
82}
83
84/// Manages the active websocket client.
85/// Responsible for creating request futures, subscription streams
86/// and reconnections.
87#[derive(Debug)]
88struct ClientManager {
89	urls: Vec<String>,
90	active_client: Arc<JsonRpcClient>,
91	active_index: usize,
92}
93
94struct RelayChainSubscriptions {
95	import_subscription: Subscription<RelayHeader>,
96	finalized_subscription: Subscription<RelayHeader>,
97	best_subscription: Subscription<RelayHeader>,
98}
99
100/// Try to find a new RPC server to connect to. Uses a naive retry
101/// logic that does an exponential backoff in between iterations
102/// through all URLs from the list. It uses a constant to tell how
103/// many iterations of connection attempts to all URLs we allow. We
104/// return early when a connection is made.
105async fn connect_next_available_rpc_server(
106	urls: &Vec<String>,
107	starting_position: usize,
108) -> Result<(usize, Arc<JsonRpcClient>), ()> {
109	tracing::debug!(target: LOG_TARGET, starting_position, "Connecting to RPC server.");
110
111	let mut prev_iteration: u32 = 0;
112	for (counter, url) in urls
113		.iter()
114		.cycle()
115		.skip(starting_position)
116		.take(urls.len() * DEFAULT_EXTERNAL_RPC_CONN_RETRIES)
117		.enumerate()
118	{
119		// If we reached the end of the urls list, backoff before retrying
120		// connections to the entire list once more.
121		let Ok(current_iteration) = (counter / urls.len()).try_into() else {
122			tracing::error!(target: LOG_TARGET, "Too many connection attempts to the RPC servers, aborting...");
123			break;
124		};
125		if current_iteration > prev_iteration {
126			// Safe conversion given we convert positive i32s which are lower than u64::MAX.
127			tokio::time::sleep(Duration::from_millis(
128				DEFAULT_SLEEP_TIME_MS_BETWEEN_RETRIES *
129					DEFAULT_SLEEP_EXP_BACKOFF_BETWEEN_RETRIES.pow(prev_iteration) as u64,
130			))
131			.await;
132			prev_iteration = current_iteration;
133		}
134
135		let index = (starting_position + counter) % urls.len();
136		tracing::info!(
137			target: LOG_TARGET,
138			attempt = current_iteration,
139			index,
140			url,
141			"Trying to connect to next external relaychain node.",
142		);
143		match WsClientBuilder::default().build(&url).await {
144			Ok(ws_client) => return Ok((index, Arc::new(ws_client))),
145			Err(err) => tracing::debug!(target: LOG_TARGET, url, ?err, "Unable to connect."),
146		};
147	}
148
149	tracing::error!(target: LOG_TARGET, "Retrying to connect to any external relaychain node failed.");
150	Err(())
151}
152
153impl ClientManager {
154	pub async fn new(urls: Vec<String>) -> Result<Self, ()> {
155		if urls.is_empty() {
156			return Err(())
157		}
158		let active_client = connect_next_available_rpc_server(&urls, 0).await?;
159		Ok(Self { urls, active_client: active_client.1, active_index: active_client.0 })
160	}
161
162	pub async fn connect_to_new_rpc_server(&mut self) -> Result<(), ()> {
163		let new_active =
164			connect_next_available_rpc_server(&self.urls, self.active_index + 1).await?;
165		self.active_client = new_active.1;
166		self.active_index = new_active.0;
167		Ok(())
168	}
169
170	async fn get_subscriptions(&self) -> Result<RelayChainSubscriptions, JsonRpseeError> {
171		let import_subscription = <JsonRpcClient as ChainApiClient<
172			RelayNumber,
173			RelayHash,
174			RelayHeader,
175			SignedBlock<RelayBlock>,
176		>>::subscribe_all_heads(&self.active_client)
177		.await
178		.map_err(|e| {
179			tracing::error!(
180				target: LOG_TARGET,
181				?e,
182				"Unable to open `chain_subscribeAllHeads` subscription."
183			);
184			e
185		})?;
186
187		let best_subscription = <JsonRpcClient as ChainApiClient<
188			RelayNumber,
189			RelayHash,
190			RelayHeader,
191			SignedBlock<RelayBlock>,
192		>>::subscribe_new_heads(&self.active_client)
193		.await
194		.map_err(|e| {
195			tracing::error!(
196				target: LOG_TARGET,
197				?e,
198				"Unable to open `chain_subscribeNewHeads` subscription."
199			);
200			e
201		})?;
202
203		let finalized_subscription = <JsonRpcClient as ChainApiClient<
204			RelayNumber,
205			RelayHash,
206			RelayHeader,
207			SignedBlock<RelayBlock>,
208		>>::subscribe_finalized_heads(&self.active_client)
209		.await
210		.map_err(|e| {
211			tracing::error!(
212				target: LOG_TARGET,
213				?e,
214				"Unable to open `chain_subscribeFinalizedHeads` subscription."
215			);
216			e
217		})?;
218
219		Ok(RelayChainSubscriptions {
220			import_subscription,
221			best_subscription,
222			finalized_subscription,
223		})
224	}
225
226	/// Create a request future that performs an RPC request and sends the results to the caller.
227	/// In case of a dead websocket connection, it returns the original request parameters to
228	/// enable retries.
229	fn create_request(
230		&self,
231		method: String,
232		params: ArrayParams,
233		response_sender: OneshotSender<Result<JsonValue, JsonRpseeError>>,
234	) -> BoxFuture<'static, Result<(), RpcDispatcherMessage>> {
235		let future_client = self.active_client.clone();
236		async move {
237			let resp = future_client.request(&method, params.clone()).await;
238
239			// We should only return the original request in case
240			// the websocket connection is dead and requires a restart.
241			// Other errors should be forwarded to the request caller.
242			if let Err(JsonRpseeError::RestartNeeded(_)) = resp {
243				return Err(RpcDispatcherMessage::Request(method, params, response_sender))
244			}
245
246			if let Err(err) = response_sender.send(resp) {
247				tracing::debug!(
248					target: LOG_TARGET,
249					?err,
250					"Recipient no longer interested in request result"
251				);
252			}
253			Ok(())
254		}
255		.boxed()
256	}
257}
258
259enum ConnectionStatus {
260	Connected,
261	ReconnectRequired(Option<RpcDispatcherMessage>),
262}
263
264impl ReconnectingWebsocketWorker {
265	/// Create new worker. Returns the worker and a channel to register new listeners.
266	pub async fn new(
267		urls: Vec<Url>,
268	) -> (ReconnectingWebsocketWorker, TokioSender<RpcDispatcherMessage>) {
269		let urls = urls.into_iter().filter_map(url_to_string_with_port).collect();
270
271		let (tx, rx) = tokio_channel(100);
272		let worker = ReconnectingWebsocketWorker {
273			ws_urls: urls,
274			client_receiver: rx,
275			imported_header_listeners: Vec::new(),
276			finalized_header_listeners: Vec::new(),
277			best_header_listeners: Vec::new(),
278		};
279		(worker, tx)
280	}
281
282	/// Reconnect via [`ClientManager`] and provide new notification streams.
283	async fn handle_reconnect(
284		&mut self,
285		client_manager: &mut ClientManager,
286		pending_requests: &mut FuturesUnordered<
287			BoxFuture<'static, Result<(), RpcDispatcherMessage>>,
288		>,
289		first_failed_request: Option<RpcDispatcherMessage>,
290	) -> Result<RelayChainSubscriptions, String> {
291		let mut requests_to_retry = Vec::new();
292		if let Some(req @ RpcDispatcherMessage::Request(_, _, _)) = first_failed_request {
293			requests_to_retry.push(req);
294		}
295
296		// At this point, all pending requests will return an error since the
297		// websocket connection is dead. So draining the pending requests should be fast.
298		while !pending_requests.is_empty() {
299			if let Some(Err(req)) = pending_requests.next().await {
300				requests_to_retry.push(req);
301			}
302		}
303
304		if client_manager.connect_to_new_rpc_server().await.is_err() {
305			return Err("Unable to find valid external RPC server, shutting down.".to_string())
306		};
307
308		for item in requests_to_retry.into_iter() {
309			if let RpcDispatcherMessage::Request(method, params, response_sender) = item {
310				pending_requests.push(client_manager.create_request(
311					method,
312					params,
313					response_sender,
314				));
315			};
316		}
317
318		client_manager.get_subscriptions().await.map_err(|e| {
319			format!("Not able to create streams from newly connected RPC server, shutting down. err: {:?}", e)
320		})
321	}
322
323	/// Run this worker to drive notification streams.
324	/// The worker does the following:
325	/// - Listen for [`RpcDispatcherMessage`], perform requests and register new listeners for the
326	///   notification streams
327	/// - Distribute incoming import, best head and finalization notifications to registered
328	///   listeners. If an error occurs during sending, the receiver has been closed and we remove
329	///   the sender from the list.
330	/// - Find a new valid RPC server to connect to in case the websocket connection is terminated.
331	///   If the worker is not able to connect to an RPC server from the list, the worker shuts
332	///   down.
333	pub async fn run(mut self) {
334		let mut pending_requests = FuturesUnordered::new();
335
336		let urls = std::mem::take(&mut self.ws_urls);
337		let Ok(mut client_manager) = ClientManager::new(urls).await else {
338			tracing::error!(target: LOG_TARGET, "No valid RPC url found. Stopping RPC worker.");
339			return
340		};
341		let Ok(mut subscriptions) = client_manager.get_subscriptions().await else {
342			tracing::error!(target: LOG_TARGET, "Unable to fetch subscriptions on initial connection.");
343			return
344		};
345
346		let mut imported_blocks_cache = LruMap::new(ByLength::new(40));
347		let mut should_reconnect = ConnectionStatus::Connected;
348		let mut last_seen_finalized_num: RelayNumber = 0;
349		loop {
350			// This branch is taken if the websocket connection to the current RPC server is closed.
351			if let ConnectionStatus::ReconnectRequired(maybe_failed_request) = should_reconnect {
352				match self
353					.handle_reconnect(
354						&mut client_manager,
355						&mut pending_requests,
356						maybe_failed_request,
357					)
358					.await
359				{
360					Ok(new_subscriptions) => {
361						subscriptions = new_subscriptions;
362					},
363					Err(message) => {
364						tracing::error!(
365							target: LOG_TARGET,
366							message,
367							"Unable to reconnect, stopping worker."
368						);
369						return
370					},
371				}
372				should_reconnect = ConnectionStatus::Connected;
373			}
374
375			tokio::select! {
376				evt = self.client_receiver.recv() => match evt {
377					Some(RpcDispatcherMessage::RegisterBestHeadListener(tx)) => {
378						self.best_header_listeners.push(tx);
379					},
380					Some(RpcDispatcherMessage::RegisterImportListener(tx)) => {
381						self.imported_header_listeners.push(tx)
382					},
383					Some(RpcDispatcherMessage::RegisterFinalizationListener(tx)) => {
384						self.finalized_header_listeners.push(tx)
385					},
386					Some(RpcDispatcherMessage::Request(method, params, response_sender)) => {
387						pending_requests.push(client_manager.create_request(method, params, response_sender));
388					},
389					None => {
390						tracing::error!(target: LOG_TARGET, "RPC client receiver closed. Stopping RPC Worker.");
391						return;
392					}
393				},
394				should_retry = pending_requests.next(), if !pending_requests.is_empty() => {
395					if let Some(Err(req)) = should_retry {
396						should_reconnect = ConnectionStatus::ReconnectRequired(Some(req));
397					}
398				},
399				import_event = subscriptions.import_subscription.next() => {
400					match import_event {
401						Some(Ok(header)) => {
402							let hash = header.hash();
403							if imported_blocks_cache.peek(&hash).is_some() {
404								tracing::debug!(
405									target: LOG_TARGET,
406									number = header.number,
407									?hash,
408									"Duplicate imported block header. This might happen after switching to a new RPC node. Skipping distribution."
409								);
410								continue;
411							}
412							imported_blocks_cache.insert(hash, ());
413							distribute_header(header, &mut self.imported_header_listeners);
414						},
415						None => {
416							tracing::error!(target: LOG_TARGET, "Subscription closed.");
417							should_reconnect = ConnectionStatus::ReconnectRequired(None);
418						},
419						Some(Err(error)) => {
420							tracing::error!(target: LOG_TARGET, ?error, "Error in RPC subscription.");
421							should_reconnect = ConnectionStatus::ReconnectRequired(None);
422						},
423					}
424				},
425				best_header_event = subscriptions.best_subscription.next() => {
426					match best_header_event {
427						Some(Ok(header)) => distribute_header(header, &mut self.best_header_listeners),
428						None => {
429							tracing::error!(target: LOG_TARGET, "Subscription closed.");
430							should_reconnect = ConnectionStatus::ReconnectRequired(None);
431						},
432						Some(Err(error)) => {
433							tracing::error!(target: LOG_TARGET, ?error, "Error in RPC subscription.");
434							should_reconnect = ConnectionStatus::ReconnectRequired(None);
435						},
436					}
437				}
438				finalized_event = subscriptions.finalized_subscription.next() => {
439					match finalized_event {
440						Some(Ok(header)) if header.number > last_seen_finalized_num => {
441							last_seen_finalized_num = header.number;
442							distribute_header(header, &mut self.finalized_header_listeners);
443						},
444						Some(Ok(header)) => {
445							tracing::debug!(
446								target: LOG_TARGET,
447								number = header.number,
448								last_seen_finalized_num,
449								"Duplicate finalized block header. This might happen after switching to a new RPC node. Skipping distribution."
450							);
451						},
452						None => {
453							tracing::error!(target: LOG_TARGET, "Subscription closed.");
454							should_reconnect = ConnectionStatus::ReconnectRequired(None);
455						},
456						Some(Err(error)) => {
457							tracing::error!(target: LOG_TARGET, ?error, "Error in RPC subscription.");
458							should_reconnect = ConnectionStatus::ReconnectRequired(None);
459						},
460					}
461				}
462			}
463		}
464	}
465}
466
467#[cfg(test)]
468mod test {
469	use std::time::Duration;
470
471	use super::{url_to_string_with_port, ClientManager};
472	use jsonrpsee::Methods;
473	use url::Url;
474
475	const SERVER_STARTUP_DELAY_SECONDS: u64 = 10;
476
477	#[test]
478	fn url_to_string_works() {
479		let url = Url::parse("wss://something/path").unwrap();
480		assert_eq!(Some("wss://something:443/path".to_string()), url_to_string_with_port(url));
481
482		let url = Url::parse("ws://something/path").unwrap();
483		assert_eq!(Some("ws://something:80/path".to_string()), url_to_string_with_port(url));
484
485		let url = Url::parse("wss://something:100/path").unwrap();
486		assert_eq!(Some("wss://something:100/path".to_string()), url_to_string_with_port(url));
487
488		let url = Url::parse("wss://something:100/path").unwrap();
489		assert_eq!(Some("wss://something:100/path".to_string()), url_to_string_with_port(url));
490
491		let url = Url::parse("wss://something/path?query=yes").unwrap();
492		assert_eq!(
493			Some("wss://something:443/path?query=yes".to_string()),
494			url_to_string_with_port(url)
495		);
496
497		let url = Url::parse("wss://something:9090/path?query=yes").unwrap();
498		assert_eq!(
499			Some("wss://something:9090/path?query=yes".to_string()),
500			url_to_string_with_port(url)
501		);
502	}
503
504	#[tokio::test]
505	// Testing the retry logic at full means increasing CI with half a minute according
506	// to the current logic, so lets test it best effort.
507	async fn client_manager_retry_logic() {
508		let port = portpicker::pick_unused_port().unwrap();
509		let server = jsonrpsee::server::Server::builder()
510			.build(format!("0.0.0.0:{}", port))
511			.await
512			.unwrap();
513
514		// Start the server.
515		let server = tokio::spawn(async {
516			tokio::time::sleep(Duration::from_secs(SERVER_STARTUP_DELAY_SECONDS)).await;
517			server.start(Methods::default())
518		});
519
520		// Start the client. Not exitting right away with an error means it
521		// is handling gracefully received connections refused while the server
522		// is starting.
523		let res = ClientManager::new(vec![format!("ws://127.0.0.1:{}", port)]).await;
524		assert!(res.is_ok());
525
526		server.await.unwrap();
527	}
528}