referrerpolicy=no-referrer-when-downgrade

pallet_revive_eth_rpc/
cli.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// 	http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17//! The Ethereum JSON-RPC server.
18use crate::{
19	client::{connect, Client, SubscriptionType, SubstrateBlockNumber},
20	DebugRpcServer, DebugRpcServerImpl, EthRpcServer, EthRpcServerImpl, PolkadotRpcServer,
21	PolkadotRpcServerImpl, ReceiptExtractor, ReceiptProvider, SubxtBlockInfoProvider,
22	SystemHealthRpcServer, SystemHealthRpcServerImpl, LOG_TARGET,
23};
24use clap::Parser;
25use futures::{future::BoxFuture, pin_mut, FutureExt};
26use jsonrpsee::server::RpcModule;
27use sc_cli::{PrometheusParams, RpcParams, SharedParams, Signals};
28use sc_service::{
29	config::{PrometheusConfig, RpcConfiguration},
30	start_rpc_servers, TaskManager,
31};
32use sqlx::sqlite::SqlitePoolOptions;
33
34// Default port if --prometheus-port is not specified
35const DEFAULT_PROMETHEUS_PORT: u16 = 9616;
36
37// Default port if --rpc-port is not specified
38const DEFAULT_RPC_PORT: u16 = 8545;
39
40const IN_MEMORY_DB: &str = "sqlite::memory:";
41
42// Parsed command instructions from the command line
43#[derive(Parser, Debug)]
44#[clap(author, about, version)]
45pub struct CliCommand {
46	/// The node url to connect to
47	#[clap(long, default_value = "ws://127.0.0.1:9944")]
48	pub node_rpc_url: String,
49
50	/// The maximum number of blocks to cache in memory.
51	#[clap(long, default_value = "256")]
52	pub cache_size: usize,
53
54	/// Earliest block number to consider when searching for transaction receipts.
55	#[clap(long)]
56	pub earliest_receipt_block: Option<SubstrateBlockNumber>,
57
58	/// The database used to store Ethereum transaction hashes.
59	/// This is only useful if the node needs to act as an archive node and respond to Ethereum RPC
60	/// queries for transactions that are not in the in memory cache.
61	#[clap(long, env = "DATABASE_URL", default_value = IN_MEMORY_DB)]
62	pub database_url: String,
63
64	/// If provided, index the last n blocks
65	#[clap(long)]
66	pub index_last_n_blocks: Option<SubstrateBlockNumber>,
67
68	#[allow(missing_docs)]
69	#[clap(flatten)]
70	pub shared_params: SharedParams,
71
72	#[allow(missing_docs)]
73	#[clap(flatten)]
74	pub rpc_params: RpcParams,
75
76	#[allow(missing_docs)]
77	#[clap(flatten)]
78	pub prometheus_params: PrometheusParams,
79}
80
81/// Initialize the logger
82#[cfg(not(test))]
83fn init_logger(params: &SharedParams) -> anyhow::Result<()> {
84	let mut logger = sc_cli::LoggerBuilder::new(params.log_filters().join(","));
85	logger
86		.with_log_reloading(params.enable_log_reloading)
87		.with_detailed_output(params.detailed_log_output);
88
89	if let Some(tracing_targets) = &params.tracing_targets {
90		let tracing_receiver = params.tracing_receiver.into();
91		logger.with_profiling(tracing_receiver, tracing_targets);
92	}
93
94	if params.disable_log_color {
95		logger.with_colors(false);
96	}
97
98	logger.init()?;
99	Ok(())
100}
101
102fn build_client(
103	tokio_handle: &tokio::runtime::Handle,
104	cache_size: usize,
105	earliest_receipt_block: Option<SubstrateBlockNumber>,
106	node_rpc_url: &str,
107	database_url: &str,
108	abort_signal: Signals,
109) -> anyhow::Result<Client> {
110	let fut = async {
111		let (api, rpc_client, rpc) = connect(node_rpc_url).await?;
112		let block_provider = SubxtBlockInfoProvider::new( api.clone(), rpc.clone()).await?;
113
114		let (pool, keep_latest_n_blocks) = if database_url == IN_MEMORY_DB {
115			log::warn!( target: LOG_TARGET, "💾 Using in-memory database, keeping only {cache_size} blocks in memory");
116			// see sqlite in-memory issue: https://github.com/launchbadge/sqlx/issues/2510
117			let pool = SqlitePoolOptions::new()
118					.max_connections(1)
119					.idle_timeout(None)
120					.max_lifetime(None)
121					.connect(database_url).await?;
122
123			(pool, Some(cache_size))
124		} else {
125			(SqlitePoolOptions::new().connect(database_url).await?, None)
126		};
127
128		let receipt_extractor = ReceiptExtractor::new(
129			api.clone(),
130			earliest_receipt_block,
131		).await?;
132
133		let receipt_provider = ReceiptProvider::new(
134				pool,
135				block_provider.clone(),
136				receipt_extractor.clone(),
137				keep_latest_n_blocks,
138			)
139			.await?;
140
141		let client =
142			Client::new(api, rpc_client, rpc, block_provider, receipt_provider).await?;
143
144		Ok(client)
145	}
146	.fuse();
147	pin_mut!(fut);
148
149	match tokio_handle.block_on(abort_signal.try_until_signal(fut)) {
150		Ok(Ok(client)) => Ok(client),
151		Ok(Err(err)) => Err(err),
152		Err(_) => anyhow::bail!("Process interrupted"),
153	}
154}
155
156/// Start the JSON-RPC server using the given command line arguments.
157pub fn run(cmd: CliCommand) -> anyhow::Result<()> {
158	let CliCommand {
159		rpc_params,
160		prometheus_params,
161		node_rpc_url,
162		cache_size,
163		database_url,
164		earliest_receipt_block,
165		index_last_n_blocks,
166		shared_params,
167		..
168	} = cmd;
169
170	#[cfg(not(test))]
171	init_logger(&shared_params)?;
172	let is_dev = shared_params.dev;
173	let rpc_addrs: Option<Vec<sc_service::config::RpcEndpoint>> = rpc_params
174		.rpc_addr(is_dev, false, 8545)?
175		.map(|addrs| addrs.into_iter().map(Into::into).collect());
176
177	let rpc_config = RpcConfiguration {
178		addr: rpc_addrs,
179		methods: rpc_params.rpc_methods.into(),
180		max_connections: rpc_params.rpc_max_connections,
181		cors: rpc_params.rpc_cors(is_dev)?,
182		max_request_size: rpc_params.rpc_max_request_size,
183		max_response_size: rpc_params.rpc_max_response_size,
184		id_provider: None,
185		max_subs_per_conn: rpc_params.rpc_max_subscriptions_per_connection,
186		port: rpc_params.rpc_port.unwrap_or(DEFAULT_RPC_PORT),
187		message_buffer_capacity: rpc_params.rpc_message_buffer_capacity_per_connection,
188		batch_config: rpc_params.rpc_batch_config()?,
189		rate_limit: rpc_params.rpc_rate_limit,
190		rate_limit_whitelisted_ips: rpc_params.rpc_rate_limit_whitelisted_ips,
191		rate_limit_trust_proxy_headers: rpc_params.rpc_rate_limit_trust_proxy_headers,
192		request_logger_limit: if is_dev { 1024 * 1024 } else { 1024 },
193	};
194
195	let prometheus_config =
196		prometheus_params.prometheus_config(DEFAULT_PROMETHEUS_PORT, "eth-rpc".into());
197	let prometheus_registry = prometheus_config.as_ref().map(|config| &config.registry);
198
199	let tokio_runtime = sc_cli::build_runtime()?;
200	let tokio_handle = tokio_runtime.handle();
201	let mut task_manager = TaskManager::new(tokio_handle.clone(), prometheus_registry)?;
202
203	let client = build_client(
204		tokio_handle,
205		cache_size,
206		earliest_receipt_block,
207		&node_rpc_url,
208		&database_url,
209		tokio_runtime.block_on(async { Signals::capture() })?,
210	)?;
211
212	// Prometheus metrics.
213	if let Some(PrometheusConfig { port, registry }) = prometheus_config.clone() {
214		task_manager.spawn_handle().spawn(
215			"prometheus-endpoint",
216			None,
217			prometheus_endpoint::init_prometheus(port, registry).map(drop),
218		);
219	}
220
221	let rpc_server_handle = start_rpc_servers(
222		&rpc_config,
223		prometheus_registry,
224		tokio_handle,
225		|| rpc_module(is_dev, client.clone()),
226		None,
227	)?;
228
229	task_manager
230		.spawn_essential_handle()
231		.spawn("block-subscription", None, async move {
232			let mut futures: Vec<BoxFuture<'_, Result<(), _>>> = vec![
233				Box::pin(client.subscribe_and_cache_new_blocks(SubscriptionType::BestBlocks)),
234				Box::pin(client.subscribe_and_cache_new_blocks(SubscriptionType::FinalizedBlocks)),
235			];
236
237			if let Some(index_last_n_blocks) = index_last_n_blocks {
238				futures.push(Box::pin(client.subscribe_and_cache_blocks(index_last_n_blocks)));
239			}
240
241			if let Err(err) = futures::future::try_join_all(futures).await {
242				panic!("Block subscription task failed: {err:?}",)
243			}
244		});
245
246	task_manager.keep_alive(rpc_server_handle);
247	let signals = tokio_runtime.block_on(async { Signals::capture() })?;
248	tokio_runtime.block_on(signals.run_until_signal(task_manager.future().fuse()))?;
249	Ok(())
250}
251
252/// Create the JSON-RPC module.
253fn rpc_module(is_dev: bool, client: Client) -> Result<RpcModule<()>, sc_service::Error> {
254	let eth_api = EthRpcServerImpl::new(client.clone())
255		.with_accounts(if is_dev {
256			vec![
257				crate::Account::from(subxt_signer::eth::dev::alith()),
258				crate::Account::from(subxt_signer::eth::dev::baltathar()),
259				crate::Account::from(subxt_signer::eth::dev::charleth()),
260				crate::Account::from(subxt_signer::eth::dev::dorothy()),
261				crate::Account::from(subxt_signer::eth::dev::ethan()),
262			]
263		} else {
264			vec![]
265		})
266		.into_rpc();
267
268	let health_api = SystemHealthRpcServerImpl::new(client.clone()).into_rpc();
269	let debug_api = DebugRpcServerImpl::new(client.clone()).into_rpc();
270	let polkadot_api = PolkadotRpcServerImpl::new(client).into_rpc();
271
272	let mut module = RpcModule::new(());
273	module.merge(eth_api).map_err(|e| sc_service::Error::Application(e.into()))?;
274	module.merge(health_api).map_err(|e| sc_service::Error::Application(e.into()))?;
275	module.merge(debug_api).map_err(|e| sc_service::Error::Application(e.into()))?;
276	module
277		.merge(polkadot_api)
278		.map_err(|e| sc_service::Error::Application(e.into()))?;
279	Ok(module)
280}