referrerpolicy=no-referrer-when-downgrade

sc_cli/
runner.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::{error::Error as CliError, Result, Signals, SubstrateCli};
20use chrono::prelude::*;
21use futures::{future::FutureExt, Future};
22use log::info;
23use sc_service::{Configuration, Error as ServiceError, TaskManager};
24use sc_utils::metrics::{TOKIO_THREADS_ALIVE, TOKIO_THREADS_TOTAL};
25use std::{marker::PhantomData, time::Duration};
26
27/// Build a tokio runtime with all features.
28pub fn build_runtime() -> std::result::Result<tokio::runtime::Runtime, std::io::Error> {
29	tokio::runtime::Builder::new_multi_thread()
30		.on_thread_start(|| {
31			TOKIO_THREADS_ALIVE.inc();
32			TOKIO_THREADS_TOTAL.inc();
33		})
34		.on_thread_stop(|| {
35			TOKIO_THREADS_ALIVE.dec();
36		})
37		.enable_all()
38		.build()
39}
40
41/// A Substrate CLI runtime that can be used to run a node or a command
42pub struct Runner<C: SubstrateCli> {
43	config: Configuration,
44	tokio_runtime: tokio::runtime::Runtime,
45	signals: Signals,
46	phantom: PhantomData<C>,
47}
48
49impl<C: SubstrateCli> Runner<C> {
50	/// Create a new runtime with the command provided in argument
51	pub fn new(
52		config: Configuration,
53		tokio_runtime: tokio::runtime::Runtime,
54		signals: Signals,
55	) -> Result<Runner<C>> {
56		Ok(Runner { config, tokio_runtime, signals, phantom: PhantomData })
57	}
58
59	/// Log information about the node itself.
60	///
61	/// # Example:
62	///
63	/// ```text
64	/// 2020-06-03 16:14:21 Substrate Node
65	/// 2020-06-03 16:14:21 ✌️  version 2.0.0-rc3-f4940588c-x86_64-linux-gnu
66	/// 2020-06-03 16:14:21 ❤️  by Parity Technologies <admin@parity.io>, 2017-2020
67	/// 2020-06-03 16:14:21 📋 Chain specification: Flaming Fir
68	/// 2020-06-03 16:14:21 🏷  Node name: jolly-rod-7462
69	/// 2020-06-03 16:14:21 👤 Role: FULL
70	/// 2020-06-03 16:14:21 💾 Database: RocksDb at /tmp/c/chains/flamingfir7/db
71	/// 2020-06-03 16:14:21 ⛓  Native runtime: node-251 (substrate-node-1.tx1.au10)
72	/// ```
73	fn print_node_infos(&self) {
74		print_node_infos::<C>(self.config())
75	}
76
77	/// A helper function that runs a node with tokio and stops if the process receives the signal
78	/// `SIGTERM` or `SIGINT`.
79	pub fn run_node_until_exit<F, E>(
80		self,
81		initialize: impl FnOnce(Configuration) -> F,
82	) -> std::result::Result<(), E>
83	where
84		F: Future<Output = std::result::Result<TaskManager, E>>,
85		E: std::error::Error + Send + Sync + 'static + From<ServiceError>,
86	{
87		self.print_node_infos();
88
89		let mut task_manager = self.tokio_runtime.block_on(initialize(self.config))?;
90
91		let res = self
92			.tokio_runtime
93			.block_on(self.signals.run_until_signal(task_manager.future().fuse()));
94		// We need to drop the task manager here to inform all tasks that they should shut down.
95		//
96		// This is important to be done before we instruct the tokio runtime to shutdown. Otherwise
97		// the tokio runtime will wait the full 60 seconds for all tasks to stop.
98		let task_registry = task_manager.into_task_registry();
99
100		// Give all futures 60 seconds to shutdown, before tokio "leaks" them.
101		let shutdown_timeout = Duration::from_secs(60);
102		self.tokio_runtime.shutdown_timeout(shutdown_timeout);
103
104		let running_tasks = task_registry.running_tasks();
105
106		if !running_tasks.is_empty() {
107			log::error!("Detected running(potentially stalled) tasks on shutdown:");
108			running_tasks.iter().for_each(|(task, count)| {
109				let instances_desc =
110					if *count > 1 { format!("with {} instances ", count) } else { "".to_string() };
111
112				if task.is_default_group() {
113					log::error!(
114						"Task \"{}\" was still running {}after waiting {} seconds to finish.",
115						task.name,
116						instances_desc,
117						shutdown_timeout.as_secs(),
118					);
119				} else {
120					log::error!(
121						"Task \"{}\" (Group: {}) was still running {}after waiting {} seconds to finish.",
122						task.name,
123						task.group,
124						instances_desc,
125						shutdown_timeout.as_secs(),
126					);
127				}
128			});
129		}
130
131		res.map_err(Into::into)
132	}
133
134	/// A helper function that runs a command with the configuration of this node.
135	pub fn sync_run<E>(
136		self,
137		runner: impl FnOnce(Configuration) -> std::result::Result<(), E>,
138	) -> std::result::Result<(), E>
139	where
140		E: std::error::Error + Send + Sync + 'static + From<ServiceError>,
141	{
142		runner(self.config)
143	}
144
145	/// A helper function that runs a future with tokio and stops if the process receives
146	/// the signal `SIGTERM` or `SIGINT`.
147	pub fn async_run<F, E>(
148		self,
149		runner: impl FnOnce(Configuration) -> std::result::Result<(F, TaskManager), E>,
150	) -> std::result::Result<(), E>
151	where
152		F: Future<Output = std::result::Result<(), E>>,
153		E: std::error::Error + Send + Sync + 'static + From<ServiceError> + From<CliError>,
154	{
155		let (future, task_manager) = runner(self.config)?;
156		self.tokio_runtime.block_on(self.signals.run_until_signal(future.fuse()))?;
157		// Drop the task manager before dropping the rest, to ensure that all futures were informed
158		// about the shut down.
159		drop(task_manager);
160		Ok(())
161	}
162
163	/// Get an immutable reference to the node Configuration
164	pub fn config(&self) -> &Configuration {
165		&self.config
166	}
167
168	/// Get a mutable reference to the node Configuration
169	pub fn config_mut(&mut self) -> &mut Configuration {
170		&mut self.config
171	}
172}
173
174/// Log information about the node itself.
175pub fn print_node_infos<C: SubstrateCli>(config: &Configuration) {
176	info!("{}", C::impl_name());
177	info!("✌️  version {}", C::impl_version());
178	info!("❤️  by {}, {}-{}", C::author(), C::copyright_start_year(), Local::now().year());
179	info!("📋 Chain specification: {}", config.chain_spec.name());
180	info!("🏷  Node name: {}", config.network.node_name);
181	info!("👤 Role: {}", config.display_role());
182	info!(
183		"💾 Database: {} at {}",
184		config.database,
185		config
186			.database
187			.path()
188			.map_or_else(|| "<unknown>".to_owned(), |p| p.display().to_string())
189	);
190}
191
192#[cfg(test)]
193mod tests {
194	use super::*;
195	use sc_network::config::NetworkConfiguration;
196	use sc_service::{
197		config::{ExecutorConfiguration, RpcConfiguration},
198		Arc, ChainType, GenericChainSpec, NoExtension,
199	};
200	use std::{
201		path::PathBuf,
202		sync::atomic::{AtomicU64, Ordering},
203	};
204
205	struct Cli;
206
207	impl SubstrateCli for Cli {
208		fn author() -> String {
209			"test".into()
210		}
211
212		fn impl_name() -> String {
213			"yep".into()
214		}
215
216		fn impl_version() -> String {
217			"version".into()
218		}
219
220		fn description() -> String {
221			"desc".into()
222		}
223
224		fn support_url() -> String {
225			"no.pe".into()
226		}
227
228		fn copyright_start_year() -> i32 {
229			2042
230		}
231
232		fn load_spec(
233			&self,
234			_: &str,
235		) -> std::result::Result<Box<dyn sc_service::ChainSpec>, String> {
236			Err("nope".into())
237		}
238	}
239
240	fn create_runner() -> Runner<Cli> {
241		let runtime = build_runtime().unwrap();
242
243		let root = PathBuf::from("db");
244		let runner = Runner::new(
245			Configuration {
246				impl_name: "spec".into(),
247				impl_version: "3".into(),
248				role: sc_service::Role::Authority,
249				tokio_handle: runtime.handle().clone(),
250				transaction_pool: Default::default(),
251				network: NetworkConfiguration::new_memory(),
252				keystore: sc_service::config::KeystoreConfig::InMemory,
253				database: sc_client_db::DatabaseSource::ParityDb { path: root.clone() },
254				trie_cache_maximum_size: None,
255				warm_up_trie_cache: None,
256				state_pruning: None,
257				blocks_pruning: sc_client_db::BlocksPruning::KeepAll,
258				chain_spec: Box::new(
259					GenericChainSpec::<NoExtension, ()>::builder(
260						Default::default(),
261						NoExtension::None,
262					)
263					.with_name("test")
264					.with_id("test_id")
265					.with_chain_type(ChainType::Development)
266					.with_genesis_config_patch(Default::default())
267					.build(),
268				),
269				executor: ExecutorConfiguration::default(),
270				wasm_runtime_overrides: None,
271				rpc: RpcConfiguration {
272					addr: None,
273					max_connections: Default::default(),
274					cors: None,
275					methods: Default::default(),
276					max_request_size: Default::default(),
277					max_response_size: Default::default(),
278					id_provider: Default::default(),
279					max_subs_per_conn: Default::default(),
280					message_buffer_capacity: Default::default(),
281					port: 9944,
282					batch_config: sc_service::config::RpcBatchRequestConfig::Unlimited,
283					rate_limit: None,
284					rate_limit_whitelisted_ips: Default::default(),
285					rate_limit_trust_proxy_headers: Default::default(),
286				},
287				prometheus_config: None,
288				telemetry_endpoints: None,
289				offchain_worker: Default::default(),
290				force_authoring: false,
291				disable_grandpa: false,
292				dev_key_seed: None,
293				tracing_targets: None,
294				tracing_receiver: Default::default(),
295				announce_block: true,
296				base_path: sc_service::BasePath::new(root.clone()),
297				data_path: root,
298			},
299			runtime,
300			Signals::dummy(),
301		)
302		.unwrap();
303
304		runner
305	}
306
307	#[test]
308	fn ensure_run_until_exit_informs_tasks_to_end() {
309		let runner = create_runner();
310
311		let counter = Arc::new(AtomicU64::new(0));
312		let counter2 = counter.clone();
313
314		runner
315			.run_node_until_exit(move |cfg| async move {
316				let task_manager = TaskManager::new(cfg.tokio_handle.clone(), None).unwrap();
317				let (sender, receiver) = futures::channel::oneshot::channel();
318
319				// We need to use `spawn_blocking` here so that we get a dedicated thread for our
320				// future. This is important for this test, as otherwise tokio can just "drop" the
321				// future.
322				task_manager.spawn_handle().spawn_blocking("test", None, async move {
323					let _ = sender.send(());
324					loop {
325						counter2.fetch_add(1, Ordering::Relaxed);
326						futures_timer::Delay::new(Duration::from_millis(50)).await;
327					}
328				});
329
330				task_manager.spawn_essential_handle().spawn_blocking("test2", None, async {
331					// Let's stop this essential task directly when our other task started.
332					// It will signal that the task manager should end.
333					let _ = receiver.await;
334				});
335
336				Ok::<_, sc_service::Error>(task_manager)
337			})
338			.unwrap_err();
339
340		let count = counter.load(Ordering::Relaxed);
341
342		// Ensure that our counting task was running for less than 30 seconds.
343		// It should be directly killed, but for CI and whatever we are being a little bit more
344		// "relaxed".
345		assert!((count as u128) < (Duration::from_secs(30).as_millis() / 50));
346	}
347
348	fn run_test_in_another_process(
349		test_name: &str,
350		test_body: impl FnOnce(),
351	) -> Option<std::process::Output> {
352		if std::env::var("RUN_FORKED_TEST").is_ok() {
353			test_body();
354			None
355		} else {
356			let output = std::process::Command::new(std::env::current_exe().unwrap())
357				.arg(test_name)
358				.env("RUN_FORKED_TEST", "1")
359				.output()
360				.unwrap();
361
362			assert!(output.status.success());
363			Some(output)
364		}
365	}
366
367	/// This test ensures that `run_node_until_exit` aborts waiting for "stuck" tasks after 60
368	/// seconds, aka doesn't wait until they are finished (which may never happen).
369	#[test]
370	fn ensure_run_until_exit_is_not_blocking_indefinitely() {
371		let output = run_test_in_another_process(
372			"ensure_run_until_exit_is_not_blocking_indefinitely",
373			|| {
374				sp_tracing::try_init_simple();
375
376				let runner = create_runner();
377
378				runner
379					.run_node_until_exit(move |cfg| async move {
380						let task_manager =
381							TaskManager::new(cfg.tokio_handle.clone(), None).unwrap();
382						let (sender, receiver) = futures::channel::oneshot::channel();
383
384						// We need to use `spawn_blocking` here so that we get a dedicated thread
385						// for our future. This future is more blocking code that will never end.
386						task_manager.spawn_handle().spawn_blocking("test", None, async move {
387							let _ = sender.send(());
388							loop {
389								std::thread::sleep(Duration::from_secs(30));
390							}
391						});
392
393						task_manager.spawn_essential_handle().spawn_blocking(
394							"test2",
395							None,
396							async {
397								// Let's stop this essential task directly when our other task
398								// started. It will signal that the task manager should end.
399								let _ = receiver.await;
400							},
401						);
402
403						Ok::<_, sc_service::Error>(task_manager)
404					})
405					.unwrap_err();
406			},
407		);
408
409		let Some(output) = output else { return };
410
411		let stderr = dbg!(String::from_utf8(output.stderr).unwrap());
412
413		assert!(
414			stderr.contains("Task \"test\" was still running after waiting 60 seconds to finish.")
415		);
416		assert!(!stderr
417			.contains("Task \"test2\" was still running after waiting 60 seconds to finish."));
418	}
419}