referrerpolicy=no-referrer-when-downgrade

pallet_revive_eth_rpc/
cli.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// 	http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17//! The Ethereum JSON-RPC server.
18use crate::{
19	client::{connect, Client, SubscriptionType, SubstrateBlockNumber},
20	DebugRpcServer, DebugRpcServerImpl, EthRpcServer, EthRpcServerImpl, ReceiptExtractor,
21	ReceiptProvider, SubxtBlockInfoProvider, SystemHealthRpcServer, SystemHealthRpcServerImpl,
22	LOG_TARGET,
23};
24use clap::Parser;
25use futures::{pin_mut, FutureExt};
26use jsonrpsee::server::RpcModule;
27use sc_cli::{PrometheusParams, RpcParams, SharedParams, Signals};
28use sc_service::{
29	config::{PrometheusConfig, RpcConfiguration},
30	start_rpc_servers, TaskManager,
31};
32use sqlx::sqlite::SqlitePoolOptions;
33
34// Default port if --prometheus-port is not specified
35const DEFAULT_PROMETHEUS_PORT: u16 = 9616;
36
37// Default port if --rpc-port is not specified
38const DEFAULT_RPC_PORT: u16 = 8545;
39
40const IN_MEMORY_DB: &str = "sqlite::memory:";
41
42// Parsed command instructions from the command line
43#[derive(Parser, Debug)]
44#[clap(author, about, version)]
45pub struct CliCommand {
46	/// The node url to connect to
47	#[clap(long, default_value = "ws://127.0.0.1:9944")]
48	pub node_rpc_url: String,
49
50	/// The maximum number of blocks to cache in memory.
51	#[clap(long, default_value = "256")]
52	pub cache_size: usize,
53
54	/// Earliest block number to consider when searching for transaction receipts.
55	#[clap(long)]
56	pub earliest_receipt_block: Option<SubstrateBlockNumber>,
57
58	/// The database used to store Ethereum transaction hashes.
59	/// This is only useful if the node needs to act as an archive node and respond to Ethereum RPC
60	/// queries for transactions that are not in the in memory cache.
61	#[clap(long, env = "DATABASE_URL", default_value = IN_MEMORY_DB)]
62	pub database_url: String,
63
64	/// If provided, index the last n blocks
65	#[clap(long)]
66	pub index_last_n_blocks: Option<SubstrateBlockNumber>,
67
68	#[allow(missing_docs)]
69	#[clap(flatten)]
70	pub shared_params: SharedParams,
71
72	#[allow(missing_docs)]
73	#[clap(flatten)]
74	pub rpc_params: RpcParams,
75
76	#[allow(missing_docs)]
77	#[clap(flatten)]
78	pub prometheus_params: PrometheusParams,
79}
80
81/// Initialize the logger
82#[cfg(not(test))]
83fn init_logger(params: &SharedParams) -> anyhow::Result<()> {
84	let mut logger = sc_cli::LoggerBuilder::new(params.log_filters().join(","));
85	logger
86		.with_log_reloading(params.enable_log_reloading)
87		.with_detailed_output(params.detailed_log_output);
88
89	if let Some(tracing_targets) = &params.tracing_targets {
90		let tracing_receiver = params.tracing_receiver.into();
91		logger.with_profiling(tracing_receiver, tracing_targets);
92	}
93
94	if params.disable_log_color {
95		logger.with_colors(false);
96	}
97
98	logger.init()?;
99	Ok(())
100}
101
102fn build_client(
103	tokio_handle: &tokio::runtime::Handle,
104	cache_size: usize,
105	earliest_receipt_block: Option<SubstrateBlockNumber>,
106	node_rpc_url: &str,
107	database_url: &str,
108	abort_signal: Signals,
109) -> anyhow::Result<Client> {
110	let fut = async {
111		let (api, rpc_client, rpc) = connect(node_rpc_url).await?;
112		let block_provider = SubxtBlockInfoProvider::new( api.clone(), rpc.clone()).await?;
113
114		let (pool, keep_latest_n_blocks) = if database_url == IN_MEMORY_DB {
115			log::warn!( target: LOG_TARGET, "💾 Using in-memory database, keeping only {cache_size} blocks in memory");
116			// see sqlite in-memory issue: https://github.com/launchbadge/sqlx/issues/2510
117			let pool = SqlitePoolOptions::new()
118					.max_connections(1)
119					.idle_timeout(None)
120					.max_lifetime(None)
121					.connect(database_url).await?;
122
123			(pool, Some(cache_size))
124		} else {
125			(SqlitePoolOptions::new().connect(database_url).await?, None)
126		};
127
128		let receipt_extractor = ReceiptExtractor::new(
129			api.clone(),
130			earliest_receipt_block).await?;
131
132		let receipt_provider = ReceiptProvider::new(
133				pool,
134				block_provider.clone(),
135				receipt_extractor.clone(),
136				keep_latest_n_blocks,
137			)
138			.await?;
139
140		let client =
141			Client::new(api, rpc_client, rpc, block_provider, receipt_provider).await?;
142
143		Ok(client)
144	}
145	.fuse();
146	pin_mut!(fut);
147
148	match tokio_handle.block_on(abort_signal.try_until_signal(fut)) {
149		Ok(Ok(client)) => Ok(client),
150		Ok(Err(err)) => Err(err),
151		Err(_) => anyhow::bail!("Process interrupted"),
152	}
153}
154
155/// Start the JSON-RPC server using the given command line arguments.
156pub fn run(cmd: CliCommand) -> anyhow::Result<()> {
157	let CliCommand {
158		rpc_params,
159		prometheus_params,
160		node_rpc_url,
161		cache_size,
162		database_url,
163		earliest_receipt_block,
164		index_last_n_blocks,
165		shared_params,
166		..
167	} = cmd;
168
169	#[cfg(not(test))]
170	init_logger(&shared_params)?;
171	let is_dev = shared_params.dev;
172	let rpc_addrs: Option<Vec<sc_service::config::RpcEndpoint>> = rpc_params
173		.rpc_addr(is_dev, false, 8545)?
174		.map(|addrs| addrs.into_iter().map(Into::into).collect());
175
176	let rpc_config = RpcConfiguration {
177		addr: rpc_addrs,
178		methods: rpc_params.rpc_methods.into(),
179		max_connections: rpc_params.rpc_max_connections,
180		cors: rpc_params.rpc_cors(is_dev)?,
181		max_request_size: rpc_params.rpc_max_request_size,
182		max_response_size: rpc_params.rpc_max_response_size,
183		id_provider: None,
184		max_subs_per_conn: rpc_params.rpc_max_subscriptions_per_connection,
185		port: rpc_params.rpc_port.unwrap_or(DEFAULT_RPC_PORT),
186		message_buffer_capacity: rpc_params.rpc_message_buffer_capacity_per_connection,
187		batch_config: rpc_params.rpc_batch_config()?,
188		rate_limit: rpc_params.rpc_rate_limit,
189		rate_limit_whitelisted_ips: rpc_params.rpc_rate_limit_whitelisted_ips,
190		rate_limit_trust_proxy_headers: rpc_params.rpc_rate_limit_trust_proxy_headers,
191	};
192
193	let prometheus_config =
194		prometheus_params.prometheus_config(DEFAULT_PROMETHEUS_PORT, "eth-rpc".into());
195	let prometheus_registry = prometheus_config.as_ref().map(|config| &config.registry);
196
197	let tokio_runtime = sc_cli::build_runtime()?;
198	let tokio_handle = tokio_runtime.handle();
199	let mut task_manager = TaskManager::new(tokio_handle.clone(), prometheus_registry)?;
200
201	let client = build_client(
202		tokio_handle,
203		cache_size,
204		earliest_receipt_block,
205		&node_rpc_url,
206		&database_url,
207		tokio_runtime.block_on(async { Signals::capture() })?,
208	)?;
209
210	// Prometheus metrics.
211	if let Some(PrometheusConfig { port, registry }) = prometheus_config.clone() {
212		task_manager.spawn_handle().spawn(
213			"prometheus-endpoint",
214			None,
215			prometheus_endpoint::init_prometheus(port, registry).map(drop),
216		);
217	}
218
219	let rpc_server_handle = start_rpc_servers(
220		&rpc_config,
221		prometheus_registry,
222		tokio_handle,
223		|| rpc_module(is_dev, client.clone()),
224		None,
225	)?;
226
227	task_manager
228		.spawn_essential_handle()
229		.spawn("block-subscription", None, async move {
230			let fut1 = client.subscribe_and_cache_new_blocks(SubscriptionType::BestBlocks);
231			let fut2 = client.subscribe_and_cache_new_blocks(SubscriptionType::FinalizedBlocks);
232
233			let res = if let Some(index_last_n_blocks) = index_last_n_blocks {
234				let fut3 = client.subscribe_and_cache_blocks(index_last_n_blocks);
235				tokio::try_join!(fut1, fut2, fut3).map(|_| ())
236			} else {
237				tokio::try_join!(fut1, fut2).map(|_| ())
238			};
239
240			if let Err(err) = res {
241				panic!("Block subscription task failed: {err:?}",)
242			}
243		});
244
245	task_manager.keep_alive(rpc_server_handle);
246	let signals = tokio_runtime.block_on(async { Signals::capture() })?;
247	tokio_runtime.block_on(signals.run_until_signal(task_manager.future().fuse()))?;
248	Ok(())
249}
250
251/// Create the JSON-RPC module.
252fn rpc_module(is_dev: bool, client: Client) -> Result<RpcModule<()>, sc_service::Error> {
253	let eth_api = EthRpcServerImpl::new(client.clone())
254		.with_accounts(if is_dev { vec![crate::Account::default()] } else { vec![] })
255		.into_rpc();
256
257	let health_api = SystemHealthRpcServerImpl::new(client.clone()).into_rpc();
258	let debug_api = DebugRpcServerImpl::new(client).into_rpc();
259
260	let mut module = RpcModule::new(());
261	module.merge(eth_api).map_err(|e| sc_service::Error::Application(e.into()))?;
262	module.merge(health_api).map_err(|e| sc_service::Error::Application(e.into()))?;
263	module.merge(debug_api).map_err(|e| sc_service::Error::Application(e.into()))?;
264	Ok(module)
265}