1use crate::{config::TaskType, Error};
22use exit_future::Signal;
23use futures::{
24 future::{pending, select, try_join_all, BoxFuture, Either},
25 Future, FutureExt, StreamExt,
26};
27use parking_lot::Mutex;
28use prometheus_endpoint::{
29 exponential_buckets, register, CounterVec, HistogramOpts, HistogramVec, Opts, PrometheusError,
30 Registry, U64,
31};
32use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
33use std::{
34 collections::{hash_map::Entry, HashMap},
35 panic,
36 pin::Pin,
37 result::Result,
38 sync::Arc,
39};
40use tokio::runtime::Handle;
41use tracing_futures::Instrument;
42
43mod prometheus_future;
44#[cfg(test)]
45mod tests;
46
47pub const DEFAULT_GROUP_NAME: &str = "default";
49
50pub enum GroupName {
55 Default,
57 Specific(&'static str),
59}
60
61impl From<Option<&'static str>> for GroupName {
62 fn from(name: Option<&'static str>) -> Self {
63 match name {
64 Some(name) => Self::Specific(name),
65 None => Self::Default,
66 }
67 }
68}
69
70impl From<&'static str> for GroupName {
71 fn from(name: &'static str) -> Self {
72 Self::Specific(name)
73 }
74}
75
76#[derive(Clone)]
78pub struct SpawnTaskHandle {
79 on_exit: exit_future::Exit,
80 tokio_handle: Handle,
81 metrics: Option<Metrics>,
82 task_registry: TaskRegistry,
83}
84
85impl SpawnTaskHandle {
86 pub fn spawn(
96 &self,
97 name: &'static str,
98 group: impl Into<GroupName>,
99 task: impl Future<Output = ()> + Send + 'static,
100 ) {
101 self.spawn_inner(name, group, task, TaskType::Async)
102 }
103
104 pub fn spawn_blocking(
106 &self,
107 name: &'static str,
108 group: impl Into<GroupName>,
109 task: impl Future<Output = ()> + Send + 'static,
110 ) {
111 self.spawn_inner(name, group, task, TaskType::Blocking)
112 }
113
114 fn spawn_inner(
116 &self,
117 name: &'static str,
118 group: impl Into<GroupName>,
119 task: impl Future<Output = ()> + Send + 'static,
120 task_type: TaskType,
121 ) {
122 let on_exit = self.on_exit.clone();
123 let metrics = self.metrics.clone();
124 let registry = self.task_registry.clone();
125
126 let group = match group.into() {
127 GroupName::Specific(var) => var,
128 GroupName::Default => DEFAULT_GROUP_NAME,
130 };
131
132 let task_type_label = match task_type {
133 TaskType::Blocking => "blocking",
134 TaskType::Async => "async",
135 };
136
137 if let Some(metrics) = &self.metrics {
140 metrics.tasks_spawned.with_label_values(&[name, group, task_type_label]).inc();
141 metrics
143 .tasks_ended
144 .with_label_values(&[name, "finished", group, task_type_label])
145 .inc_by(0);
146 }
147
148 let future = async move {
149 let _registry_token = registry.register_task(name, group);
152
153 if let Some(metrics) = metrics {
154 let task = {
156 let poll_duration =
157 metrics.poll_duration.with_label_values(&[name, group, task_type_label]);
158 let poll_start =
159 metrics.poll_start.with_label_values(&[name, group, task_type_label]);
160 let inner =
161 prometheus_future::with_poll_durations(poll_duration, poll_start, task);
162 panic::AssertUnwindSafe(inner).catch_unwind()
165 };
166 futures::pin_mut!(task);
167
168 match select(on_exit, task).await {
169 Either::Right((Err(payload), _)) => {
170 metrics
171 .tasks_ended
172 .with_label_values(&[name, "panic", group, task_type_label])
173 .inc();
174 panic::resume_unwind(payload)
175 },
176 Either::Right((Ok(()), _)) => {
177 metrics
178 .tasks_ended
179 .with_label_values(&[name, "finished", group, task_type_label])
180 .inc();
181 },
182 Either::Left(((), _)) => {
183 metrics
185 .tasks_ended
186 .with_label_values(&[name, "interrupted", group, task_type_label])
187 .inc();
188 },
189 }
190 } else {
191 futures::pin_mut!(task);
192 let _ = select(on_exit, task).await;
193 }
194 }
195 .in_current_span();
196
197 match task_type {
198 TaskType::Async => {
199 self.tokio_handle.spawn(future);
200 },
201 TaskType::Blocking => {
202 let handle = self.tokio_handle.clone();
203 self.tokio_handle.spawn_blocking(move || {
204 handle.block_on(future);
205 });
206 },
207 }
208 }
209}
210
211impl sp_core::traits::SpawnNamed for SpawnTaskHandle {
212 fn spawn_blocking(
213 &self,
214 name: &'static str,
215 group: Option<&'static str>,
216 future: BoxFuture<'static, ()>,
217 ) {
218 self.spawn_inner(name, group, future, TaskType::Blocking)
219 }
220
221 fn spawn(
222 &self,
223 name: &'static str,
224 group: Option<&'static str>,
225 future: BoxFuture<'static, ()>,
226 ) {
227 self.spawn_inner(name, group, future, TaskType::Async)
228 }
229}
230
231#[derive(Clone)]
236pub struct SpawnEssentialTaskHandle {
237 essential_failed_tx: TracingUnboundedSender<()>,
238 inner: SpawnTaskHandle,
239}
240
241impl SpawnEssentialTaskHandle {
242 pub fn new(
244 essential_failed_tx: TracingUnboundedSender<()>,
245 spawn_task_handle: SpawnTaskHandle,
246 ) -> SpawnEssentialTaskHandle {
247 SpawnEssentialTaskHandle { essential_failed_tx, inner: spawn_task_handle }
248 }
249
250 pub fn spawn(
254 &self,
255 name: &'static str,
256 group: impl Into<GroupName>,
257 task: impl Future<Output = ()> + Send + 'static,
258 ) {
259 self.spawn_inner(name, group, task, TaskType::Async)
260 }
261
262 pub fn spawn_blocking(
266 &self,
267 name: &'static str,
268 group: impl Into<GroupName>,
269 task: impl Future<Output = ()> + Send + 'static,
270 ) {
271 self.spawn_inner(name, group, task, TaskType::Blocking)
272 }
273
274 fn spawn_inner(
275 &self,
276 name: &'static str,
277 group: impl Into<GroupName>,
278 task: impl Future<Output = ()> + Send + 'static,
279 task_type: TaskType,
280 ) {
281 let essential_failed = self.essential_failed_tx.clone();
282 let essential_task = std::panic::AssertUnwindSafe(task).catch_unwind().map(move |_| {
283 log::error!("Essential task `{}` failed. Shutting down service.", name);
284 let _ = essential_failed.close();
285 });
286
287 let _ = self.inner.spawn_inner(name, group, essential_task, task_type);
288 }
289}
290
291impl sp_core::traits::SpawnEssentialNamed for SpawnEssentialTaskHandle {
292 fn spawn_essential_blocking(
293 &self,
294 name: &'static str,
295 group: Option<&'static str>,
296 future: BoxFuture<'static, ()>,
297 ) {
298 self.spawn_blocking(name, group, future);
299 }
300
301 fn spawn_essential(
302 &self,
303 name: &'static str,
304 group: Option<&'static str>,
305 future: BoxFuture<'static, ()>,
306 ) {
307 self.spawn(name, group, future);
308 }
309}
310
311pub struct TaskManager {
313 on_exit: exit_future::Exit,
316 _signal: Signal,
318 tokio_handle: Handle,
320 metrics: Option<Metrics>,
322 essential_failed_tx: TracingUnboundedSender<()>,
325 essential_failed_rx: TracingUnboundedReceiver<()>,
327 keep_alive: Box<dyn std::any::Any + Send>,
329 children: Vec<TaskManager>,
333 task_registry: TaskRegistry,
335}
336
337impl TaskManager {
338 pub fn new(
341 tokio_handle: Handle,
342 prometheus_registry: Option<&Registry>,
343 ) -> Result<Self, PrometheusError> {
344 let (signal, on_exit) = exit_future::signal();
345
346 let (essential_failed_tx, essential_failed_rx) =
348 tracing_unbounded("mpsc_essential_tasks", 100);
349
350 let metrics = prometheus_registry.map(Metrics::register).transpose()?;
351
352 Ok(Self {
353 on_exit,
354 _signal: signal,
355 tokio_handle,
356 metrics,
357 essential_failed_tx,
358 essential_failed_rx,
359 keep_alive: Box::new(()),
360 children: Vec::new(),
361 task_registry: Default::default(),
362 })
363 }
364
365 pub fn spawn_handle(&self) -> SpawnTaskHandle {
367 SpawnTaskHandle {
368 on_exit: self.on_exit.clone(),
369 tokio_handle: self.tokio_handle.clone(),
370 metrics: self.metrics.clone(),
371 task_registry: self.task_registry.clone(),
372 }
373 }
374
375 pub fn spawn_essential_handle(&self) -> SpawnEssentialTaskHandle {
377 SpawnEssentialTaskHandle::new(self.essential_failed_tx.clone(), self.spawn_handle())
378 }
379
380 pub fn future<'a>(
387 &'a mut self,
388 ) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'a>> {
389 Box::pin(async move {
390 let mut t1 = self.essential_failed_rx.next().fuse();
391 let mut t2 = self.on_exit.clone().fuse();
392 let mut t3 = try_join_all(
393 self.children
394 .iter_mut()
395 .map(|x| x.future())
396 .chain(std::iter::once(pending().boxed())),
399 )
400 .fuse();
401
402 futures::select! {
403 _ = t1 => Err(Error::Other("Essential task failed.".into())),
404 _ = t2 => Ok(()),
405 res = t3 => Err(res.map(|_| ()).expect_err("this future never ends; qed")),
406 }
407 })
408 }
409
410 pub fn keep_alive<T: 'static + Send>(&mut self, to_keep_alive: T) {
412 use std::mem;
414 let old = mem::replace(&mut self.keep_alive, Box::new(()));
415 self.keep_alive = Box::new((to_keep_alive, old));
416 }
417
418 pub fn add_child(&mut self, child: TaskManager) {
422 self.children.push(child);
423 }
424
425 pub fn into_task_registry(self) -> TaskRegistry {
430 self.task_registry
431 }
432}
433
434#[derive(Clone)]
435struct Metrics {
436 poll_duration: HistogramVec,
438 poll_start: CounterVec<U64>,
439 tasks_spawned: CounterVec<U64>,
440 tasks_ended: CounterVec<U64>,
441}
442
443impl Metrics {
444 fn register(registry: &Registry) -> Result<Self, PrometheusError> {
445 Ok(Self {
446 poll_duration: register(HistogramVec::new(
447 HistogramOpts {
448 common_opts: Opts::new(
449 "substrate_tasks_polling_duration",
450 "Duration in seconds of each invocation of Future::poll"
451 ),
452 buckets: exponential_buckets(0.001, 4.0, 9)
453 .expect("function parameters are constant and always valid; qed"),
454 },
455 &["task_name", "task_group", "kind"]
456 )?, registry)?,
457 poll_start: register(CounterVec::new(
458 Opts::new(
459 "substrate_tasks_polling_started_total",
460 "Total number of times we started invoking Future::poll"
461 ),
462 &["task_name", "task_group", "kind"]
463 )?, registry)?,
464 tasks_spawned: register(CounterVec::new(
465 Opts::new(
466 "substrate_tasks_spawned_total",
467 "Total number of tasks that have been spawned on the Service"
468 ),
469 &["task_name", "task_group", "kind"]
470 )?, registry)?,
471 tasks_ended: register(CounterVec::new(
472 Opts::new(
473 "substrate_tasks_ended_total",
474 "Total number of tasks for which Future::poll has returned Ready(()) or panicked"
475 ),
476 &["task_name", "reason", "task_group", "kind"]
477 )?, registry)?,
478 })
479 }
480}
481
482struct UnregisterOnDrop {
484 task: Task,
485 registry: TaskRegistry,
486}
487
488impl Drop for UnregisterOnDrop {
489 fn drop(&mut self) {
490 let mut tasks = self.registry.tasks.lock();
491
492 if let Entry::Occupied(mut entry) = (*tasks).entry(self.task.clone()) {
493 *entry.get_mut() -= 1;
494
495 if *entry.get() == 0 {
496 entry.remove();
497 }
498 }
499 }
500}
501
502#[derive(Clone, Hash, Eq, PartialEq)]
507pub struct Task {
508 pub name: &'static str,
510 pub group: &'static str,
512}
513
514impl Task {
515 pub fn is_default_group(&self) -> bool {
517 self.group == DEFAULT_GROUP_NAME
518 }
519}
520
521#[derive(Clone, Default)]
523pub struct TaskRegistry {
524 tasks: Arc<Mutex<HashMap<Task, usize>>>,
525}
526
527impl TaskRegistry {
528 fn register_task(&self, name: &'static str, group: &'static str) -> UnregisterOnDrop {
533 let task = Task { name, group };
534
535 {
536 let mut tasks = self.tasks.lock();
537
538 *(*tasks).entry(task.clone()).or_default() += 1;
539 }
540
541 UnregisterOnDrop { task, registry: self.clone() }
542 }
543
544 pub fn running_tasks(&self) -> HashMap<Task, usize> {
549 (*self.tasks.lock()).clone()
550 }
551}