referrerpolicy=no-referrer-when-downgrade

sc_service/task_manager/
mod.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Substrate service tasks management module.
20
21use crate::{config::TaskType, Error};
22use exit_future::Signal;
23use futures::{
24	future::{pending, select, try_join_all, BoxFuture, Either},
25	Future, FutureExt, StreamExt,
26};
27use parking_lot::Mutex;
28use prometheus_endpoint::{
29	exponential_buckets, register, CounterVec, HistogramOpts, HistogramVec, Opts, PrometheusError,
30	Registry, U64,
31};
32use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
33use std::{
34	collections::{hash_map::Entry, HashMap},
35	panic,
36	pin::Pin,
37	result::Result,
38	sync::Arc,
39};
40use tokio::runtime::Handle;
41use tracing_futures::Instrument;
42
43mod prometheus_future;
44#[cfg(test)]
45mod tests;
46
47/// Default task group name.
48pub const DEFAULT_GROUP_NAME: &str = "default";
49
50/// The name of a group a task belongs to.
51///
52/// This name is passed belong-side the task name to the prometheus metrics and can be used
53/// to group tasks.
54pub enum GroupName {
55	/// Sets the group name to `default`.
56	Default,
57	/// Use the specifically given name as group name.
58	Specific(&'static str),
59}
60
61impl From<Option<&'static str>> for GroupName {
62	fn from(name: Option<&'static str>) -> Self {
63		match name {
64			Some(name) => Self::Specific(name),
65			None => Self::Default,
66		}
67	}
68}
69
70impl From<&'static str> for GroupName {
71	fn from(name: &'static str) -> Self {
72		Self::Specific(name)
73	}
74}
75
76/// An handle for spawning tasks in the service.
77#[derive(Clone)]
78pub struct SpawnTaskHandle {
79	on_exit: exit_future::Exit,
80	tokio_handle: Handle,
81	metrics: Option<Metrics>,
82	task_registry: TaskRegistry,
83}
84
85impl SpawnTaskHandle {
86	/// Spawns the given task with the given name and a group name.
87	/// If group is not specified `DEFAULT_GROUP_NAME` will be used.
88	///
89	/// Note that the `name` is a `&'static str`. The reason for this choice is that
90	/// statistics about this task are getting reported to the Prometheus endpoint (if enabled), and
91	/// that therefore the set of possible task names must be bounded.
92	///
93	/// In other words, it would be a bad idea for someone to do for example
94	/// `spawn(format!("{:?}", some_public_key))`.
95	pub fn spawn(
96		&self,
97		name: &'static str,
98		group: impl Into<GroupName>,
99		task: impl Future<Output = ()> + Send + 'static,
100	) {
101		self.spawn_inner(name, group, task, TaskType::Async)
102	}
103
104	/// Spawns the blocking task with the given name. See also `spawn`.
105	pub fn spawn_blocking(
106		&self,
107		name: &'static str,
108		group: impl Into<GroupName>,
109		task: impl Future<Output = ()> + Send + 'static,
110	) {
111		self.spawn_inner(name, group, task, TaskType::Blocking)
112	}
113
114	/// Helper function that implements the spawning logic. See `spawn` and `spawn_blocking`.
115	fn spawn_inner(
116		&self,
117		name: &'static str,
118		group: impl Into<GroupName>,
119		task: impl Future<Output = ()> + Send + 'static,
120		task_type: TaskType,
121	) {
122		let on_exit = self.on_exit.clone();
123		let metrics = self.metrics.clone();
124		let registry = self.task_registry.clone();
125
126		let group = match group.into() {
127			GroupName::Specific(var) => var,
128			// If no group is specified use default.
129			GroupName::Default => DEFAULT_GROUP_NAME,
130		};
131
132		let task_type_label = match task_type {
133			TaskType::Blocking => "blocking",
134			TaskType::Async => "async",
135		};
136
137		// Note that we increase the started counter here and not within the future. This way,
138		// we could properly visualize on Prometheus situations where the spawning doesn't work.
139		if let Some(metrics) = &self.metrics {
140			metrics.tasks_spawned.with_label_values(&[name, group, task_type_label]).inc();
141			// We do a dummy increase in order for the task to show up in metrics.
142			metrics
143				.tasks_ended
144				.with_label_values(&[name, "finished", group, task_type_label])
145				.inc_by(0);
146		}
147
148		let future = async move {
149			// Register the task and keep the "token" alive until the task is ended. Then this
150			// "token" will unregister this task.
151			let _registry_token = registry.register_task(name, group);
152
153			if let Some(metrics) = metrics {
154				// Add some wrappers around `task`.
155				let task = {
156					let poll_duration =
157						metrics.poll_duration.with_label_values(&[name, group, task_type_label]);
158					let poll_start =
159						metrics.poll_start.with_label_values(&[name, group, task_type_label]);
160					let inner =
161						prometheus_future::with_poll_durations(poll_duration, poll_start, task);
162					// The logic of `AssertUnwindSafe` here is ok considering that we throw
163					// away the `Future` after it has panicked.
164					panic::AssertUnwindSafe(inner).catch_unwind()
165				};
166				futures::pin_mut!(task);
167
168				match select(on_exit, task).await {
169					Either::Right((Err(payload), _)) => {
170						metrics
171							.tasks_ended
172							.with_label_values(&[name, "panic", group, task_type_label])
173							.inc();
174						panic::resume_unwind(payload)
175					},
176					Either::Right((Ok(()), _)) => {
177						metrics
178							.tasks_ended
179							.with_label_values(&[name, "finished", group, task_type_label])
180							.inc();
181					},
182					Either::Left(((), _)) => {
183						// The `on_exit` has triggered.
184						metrics
185							.tasks_ended
186							.with_label_values(&[name, "interrupted", group, task_type_label])
187							.inc();
188					},
189				}
190			} else {
191				futures::pin_mut!(task);
192				let _ = select(on_exit, task).await;
193			}
194		}
195		.in_current_span();
196
197		match task_type {
198			TaskType::Async => {
199				self.tokio_handle.spawn(future);
200			},
201			TaskType::Blocking => {
202				let handle = self.tokio_handle.clone();
203				self.tokio_handle.spawn_blocking(move || {
204					handle.block_on(future);
205				});
206			},
207		}
208	}
209}
210
211impl sp_core::traits::SpawnNamed for SpawnTaskHandle {
212	fn spawn_blocking(
213		&self,
214		name: &'static str,
215		group: Option<&'static str>,
216		future: BoxFuture<'static, ()>,
217	) {
218		self.spawn_inner(name, group, future, TaskType::Blocking)
219	}
220
221	fn spawn(
222		&self,
223		name: &'static str,
224		group: Option<&'static str>,
225		future: BoxFuture<'static, ()>,
226	) {
227		self.spawn_inner(name, group, future, TaskType::Async)
228	}
229}
230
231/// A wrapper over `SpawnTaskHandle` that will notify a receiver whenever any
232/// task spawned through it fails. The service should be on the receiver side
233/// and will shut itself down whenever it receives any message, i.e. an
234/// essential task has failed.
235#[derive(Clone)]
236pub struct SpawnEssentialTaskHandle {
237	essential_failed_tx: TracingUnboundedSender<()>,
238	inner: SpawnTaskHandle,
239}
240
241impl SpawnEssentialTaskHandle {
242	/// Creates a new `SpawnEssentialTaskHandle`.
243	pub fn new(
244		essential_failed_tx: TracingUnboundedSender<()>,
245		spawn_task_handle: SpawnTaskHandle,
246	) -> SpawnEssentialTaskHandle {
247		SpawnEssentialTaskHandle { essential_failed_tx, inner: spawn_task_handle }
248	}
249
250	/// Spawns the given task with the given name.
251	///
252	/// See also [`SpawnTaskHandle::spawn`].
253	pub fn spawn(
254		&self,
255		name: &'static str,
256		group: impl Into<GroupName>,
257		task: impl Future<Output = ()> + Send + 'static,
258	) {
259		self.spawn_inner(name, group, task, TaskType::Async)
260	}
261
262	/// Spawns the blocking task with the given name.
263	///
264	/// See also [`SpawnTaskHandle::spawn_blocking`].
265	pub fn spawn_blocking(
266		&self,
267		name: &'static str,
268		group: impl Into<GroupName>,
269		task: impl Future<Output = ()> + Send + 'static,
270	) {
271		self.spawn_inner(name, group, task, TaskType::Blocking)
272	}
273
274	fn spawn_inner(
275		&self,
276		name: &'static str,
277		group: impl Into<GroupName>,
278		task: impl Future<Output = ()> + Send + 'static,
279		task_type: TaskType,
280	) {
281		let essential_failed = self.essential_failed_tx.clone();
282		let essential_task = std::panic::AssertUnwindSafe(task).catch_unwind().map(move |_| {
283			log::error!("Essential task `{}` failed. Shutting down service.", name);
284			let _ = essential_failed.close();
285		});
286
287		let _ = self.inner.spawn_inner(name, group, essential_task, task_type);
288	}
289}
290
291impl sp_core::traits::SpawnEssentialNamed for SpawnEssentialTaskHandle {
292	fn spawn_essential_blocking(
293		&self,
294		name: &'static str,
295		group: Option<&'static str>,
296		future: BoxFuture<'static, ()>,
297	) {
298		self.spawn_blocking(name, group, future);
299	}
300
301	fn spawn_essential(
302		&self,
303		name: &'static str,
304		group: Option<&'static str>,
305		future: BoxFuture<'static, ()>,
306	) {
307		self.spawn(name, group, future);
308	}
309}
310
311/// Helper struct to manage background/async tasks in Service.
312pub struct TaskManager {
313	/// A future that resolves when the service has exited, this is useful to
314	/// make sure any internally spawned futures stop when the service does.
315	on_exit: exit_future::Exit,
316	/// A signal that makes the exit future above resolve, fired on drop.
317	_signal: Signal,
318	/// Tokio runtime handle that is used to spawn futures.
319	tokio_handle: Handle,
320	/// Prometheus metric where to report the polling times.
321	metrics: Option<Metrics>,
322	/// Send a signal when a spawned essential task has concluded. The next time
323	/// the service future is polled it should complete with an error.
324	essential_failed_tx: TracingUnboundedSender<()>,
325	/// A receiver for spawned essential-tasks concluding.
326	essential_failed_rx: TracingUnboundedReceiver<()>,
327	/// Things to keep alive until the task manager is dropped.
328	keep_alive: Box<dyn std::any::Any + Send>,
329	/// A list of other `TaskManager`'s to terminate and gracefully shutdown when the parent
330	/// terminates and gracefully shutdown. Also ends the parent `future()` if a child's essential
331	/// task fails.
332	children: Vec<TaskManager>,
333	/// The registry of all running tasks.
334	task_registry: TaskRegistry,
335}
336
337impl TaskManager {
338	/// If a Prometheus registry is passed, it will be used to report statistics about the
339	/// service tasks.
340	pub fn new(
341		tokio_handle: Handle,
342		prometheus_registry: Option<&Registry>,
343	) -> Result<Self, PrometheusError> {
344		let (signal, on_exit) = exit_future::signal();
345
346		// A side-channel for essential tasks to communicate shutdown.
347		let (essential_failed_tx, essential_failed_rx) =
348			tracing_unbounded("mpsc_essential_tasks", 100);
349
350		let metrics = prometheus_registry.map(Metrics::register).transpose()?;
351
352		Ok(Self {
353			on_exit,
354			_signal: signal,
355			tokio_handle,
356			metrics,
357			essential_failed_tx,
358			essential_failed_rx,
359			keep_alive: Box::new(()),
360			children: Vec::new(),
361			task_registry: Default::default(),
362		})
363	}
364
365	/// Get a handle for spawning tasks.
366	pub fn spawn_handle(&self) -> SpawnTaskHandle {
367		SpawnTaskHandle {
368			on_exit: self.on_exit.clone(),
369			tokio_handle: self.tokio_handle.clone(),
370			metrics: self.metrics.clone(),
371			task_registry: self.task_registry.clone(),
372		}
373	}
374
375	/// Get a handle for spawning essential tasks.
376	pub fn spawn_essential_handle(&self) -> SpawnEssentialTaskHandle {
377		SpawnEssentialTaskHandle::new(self.essential_failed_tx.clone(), self.spawn_handle())
378	}
379
380	/// Return a future that will end with success if the signal to terminate was sent
381	/// (`self.terminate()`) or with an error if an essential task fails.
382	///
383	/// # Warning
384	///
385	/// This function will not wait until the end of the remaining task.
386	pub fn future<'a>(
387		&'a mut self,
388	) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'a>> {
389		Box::pin(async move {
390			let mut t1 = self.essential_failed_rx.next().fuse();
391			let mut t2 = self.on_exit.clone().fuse();
392			let mut t3 = try_join_all(
393				self.children
394					.iter_mut()
395					.map(|x| x.future())
396					// Never end this future if there is no error because if there is no children,
397					// it must not stop
398					.chain(std::iter::once(pending().boxed())),
399			)
400			.fuse();
401
402			futures::select! {
403				_ = t1 => Err(Error::Other("Essential task failed.".into())),
404				_ = t2 => Ok(()),
405				res = t3 => Err(res.map(|_| ()).expect_err("this future never ends; qed")),
406			}
407		})
408	}
409
410	/// Set what the task manager should keep alive, can be called multiple times.
411	pub fn keep_alive<T: 'static + Send>(&mut self, to_keep_alive: T) {
412		// allows this fn to safely called multiple times.
413		use std::mem;
414		let old = mem::replace(&mut self.keep_alive, Box::new(()));
415		self.keep_alive = Box::new((to_keep_alive, old));
416	}
417
418	/// Register another TaskManager to terminate and gracefully shutdown when the parent
419	/// terminates and gracefully shutdown. Also ends the parent `future()` if a child's essential
420	/// task fails. (But don't end the parent if a child's normal task fails.)
421	pub fn add_child(&mut self, child: TaskManager) {
422		self.children.push(child);
423	}
424
425	/// Consume `self` and return the [`TaskRegistry`].
426	///
427	/// This [`TaskRegistry`] can be used to check for still running tasks after this task manager
428	/// was dropped.
429	pub fn into_task_registry(self) -> TaskRegistry {
430		self.task_registry
431	}
432}
433
434#[derive(Clone)]
435struct Metrics {
436	// This list is ordered alphabetically
437	poll_duration: HistogramVec,
438	poll_start: CounterVec<U64>,
439	tasks_spawned: CounterVec<U64>,
440	tasks_ended: CounterVec<U64>,
441}
442
443impl Metrics {
444	fn register(registry: &Registry) -> Result<Self, PrometheusError> {
445		Ok(Self {
446			poll_duration: register(HistogramVec::new(
447				HistogramOpts {
448					common_opts: Opts::new(
449						"substrate_tasks_polling_duration",
450						"Duration in seconds of each invocation of Future::poll"
451					),
452					buckets: exponential_buckets(0.001, 4.0, 9)
453						.expect("function parameters are constant and always valid; qed"),
454				},
455				&["task_name", "task_group", "kind"]
456			)?, registry)?,
457			poll_start: register(CounterVec::new(
458				Opts::new(
459					"substrate_tasks_polling_started_total",
460					"Total number of times we started invoking Future::poll"
461				),
462				&["task_name", "task_group", "kind"]
463			)?, registry)?,
464			tasks_spawned: register(CounterVec::new(
465				Opts::new(
466					"substrate_tasks_spawned_total",
467					"Total number of tasks that have been spawned on the Service"
468				),
469				&["task_name", "task_group", "kind"]
470			)?, registry)?,
471			tasks_ended: register(CounterVec::new(
472				Opts::new(
473					"substrate_tasks_ended_total",
474					"Total number of tasks for which Future::poll has returned Ready(()) or panicked"
475				),
476				&["task_name", "reason", "task_group", "kind"]
477			)?, registry)?,
478		})
479	}
480}
481
482/// Ensures that a [`Task`] is unregistered when this object is dropped.
483struct UnregisterOnDrop {
484	task: Task,
485	registry: TaskRegistry,
486}
487
488impl Drop for UnregisterOnDrop {
489	fn drop(&mut self) {
490		let mut tasks = self.registry.tasks.lock();
491
492		if let Entry::Occupied(mut entry) = (*tasks).entry(self.task.clone()) {
493			*entry.get_mut() -= 1;
494
495			if *entry.get() == 0 {
496				entry.remove();
497			}
498		}
499	}
500}
501
502/// Represents a running async task in the [`TaskManager`].
503///
504/// As a task is identified by a name and a group, it is totally valid that there exists multiple
505/// tasks with the same name and group.
506#[derive(Clone, Hash, Eq, PartialEq)]
507pub struct Task {
508	/// The name of the task.
509	pub name: &'static str,
510	/// The group this task is associated to.
511	pub group: &'static str,
512}
513
514impl Task {
515	/// Returns if the `group` is the [`DEFAULT_GROUP_NAME`].
516	pub fn is_default_group(&self) -> bool {
517		self.group == DEFAULT_GROUP_NAME
518	}
519}
520
521/// Keeps track of all running [`Task`]s in [`TaskManager`].
522#[derive(Clone, Default)]
523pub struct TaskRegistry {
524	tasks: Arc<Mutex<HashMap<Task, usize>>>,
525}
526
527impl TaskRegistry {
528	/// Register a task with the given `name` and `group`.
529	///
530	/// Returns [`UnregisterOnDrop`] that ensures that the task is unregistered when this value is
531	/// dropped.
532	fn register_task(&self, name: &'static str, group: &'static str) -> UnregisterOnDrop {
533		let task = Task { name, group };
534
535		{
536			let mut tasks = self.tasks.lock();
537
538			*(*tasks).entry(task.clone()).or_default() += 1;
539		}
540
541		UnregisterOnDrop { task, registry: self.clone() }
542	}
543
544	/// Returns the running tasks.
545	///
546	/// As a task is only identified by its `name` and `group`, there can be duplicate tasks. The
547	/// number per task represents the concurrently running tasks with the same identifier.
548	pub fn running_tasks(&self) -> HashMap<Task, usize> {
549		(*self.tasks.lock()).clone()
550	}
551}