polkadot_overseer/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! # Overseer
18//!
19//! `overseer` implements the Overseer architecture described in the
20//! [implementers' guide][overseer-page].
21//! For the motivations behind implementing the overseer itself you should
22//! check out that guide, documentation in this crate will be mostly discussing
23//! technical stuff.
24//!
25//! An `Overseer` is something that allows spawning/stopping and overseeing
26//! asynchronous tasks as well as establishing a well-defined and easy to use
27//! protocol that the tasks can use to communicate with each other. It is desired
28//! that this protocol is the only way tasks communicate with each other, however
29//! at this moment there are no foolproof guards against other ways of communication.
30//!
31//! The `Overseer` is instantiated with a pre-defined set of `Subsystems` that
32//! share the same behavior from `Overseer`'s point of view.
33//!
34//! ```text
35//!                              +-----------------------------+
36//!                              |         Overseer            |
37//!                              +-----------------------------+
38//!
39//!             ................|  Overseer "holds" these and uses |..............
40//!             .                  them to (re)start things                      .
41//!             .                                                                .
42//!             .  +-------------------+                +---------------------+  .
43//!             .  |   Subsystem1      |                |   Subsystem2        |  .
44//!             .  +-------------------+                +---------------------+  .
45//!             .           |                                       |            .
46//!             ..................................................................
47//!                         |                                       |
48//!                       start()                                 start()
49//!                         V                                       V
50//!             ..................| Overseer "runs" these |.......................
51//!             .  +--------------------+               +---------------------+  .
52//!             .  | SubsystemInstance1 |               | SubsystemInstance2  |  .
53//!             .  +--------------------+               +---------------------+  .
54//!             ..................................................................
55//! ```
56//!
57//! [overseer-page]: https://paritytech.github.io/polkadot-sdk/book/node/overseer.html
58
59// #![deny(unused_results)]
60// unused dependencies can not work for test and examples at the same time
61// yielding false positives
62#![warn(missing_docs)]
63#![allow(dead_code)] // TODO https://github.com/paritytech/polkadot-sdk/issues/5793
64
65use std::{
66	collections::{hash_map, HashMap},
67	fmt::{self, Debug},
68	pin::Pin,
69	sync::Arc,
70	time::Duration,
71};
72
73use futures::{channel::oneshot, future::BoxFuture, select, Future, FutureExt, StreamExt};
74
75use polkadot_primitives::{Block, BlockNumber, Hash};
76use sc_client_api::{BlockImportNotification, BlockchainEvents, FinalityNotification};
77
78use self::messages::{BitfieldSigningMessage, PvfCheckerMessage};
79use polkadot_node_subsystem_types::messages::{
80	ApprovalDistributionMessage, ApprovalVotingMessage, ApprovalVotingParallelMessage,
81	AvailabilityDistributionMessage, AvailabilityRecoveryMessage, AvailabilityStoreMessage,
82	BitfieldDistributionMessage, CandidateBackingMessage, CandidateValidationMessage,
83	ChainApiMessage, ChainSelectionMessage, CollationGenerationMessage, CollatorProtocolMessage,
84	DisputeCoordinatorMessage, DisputeDistributionMessage, GossipSupportMessage,
85	NetworkBridgeRxMessage, NetworkBridgeTxMessage, ProspectiveParachainsMessage,
86	ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage,
87};
88
89pub use polkadot_node_subsystem_types::{
90	errors::{SubsystemError, SubsystemResult},
91	jaeger, ActivatedLeaf, ActiveLeavesUpdate, ChainApiBackend, OverseerSignal,
92	RuntimeApiSubsystemClient, UnpinHandle,
93};
94
95pub mod metrics;
96pub use self::metrics::Metrics as OverseerMetrics;
97
98/// A dummy subsystem, mostly useful for placeholders and tests.
99pub mod dummy;
100pub use self::dummy::DummySubsystem;
101
102pub use polkadot_node_metrics::{
103	metrics::{prometheus, Metrics as MetricsTrait},
104	Metronome,
105};
106
107pub use orchestra as gen;
108pub use orchestra::{
109	contextbounds, orchestra, subsystem, FromOrchestra, HighPriority, MapSubsystem, MessagePacket,
110	NormalPriority, OrchestraError as OverseerError, Priority, PriorityLevel, SignalsReceived,
111	Spawner, Subsystem, SubsystemContext, SubsystemIncomingMessages, SubsystemInstance,
112	SubsystemMeterReadouts, SubsystemMeters, SubsystemSender, TimeoutExt, ToOrchestra,
113	TrySendError,
114};
115
116#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
117mod memory_stats;
118#[cfg(test)]
119mod tests;
120
121use sp_core::traits::SpawnNamed;
122
123/// Glue to connect `trait orchestra::Spawner` and `SpawnNamed` from `substrate`.
124pub struct SpawnGlue<S>(pub S);
125
126impl<S> AsRef<S> for SpawnGlue<S> {
127	fn as_ref(&self) -> &S {
128		&self.0
129	}
130}
131
132impl<S: Clone> Clone for SpawnGlue<S> {
133	fn clone(&self) -> Self {
134		Self(self.0.clone())
135	}
136}
137
138impl<S: SpawnNamed + Clone + Send + Sync> crate::gen::Spawner for SpawnGlue<S> {
139	fn spawn_blocking(
140		&self,
141		name: &'static str,
142		group: Option<&'static str>,
143		future: futures::future::BoxFuture<'static, ()>,
144	) {
145		SpawnNamed::spawn_blocking(&self.0, name, group, future)
146	}
147	fn spawn(
148		&self,
149		name: &'static str,
150		group: Option<&'static str>,
151		future: futures::future::BoxFuture<'static, ()>,
152	) {
153		SpawnNamed::spawn(&self.0, name, group, future)
154	}
155}
156
157/// Whether a header supports parachain consensus or not.
158#[async_trait::async_trait]
159pub trait HeadSupportsParachains {
160	/// Return true if the given header supports parachain consensus. Otherwise, false.
161	async fn head_supports_parachains(&self, head: &Hash) -> bool;
162}
163
164#[async_trait::async_trait]
165impl<Client> HeadSupportsParachains for Arc<Client>
166where
167	Client: RuntimeApiSubsystemClient + Sync + Send,
168{
169	async fn head_supports_parachains(&self, head: &Hash) -> bool {
170		// Check that the `ParachainHost` runtime api is at least with version 1 present on chain.
171		self.api_version_parachain_host(*head).await.ok().flatten().unwrap_or(0) >= 1
172	}
173}
174
175/// A handle used to communicate with the [`Overseer`].
176///
177/// [`Overseer`]: struct.Overseer.html
178#[derive(Clone)]
179pub struct Handle(OverseerHandle);
180
181impl Handle {
182	/// Create a new [`Handle`].
183	pub fn new(raw: OverseerHandle) -> Self {
184		Self(raw)
185	}
186
187	/// Inform the `Overseer` that that some block was imported.
188	pub async fn block_imported(&mut self, block: BlockInfo) {
189		self.send_and_log_error(Event::BlockImported(block)).await
190	}
191
192	/// Send some message to one of the `Subsystem`s.
193	pub async fn send_msg(&mut self, msg: impl Into<AllMessages>, origin: &'static str) {
194		self.send_and_log_error(Event::MsgToSubsystem { msg: msg.into(), origin }).await
195	}
196
197	/// Send a message not providing an origin.
198	#[inline(always)]
199	pub async fn send_msg_anon(&mut self, msg: impl Into<AllMessages>) {
200		self.send_msg(msg, "").await
201	}
202
203	/// Inform the `Overseer` that some block was finalized.
204	pub async fn block_finalized(&mut self, block: BlockInfo) {
205		self.send_and_log_error(Event::BlockFinalized(block)).await
206	}
207
208	/// Wait for a block with the given hash to be in the active-leaves set.
209	///
210	/// The response channel responds if the hash was activated and is closed if the hash was
211	/// deactivated. Note that due the fact the overseer doesn't store the whole active-leaves set,
212	/// only deltas, the response channel may never return if the hash was deactivated before this
213	/// call. In this case, it's the caller's responsibility to ensure a timeout is set.
214	pub async fn wait_for_activation(
215		&mut self,
216		hash: Hash,
217		response_channel: oneshot::Sender<SubsystemResult<()>>,
218	) {
219		self.send_and_log_error(Event::ExternalRequest(ExternalRequest::WaitForActivation {
220			hash,
221			response_channel,
222		}))
223		.await;
224	}
225
226	/// Tell `Overseer` to shutdown.
227	pub async fn stop(&mut self) {
228		self.send_and_log_error(Event::Stop).await;
229	}
230
231	/// Most basic operation, to stop a server.
232	async fn send_and_log_error(&mut self, event: Event) {
233		if self.0.send(event).await.is_err() {
234			gum::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
235		}
236	}
237}
238
239/// An event telling the `Overseer` on the particular block
240/// that has been imported or finalized.
241///
242/// This structure exists solely for the purposes of decoupling
243/// `Overseer` code from the client code and the necessity to call
244/// `HeaderBackend::block_number_from_id()`.
245#[derive(Debug, Clone)]
246pub struct BlockInfo {
247	/// Hash of the block.
248	pub hash: Hash,
249	/// Hash of the parent block.
250	pub parent_hash: Hash,
251	/// Block's number.
252	pub number: BlockNumber,
253	/// A handle to unpin the block on drop.
254	pub unpin_handle: UnpinHandle,
255}
256
257impl From<BlockImportNotification<Block>> for BlockInfo {
258	fn from(n: BlockImportNotification<Block>) -> Self {
259		let hash = n.hash;
260		let parent_hash = n.header.parent_hash;
261		let number = n.header.number;
262		let unpin_handle = n.into_unpin_handle();
263
264		BlockInfo { hash, parent_hash, number, unpin_handle }
265	}
266}
267
268impl From<FinalityNotification<Block>> for BlockInfo {
269	fn from(n: FinalityNotification<Block>) -> Self {
270		let hash = n.hash;
271		let parent_hash = n.header.parent_hash;
272		let number = n.header.number;
273		let unpin_handle = n.into_unpin_handle();
274
275		BlockInfo { hash, parent_hash, number, unpin_handle }
276	}
277}
278
279/// An event from outside the overseer scope, such
280/// as the substrate framework or user interaction.
281#[derive(Debug)]
282pub enum Event {
283	/// A new block was imported.
284	///
285	/// This event is not sent if the block was already known
286	/// and we reorged to it e.g. due to a reversion.
287	///
288	/// Also, these events are not sent during a major sync.
289	BlockImported(BlockInfo),
290	/// A block was finalized with i.e. babe or another consensus algorithm.
291	BlockFinalized(BlockInfo),
292	/// Message as sent to a subsystem.
293	MsgToSubsystem {
294		/// The actual message.
295		msg: AllMessages,
296		/// The originating subsystem name.
297		origin: &'static str,
298	},
299	/// A request from the outer world.
300	ExternalRequest(ExternalRequest),
301	/// Stop the overseer on i.e. a UNIX signal.
302	Stop,
303}
304
305/// Some request from outer world.
306#[derive(Debug)]
307pub enum ExternalRequest {
308	/// Wait for the activation of a particular hash
309	/// and be notified by means of the return channel.
310	WaitForActivation {
311		/// The relay parent for which activation to wait for.
312		hash: Hash,
313		/// Response channel to await on.
314		response_channel: oneshot::Sender<SubsystemResult<()>>,
315	},
316}
317
318/// Glues together the [`Overseer`] and `BlockchainEvents` by forwarding
319/// import and finality notifications into the [`OverseerHandle`].
320pub async fn forward_events<P: BlockchainEvents<Block>>(client: Arc<P>, mut handle: Handle) {
321	let mut finality = client.finality_notification_stream();
322	let mut imports = client.import_notification_stream();
323
324	loop {
325		select! {
326			f = finality.next() => {
327				match f {
328					Some(block) => {
329						handle.block_finalized(block.into()).await;
330					}
331					None => break,
332				}
333			},
334			i = imports.next() => {
335				match i {
336					Some(block) => {
337						handle.block_imported(block.into()).await;
338					}
339					None => break,
340				}
341			},
342			complete => break,
343		}
344	}
345}
346
347/// Create a new instance of the [`Overseer`] with a fixed set of [`Subsystem`]s.
348///
349/// This returns the overseer along with an [`OverseerHandle`] which can
350/// be used to send messages from external parts of the codebase.
351///
352/// The [`OverseerHandle`] returned from this function is connected to
353/// the returned [`Overseer`].
354///
355/// ```text
356///                  +------------------------------------+
357///                  |            Overseer                |
358///                  +------------------------------------+
359///                    /            |             |      \
360///      ................. subsystems...................................
361///      . +-----------+    +-----------+   +----------+   +---------+ .
362///      . |           |    |           |   |          |   |         | .
363///      . +-----------+    +-----------+   +----------+   +---------+ .
364///      ...............................................................
365///                              |
366///                        probably `spawn`
367///                            a `job`
368///                              |
369///                              V
370///                         +-----------+
371///                         |           |
372///                         +-----------+
373/// ```
374///
375/// [`Subsystem`]: trait.Subsystem.html
376///
377/// # Example
378///
379/// The [`Subsystems`] may be any type as long as they implement an expected interface.
380/// Here, we create a mock validation subsystem and a few dummy ones and start the `Overseer` with
381/// them. For the sake of simplicity the termination of the example is done with a timeout.
382/// ```
383/// # use std::time::Duration;
384/// # use futures::{executor, pin_mut, select, FutureExt};
385/// # use futures_timer::Delay;
386/// # use polkadot_primitives::Hash;
387/// # use polkadot_overseer::{
388/// # 	self as overseer,
389/// #   OverseerSignal,
390/// # 	SubsystemSender as _,
391/// # 	AllMessages,
392/// # 	HeadSupportsParachains,
393/// # 	Overseer,
394/// # 	SubsystemError,
395/// # 	gen::{
396/// # 		SubsystemContext,
397/// # 		FromOrchestra,
398/// # 		SpawnedSubsystem,
399/// # 	},
400/// # };
401/// # use polkadot_node_subsystem_types::messages::{
402/// # 	CandidateValidationMessage, CandidateBackingMessage,
403/// # 	NetworkBridgeTxMessage,
404/// # };
405///
406/// struct ValidationSubsystem;
407///
408/// impl<Ctx> overseer::Subsystem<Ctx, SubsystemError> for ValidationSubsystem
409/// where
410///     Ctx: overseer::SubsystemContext<
411/// 				Message=CandidateValidationMessage,
412/// 				AllMessages=AllMessages,
413/// 				Signal=OverseerSignal,
414/// 				Error=SubsystemError,
415/// 			>,
416/// {
417///     fn start(
418///         self,
419///         mut ctx: Ctx,
420///     ) -> SpawnedSubsystem<SubsystemError> {
421///         SpawnedSubsystem {
422///             name: "validation-subsystem",
423///             future: Box::pin(async move {
424///                 loop {
425///                     Delay::new(Duration::from_secs(1)).await;
426///                 }
427///             }),
428///         }
429///     }
430/// }
431///
432/// # fn main() { executor::block_on(async move {
433///
434/// struct AlwaysSupportsParachains;
435///
436/// #[async_trait::async_trait]
437/// impl HeadSupportsParachains for AlwaysSupportsParachains {
438///      async fn head_supports_parachains(&self, _head: &Hash) -> bool { true }
439/// }
440///
441/// let spawner = sp_core::testing::TaskExecutor::new();
442/// let (overseer, _handle) = dummy_overseer_builder(spawner, AlwaysSupportsParachains, None)
443/// 		.unwrap()
444/// 		.replace_candidate_validation(|_| ValidationSubsystem)
445/// 		.build()
446/// 		.unwrap();
447///
448/// let timer = Delay::new(Duration::from_millis(50)).fuse();
449///
450/// let overseer_fut = overseer.run().fuse();
451/// pin_mut!(timer);
452/// pin_mut!(overseer_fut);
453///
454/// select! {
455///     _ = overseer_fut => (),
456///     _ = timer => (),
457/// }
458/// #
459/// # 	});
460/// # }
461/// ```
462#[orchestra(
463	gen=AllMessages,
464	event=Event,
465	signal=OverseerSignal,
466	error=SubsystemError,
467	message_capacity=2048,
468)]
469pub struct Overseer<SupportsParachains> {
470	#[subsystem(CandidateValidationMessage, sends: [
471		RuntimeApiMessage,
472	])]
473	candidate_validation: CandidateValidation,
474
475	#[subsystem(sends: [
476		CandidateValidationMessage,
477		RuntimeApiMessage,
478	])]
479	pvf_checker: PvfChecker,
480
481	#[subsystem(CandidateBackingMessage, sends: [
482		CandidateValidationMessage,
483		CollatorProtocolMessage,
484		ChainApiMessage,
485		AvailabilityDistributionMessage,
486		AvailabilityStoreMessage,
487		StatementDistributionMessage,
488		ProvisionerMessage,
489		RuntimeApiMessage,
490		ProspectiveParachainsMessage,
491	])]
492	candidate_backing: CandidateBacking,
493
494	#[subsystem(StatementDistributionMessage, sends: [
495		NetworkBridgeTxMessage,
496		CandidateBackingMessage,
497		RuntimeApiMessage,
498		ProspectiveParachainsMessage,
499		ChainApiMessage,
500	], can_receive_priority_messages)]
501	statement_distribution: StatementDistribution,
502
503	#[subsystem(AvailabilityDistributionMessage, sends: [
504		AvailabilityStoreMessage,
505		ChainApiMessage,
506		RuntimeApiMessage,
507		NetworkBridgeTxMessage,
508	])]
509	availability_distribution: AvailabilityDistribution,
510
511	#[subsystem(AvailabilityRecoveryMessage, sends: [
512		NetworkBridgeTxMessage,
513		RuntimeApiMessage,
514		AvailabilityStoreMessage,
515	])]
516	availability_recovery: AvailabilityRecovery,
517
518	#[subsystem(blocking, sends: [
519		AvailabilityStoreMessage,
520		RuntimeApiMessage,
521		BitfieldDistributionMessage,
522	])]
523	bitfield_signing: BitfieldSigning,
524
525	#[subsystem(blocking, message_capacity: 8192, BitfieldDistributionMessage, sends: [
526		RuntimeApiMessage,
527		NetworkBridgeTxMessage,
528		ProvisionerMessage,
529	], can_receive_priority_messages)]
530	bitfield_distribution: BitfieldDistribution,
531
532	#[subsystem(ProvisionerMessage, sends: [
533		RuntimeApiMessage,
534		CandidateBackingMessage,
535		ChainApiMessage,
536		DisputeCoordinatorMessage,
537		ProspectiveParachainsMessage,
538	])]
539	provisioner: Provisioner,
540
541	#[subsystem(blocking, RuntimeApiMessage, sends: [])]
542	runtime_api: RuntimeApi,
543
544	#[subsystem(blocking, AvailabilityStoreMessage, sends: [
545		ChainApiMessage,
546		RuntimeApiMessage,
547	])]
548	availability_store: AvailabilityStore,
549
550	#[subsystem(blocking, NetworkBridgeRxMessage, sends: [
551		BitfieldDistributionMessage,
552		StatementDistributionMessage,
553		ApprovalDistributionMessage,
554		ApprovalVotingParallelMessage,
555		GossipSupportMessage,
556		DisputeDistributionMessage,
557		CollationGenerationMessage,
558		CollatorProtocolMessage,
559	])]
560	network_bridge_rx: NetworkBridgeRx,
561
562	#[subsystem(blocking, NetworkBridgeTxMessage, sends: [])]
563	network_bridge_tx: NetworkBridgeTx,
564
565	#[subsystem(blocking, ChainApiMessage, sends: [])]
566	chain_api: ChainApi,
567
568	#[subsystem(CollationGenerationMessage, sends: [
569		RuntimeApiMessage,
570		CollatorProtocolMessage,
571	])]
572	collation_generation: CollationGeneration,
573
574	#[subsystem(CollatorProtocolMessage, sends: [
575		NetworkBridgeTxMessage,
576		RuntimeApiMessage,
577		CandidateBackingMessage,
578		ChainApiMessage,
579		ProspectiveParachainsMessage,
580	])]
581	collator_protocol: CollatorProtocol,
582
583	#[subsystem(blocking, message_capacity: 64000, ApprovalDistributionMessage, sends: [
584		NetworkBridgeTxMessage,
585		ApprovalVotingMessage,
586		RuntimeApiMessage,
587	], can_receive_priority_messages)]
588	approval_distribution: ApprovalDistribution,
589
590	#[subsystem(blocking, ApprovalVotingMessage, sends: [
591		ApprovalDistributionMessage,
592		AvailabilityRecoveryMessage,
593		CandidateValidationMessage,
594		ChainApiMessage,
595		ChainSelectionMessage,
596		DisputeCoordinatorMessage,
597		RuntimeApiMessage,
598	])]
599	approval_voting: ApprovalVoting,
600	#[subsystem(blocking, message_capacity: 64000, ApprovalVotingParallelMessage, sends: [
601		AvailabilityRecoveryMessage,
602		CandidateValidationMessage,
603		ChainApiMessage,
604		ChainSelectionMessage,
605		DisputeCoordinatorMessage,
606		RuntimeApiMessage,
607		NetworkBridgeTxMessage,
608		ApprovalVotingMessage,
609		ApprovalDistributionMessage,
610		ApprovalVotingParallelMessage,
611	])]
612	approval_voting_parallel: ApprovalVotingParallel,
613	#[subsystem(GossipSupportMessage, sends: [
614		NetworkBridgeTxMessage,
615		NetworkBridgeRxMessage, // TODO <https://github.com/paritytech/polkadot/issues/5626>
616		RuntimeApiMessage,
617		ChainSelectionMessage,
618	], can_receive_priority_messages)]
619	gossip_support: GossipSupport,
620
621	#[subsystem(blocking, message_capacity: 32000, DisputeCoordinatorMessage, sends: [
622		RuntimeApiMessage,
623		ChainApiMessage,
624		DisputeDistributionMessage,
625		CandidateValidationMessage,
626		ApprovalVotingMessage,
627		AvailabilityStoreMessage,
628		AvailabilityRecoveryMessage,
629		ChainSelectionMessage,
630		ApprovalVotingParallelMessage,
631	])]
632	dispute_coordinator: DisputeCoordinator,
633
634	#[subsystem(DisputeDistributionMessage, sends: [
635		RuntimeApiMessage,
636		DisputeCoordinatorMessage,
637		NetworkBridgeTxMessage,
638	])]
639	dispute_distribution: DisputeDistribution,
640
641	#[subsystem(blocking, ChainSelectionMessage, sends: [ChainApiMessage])]
642	chain_selection: ChainSelection,
643
644	#[subsystem(ProspectiveParachainsMessage, sends: [
645		RuntimeApiMessage,
646		ChainApiMessage,
647	])]
648	prospective_parachains: ProspectiveParachains,
649
650	/// External listeners waiting for a hash to be in the active-leave set.
651	pub activation_external_listeners: HashMap<Hash, Vec<oneshot::Sender<SubsystemResult<()>>>>,
652
653	/// Stores the [`jaeger::Span`] per active leaf.
654	pub span_per_active_leaf: HashMap<Hash, Arc<jaeger::Span>>,
655
656	/// The set of the "active leaves".
657	pub active_leaves: HashMap<Hash, BlockNumber>,
658
659	/// An implementation for checking whether a header supports parachain consensus.
660	pub supports_parachains: SupportsParachains,
661
662	/// Various Prometheus metrics.
663	pub metrics: OverseerMetrics,
664}
665
666/// Spawn the metrics metronome task.
667pub fn spawn_metronome_metrics<S, SupportsParachains>(
668	overseer: &mut Overseer<S, SupportsParachains>,
669	metronome_metrics: OverseerMetrics,
670) -> Result<(), SubsystemError>
671where
672	S: Spawner,
673	SupportsParachains: HeadSupportsParachains,
674{
675	struct ExtractNameAndMeters;
676
677	impl<'a, T: 'a> MapSubsystem<&'a OrchestratedSubsystem<T>> for ExtractNameAndMeters {
678		type Output = Option<(&'static str, SubsystemMeters)>;
679
680		fn map_subsystem(&self, subsystem: &'a OrchestratedSubsystem<T>) -> Self::Output {
681			subsystem
682				.instance
683				.as_ref()
684				.map(|instance| (instance.name, instance.meters.clone()))
685		}
686	}
687	let subsystem_meters = overseer.map_subsystems(ExtractNameAndMeters);
688
689	#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
690	let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> =
691		match memory_stats::MemoryAllocationTracker::new() {
692			Ok(memory_stats) =>
693				Box::new(move |metrics: &OverseerMetrics| match memory_stats.snapshot() {
694					Ok(memory_stats_snapshot) => {
695						gum::trace!(
696							target: LOG_TARGET,
697							"memory_stats: {:?}",
698							&memory_stats_snapshot
699						);
700						metrics.memory_stats_snapshot(memory_stats_snapshot);
701					},
702					Err(e) =>
703						gum::debug!(target: LOG_TARGET, "Failed to obtain memory stats: {:?}", e),
704				}),
705			Err(_) => {
706				gum::debug!(
707					target: LOG_TARGET,
708					"Memory allocation tracking is not supported by the allocator.",
709				);
710
711				Box::new(|_| {})
712			},
713		};
714
715	#[cfg(not(any(target_os = "linux", feature = "jemalloc-allocator")))]
716	let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> = Box::new(|_| {});
717
718	let metronome = Metronome::new(std::time::Duration::from_millis(950)).for_each(move |_| {
719		collect_memory_stats(&metronome_metrics);
720
721		// We combine the amount of messages from subsystems to the overseer
722		// as well as the amount of messages from external sources to the overseer
723		// into one `to_overseer` value.
724		metronome_metrics.channel_metrics_snapshot(
725			subsystem_meters
726				.iter()
727				.cloned()
728				.flatten()
729				.map(|(name, ref meters)| (name, meters.read())),
730		);
731
732		futures::future::ready(())
733	});
734	overseer
735		.spawner()
736		.spawn("metrics-metronome", Some("overseer"), Box::pin(metronome));
737
738	Ok(())
739}
740
741impl<S, SupportsParachains> Overseer<S, SupportsParachains>
742where
743	SupportsParachains: HeadSupportsParachains,
744	S: Spawner,
745{
746	/// Stop the `Overseer`.
747	async fn stop(mut self) {
748		let _ = self.wait_terminate(OverseerSignal::Conclude, Duration::from_secs(1_u64)).await;
749	}
750
751	/// Run the `Overseer`.
752	///
753	/// Logging any errors.
754	pub async fn run(self) {
755		if let Err(err) = self.run_inner().await {
756			gum::error!(target: LOG_TARGET, ?err, "Overseer exited with error");
757		}
758	}
759
760	async fn run_inner(mut self) -> SubsystemResult<()> {
761		let metrics = self.metrics.clone();
762		spawn_metronome_metrics(&mut self, metrics)?;
763
764		loop {
765			select! {
766				msg = self.events_rx.select_next_some() => {
767					match msg {
768						Event::MsgToSubsystem { msg, origin } => {
769							self.route_message(msg.into(), origin).await?;
770							self.metrics.on_message_relayed();
771						}
772						Event::Stop => {
773							self.stop().await;
774							return Ok(());
775						}
776						Event::BlockImported(block) => {
777							self.block_imported(block).await?;
778						}
779						Event::BlockFinalized(block) => {
780							self.block_finalized(block).await?;
781						}
782						Event::ExternalRequest(request) => {
783							self.handle_external_request(request);
784						}
785					}
786				},
787				msg = self.to_orchestra_rx.select_next_some() => {
788					match msg {
789						ToOrchestra::SpawnJob { name, subsystem, s } => {
790							self.spawn_job(name, subsystem, s);
791						}
792						ToOrchestra::SpawnBlockingJob { name, subsystem, s } => {
793							self.spawn_blocking_job(name, subsystem, s);
794						}
795					}
796				},
797				res = self.running_subsystems.select_next_some() => {
798					gum::error!(
799						target: LOG_TARGET,
800						subsystem = ?res,
801						"subsystem finished unexpectedly",
802					);
803					self.stop().await;
804					return res;
805				},
806			}
807		}
808	}
809
810	async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
811		match self.active_leaves.entry(block.hash) {
812			hash_map::Entry::Vacant(entry) => entry.insert(block.number),
813			hash_map::Entry::Occupied(entry) => {
814				debug_assert_eq!(*entry.get(), block.number);
815				return Ok(())
816			},
817		};
818
819		let mut update = match self.on_head_activated(&block.hash, Some(block.parent_hash)).await {
820			Some(span) => ActiveLeavesUpdate::start_work(ActivatedLeaf {
821				hash: block.hash,
822				number: block.number,
823				unpin_handle: block.unpin_handle,
824				span,
825			}),
826			None => ActiveLeavesUpdate::default(),
827		};
828
829		if let Some(number) = self.active_leaves.remove(&block.parent_hash) {
830			debug_assert_eq!(block.number.saturating_sub(1), number);
831			update.deactivated.push(block.parent_hash);
832			self.on_head_deactivated(&block.parent_hash);
833		}
834
835		self.clean_up_external_listeners();
836
837		if !update.is_empty() {
838			self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
839		}
840		Ok(())
841	}
842
843	async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
844		let mut update = ActiveLeavesUpdate::default();
845
846		self.active_leaves.retain(|h, n| {
847			// prune all orphaned leaves, but don't prune
848			// the finalized block if it is itself a leaf.
849			if *n <= block.number && *h != block.hash {
850				update.deactivated.push(*h);
851				false
852			} else {
853				true
854			}
855		});
856
857		for deactivated in &update.deactivated {
858			self.on_head_deactivated(deactivated)
859		}
860
861		self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number))
862			.await?;
863
864		// If there are no leaves being deactivated, we don't need to send an update.
865		//
866		// Our peers will be informed about our finalized block the next time we
867		// activating/deactivating some leaf.
868		if !update.is_empty() {
869			self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
870		}
871
872		Ok(())
873	}
874
875	/// Handles a header activation. If the header's state doesn't support the parachains API,
876	/// this returns `None`.
877	async fn on_head_activated(
878		&mut self,
879		hash: &Hash,
880		parent_hash: Option<Hash>,
881	) -> Option<Arc<jaeger::Span>> {
882		if !self.supports_parachains.head_supports_parachains(hash).await {
883			return None
884		}
885
886		self.metrics.on_head_activated();
887		if let Some(listeners) = self.activation_external_listeners.remove(hash) {
888			gum::trace!(
889				target: LOG_TARGET,
890				relay_parent = ?hash,
891				"Leaf got activated, notifying external listeners"
892			);
893			for listener in listeners {
894				// it's fine if the listener is no longer interested
895				let _ = listener.send(Ok(()));
896			}
897		}
898
899		let mut span = jaeger::Span::new(*hash, "leaf-activated");
900
901		if let Some(parent_span) = parent_hash.and_then(|h| self.span_per_active_leaf.get(&h)) {
902			span.add_follows_from(parent_span);
903		}
904
905		let span = Arc::new(span);
906		self.span_per_active_leaf.insert(*hash, span.clone());
907
908		Some(span)
909	}
910
911	fn on_head_deactivated(&mut self, hash: &Hash) {
912		self.metrics.on_head_deactivated();
913		self.activation_external_listeners.remove(hash);
914		self.span_per_active_leaf.remove(hash);
915	}
916
917	fn clean_up_external_listeners(&mut self) {
918		self.activation_external_listeners.retain(|_, v| {
919			// remove dead listeners
920			v.retain(|c| !c.is_canceled());
921			!v.is_empty()
922		})
923	}
924
925	fn handle_external_request(&mut self, request: ExternalRequest) {
926		match request {
927			ExternalRequest::WaitForActivation { hash, response_channel } => {
928				if self.active_leaves.get(&hash).is_some() {
929					gum::trace!(
930						target: LOG_TARGET,
931						relay_parent = ?hash,
932						"Leaf was already ready - answering `WaitForActivation`"
933					);
934					// it's fine if the listener is no longer interested
935					let _ = response_channel.send(Ok(()));
936				} else {
937					gum::trace!(
938						target: LOG_TARGET,
939						relay_parent = ?hash,
940						"Leaf not yet ready - queuing `WaitForActivation` sender"
941					);
942					self.activation_external_listeners
943						.entry(hash)
944						.or_default()
945						.push(response_channel);
946				}
947			},
948		}
949	}
950
951	fn spawn_job(
952		&mut self,
953		task_name: &'static str,
954		subsystem_name: Option<&'static str>,
955		j: BoxFuture<'static, ()>,
956	) {
957		self.spawner.spawn(task_name, subsystem_name, j);
958	}
959
960	fn spawn_blocking_job(
961		&mut self,
962		task_name: &'static str,
963		subsystem_name: Option<&'static str>,
964		j: BoxFuture<'static, ()>,
965	) {
966		self.spawner.spawn_blocking(task_name, subsystem_name, j);
967	}
968}