1use crate::{
19 DbContext, DebugRpcServer, DebugRpcServerImpl, EthRpcServer, EthRpcServerImpl, LOG_TARGET,
20 PolkadotRpcServer, PolkadotRpcServerImpl, ReceiptExtractor, ReceiptProvider,
21 SubxtBlockInfoProvider, SystemHealthRpcServer, SystemHealthRpcServerImpl,
22 client::{Client, ClientError, SubscriptionGapQueue, SubscriptionType, connect},
23};
24use clap::{CommandFactory, FromArgMatches, Parser};
25use futures::{FutureExt, future::BoxFuture, pin_mut};
26use jsonrpsee::server::RpcModule;
27use sc_cli::{PrometheusParams, RpcParams, SharedParams, Signals};
28use sc_service::{
29 TaskManager,
30 config::{BasePath, PrometheusConfig, RpcConfiguration},
31 create_rpc_runtime, start_rpc_servers,
32};
33use sqlx::{
34 SqlitePool,
35 sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions},
36};
37use std::path::PathBuf;
38
39async fn sqlite_db_query_max_variable_number(pool: &SqlitePool) -> usize {
41 let limit = async {
42 let mut conn = pool
43 .acquire()
44 .await
45 .inspect_err(|e| log::warn!(target: LOG_TARGET, "๐พ Failed to acquire connection: {e}"))
46 .ok()?;
47 let mut handle = conn
48 .lock_handle()
49 .await
50 .inspect_err(|e| log::warn!(target: LOG_TARGET, "๐พ Failed to lock handle: {e}"))
51 .ok()?;
52 let raw = unsafe {
55 libsqlite3_sys::sqlite3_limit(
56 handle.as_raw_handle().as_ptr(),
57 libsqlite3_sys::SQLITE_LIMIT_VARIABLE_NUMBER,
58 -1,
59 )
60 };
61 raw.try_into().ok()
62 }
63 .await;
64
65 let default = DbContext::DEFAULT_MAX_VARIABLE_NUMBER;
66 limit.inspect(|n| log::info!(target: LOG_TARGET, "๐พ SQLite db_query_max_variable_number: {n}"))
67 .unwrap_or_else(|| {
68 log::warn!(target: LOG_TARGET, "๐พ Failed to query SQLite variable limit, falling back to {default}");
69 default
70 })
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display)]
75pub enum EthPruningMode {
76 #[display(fmt = "archive")]
78 Archive,
79 #[display(fmt = "{_0}")]
81 KeepLatest(usize),
82}
83
84impl EthPruningMode {
85 pub fn is_archive(&self) -> bool {
87 matches!(self, Self::Archive)
88 }
89
90 pub fn keep_latest(&self) -> Option<usize> {
92 match self {
93 Self::KeepLatest(n) => Some(*n),
94 _ => None,
95 }
96 }
97}
98
99impl std::str::FromStr for EthPruningMode {
100 type Err = String;
101
102 fn from_str(input: &str) -> Result<Self, Self::Err> {
103 match input {
104 "archive" => Ok(Self::Archive),
105 n => {
106 n.parse::<usize>()
107 .ok()
108 .filter(|&v| v >= 1)
109 .map(Self::KeepLatest)
110 .ok_or_else(|| {
111 format!(
112 "Invalid pruning mode '{n}': expected 'archive' or a positive integer"
113 )
114 })
115 },
116 }
117 }
118}
119
120const DEFAULT_PROMETHEUS_PORT: u16 = 9616;
122
123const DEFAULT_RPC_PORT: u16 = 8545;
125
126const DEFAULT_DATABASE_NAME: &str = "eth-rpc.db";
127
128#[derive(Parser, Debug)]
130#[clap(author, about, version)]
131pub struct CliCommand {
132 #[clap(long, default_value = "ws://127.0.0.1:9944")]
134 pub node_rpc_url: String,
135
136 #[clap(long, default_value = "archive")]
141 pub eth_pruning: EthPruningMode,
142
143 #[allow(missing_docs)]
144 #[clap(flatten)]
145 pub shared_params: SharedParams,
146
147 #[allow(missing_docs)]
148 #[clap(flatten)]
149 pub rpc_params: RpcParams,
150
151 #[allow(missing_docs)]
152 #[clap(flatten)]
153 pub prometheus_params: PrometheusParams,
154
155 #[arg(long)]
159 pub allow_unprotected_txs: bool,
160}
161
162impl CliCommand {
163 pub fn parse_cli() -> anyhow::Result<Self> {
165 let removed_flags =
166 ["database-url", "cache-size", "index-last-n-blocks", "earliest-receipt-block"];
167
168 let cmd = removed_flags.iter().fold(Self::command(), |cmd, name| {
169 cmd.arg(
170 clap::Arg::new(*name)
171 .long(*name)
172 .num_args(0..=1)
173 .hide(true)
174 .action(clap::ArgAction::Set),
175 )
176 });
177 let matches = cmd.get_matches();
178
179 let used: Vec<_> = removed_flags
180 .iter()
181 .filter(|f| matches.contains_id(f))
182 .map(|f| format!("--{f}"))
183 .collect();
184 if !used.is_empty() {
185 anyhow::bail!(
186 "[{}] have been removed. \
187 Check polkadot-sdk PR #11153 for the CLI migration guide.",
188 used.join(", "),
189 );
190 }
191
192 Ok(Self::from_arg_matches(&matches).expect("already validated by clap"))
193 }
194}
195
196#[cfg(not(test))]
198fn init_logger(params: &SharedParams) -> anyhow::Result<()> {
199 let mut logger = sc_cli::LoggerBuilder::new(params.log_filters().join(","));
200 logger
201 .with_log_reloading(params.enable_log_reloading)
202 .with_detailed_output(params.detailed_log_output);
203
204 if let Some(tracing_targets) = ¶ms.tracing_targets {
205 let tracing_receiver = params.tracing_receiver.into();
206 logger.with_profiling(tracing_receiver, tracing_targets);
207 }
208
209 if params.disable_log_color {
210 logger.with_colors(false);
211 }
212
213 logger.init()?;
214 Ok(())
215}
216
217fn resolve_db_dir(base_path: Option<BasePath>) -> PathBuf {
225 match base_path {
226 Some(path) => path.path().to_path_buf(),
227 None => BasePath::from_project("", "", "eth-rpc").path().to_path_buf(),
228 }
229}
230
231fn resolve_db_options(
233 eth_pruning: EthPruningMode,
234 base_path: Option<BasePath>,
235) -> anyhow::Result<SqliteConnectOptions> {
236 if eth_pruning.is_archive() {
237 let db_dir = resolve_db_dir(base_path);
238 std::fs::create_dir_all(&db_dir).map_err(|e| {
239 anyhow::anyhow!("Failed to create database directory {}: {e}", db_dir.display())
240 })?;
241 let db_path = db_dir.join(DEFAULT_DATABASE_NAME);
242 log::info!(target: LOG_TARGET, "๐พ Database path: {}", db_path.display());
243 Ok(SqliteConnectOptions::new()
246 .filename(&db_path)
247 .create_if_missing(true)
248 .journal_mode(SqliteJournalMode::Wal))
249 } else {
250 Ok(SqliteConnectOptions::new().in_memory(true))
251 }
252}
253
254fn build_client(
255 tokio_handle: &tokio::runtime::Handle,
256 eth_pruning: EthPruningMode,
257 node_rpc_url: &str,
258 db_options: SqliteConnectOptions,
259 max_request_size: u32,
260 max_response_size: u32,
261 abort_signal: Signals,
262 subscription_gap_queue: SubscriptionGapQueue,
263) -> anyhow::Result<Client> {
264 let fut = async {
265 let (api, rpc_client, rpc) =
266 connect(node_rpc_url, max_request_size, max_response_size).await?;
267 let block_provider = SubxtBlockInfoProvider::new(api.clone(), rpc.clone()).await?;
268
269 let (pool, keep_latest_n_blocks) = match eth_pruning {
270 EthPruningMode::Archive => {
271 (SqlitePoolOptions::new().connect_with(db_options).await?, None)
272 },
273 EthPruningMode::KeepLatest(max_blocks) => {
274 log::info!(target: LOG_TARGET,
275 "๐พ Using in-memory database, keeping only {max_blocks} blocks");
276 let pool = SqlitePoolOptions::new()
278 .max_connections(1)
279 .idle_timeout(None)
280 .max_lifetime(None)
281 .connect_with(db_options)
282 .await?;
283 (pool, Some(max_blocks))
284 },
285 };
286
287 let receipt_extractor = ReceiptExtractor::new(api.clone()).await?;
288 let max_variable_number = sqlite_db_query_max_variable_number(&pool).await;
289 let db_ctx = DbContext::new(pool, max_variable_number);
290
291 let receipt_provider = ReceiptProvider::new(
292 db_ctx,
293 block_provider.clone(),
294 receipt_extractor.clone(),
295 keep_latest_n_blocks,
296 )
297 .await?;
298
299 let client = Client::new(
300 api,
301 rpc_client,
302 rpc,
303 block_provider,
304 receipt_provider,
305 eth_pruning.is_archive(),
306 subscription_gap_queue,
307 )
308 .await?;
309
310 Ok(client)
311 }
312 .fuse();
313 pin_mut!(fut);
314
315 match tokio_handle.block_on(abort_signal.try_until_signal(fut)) {
316 Ok(Ok(client)) => Ok(client),
317 Ok(Err(err)) => Err(err),
318 Err(_) => anyhow::bail!("Process interrupted"),
319 }
320}
321
322pub fn run(cmd: CliCommand) -> anyhow::Result<()> {
324 let CliCommand {
325 rpc_params,
326 prometheus_params,
327 node_rpc_url,
328 eth_pruning,
329 shared_params,
330 allow_unprotected_txs,
331 ..
332 } = cmd;
333
334 #[cfg(not(test))]
335 init_logger(&shared_params)?;
336 let is_dev = shared_params.dev;
337 let explicit_base_path = shared_params.base_path.is_some();
338 let base_path = shared_params.base_path()?;
339
340 if is_dev && eth_pruning.is_archive() && !explicit_base_path {
341 log::warn!(
342 target: LOG_TARGET,
343 "โ ๏ธ Running in --dev mode with --eth-pruning=archive but no --base-path. \
344 The database will be stored in a temporary directory and lost on exit. \
345 Use --base-path to persist the database."
346 );
347 }
348
349 let db_options = resolve_db_options(eth_pruning, base_path)?;
350
351 let rpc_addrs: Option<Vec<sc_service::config::RpcEndpoint>> = rpc_params
352 .rpc_addr(is_dev, false, DEFAULT_RPC_PORT)?
353 .map(|addrs| addrs.into_iter().map(Into::into).collect());
354
355 let rpc_config = RpcConfiguration {
356 addr: rpc_addrs,
357 methods: rpc_params.rpc_methods.into(),
358 max_connections: rpc_params.rpc_max_connections,
359 cors: rpc_params.rpc_cors(is_dev)?,
360 max_request_size: rpc_params.rpc_max_request_size,
361 max_response_size: rpc_params.rpc_max_response_size,
362 id_provider: None,
363 max_subs_per_conn: rpc_params.rpc_max_subscriptions_per_connection,
364 port: rpc_params.rpc_port.unwrap_or(DEFAULT_RPC_PORT),
365 message_buffer_capacity: rpc_params.rpc_message_buffer_capacity_per_connection,
366 batch_config: rpc_params.rpc_batch_config()?,
367 rate_limit: rpc_params.rpc_rate_limit,
368 rate_limit_whitelisted_ips: rpc_params.rpc_rate_limit_whitelisted_ips,
369 rate_limit_trust_proxy_headers: rpc_params.rpc_rate_limit_trust_proxy_headers,
370 request_logger_limit: if is_dev { 1024 * 1024 } else { 1024 },
371 };
372
373 let prometheus_config =
374 prometheus_params.prometheus_config(DEFAULT_PROMETHEUS_PORT, "eth-rpc".into());
375 let prometheus_registry = prometheus_config.as_ref().map(|config| &config.registry);
376
377 let tokio_runtime = sc_cli::build_runtime()?;
378 let tokio_handle = tokio_runtime.handle();
379 let mut task_manager = TaskManager::new(tokio_handle.clone(), prometheus_registry)?;
380
381 let (subscription_gap_queue, gap_fill_rx) = SubscriptionGapQueue::new();
382 let client = build_client(
383 tokio_handle,
384 eth_pruning,
385 &node_rpc_url,
386 db_options,
387 rpc_config.max_request_size * 1024 * 1024,
388 rpc_config.max_response_size * 1024 * 1024,
389 tokio_runtime.block_on(async { Signals::capture() })?,
390 subscription_gap_queue,
391 )?;
392
393 if let Some(PrometheusConfig { port, registry }) = prometheus_config.clone() {
395 task_manager.spawn_handle().spawn(
396 "prometheus-endpoint",
397 None,
398 prometheus_endpoint::init_prometheus(port, registry).map(drop),
399 );
400 }
401
402 let rpc_runtime = create_rpc_runtime(rpc_config.max_connections)
403 .map_err(|e| anyhow::anyhow!("Failed to create RPC runtime: {}", e))?;
404
405 let rpc_api = rpc_module(is_dev, client.clone(), allow_unprotected_txs)?;
406 let rpc_server_handle = start_rpc_servers(
407 &rpc_config,
408 prometheus_registry,
409 tokio_handle,
410 rpc_api,
411 rpc_runtime,
412 None,
413 )?;
414
415 task_manager
416 .spawn_essential_handle()
417 .spawn("block-subscription", None, async move {
418 let mut futures: Vec<BoxFuture<'_, Result<(), _>>> = vec![
419 Box::pin(client.subscribe_and_cache_new_blocks(SubscriptionType::BestBlocks)),
420 Box::pin(client.subscribe_and_cache_new_blocks(SubscriptionType::FinalizedBlocks)),
421 ];
422
423 if eth_pruning.is_archive() {
424 futures.push(Box::pin(client.sync_backward()));
425 }
426
427 futures.push(Box::pin(async {
429 client.run_subscription_gap_filler(gap_fill_rx).await;
430 Ok::<_, ClientError>(())
431 }));
432
433 if let Err(err) = futures::future::try_join_all(futures).await {
434 panic!("Block subscription task failed: {err:?}",)
435 }
436 });
437
438 task_manager.keep_alive(rpc_server_handle);
439 let signals = tokio_runtime.block_on(async { Signals::capture() })?;
440 tokio_runtime.block_on(signals.run_until_signal(task_manager.future().fuse()))?;
441 Ok(())
442}
443
444fn rpc_module(
446 is_dev: bool,
447 client: Client,
448 allow_unprotected_txs: bool,
449) -> Result<RpcModule<()>, sc_service::Error> {
450 let eth_api = EthRpcServerImpl::new(client.clone())
451 .with_accounts(if is_dev {
452 vec![
453 crate::Account::from(subxt_signer::eth::dev::alith()),
454 crate::Account::from(subxt_signer::eth::dev::baltathar()),
455 crate::Account::from(subxt_signer::eth::dev::charleth()),
456 crate::Account::from(subxt_signer::eth::dev::dorothy()),
457 crate::Account::from(subxt_signer::eth::dev::ethan()),
458 ]
459 } else {
460 vec![]
461 })
462 .with_allow_unprotected_txs(allow_unprotected_txs)
463 .with_use_pending_for_estimate_gas(is_dev)
464 .into_rpc();
465
466 let health_api = SystemHealthRpcServerImpl::new(client.clone()).into_rpc();
467 let debug_api = DebugRpcServerImpl::new(client.clone()).into_rpc();
468 let polkadot_api = PolkadotRpcServerImpl::new(client).into_rpc();
469
470 let mut module = RpcModule::new(());
471 module.merge(eth_api).map_err(|e| sc_service::Error::Application(e.into()))?;
472 module.merge(health_api).map_err(|e| sc_service::Error::Application(e.into()))?;
473 module.merge(debug_api).map_err(|e| sc_service::Error::Application(e.into()))?;
474 module
475 .merge(polkadot_api)
476 .map_err(|e| sc_service::Error::Application(e.into()))?;
477 Ok(module)
478}
479
480#[cfg(test)]
481mod tests {
482 use super::*;
483 use tempfile::TempDir;
484
485 #[test]
486 fn in_memory_returns_memory_options() {
487 let opts = resolve_db_options(EthPruningMode::KeepLatest(256), None).unwrap();
488 let filename = opts.get_filename();
490 assert_eq!(filename, std::path::Path::new(":memory:"));
491 }
492
493 #[test]
494 fn persistent_with_explicit_base_path() {
495 let tmp = TempDir::new().unwrap();
496 let base = BasePath::new(tmp.path());
497 let opts = resolve_db_options(EthPruningMode::Archive, Some(base)).unwrap();
498 assert_eq!(opts.get_filename(), tmp.path().join(DEFAULT_DATABASE_NAME));
499 assert!(tmp.path().exists());
500 }
501
502 #[test]
503 fn persistent_default_path() {
504 let opts = resolve_db_options(EthPruningMode::Archive, None).unwrap();
505 let filename = opts.get_filename().to_string_lossy().to_string();
506 assert!(filename.contains("eth-rpc"));
507 assert!(filename.contains(DEFAULT_DATABASE_NAME));
508 }
509
510 #[test]
511 fn persistent_creates_nested_directories() {
512 let tmp = TempDir::new().unwrap();
513 let nested = tmp.path().join("a").join("b");
514 let base = BasePath::new(&nested);
515 resolve_db_options(EthPruningMode::Archive, Some(base)).unwrap();
516 assert!(nested.exists());
517 }
518
519 #[test]
520 fn eth_pruning_mode() {
521 let cmd = CliCommand::try_parse_from(["eth-rpc", "--eth-pruning", "archive"]).unwrap();
523 assert_eq!(cmd.eth_pruning, EthPruningMode::Archive);
524
525 let cmd = CliCommand::try_parse_from(["eth-rpc", "--eth-pruning", "256"]).unwrap();
526 assert_eq!(cmd.eth_pruning, EthPruningMode::KeepLatest(256));
527
528 let cmd = CliCommand::try_parse_from(["eth-rpc"]).unwrap();
530 assert_eq!(cmd.eth_pruning, EthPruningMode::Archive);
531 }
532}