referrerpolicy=no-referrer-when-downgrade

pallet_revive_eth_rpc/
cli.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// 	http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17//! The Ethereum JSON-RPC server.
18use crate::{
19	DbContext, DebugRpcServer, DebugRpcServerImpl, EthRpcServer, EthRpcServerImpl, LOG_TARGET,
20	PolkadotRpcServer, PolkadotRpcServerImpl, ReceiptExtractor, ReceiptProvider,
21	SubxtBlockInfoProvider, SystemHealthRpcServer, SystemHealthRpcServerImpl,
22	client::{Client, ClientError, SubscriptionGapQueue, SubscriptionType, connect},
23};
24use clap::{CommandFactory, FromArgMatches, Parser};
25use futures::{FutureExt, future::BoxFuture, pin_mut};
26use jsonrpsee::server::RpcModule;
27use sc_cli::{PrometheusParams, RpcParams, SharedParams, Signals};
28use sc_service::{
29	TaskManager,
30	config::{BasePath, PrometheusConfig, RpcConfiguration},
31	create_rpc_runtime, start_rpc_servers,
32};
33use sqlx::{
34	SqlitePool,
35	sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions},
36};
37use std::path::PathBuf;
38
39/// Query the maximum number of bound parameters SQLite allows per query
40async fn sqlite_db_query_max_variable_number(pool: &SqlitePool) -> usize {
41	let limit = async {
42		let mut conn = pool
43			.acquire()
44			.await
45			.inspect_err(|e| log::warn!(target: LOG_TARGET, "๐Ÿ’พ Failed to acquire connection: {e}"))
46			.ok()?;
47		let mut handle = conn
48			.lock_handle()
49			.await
50			.inspect_err(|e| log::warn!(target: LOG_TARGET, "๐Ÿ’พ Failed to lock handle: {e}"))
51			.ok()?;
52		// SAFETY: `lock_handle` guarantees the raw pointer is valid for
53		// the lifetime of the guard, and passing -1 only queries the limit.
54		let raw = unsafe {
55			libsqlite3_sys::sqlite3_limit(
56				handle.as_raw_handle().as_ptr(),
57				libsqlite3_sys::SQLITE_LIMIT_VARIABLE_NUMBER,
58				-1,
59			)
60		};
61		raw.try_into().ok()
62	}
63	.await;
64
65	let default = DbContext::DEFAULT_MAX_VARIABLE_NUMBER;
66	limit.inspect(|n| log::info!(target: LOG_TARGET, "๐Ÿ’พ SQLite db_query_max_variable_number: {n}"))
67		.unwrap_or_else(|| {
68			log::warn!(target: LOG_TARGET, "๐Ÿ’พ Failed to query SQLite variable limit, falling back to {default}");
69			default
70		})
71}
72
73/// Specifies the eth-rpc pruning mode.
74#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display)]
75pub enum EthPruningMode {
76	/// Persistent on-disk database with backward historical sync of all blocks.
77	#[display(fmt = "archive")]
78	Archive,
79	/// In-memory database keeping only the latest N blocks.
80	#[display(fmt = "{_0}")]
81	KeepLatest(usize),
82}
83
84impl EthPruningMode {
85	/// Returns `true` if this mode enables historical block sync.
86	pub fn is_archive(&self) -> bool {
87		matches!(self, Self::Archive)
88	}
89
90	/// Returns the number of blocks to keep, if in `KeepLatest` mode.
91	pub fn keep_latest(&self) -> Option<usize> {
92		match self {
93			Self::KeepLatest(n) => Some(*n),
94			_ => None,
95		}
96	}
97}
98
99impl std::str::FromStr for EthPruningMode {
100	type Err = String;
101
102	fn from_str(input: &str) -> Result<Self, Self::Err> {
103		match input {
104			"archive" => Ok(Self::Archive),
105			n => {
106				n.parse::<usize>()
107					.ok()
108					.filter(|&v| v >= 1)
109					.map(Self::KeepLatest)
110					.ok_or_else(|| {
111						format!(
112							"Invalid pruning mode '{n}': expected 'archive' or a positive integer"
113						)
114					})
115			},
116		}
117	}
118}
119
120// Default port if --prometheus-port is not specified
121const DEFAULT_PROMETHEUS_PORT: u16 = 9616;
122
123// Default port if --rpc-port is not specified
124const DEFAULT_RPC_PORT: u16 = 8545;
125
126const DEFAULT_DATABASE_NAME: &str = "eth-rpc.db";
127
128// Parsed command instructions from the command line
129#[derive(Parser, Debug)]
130#[clap(author, about, version)]
131pub struct CliCommand {
132	/// The node url to connect to
133	#[clap(long, default_value = "ws://127.0.0.1:9944")]
134	pub node_rpc_url: String,
135
136	/// Pruning mode for the eth-rpc receipt database.
137	///
138	/// - archive (default): Sync all historical blocks (requires an archive node).
139	/// - N (>= 1): In-memory database keeping only the latest N blocks.
140	#[clap(long, default_value = "archive")]
141	pub eth_pruning: EthPruningMode,
142
143	#[allow(missing_docs)]
144	#[clap(flatten)]
145	pub shared_params: SharedParams,
146
147	#[allow(missing_docs)]
148	#[clap(flatten)]
149	pub rpc_params: RpcParams,
150
151	#[allow(missing_docs)]
152	#[clap(flatten)]
153	pub prometheus_params: PrometheusParams,
154
155	/// By default, the node rejects any transaction that's unprotected (i.e., that doesn't have a
156	/// chain-id). If the user wishes the submit such a transaction then they can use this flag to
157	/// instruct the RPC to ignore this check.
158	#[arg(long)]
159	pub allow_unprotected_txs: bool,
160}
161
162impl CliCommand {
163	/// Parse CLI args, rejecting any removed flags with a helpful message.
164	pub fn parse_cli() -> anyhow::Result<Self> {
165		let removed_flags =
166			["database-url", "cache-size", "index-last-n-blocks", "earliest-receipt-block"];
167
168		let cmd = removed_flags.iter().fold(Self::command(), |cmd, name| {
169			cmd.arg(
170				clap::Arg::new(*name)
171					.long(*name)
172					.num_args(0..=1)
173					.hide(true)
174					.action(clap::ArgAction::Set),
175			)
176		});
177		let matches = cmd.get_matches();
178
179		let used: Vec<_> = removed_flags
180			.iter()
181			.filter(|f| matches.contains_id(f))
182			.map(|f| format!("--{f}"))
183			.collect();
184		if !used.is_empty() {
185			anyhow::bail!(
186				"[{}] have been removed. \
187				 Check polkadot-sdk PR #11153 for the CLI migration guide.",
188				used.join(", "),
189			);
190		}
191
192		Ok(Self::from_arg_matches(&matches).expect("already validated by clap"))
193	}
194}
195
196/// Initialize the logger
197#[cfg(not(test))]
198fn init_logger(params: &SharedParams) -> anyhow::Result<()> {
199	let mut logger = sc_cli::LoggerBuilder::new(params.log_filters().join(","));
200	logger
201		.with_log_reloading(params.enable_log_reloading)
202		.with_detailed_output(params.detailed_log_output);
203
204	if let Some(tracing_targets) = &params.tracing_targets {
205		let tracing_receiver = params.tracing_receiver.into();
206		logger.with_profiling(tracing_receiver, tracing_targets);
207	}
208
209	if params.disable_log_color {
210		logger.with_colors(false);
211	}
212
213	logger.init()?;
214	Ok(())
215}
216
217/// Resolve the base directory for persistent database storage.
218///
219/// - If `base_path` is `Some` (explicit `--base-path` or `--dev` temp dir), use it directly.
220/// - If `base_path` is `None`, use the platform default:
221///   - macOS: `~/Library/Application Support/eth-rpc/`
222///   - Linux: `~/.local/share/eth-rpc/`
223///   - Windows: `%APPDATA%\eth-rpc\`
224fn resolve_db_dir(base_path: Option<BasePath>) -> PathBuf {
225	match base_path {
226		Some(path) => path.path().to_path_buf(),
227		None => BasePath::from_project("", "", "eth-rpc").path().to_path_buf(),
228	}
229}
230
231/// Resolve SQLite connection options from CLI arguments.
232fn resolve_db_options(
233	eth_pruning: EthPruningMode,
234	base_path: Option<BasePath>,
235) -> anyhow::Result<SqliteConnectOptions> {
236	if eth_pruning.is_archive() {
237		let db_dir = resolve_db_dir(base_path);
238		std::fs::create_dir_all(&db_dir).map_err(|e| {
239			anyhow::anyhow!("Failed to create database directory {}: {e}", db_dir.display())
240		})?;
241		let db_path = db_dir.join(DEFAULT_DATABASE_NAME);
242		log::info!(target: LOG_TARGET, "๐Ÿ’พ Database path: {}", db_path.display());
243		// WAL mode allows concurrent writes from the live subscription
244		// and the backward sync without SQLITE_BUSY errors.
245		Ok(SqliteConnectOptions::new()
246			.filename(&db_path)
247			.create_if_missing(true)
248			.journal_mode(SqliteJournalMode::Wal))
249	} else {
250		Ok(SqliteConnectOptions::new().in_memory(true))
251	}
252}
253
254fn build_client(
255	tokio_handle: &tokio::runtime::Handle,
256	eth_pruning: EthPruningMode,
257	node_rpc_url: &str,
258	db_options: SqliteConnectOptions,
259	max_request_size: u32,
260	max_response_size: u32,
261	abort_signal: Signals,
262	subscription_gap_queue: SubscriptionGapQueue,
263) -> anyhow::Result<Client> {
264	let fut = async {
265		let (api, rpc_client, rpc) =
266			connect(node_rpc_url, max_request_size, max_response_size).await?;
267		let block_provider = SubxtBlockInfoProvider::new(api.clone(), rpc.clone()).await?;
268
269		let (pool, keep_latest_n_blocks) = match eth_pruning {
270			EthPruningMode::Archive => {
271				(SqlitePoolOptions::new().connect_with(db_options).await?, None)
272			},
273			EthPruningMode::KeepLatest(max_blocks) => {
274				log::info!(target: LOG_TARGET,
275					"๐Ÿ’พ Using in-memory database, keeping only {max_blocks} blocks");
276				// see sqlite in-memory issue: https://github.com/launchbadge/sqlx/issues/2510
277				let pool = SqlitePoolOptions::new()
278					.max_connections(1)
279					.idle_timeout(None)
280					.max_lifetime(None)
281					.connect_with(db_options)
282					.await?;
283				(pool, Some(max_blocks))
284			},
285		};
286
287		let receipt_extractor = ReceiptExtractor::new(api.clone()).await?;
288		let max_variable_number = sqlite_db_query_max_variable_number(&pool).await;
289		let db_ctx = DbContext::new(pool, max_variable_number);
290
291		let receipt_provider = ReceiptProvider::new(
292			db_ctx,
293			block_provider.clone(),
294			receipt_extractor.clone(),
295			keep_latest_n_blocks,
296		)
297		.await?;
298
299		let client = Client::new(
300			api,
301			rpc_client,
302			rpc,
303			block_provider,
304			receipt_provider,
305			eth_pruning.is_archive(),
306			subscription_gap_queue,
307		)
308		.await?;
309
310		Ok(client)
311	}
312	.fuse();
313	pin_mut!(fut);
314
315	match tokio_handle.block_on(abort_signal.try_until_signal(fut)) {
316		Ok(Ok(client)) => Ok(client),
317		Ok(Err(err)) => Err(err),
318		Err(_) => anyhow::bail!("Process interrupted"),
319	}
320}
321
322/// Start the JSON-RPC server using the given command line arguments.
323pub fn run(cmd: CliCommand) -> anyhow::Result<()> {
324	let CliCommand {
325		rpc_params,
326		prometheus_params,
327		node_rpc_url,
328		eth_pruning,
329		shared_params,
330		allow_unprotected_txs,
331		..
332	} = cmd;
333
334	#[cfg(not(test))]
335	init_logger(&shared_params)?;
336	let is_dev = shared_params.dev;
337	let explicit_base_path = shared_params.base_path.is_some();
338	let base_path = shared_params.base_path()?;
339
340	if is_dev && eth_pruning.is_archive() && !explicit_base_path {
341		log::warn!(
342			target: LOG_TARGET,
343			"โš ๏ธ  Running in --dev mode with --eth-pruning=archive but no --base-path. \
344			 The database will be stored in a temporary directory and lost on exit. \
345			 Use --base-path to persist the database."
346		);
347	}
348
349	let db_options = resolve_db_options(eth_pruning, base_path)?;
350
351	let rpc_addrs: Option<Vec<sc_service::config::RpcEndpoint>> = rpc_params
352		.rpc_addr(is_dev, false, DEFAULT_RPC_PORT)?
353		.map(|addrs| addrs.into_iter().map(Into::into).collect());
354
355	let rpc_config = RpcConfiguration {
356		addr: rpc_addrs,
357		methods: rpc_params.rpc_methods.into(),
358		max_connections: rpc_params.rpc_max_connections,
359		cors: rpc_params.rpc_cors(is_dev)?,
360		max_request_size: rpc_params.rpc_max_request_size,
361		max_response_size: rpc_params.rpc_max_response_size,
362		id_provider: None,
363		max_subs_per_conn: rpc_params.rpc_max_subscriptions_per_connection,
364		port: rpc_params.rpc_port.unwrap_or(DEFAULT_RPC_PORT),
365		message_buffer_capacity: rpc_params.rpc_message_buffer_capacity_per_connection,
366		batch_config: rpc_params.rpc_batch_config()?,
367		rate_limit: rpc_params.rpc_rate_limit,
368		rate_limit_whitelisted_ips: rpc_params.rpc_rate_limit_whitelisted_ips,
369		rate_limit_trust_proxy_headers: rpc_params.rpc_rate_limit_trust_proxy_headers,
370		request_logger_limit: if is_dev { 1024 * 1024 } else { 1024 },
371	};
372
373	let prometheus_config =
374		prometheus_params.prometheus_config(DEFAULT_PROMETHEUS_PORT, "eth-rpc".into());
375	let prometheus_registry = prometheus_config.as_ref().map(|config| &config.registry);
376
377	let tokio_runtime = sc_cli::build_runtime()?;
378	let tokio_handle = tokio_runtime.handle();
379	let mut task_manager = TaskManager::new(tokio_handle.clone(), prometheus_registry)?;
380
381	let (subscription_gap_queue, gap_fill_rx) = SubscriptionGapQueue::new();
382	let client = build_client(
383		tokio_handle,
384		eth_pruning,
385		&node_rpc_url,
386		db_options,
387		rpc_config.max_request_size * 1024 * 1024,
388		rpc_config.max_response_size * 1024 * 1024,
389		tokio_runtime.block_on(async { Signals::capture() })?,
390		subscription_gap_queue,
391	)?;
392
393	// Prometheus metrics.
394	if let Some(PrometheusConfig { port, registry }) = prometheus_config.clone() {
395		task_manager.spawn_handle().spawn(
396			"prometheus-endpoint",
397			None,
398			prometheus_endpoint::init_prometheus(port, registry).map(drop),
399		);
400	}
401
402	let rpc_runtime = create_rpc_runtime(rpc_config.max_connections)
403		.map_err(|e| anyhow::anyhow!("Failed to create RPC runtime: {}", e))?;
404
405	let rpc_api = rpc_module(is_dev, client.clone(), allow_unprotected_txs)?;
406	let rpc_server_handle = start_rpc_servers(
407		&rpc_config,
408		prometheus_registry,
409		tokio_handle,
410		rpc_api,
411		rpc_runtime,
412		None,
413	)?;
414
415	task_manager
416		.spawn_essential_handle()
417		.spawn("block-subscription", None, async move {
418			let mut futures: Vec<BoxFuture<'_, Result<(), _>>> = vec![
419				Box::pin(client.subscribe_and_cache_new_blocks(SubscriptionType::BestBlocks)),
420				Box::pin(client.subscribe_and_cache_new_blocks(SubscriptionType::FinalizedBlocks)),
421			];
422
423			if eth_pruning.is_archive() {
424				futures.push(Box::pin(client.sync_backward()));
425			}
426
427			// Backfill gaps caused by subscription reconnects.
428			futures.push(Box::pin(async {
429				client.run_subscription_gap_filler(gap_fill_rx).await;
430				Ok::<_, ClientError>(())
431			}));
432
433			if let Err(err) = futures::future::try_join_all(futures).await {
434				panic!("Block subscription task failed: {err:?}",)
435			}
436		});
437
438	task_manager.keep_alive(rpc_server_handle);
439	let signals = tokio_runtime.block_on(async { Signals::capture() })?;
440	tokio_runtime.block_on(signals.run_until_signal(task_manager.future().fuse()))?;
441	Ok(())
442}
443
444/// Create the JSON-RPC module.
445fn rpc_module(
446	is_dev: bool,
447	client: Client,
448	allow_unprotected_txs: bool,
449) -> Result<RpcModule<()>, sc_service::Error> {
450	let eth_api = EthRpcServerImpl::new(client.clone())
451		.with_accounts(if is_dev {
452			vec![
453				crate::Account::from(subxt_signer::eth::dev::alith()),
454				crate::Account::from(subxt_signer::eth::dev::baltathar()),
455				crate::Account::from(subxt_signer::eth::dev::charleth()),
456				crate::Account::from(subxt_signer::eth::dev::dorothy()),
457				crate::Account::from(subxt_signer::eth::dev::ethan()),
458			]
459		} else {
460			vec![]
461		})
462		.with_allow_unprotected_txs(allow_unprotected_txs)
463		.with_use_pending_for_estimate_gas(is_dev)
464		.into_rpc();
465
466	let health_api = SystemHealthRpcServerImpl::new(client.clone()).into_rpc();
467	let debug_api = DebugRpcServerImpl::new(client.clone()).into_rpc();
468	let polkadot_api = PolkadotRpcServerImpl::new(client).into_rpc();
469
470	let mut module = RpcModule::new(());
471	module.merge(eth_api).map_err(|e| sc_service::Error::Application(e.into()))?;
472	module.merge(health_api).map_err(|e| sc_service::Error::Application(e.into()))?;
473	module.merge(debug_api).map_err(|e| sc_service::Error::Application(e.into()))?;
474	module
475		.merge(polkadot_api)
476		.map_err(|e| sc_service::Error::Application(e.into()))?;
477	Ok(module)
478}
479
480#[cfg(test)]
481mod tests {
482	use super::*;
483	use tempfile::TempDir;
484
485	#[test]
486	fn in_memory_returns_memory_options() {
487		let opts = resolve_db_options(EthPruningMode::KeepLatest(256), None).unwrap();
488		// In-memory options produce `:memory:` filename.
489		let filename = opts.get_filename();
490		assert_eq!(filename, std::path::Path::new(":memory:"));
491	}
492
493	#[test]
494	fn persistent_with_explicit_base_path() {
495		let tmp = TempDir::new().unwrap();
496		let base = BasePath::new(tmp.path());
497		let opts = resolve_db_options(EthPruningMode::Archive, Some(base)).unwrap();
498		assert_eq!(opts.get_filename(), tmp.path().join(DEFAULT_DATABASE_NAME));
499		assert!(tmp.path().exists());
500	}
501
502	#[test]
503	fn persistent_default_path() {
504		let opts = resolve_db_options(EthPruningMode::Archive, None).unwrap();
505		let filename = opts.get_filename().to_string_lossy().to_string();
506		assert!(filename.contains("eth-rpc"));
507		assert!(filename.contains(DEFAULT_DATABASE_NAME));
508	}
509
510	#[test]
511	fn persistent_creates_nested_directories() {
512		let tmp = TempDir::new().unwrap();
513		let nested = tmp.path().join("a").join("b");
514		let base = BasePath::new(&nested);
515		resolve_db_options(EthPruningMode::Archive, Some(base)).unwrap();
516		assert!(nested.exists());
517	}
518
519	#[test]
520	fn eth_pruning_mode() {
521		// CLI parsing
522		let cmd = CliCommand::try_parse_from(["eth-rpc", "--eth-pruning", "archive"]).unwrap();
523		assert_eq!(cmd.eth_pruning, EthPruningMode::Archive);
524
525		let cmd = CliCommand::try_parse_from(["eth-rpc", "--eth-pruning", "256"]).unwrap();
526		assert_eq!(cmd.eth_pruning, EthPruningMode::KeepLatest(256));
527
528		// Default is archive
529		let cmd = CliCommand::try_parse_from(["eth-rpc"]).unwrap();
530		assert_eq!(cmd.eth_pruning, EthPruningMode::Archive);
531	}
532}