1use crate::{
19 client::{connect, Client, SubscriptionType, SubstrateBlockNumber},
20 DebugRpcServer, DebugRpcServerImpl, EthRpcServer, EthRpcServerImpl, PolkadotRpcServer,
21 PolkadotRpcServerImpl, ReceiptExtractor, ReceiptProvider, SubxtBlockInfoProvider,
22 SystemHealthRpcServer, SystemHealthRpcServerImpl, LOG_TARGET,
23};
24use clap::Parser;
25use futures::{future::BoxFuture, pin_mut, FutureExt};
26use jsonrpsee::server::RpcModule;
27use sc_cli::{PrometheusParams, RpcParams, SharedParams, Signals};
28use sc_service::{
29 config::{PrometheusConfig, RpcConfiguration},
30 start_rpc_servers, TaskManager,
31};
32use sqlx::sqlite::SqlitePoolOptions;
33
34const DEFAULT_PROMETHEUS_PORT: u16 = 9616;
36
37const DEFAULT_RPC_PORT: u16 = 8545;
39
40const IN_MEMORY_DB: &str = "sqlite::memory:";
41
42#[derive(Parser, Debug)]
44#[clap(author, about, version)]
45pub struct CliCommand {
46 #[clap(long, default_value = "ws://127.0.0.1:9944")]
48 pub node_rpc_url: String,
49
50 #[clap(long, default_value = "256")]
52 pub cache_size: usize,
53
54 #[clap(long)]
56 pub earliest_receipt_block: Option<SubstrateBlockNumber>,
57
58 #[clap(long, env = "DATABASE_URL", default_value = IN_MEMORY_DB)]
62 pub database_url: String,
63
64 #[clap(long)]
66 pub index_last_n_blocks: Option<SubstrateBlockNumber>,
67
68 #[allow(missing_docs)]
69 #[clap(flatten)]
70 pub shared_params: SharedParams,
71
72 #[allow(missing_docs)]
73 #[clap(flatten)]
74 pub rpc_params: RpcParams,
75
76 #[allow(missing_docs)]
77 #[clap(flatten)]
78 pub prometheus_params: PrometheusParams,
79}
80
81#[cfg(not(test))]
83fn init_logger(params: &SharedParams) -> anyhow::Result<()> {
84 let mut logger = sc_cli::LoggerBuilder::new(params.log_filters().join(","));
85 logger
86 .with_log_reloading(params.enable_log_reloading)
87 .with_detailed_output(params.detailed_log_output);
88
89 if let Some(tracing_targets) = ¶ms.tracing_targets {
90 let tracing_receiver = params.tracing_receiver.into();
91 logger.with_profiling(tracing_receiver, tracing_targets);
92 }
93
94 if params.disable_log_color {
95 logger.with_colors(false);
96 }
97
98 logger.init()?;
99 Ok(())
100}
101
102fn build_client(
103 tokio_handle: &tokio::runtime::Handle,
104 cache_size: usize,
105 earliest_receipt_block: Option<SubstrateBlockNumber>,
106 node_rpc_url: &str,
107 database_url: &str,
108 abort_signal: Signals,
109) -> anyhow::Result<Client> {
110 let fut = async {
111 let (api, rpc_client, rpc) = connect(node_rpc_url).await?;
112 let block_provider = SubxtBlockInfoProvider::new( api.clone(), rpc.clone()).await?;
113
114 let (pool, keep_latest_n_blocks) = if database_url == IN_MEMORY_DB {
115 log::warn!( target: LOG_TARGET, "💾 Using in-memory database, keeping only {cache_size} blocks in memory");
116 let pool = SqlitePoolOptions::new()
118 .max_connections(1)
119 .idle_timeout(None)
120 .max_lifetime(None)
121 .connect(database_url).await?;
122
123 (pool, Some(cache_size))
124 } else {
125 (SqlitePoolOptions::new().connect(database_url).await?, None)
126 };
127
128 let receipt_extractor = ReceiptExtractor::new(
129 api.clone(),
130 earliest_receipt_block,
131 ).await?;
132
133 let receipt_provider = ReceiptProvider::new(
134 pool,
135 block_provider.clone(),
136 receipt_extractor.clone(),
137 keep_latest_n_blocks,
138 )
139 .await?;
140
141 let client =
142 Client::new(api, rpc_client, rpc, block_provider, receipt_provider).await?;
143
144 Ok(client)
145 }
146 .fuse();
147 pin_mut!(fut);
148
149 match tokio_handle.block_on(abort_signal.try_until_signal(fut)) {
150 Ok(Ok(client)) => Ok(client),
151 Ok(Err(err)) => Err(err),
152 Err(_) => anyhow::bail!("Process interrupted"),
153 }
154}
155
156pub fn run(cmd: CliCommand) -> anyhow::Result<()> {
158 let CliCommand {
159 rpc_params,
160 prometheus_params,
161 node_rpc_url,
162 cache_size,
163 database_url,
164 earliest_receipt_block,
165 index_last_n_blocks,
166 shared_params,
167 ..
168 } = cmd;
169
170 #[cfg(not(test))]
171 init_logger(&shared_params)?;
172 let is_dev = shared_params.dev;
173 let rpc_addrs: Option<Vec<sc_service::config::RpcEndpoint>> = rpc_params
174 .rpc_addr(is_dev, false, 8545)?
175 .map(|addrs| addrs.into_iter().map(Into::into).collect());
176
177 let rpc_config = RpcConfiguration {
178 addr: rpc_addrs,
179 methods: rpc_params.rpc_methods.into(),
180 max_connections: rpc_params.rpc_max_connections,
181 cors: rpc_params.rpc_cors(is_dev)?,
182 max_request_size: rpc_params.rpc_max_request_size,
183 max_response_size: rpc_params.rpc_max_response_size,
184 id_provider: None,
185 max_subs_per_conn: rpc_params.rpc_max_subscriptions_per_connection,
186 port: rpc_params.rpc_port.unwrap_or(DEFAULT_RPC_PORT),
187 message_buffer_capacity: rpc_params.rpc_message_buffer_capacity_per_connection,
188 batch_config: rpc_params.rpc_batch_config()?,
189 rate_limit: rpc_params.rpc_rate_limit,
190 rate_limit_whitelisted_ips: rpc_params.rpc_rate_limit_whitelisted_ips,
191 rate_limit_trust_proxy_headers: rpc_params.rpc_rate_limit_trust_proxy_headers,
192 request_logger_limit: if is_dev { 1024 * 1024 } else { 1024 },
193 };
194
195 let prometheus_config =
196 prometheus_params.prometheus_config(DEFAULT_PROMETHEUS_PORT, "eth-rpc".into());
197 let prometheus_registry = prometheus_config.as_ref().map(|config| &config.registry);
198
199 let tokio_runtime = sc_cli::build_runtime()?;
200 let tokio_handle = tokio_runtime.handle();
201 let mut task_manager = TaskManager::new(tokio_handle.clone(), prometheus_registry)?;
202
203 let client = build_client(
204 tokio_handle,
205 cache_size,
206 earliest_receipt_block,
207 &node_rpc_url,
208 &database_url,
209 tokio_runtime.block_on(async { Signals::capture() })?,
210 )?;
211
212 if let Some(PrometheusConfig { port, registry }) = prometheus_config.clone() {
214 task_manager.spawn_handle().spawn(
215 "prometheus-endpoint",
216 None,
217 prometheus_endpoint::init_prometheus(port, registry).map(drop),
218 );
219 }
220
221 let rpc_server_handle = start_rpc_servers(
222 &rpc_config,
223 prometheus_registry,
224 tokio_handle,
225 || rpc_module(is_dev, client.clone()),
226 None,
227 )?;
228
229 task_manager
230 .spawn_essential_handle()
231 .spawn("block-subscription", None, async move {
232 let mut futures: Vec<BoxFuture<'_, Result<(), _>>> = vec![
233 Box::pin(client.subscribe_and_cache_new_blocks(SubscriptionType::BestBlocks)),
234 Box::pin(client.subscribe_and_cache_new_blocks(SubscriptionType::FinalizedBlocks)),
235 ];
236
237 if let Some(index_last_n_blocks) = index_last_n_blocks {
238 futures.push(Box::pin(client.subscribe_and_cache_blocks(index_last_n_blocks)));
239 }
240
241 if let Err(err) = futures::future::try_join_all(futures).await {
242 panic!("Block subscription task failed: {err:?}",)
243 }
244 });
245
246 task_manager.keep_alive(rpc_server_handle);
247 let signals = tokio_runtime.block_on(async { Signals::capture() })?;
248 tokio_runtime.block_on(signals.run_until_signal(task_manager.future().fuse()))?;
249 Ok(())
250}
251
252fn rpc_module(is_dev: bool, client: Client) -> Result<RpcModule<()>, sc_service::Error> {
254 let eth_api = EthRpcServerImpl::new(client.clone())
255 .with_accounts(if is_dev {
256 vec![
257 crate::Account::from(subxt_signer::eth::dev::alith()),
258 crate::Account::from(subxt_signer::eth::dev::baltathar()),
259 crate::Account::from(subxt_signer::eth::dev::charleth()),
260 crate::Account::from(subxt_signer::eth::dev::dorothy()),
261 crate::Account::from(subxt_signer::eth::dev::ethan()),
262 ]
263 } else {
264 vec![]
265 })
266 .into_rpc();
267
268 let health_api = SystemHealthRpcServerImpl::new(client.clone()).into_rpc();
269 let debug_api = DebugRpcServerImpl::new(client.clone()).into_rpc();
270 let polkadot_api = PolkadotRpcServerImpl::new(client).into_rpc();
271
272 let mut module = RpcModule::new(());
273 module.merge(eth_api).map_err(|e| sc_service::Error::Application(e.into()))?;
274 module.merge(health_api).map_err(|e| sc_service::Error::Application(e.into()))?;
275 module.merge(debug_api).map_err(|e| sc_service::Error::Application(e.into()))?;
276 module
277 .merge(polkadot_api)
278 .map_err(|e| sc_service::Error::Application(e.into()))?;
279 Ok(module)
280}