1use crate::{error::Error as CliError, Result, Signals, SubstrateCli};
20use chrono::prelude::*;
21use futures::{future::FutureExt, Future};
22use log::info;
23use sc_service::{Configuration, Error as ServiceError, TaskManager};
24use sc_utils::metrics::{TOKIO_THREADS_ALIVE, TOKIO_THREADS_TOTAL};
25use std::{marker::PhantomData, time::Duration};
26
27pub fn build_runtime() -> std::result::Result<tokio::runtime::Runtime, std::io::Error> {
29 tokio::runtime::Builder::new_multi_thread()
30 .on_thread_start(|| {
31 TOKIO_THREADS_ALIVE.inc();
32 TOKIO_THREADS_TOTAL.inc();
33 })
34 .on_thread_stop(|| {
35 TOKIO_THREADS_ALIVE.dec();
36 })
37 .enable_all()
38 .build()
39}
40
41pub struct Runner<C: SubstrateCli> {
43 config: Configuration,
44 tokio_runtime: tokio::runtime::Runtime,
45 signals: Signals,
46 phantom: PhantomData<C>,
47}
48
49impl<C: SubstrateCli> Runner<C> {
50 pub fn new(
52 config: Configuration,
53 tokio_runtime: tokio::runtime::Runtime,
54 signals: Signals,
55 ) -> Result<Runner<C>> {
56 Ok(Runner { config, tokio_runtime, signals, phantom: PhantomData })
57 }
58
59 fn print_node_infos(&self) {
74 print_node_infos::<C>(self.config())
75 }
76
77 pub fn run_node_until_exit<F, E>(
80 self,
81 initialize: impl FnOnce(Configuration) -> F,
82 ) -> std::result::Result<(), E>
83 where
84 F: Future<Output = std::result::Result<TaskManager, E>>,
85 E: std::error::Error + Send + Sync + 'static + From<ServiceError>,
86 {
87 self.print_node_infos();
88
89 let mut task_manager = self.tokio_runtime.block_on(initialize(self.config))?;
90
91 let res = self
92 .tokio_runtime
93 .block_on(self.signals.run_until_signal(task_manager.future().fuse()));
94 let task_registry = task_manager.into_task_registry();
99
100 let shutdown_timeout = Duration::from_secs(60);
102 self.tokio_runtime.shutdown_timeout(shutdown_timeout);
103
104 let running_tasks = task_registry.running_tasks();
105
106 if !running_tasks.is_empty() {
107 log::error!("Detected running(potentially stalled) tasks on shutdown:");
108 running_tasks.iter().for_each(|(task, count)| {
109 let instances_desc =
110 if *count > 1 { format!("with {} instances ", count) } else { "".to_string() };
111
112 if task.is_default_group() {
113 log::error!(
114 "Task \"{}\" was still running {}after waiting {} seconds to finish.",
115 task.name,
116 instances_desc,
117 shutdown_timeout.as_secs(),
118 );
119 } else {
120 log::error!(
121 "Task \"{}\" (Group: {}) was still running {}after waiting {} seconds to finish.",
122 task.name,
123 task.group,
124 instances_desc,
125 shutdown_timeout.as_secs(),
126 );
127 }
128 });
129 }
130
131 res.map_err(Into::into)
132 }
133
134 pub fn sync_run<E>(
136 self,
137 runner: impl FnOnce(Configuration) -> std::result::Result<(), E>,
138 ) -> std::result::Result<(), E>
139 where
140 E: std::error::Error + Send + Sync + 'static + From<ServiceError>,
141 {
142 runner(self.config)
143 }
144
145 pub fn async_run<F, E>(
148 self,
149 runner: impl FnOnce(Configuration) -> std::result::Result<(F, TaskManager), E>,
150 ) -> std::result::Result<(), E>
151 where
152 F: Future<Output = std::result::Result<(), E>>,
153 E: std::error::Error + Send + Sync + 'static + From<ServiceError> + From<CliError>,
154 {
155 let (future, task_manager) = runner(self.config)?;
156 self.tokio_runtime.block_on(self.signals.run_until_signal(future.fuse()))?;
157 drop(task_manager);
160 Ok(())
161 }
162
163 pub fn config(&self) -> &Configuration {
165 &self.config
166 }
167
168 pub fn config_mut(&mut self) -> &mut Configuration {
170 &mut self.config
171 }
172}
173
174pub fn print_node_infos<C: SubstrateCli>(config: &Configuration) {
176 info!("{}", C::impl_name());
177 info!("✌️ version {}", C::impl_version());
178 info!("❤️ by {}, {}-{}", C::author(), C::copyright_start_year(), Local::now().year());
179 info!("📋 Chain specification: {}", config.chain_spec.name());
180 info!("🏷 Node name: {}", config.network.node_name);
181 info!("👤 Role: {}", config.display_role());
182 info!(
183 "💾 Database: {} at {}",
184 config.database,
185 config
186 .database
187 .path()
188 .map_or_else(|| "<unknown>".to_owned(), |p| p.display().to_string())
189 );
190}
191
192#[cfg(test)]
193mod tests {
194 use super::*;
195 use sc_network::config::NetworkConfiguration;
196 use sc_service::{
197 config::{ExecutorConfiguration, RpcConfiguration},
198 Arc, ChainType, GenericChainSpec, NoExtension,
199 };
200 use std::{
201 path::PathBuf,
202 sync::atomic::{AtomicU64, Ordering},
203 };
204
205 struct Cli;
206
207 impl SubstrateCli for Cli {
208 fn author() -> String {
209 "test".into()
210 }
211
212 fn impl_name() -> String {
213 "yep".into()
214 }
215
216 fn impl_version() -> String {
217 "version".into()
218 }
219
220 fn description() -> String {
221 "desc".into()
222 }
223
224 fn support_url() -> String {
225 "no.pe".into()
226 }
227
228 fn copyright_start_year() -> i32 {
229 2042
230 }
231
232 fn load_spec(
233 &self,
234 _: &str,
235 ) -> std::result::Result<Box<dyn sc_service::ChainSpec>, String> {
236 Err("nope".into())
237 }
238 }
239
240 fn create_runner() -> Runner<Cli> {
241 let runtime = build_runtime().unwrap();
242
243 let root = PathBuf::from("db");
244 let runner = Runner::new(
245 Configuration {
246 impl_name: "spec".into(),
247 impl_version: "3".into(),
248 role: sc_service::Role::Authority,
249 tokio_handle: runtime.handle().clone(),
250 transaction_pool: Default::default(),
251 network: NetworkConfiguration::new_memory(),
252 keystore: sc_service::config::KeystoreConfig::InMemory,
253 database: sc_client_db::DatabaseSource::ParityDb { path: root.clone() },
254 trie_cache_maximum_size: None,
255 warm_up_trie_cache: None,
256 state_pruning: None,
257 blocks_pruning: sc_client_db::BlocksPruning::KeepAll,
258 chain_spec: Box::new(
259 GenericChainSpec::<NoExtension, ()>::builder(
260 Default::default(),
261 NoExtension::None,
262 )
263 .with_name("test")
264 .with_id("test_id")
265 .with_chain_type(ChainType::Development)
266 .with_genesis_config_patch(Default::default())
267 .build(),
268 ),
269 executor: ExecutorConfiguration::default(),
270 wasm_runtime_overrides: None,
271 rpc: RpcConfiguration {
272 addr: None,
273 max_connections: Default::default(),
274 cors: None,
275 methods: Default::default(),
276 max_request_size: Default::default(),
277 max_response_size: Default::default(),
278 id_provider: Default::default(),
279 max_subs_per_conn: Default::default(),
280 message_buffer_capacity: Default::default(),
281 port: 9944,
282 batch_config: sc_service::config::RpcBatchRequestConfig::Unlimited,
283 rate_limit: None,
284 rate_limit_whitelisted_ips: Default::default(),
285 rate_limit_trust_proxy_headers: Default::default(),
286 },
287 prometheus_config: None,
288 telemetry_endpoints: None,
289 offchain_worker: Default::default(),
290 force_authoring: false,
291 disable_grandpa: false,
292 dev_key_seed: None,
293 tracing_targets: None,
294 tracing_receiver: Default::default(),
295 announce_block: true,
296 base_path: sc_service::BasePath::new(root.clone()),
297 data_path: root,
298 },
299 runtime,
300 Signals::dummy(),
301 )
302 .unwrap();
303
304 runner
305 }
306
307 #[test]
308 fn ensure_run_until_exit_informs_tasks_to_end() {
309 let runner = create_runner();
310
311 let counter = Arc::new(AtomicU64::new(0));
312 let counter2 = counter.clone();
313
314 runner
315 .run_node_until_exit(move |cfg| async move {
316 let task_manager = TaskManager::new(cfg.tokio_handle.clone(), None).unwrap();
317 let (sender, receiver) = futures::channel::oneshot::channel();
318
319 task_manager.spawn_handle().spawn_blocking("test", None, async move {
323 let _ = sender.send(());
324 loop {
325 counter2.fetch_add(1, Ordering::Relaxed);
326 futures_timer::Delay::new(Duration::from_millis(50)).await;
327 }
328 });
329
330 task_manager.spawn_essential_handle().spawn_blocking("test2", None, async {
331 let _ = receiver.await;
334 });
335
336 Ok::<_, sc_service::Error>(task_manager)
337 })
338 .unwrap_err();
339
340 let count = counter.load(Ordering::Relaxed);
341
342 assert!((count as u128) < (Duration::from_secs(30).as_millis() / 50));
346 }
347
348 fn run_test_in_another_process(
349 test_name: &str,
350 test_body: impl FnOnce(),
351 ) -> Option<std::process::Output> {
352 if std::env::var("RUN_FORKED_TEST").is_ok() {
353 test_body();
354 None
355 } else {
356 let output = std::process::Command::new(std::env::current_exe().unwrap())
357 .arg(test_name)
358 .env("RUN_FORKED_TEST", "1")
359 .output()
360 .unwrap();
361
362 assert!(output.status.success());
363 Some(output)
364 }
365 }
366
367 #[test]
370 fn ensure_run_until_exit_is_not_blocking_indefinitely() {
371 let output = run_test_in_another_process(
372 "ensure_run_until_exit_is_not_blocking_indefinitely",
373 || {
374 sp_tracing::try_init_simple();
375
376 let runner = create_runner();
377
378 runner
379 .run_node_until_exit(move |cfg| async move {
380 let task_manager =
381 TaskManager::new(cfg.tokio_handle.clone(), None).unwrap();
382 let (sender, receiver) = futures::channel::oneshot::channel();
383
384 task_manager.spawn_handle().spawn_blocking("test", None, async move {
387 let _ = sender.send(());
388 loop {
389 std::thread::sleep(Duration::from_secs(30));
390 }
391 });
392
393 task_manager.spawn_essential_handle().spawn_blocking(
394 "test2",
395 None,
396 async {
397 let _ = receiver.await;
400 },
401 );
402
403 Ok::<_, sc_service::Error>(task_manager)
404 })
405 .unwrap_err();
406 },
407 );
408
409 let Some(output) = output else { return };
410
411 let stderr = dbg!(String::from_utf8(output.stderr).unwrap());
412
413 assert!(
414 stderr.contains("Task \"test\" was still running after waiting 60 seconds to finish.")
415 );
416 assert!(!stderr
417 .contains("Task \"test2\" was still running after waiting 60 seconds to finish."));
418 }
419}