1use crate::{
19 client::{connect, Client, SubscriptionType, SubstrateBlockNumber},
20 DebugRpcServer, DebugRpcServerImpl, EthRpcServer, EthRpcServerImpl, ReceiptExtractor,
21 ReceiptProvider, SubxtBlockInfoProvider, SystemHealthRpcServer, SystemHealthRpcServerImpl,
22 LOG_TARGET,
23};
24use clap::Parser;
25use futures::{pin_mut, FutureExt};
26use jsonrpsee::server::RpcModule;
27use sc_cli::{PrometheusParams, RpcParams, SharedParams, Signals};
28use sc_service::{
29 config::{PrometheusConfig, RpcConfiguration},
30 start_rpc_servers, TaskManager,
31};
32use sqlx::sqlite::SqlitePoolOptions;
33
34const DEFAULT_PROMETHEUS_PORT: u16 = 9616;
36
37const DEFAULT_RPC_PORT: u16 = 8545;
39
40const IN_MEMORY_DB: &str = "sqlite::memory:";
41
42#[derive(Parser, Debug)]
44#[clap(author, about, version)]
45pub struct CliCommand {
46 #[clap(long, default_value = "ws://127.0.0.1:9944")]
48 pub node_rpc_url: String,
49
50 #[clap(long, default_value = "256")]
52 pub cache_size: usize,
53
54 #[clap(long)]
56 pub earliest_receipt_block: Option<SubstrateBlockNumber>,
57
58 #[clap(long, env = "DATABASE_URL", default_value = IN_MEMORY_DB)]
62 pub database_url: String,
63
64 #[clap(long)]
66 pub index_last_n_blocks: Option<SubstrateBlockNumber>,
67
68 #[allow(missing_docs)]
69 #[clap(flatten)]
70 pub shared_params: SharedParams,
71
72 #[allow(missing_docs)]
73 #[clap(flatten)]
74 pub rpc_params: RpcParams,
75
76 #[allow(missing_docs)]
77 #[clap(flatten)]
78 pub prometheus_params: PrometheusParams,
79}
80
81#[cfg(not(test))]
83fn init_logger(params: &SharedParams) -> anyhow::Result<()> {
84 let mut logger = sc_cli::LoggerBuilder::new(params.log_filters().join(","));
85 logger
86 .with_log_reloading(params.enable_log_reloading)
87 .with_detailed_output(params.detailed_log_output);
88
89 if let Some(tracing_targets) = ¶ms.tracing_targets {
90 let tracing_receiver = params.tracing_receiver.into();
91 logger.with_profiling(tracing_receiver, tracing_targets);
92 }
93
94 if params.disable_log_color {
95 logger.with_colors(false);
96 }
97
98 logger.init()?;
99 Ok(())
100}
101
102fn build_client(
103 tokio_handle: &tokio::runtime::Handle,
104 cache_size: usize,
105 earliest_receipt_block: Option<SubstrateBlockNumber>,
106 node_rpc_url: &str,
107 database_url: &str,
108 abort_signal: Signals,
109) -> anyhow::Result<Client> {
110 let fut = async {
111 let (api, rpc_client, rpc) = connect(node_rpc_url).await?;
112 let block_provider = SubxtBlockInfoProvider::new( api.clone(), rpc.clone()).await?;
113
114 let (pool, keep_latest_n_blocks) = if database_url == IN_MEMORY_DB {
115 log::warn!( target: LOG_TARGET, "💾 Using in-memory database, keeping only {cache_size} blocks in memory");
116 let pool = SqlitePoolOptions::new()
118 .max_connections(1)
119 .idle_timeout(None)
120 .max_lifetime(None)
121 .connect(database_url).await?;
122
123 (pool, Some(cache_size))
124 } else {
125 (SqlitePoolOptions::new().connect(database_url).await?, None)
126 };
127
128 let receipt_extractor = ReceiptExtractor::new(
129 api.clone(),
130 earliest_receipt_block).await?;
131
132 let receipt_provider = ReceiptProvider::new(
133 pool,
134 block_provider.clone(),
135 receipt_extractor.clone(),
136 keep_latest_n_blocks,
137 )
138 .await?;
139
140 let client =
141 Client::new(api, rpc_client, rpc, block_provider, receipt_provider).await?;
142
143 Ok(client)
144 }
145 .fuse();
146 pin_mut!(fut);
147
148 match tokio_handle.block_on(abort_signal.try_until_signal(fut)) {
149 Ok(Ok(client)) => Ok(client),
150 Ok(Err(err)) => Err(err),
151 Err(_) => anyhow::bail!("Process interrupted"),
152 }
153}
154
155pub fn run(cmd: CliCommand) -> anyhow::Result<()> {
157 let CliCommand {
158 rpc_params,
159 prometheus_params,
160 node_rpc_url,
161 cache_size,
162 database_url,
163 earliest_receipt_block,
164 index_last_n_blocks,
165 shared_params,
166 ..
167 } = cmd;
168
169 #[cfg(not(test))]
170 init_logger(&shared_params)?;
171 let is_dev = shared_params.dev;
172 let rpc_addrs: Option<Vec<sc_service::config::RpcEndpoint>> = rpc_params
173 .rpc_addr(is_dev, false, 8545)?
174 .map(|addrs| addrs.into_iter().map(Into::into).collect());
175
176 let rpc_config = RpcConfiguration {
177 addr: rpc_addrs,
178 methods: rpc_params.rpc_methods.into(),
179 max_connections: rpc_params.rpc_max_connections,
180 cors: rpc_params.rpc_cors(is_dev)?,
181 max_request_size: rpc_params.rpc_max_request_size,
182 max_response_size: rpc_params.rpc_max_response_size,
183 id_provider: None,
184 max_subs_per_conn: rpc_params.rpc_max_subscriptions_per_connection,
185 port: rpc_params.rpc_port.unwrap_or(DEFAULT_RPC_PORT),
186 message_buffer_capacity: rpc_params.rpc_message_buffer_capacity_per_connection,
187 batch_config: rpc_params.rpc_batch_config()?,
188 rate_limit: rpc_params.rpc_rate_limit,
189 rate_limit_whitelisted_ips: rpc_params.rpc_rate_limit_whitelisted_ips,
190 rate_limit_trust_proxy_headers: rpc_params.rpc_rate_limit_trust_proxy_headers,
191 };
192
193 let prometheus_config =
194 prometheus_params.prometheus_config(DEFAULT_PROMETHEUS_PORT, "eth-rpc".into());
195 let prometheus_registry = prometheus_config.as_ref().map(|config| &config.registry);
196
197 let tokio_runtime = sc_cli::build_runtime()?;
198 let tokio_handle = tokio_runtime.handle();
199 let mut task_manager = TaskManager::new(tokio_handle.clone(), prometheus_registry)?;
200
201 let client = build_client(
202 tokio_handle,
203 cache_size,
204 earliest_receipt_block,
205 &node_rpc_url,
206 &database_url,
207 tokio_runtime.block_on(async { Signals::capture() })?,
208 )?;
209
210 if let Some(PrometheusConfig { port, registry }) = prometheus_config.clone() {
212 task_manager.spawn_handle().spawn(
213 "prometheus-endpoint",
214 None,
215 prometheus_endpoint::init_prometheus(port, registry).map(drop),
216 );
217 }
218
219 let rpc_server_handle = start_rpc_servers(
220 &rpc_config,
221 prometheus_registry,
222 tokio_handle,
223 || rpc_module(is_dev, client.clone()),
224 None,
225 )?;
226
227 task_manager
228 .spawn_essential_handle()
229 .spawn("block-subscription", None, async move {
230 let fut1 = client.subscribe_and_cache_new_blocks(SubscriptionType::BestBlocks);
231 let fut2 = client.subscribe_and_cache_new_blocks(SubscriptionType::FinalizedBlocks);
232
233 let res = if let Some(index_last_n_blocks) = index_last_n_blocks {
234 let fut3 = client.subscribe_and_cache_blocks(index_last_n_blocks);
235 tokio::try_join!(fut1, fut2, fut3).map(|_| ())
236 } else {
237 tokio::try_join!(fut1, fut2).map(|_| ())
238 };
239
240 if let Err(err) = res {
241 panic!("Block subscription task failed: {err:?}",)
242 }
243 });
244
245 task_manager.keep_alive(rpc_server_handle);
246 let signals = tokio_runtime.block_on(async { Signals::capture() })?;
247 tokio_runtime.block_on(signals.run_until_signal(task_manager.future().fuse()))?;
248 Ok(())
249}
250
251fn rpc_module(is_dev: bool, client: Client) -> Result<RpcModule<()>, sc_service::Error> {
253 let eth_api = EthRpcServerImpl::new(client.clone())
254 .with_accounts(if is_dev { vec![crate::Account::default()] } else { vec![] })
255 .into_rpc();
256
257 let health_api = SystemHealthRpcServerImpl::new(client.clone()).into_rpc();
258 let debug_api = DebugRpcServerImpl::new(client).into_rpc();
259
260 let mut module = RpcModule::new(());
261 module.merge(eth_api).map_err(|e| sc_service::Error::Application(e.into()))?;
262 module.merge(health_api).map_err(|e| sc_service::Error::Application(e.into()))?;
263 module.merge(debug_api).map_err(|e| sc_service::Error::Application(e.into()))?;
264 Ok(module)
265}