referrerpolicy=no-referrer-when-downgrade

polkadot_overseer/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! # Overseer
18//!
19//! `overseer` implements the Overseer architecture described in the
20//! [implementers' guide][overseer-page].
21//! For the motivations behind implementing the overseer itself you should
22//! check out that guide, documentation in this crate will be mostly discussing
23//! technical stuff.
24//!
25//! An `Overseer` is something that allows spawning/stopping and overseeing
26//! asynchronous tasks as well as establishing a well-defined and easy to use
27//! protocol that the tasks can use to communicate with each other. It is desired
28//! that this protocol is the only way tasks communicate with each other, however
29//! at this moment there are no foolproof guards against other ways of communication.
30//!
31//! The `Overseer` is instantiated with a pre-defined set of `Subsystems` that
32//! share the same behavior from `Overseer`'s point of view.
33//!
34//! ```text
35//!                              +-----------------------------+
36//!                              |         Overseer            |
37//!                              +-----------------------------+
38//!
39//!             ................|  Overseer "holds" these and uses |..............
40//!             .                  them to (re)start things                      .
41//!             .                                                                .
42//!             .  +-------------------+                +---------------------+  .
43//!             .  |   Subsystem1      |                |   Subsystem2        |  .
44//!             .  +-------------------+                +---------------------+  .
45//!             .           |                                       |            .
46//!             ..................................................................
47//!                         |                                       |
48//!                       start()                                 start()
49//!                         V                                       V
50//!             ..................| Overseer "runs" these |.......................
51//!             .  +--------------------+               +---------------------+  .
52//!             .  | SubsystemInstance1 |               | SubsystemInstance2  |  .
53//!             .  +--------------------+               +---------------------+  .
54//!             ..................................................................
55//! ```
56//!
57//! [overseer-page]: https://paritytech.github.io/polkadot-sdk/book/node/overseer.html
58
59// #![deny(unused_results)]
60// unused dependencies can not work for test and examples at the same time
61// yielding false positives
62#![warn(missing_docs)]
63// TODO https://github.com/paritytech/polkadot-sdk/issues/5793
64#![allow(dead_code, irrefutable_let_patterns)]
65
66use std::{
67	collections::{hash_map, HashMap},
68	fmt::{self, Debug},
69	pin::Pin,
70	sync::Arc,
71	time::Duration,
72};
73
74use futures::{channel::oneshot, future::BoxFuture, select, Future, FutureExt, StreamExt};
75
76use polkadot_primitives::{Block, BlockNumber, Hash};
77use sc_client_api::{BlockImportNotification, BlockchainEvents, FinalityNotification};
78
79use self::messages::{BitfieldSigningMessage, PvfCheckerMessage};
80use polkadot_node_subsystem_types::messages::{
81	ApprovalDistributionMessage, ApprovalVotingMessage, ApprovalVotingParallelMessage,
82	AvailabilityDistributionMessage, AvailabilityRecoveryMessage, AvailabilityStoreMessage,
83	BitfieldDistributionMessage, CandidateBackingMessage, CandidateValidationMessage,
84	ChainApiMessage, ChainSelectionMessage, CollationGenerationMessage, CollatorProtocolMessage,
85	DisputeCoordinatorMessage, DisputeDistributionMessage, GossipSupportMessage,
86	NetworkBridgeRxMessage, NetworkBridgeTxMessage, ProspectiveParachainsMessage,
87	ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage,
88};
89
90pub use polkadot_node_subsystem_types::{
91	errors::{SubsystemError, SubsystemResult},
92	ActivatedLeaf, ActiveLeavesUpdate, ChainApiBackend, OverseerSignal, RuntimeApiSubsystemClient,
93	UnpinHandle,
94};
95
96pub mod metrics;
97pub use self::metrics::Metrics as OverseerMetrics;
98
99/// A dummy subsystem, mostly useful for placeholders and tests.
100pub mod dummy;
101pub use self::dummy::DummySubsystem;
102
103pub use polkadot_node_metrics::{
104	metrics::{prometheus, Metrics as MetricsTrait},
105	Metronome,
106};
107
108pub use orchestra as gen;
109pub use orchestra::{
110	contextbounds, orchestra, subsystem, FromOrchestra, HighPriority, MapSubsystem, MessagePacket,
111	NormalPriority, OrchestraError as OverseerError, Priority, PriorityLevel, SignalsReceived,
112	Spawner, Subsystem, SubsystemContext, SubsystemIncomingMessages, SubsystemInstance,
113	SubsystemMeterReadouts, SubsystemMeters, SubsystemSender, TimeoutExt, ToOrchestra,
114	TrySendError,
115};
116
117#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
118mod memory_stats;
119#[cfg(test)]
120mod tests;
121
122use sp_core::traits::SpawnNamed;
123
124/// Glue to connect `trait orchestra::Spawner` and `SpawnNamed` from `substrate`.
125pub struct SpawnGlue<S>(pub S);
126
127impl<S> AsRef<S> for SpawnGlue<S> {
128	fn as_ref(&self) -> &S {
129		&self.0
130	}
131}
132
133impl<S: Clone> Clone for SpawnGlue<S> {
134	fn clone(&self) -> Self {
135		Self(self.0.clone())
136	}
137}
138
139impl<S: SpawnNamed + Clone + Send + Sync> crate::gen::Spawner for SpawnGlue<S> {
140	fn spawn_blocking(
141		&self,
142		name: &'static str,
143		group: Option<&'static str>,
144		future: futures::future::BoxFuture<'static, ()>,
145	) {
146		SpawnNamed::spawn_blocking(&self.0, name, group, future)
147	}
148	fn spawn(
149		&self,
150		name: &'static str,
151		group: Option<&'static str>,
152		future: futures::future::BoxFuture<'static, ()>,
153	) {
154		SpawnNamed::spawn(&self.0, name, group, future)
155	}
156}
157
158/// Whether a header supports parachain consensus or not.
159#[async_trait::async_trait]
160pub trait HeadSupportsParachains {
161	/// Return true if the given header supports parachain consensus. Otherwise, false.
162	async fn head_supports_parachains(&self, head: &Hash) -> bool;
163}
164
165#[async_trait::async_trait]
166impl<Client> HeadSupportsParachains for Arc<Client>
167where
168	Client: RuntimeApiSubsystemClient + Sync + Send,
169{
170	async fn head_supports_parachains(&self, head: &Hash) -> bool {
171		// Check that the `ParachainHost` runtime api is at least with version 1 present on chain.
172		self.api_version_parachain_host(*head).await.ok().flatten().unwrap_or(0) >= 1
173	}
174}
175
176/// A handle used to communicate with the [`Overseer`].
177///
178/// [`Overseer`]: struct.Overseer.html
179#[derive(Clone)]
180pub struct Handle(OverseerHandle);
181
182impl Handle {
183	/// Create a new [`Handle`].
184	pub fn new(raw: OverseerHandle) -> Self {
185		Self(raw)
186	}
187
188	/// Inform the `Overseer` that that some block was imported.
189	pub async fn block_imported(&mut self, block: BlockInfo) {
190		self.send_and_log_error(Event::BlockImported(block)).await
191	}
192
193	/// Send some message with normal priority to one of the `Subsystem`s.
194	pub async fn send_msg(&mut self, msg: impl Into<AllMessages>, origin: &'static str) {
195		self.send_msg_with_priority(msg, origin, PriorityLevel::Normal).await
196	}
197
198	/// Send some message with the specified priority to one of the `Subsystem`s.
199	pub async fn send_msg_with_priority(
200		&mut self,
201		msg: impl Into<AllMessages>,
202		origin: &'static str,
203		priority: PriorityLevel,
204	) {
205		self.send_and_log_error(Event::MsgToSubsystem { msg: msg.into(), origin, priority })
206			.await
207	}
208
209	/// Send a message not providing an origin.
210	#[inline(always)]
211	pub async fn send_msg_anon(&mut self, msg: impl Into<AllMessages>) {
212		self.send_msg(msg, "").await
213	}
214
215	/// Inform the `Overseer` that some block was finalized.
216	pub async fn block_finalized(&mut self, block: BlockInfo) {
217		self.send_and_log_error(Event::BlockFinalized(block)).await
218	}
219
220	/// Wait for a block with the given hash to be in the active-leaves set.
221	///
222	/// The response channel responds if the hash was activated and is closed if the hash was
223	/// deactivated. Note that due the fact the overseer doesn't store the whole active-leaves set,
224	/// only deltas, the response channel may never return if the hash was deactivated before this
225	/// call. In this case, it's the caller's responsibility to ensure a timeout is set.
226	pub async fn wait_for_activation(
227		&mut self,
228		hash: Hash,
229		response_channel: oneshot::Sender<SubsystemResult<()>>,
230	) {
231		self.send_and_log_error(Event::ExternalRequest(ExternalRequest::WaitForActivation {
232			hash,
233			response_channel,
234		}))
235		.await;
236	}
237
238	/// Tell `Overseer` to shutdown.
239	pub async fn stop(&mut self) {
240		self.send_and_log_error(Event::Stop).await;
241	}
242
243	/// Most basic operation, to stop a server.
244	async fn send_and_log_error(&mut self, event: Event) {
245		if self.0.send(event).await.is_err() {
246			gum::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
247		}
248	}
249}
250
251/// An event telling the `Overseer` on the particular block
252/// that has been imported or finalized.
253///
254/// This structure exists solely for the purposes of decoupling
255/// `Overseer` code from the client code and the necessity to call
256/// `HeaderBackend::block_number_from_id()`.
257#[derive(Debug, Clone)]
258pub struct BlockInfo {
259	/// Hash of the block.
260	pub hash: Hash,
261	/// Hash of the parent block.
262	pub parent_hash: Hash,
263	/// Block's number.
264	pub number: BlockNumber,
265	/// A handle to unpin the block on drop.
266	pub unpin_handle: UnpinHandle,
267}
268
269impl From<BlockImportNotification<Block>> for BlockInfo {
270	fn from(n: BlockImportNotification<Block>) -> Self {
271		let hash = n.hash;
272		let parent_hash = n.header.parent_hash;
273		let number = n.header.number;
274		let unpin_handle = n.into_unpin_handle();
275
276		BlockInfo { hash, parent_hash, number, unpin_handle }
277	}
278}
279
280impl From<FinalityNotification<Block>> for BlockInfo {
281	fn from(n: FinalityNotification<Block>) -> Self {
282		let hash = n.hash;
283		let parent_hash = n.header.parent_hash;
284		let number = n.header.number;
285		let unpin_handle = n.into_unpin_handle();
286
287		BlockInfo { hash, parent_hash, number, unpin_handle }
288	}
289}
290
291/// An event from outside the overseer scope, such
292/// as the substrate framework or user interaction.
293#[derive(Debug)]
294pub enum Event {
295	/// A new block was imported.
296	///
297	/// This event is not sent if the block was already known
298	/// and we reorged to it e.g. due to a reversion.
299	///
300	/// Also, these events are not sent during a major sync.
301	BlockImported(BlockInfo),
302	/// A block was finalized with i.e. babe or another consensus algorithm.
303	BlockFinalized(BlockInfo),
304	/// Message as sent to a subsystem.
305	MsgToSubsystem {
306		/// The actual message.
307		msg: AllMessages,
308		/// The originating subsystem name.
309		origin: &'static str,
310		/// The priority of the message.
311		priority: PriorityLevel,
312	},
313	/// A request from the outer world.
314	ExternalRequest(ExternalRequest),
315	/// Stop the overseer on i.e. a UNIX signal.
316	Stop,
317}
318
319/// Some request from outer world.
320#[derive(Debug)]
321pub enum ExternalRequest {
322	/// Wait for the activation of a particular hash
323	/// and be notified by means of the return channel.
324	WaitForActivation {
325		/// The relay parent for which activation to wait for.
326		hash: Hash,
327		/// Response channel to await on.
328		response_channel: oneshot::Sender<SubsystemResult<()>>,
329	},
330}
331
332/// Glues together the [`Overseer`] and `BlockchainEvents` by forwarding
333/// import and finality notifications into the [`OverseerHandle`].
334pub async fn forward_events<P: BlockchainEvents<Block>>(client: Arc<P>, mut handle: Handle) {
335	let mut finality = client.finality_notification_stream();
336	let mut imports = client.import_notification_stream();
337
338	loop {
339		select! {
340			f = finality.next() => {
341				match f {
342					Some(block) => {
343						handle.block_finalized(block.into()).await;
344					}
345					None => break,
346				}
347			},
348			i = imports.next() => {
349				match i {
350					Some(block) => {
351						handle.block_imported(block.into()).await;
352					}
353					None => break,
354				}
355			},
356			complete => break,
357		}
358	}
359}
360
361/// Create a new instance of the [`Overseer`] with a fixed set of [`Subsystem`]s.
362///
363/// This returns the overseer along with an [`OverseerHandle`] which can
364/// be used to send messages from external parts of the codebase.
365///
366/// The [`OverseerHandle`] returned from this function is connected to
367/// the returned [`Overseer`].
368///
369/// ```text
370///                  +------------------------------------+
371///                  |            Overseer                |
372///                  +------------------------------------+
373///                    /            |             |      \
374///      ................. subsystems...................................
375///      . +-----------+    +-----------+   +----------+   +---------+ .
376///      . |           |    |           |   |          |   |         | .
377///      . +-----------+    +-----------+   +----------+   +---------+ .
378///      ...............................................................
379///                              |
380///                        probably `spawn`
381///                            a `job`
382///                              |
383///                              V
384///                         +-----------+
385///                         |           |
386///                         +-----------+
387/// ```
388///
389/// [`Subsystem`]: trait.Subsystem.html
390///
391/// # Example
392///
393/// The [`Subsystems`] may be any type as long as they implement an expected interface.
394/// Here, we create a mock validation subsystem and a few dummy ones and start the `Overseer` with
395/// them. For the sake of simplicity the termination of the example is done with a timeout.
396/// ```
397/// # use std::time::Duration;
398/// # use futures::{executor, pin_mut, select, FutureExt};
399/// # use futures_timer::Delay;
400/// # use polkadot_primitives::Hash;
401/// # use polkadot_overseer::{
402/// # 	self as overseer,
403/// #   OverseerSignal,
404/// # 	SubsystemSender as _,
405/// # 	AllMessages,
406/// # 	HeadSupportsParachains,
407/// # 	Overseer,
408/// # 	SubsystemError,
409/// # 	gen::{
410/// # 		SubsystemContext,
411/// # 		FromOrchestra,
412/// # 		SpawnedSubsystem,
413/// # 	},
414/// # };
415/// # use polkadot_node_subsystem_types::messages::{
416/// # 	CandidateValidationMessage, CandidateBackingMessage,
417/// # 	NetworkBridgeTxMessage,
418/// # };
419///
420/// struct ValidationSubsystem;
421///
422/// impl<Ctx> overseer::Subsystem<Ctx, SubsystemError> for ValidationSubsystem
423/// where
424///     Ctx: overseer::SubsystemContext<
425/// 				Message=CandidateValidationMessage,
426/// 				AllMessages=AllMessages,
427/// 				Signal=OverseerSignal,
428/// 				Error=SubsystemError,
429/// 			>,
430/// {
431///     fn start(
432///         self,
433///         mut ctx: Ctx,
434///     ) -> SpawnedSubsystem<SubsystemError> {
435///         SpawnedSubsystem {
436///             name: "validation-subsystem",
437///             future: Box::pin(async move {
438///                 loop {
439///                     Delay::new(Duration::from_secs(1)).await;
440///                 }
441///             }),
442///         }
443///     }
444/// }
445///
446/// # fn main() { executor::block_on(async move {
447///
448/// struct AlwaysSupportsParachains;
449///
450/// #[async_trait::async_trait]
451/// impl HeadSupportsParachains for AlwaysSupportsParachains {
452///      async fn head_supports_parachains(&self, _head: &Hash) -> bool { true }
453/// }
454///
455/// let spawner = sp_core::testing::TaskExecutor::new();
456/// let (overseer, _handle) = dummy_overseer_builder(spawner, AlwaysSupportsParachains, None)
457/// 		.unwrap()
458/// 		.replace_candidate_validation(|_| ValidationSubsystem)
459/// 		.build()
460/// 		.unwrap();
461///
462/// let timer = Delay::new(Duration::from_millis(50)).fuse();
463///
464/// let overseer_fut = overseer.run().fuse();
465/// pin_mut!(timer);
466/// pin_mut!(overseer_fut);
467///
468/// select! {
469///     _ = overseer_fut => (),
470///     _ = timer => (),
471/// }
472/// #
473/// # 	});
474/// # }
475/// ```
476#[orchestra(
477	gen=AllMessages,
478	event=Event,
479	signal=OverseerSignal,
480	error=SubsystemError,
481	message_capacity=2048,
482)]
483pub struct Overseer<SupportsParachains> {
484	#[subsystem(CandidateValidationMessage, sends: [
485		ChainApiMessage,
486		RuntimeApiMessage,
487	])]
488	candidate_validation: CandidateValidation,
489
490	#[subsystem(sends: [
491		CandidateValidationMessage,
492		RuntimeApiMessage,
493	])]
494	pvf_checker: PvfChecker,
495
496	#[subsystem(CandidateBackingMessage, sends: [
497		CandidateValidationMessage,
498		CollatorProtocolMessage,
499		ChainApiMessage,
500		AvailabilityDistributionMessage,
501		AvailabilityStoreMessage,
502		StatementDistributionMessage,
503		ProvisionerMessage,
504		RuntimeApiMessage,
505		ProspectiveParachainsMessage,
506	])]
507	candidate_backing: CandidateBacking,
508
509	#[subsystem(StatementDistributionMessage, sends: [
510		NetworkBridgeTxMessage,
511		CandidateBackingMessage,
512		RuntimeApiMessage,
513		ProspectiveParachainsMessage,
514		ChainApiMessage,
515	], can_receive_priority_messages)]
516	statement_distribution: StatementDistribution,
517
518	#[subsystem(AvailabilityDistributionMessage, sends: [
519		AvailabilityStoreMessage,
520		ChainApiMessage,
521		RuntimeApiMessage,
522		NetworkBridgeTxMessage,
523	])]
524	availability_distribution: AvailabilityDistribution,
525
526	#[subsystem(AvailabilityRecoveryMessage, sends: [
527		NetworkBridgeTxMessage,
528		RuntimeApiMessage,
529		AvailabilityStoreMessage,
530	])]
531	availability_recovery: AvailabilityRecovery,
532
533	#[subsystem(blocking, sends: [
534		AvailabilityStoreMessage,
535		RuntimeApiMessage,
536		BitfieldDistributionMessage,
537	])]
538	bitfield_signing: BitfieldSigning,
539
540	#[subsystem(blocking, message_capacity: 8192, BitfieldDistributionMessage, sends: [
541		RuntimeApiMessage,
542		NetworkBridgeTxMessage,
543		ProvisionerMessage,
544	], can_receive_priority_messages)]
545	bitfield_distribution: BitfieldDistribution,
546
547	#[subsystem(ProvisionerMessage, sends: [
548		RuntimeApiMessage,
549		CandidateBackingMessage,
550		DisputeCoordinatorMessage,
551		ProspectiveParachainsMessage,
552		ChainApiMessage,
553	])]
554	provisioner: Provisioner,
555
556	#[subsystem(blocking, RuntimeApiMessage, sends: [])]
557	runtime_api: RuntimeApi,
558
559	#[subsystem(blocking, AvailabilityStoreMessage, sends: [
560		ChainApiMessage,
561		RuntimeApiMessage,
562	])]
563	availability_store: AvailabilityStore,
564
565	#[subsystem(blocking, NetworkBridgeRxMessage, sends: [
566		BitfieldDistributionMessage,
567		StatementDistributionMessage,
568		ApprovalVotingParallelMessage,
569		GossipSupportMessage,
570		DisputeDistributionMessage,
571		CollationGenerationMessage,
572		CollatorProtocolMessage,
573	])]
574	network_bridge_rx: NetworkBridgeRx,
575
576	#[subsystem(blocking, NetworkBridgeTxMessage, sends: [])]
577	network_bridge_tx: NetworkBridgeTx,
578
579	#[subsystem(blocking, ChainApiMessage, sends: [])]
580	chain_api: ChainApi,
581
582	#[subsystem(CollationGenerationMessage, sends: [
583		RuntimeApiMessage,
584		CollatorProtocolMessage,
585	])]
586	collation_generation: CollationGeneration,
587
588	#[subsystem(CollatorProtocolMessage, sends: [
589		NetworkBridgeTxMessage,
590		RuntimeApiMessage,
591		CandidateBackingMessage,
592		ChainApiMessage,
593		ProspectiveParachainsMessage,
594	])]
595	collator_protocol: CollatorProtocol,
596
597	#[subsystem(blocking, message_capacity: 64000, ApprovalDistributionMessage, sends: [
598		NetworkBridgeTxMessage,
599		ApprovalVotingMessage,
600		RuntimeApiMessage,
601	], can_receive_priority_messages)]
602	approval_distribution: ApprovalDistribution,
603
604	#[subsystem(blocking, ApprovalVotingMessage, sends: [
605		ApprovalDistributionMessage,
606		AvailabilityRecoveryMessage,
607		CandidateValidationMessage,
608		ChainApiMessage,
609		ChainSelectionMessage,
610		DisputeCoordinatorMessage,
611		RuntimeApiMessage,
612	])]
613	approval_voting: ApprovalVoting,
614	#[subsystem(blocking, message_capacity: 64000, ApprovalVotingParallelMessage, sends: [
615		AvailabilityRecoveryMessage,
616		CandidateValidationMessage,
617		ChainApiMessage,
618		ChainSelectionMessage,
619		DisputeCoordinatorMessage,
620		RuntimeApiMessage,
621		NetworkBridgeTxMessage,
622		ApprovalVotingParallelMessage,
623	], can_receive_priority_messages)]
624	approval_voting_parallel: ApprovalVotingParallel,
625	#[subsystem(GossipSupportMessage, sends: [
626		NetworkBridgeTxMessage,
627		NetworkBridgeRxMessage, // TODO <https://github.com/paritytech/polkadot/issues/5626>
628		RuntimeApiMessage,
629		ChainSelectionMessage,
630		ChainApiMessage,
631	], can_receive_priority_messages)]
632	gossip_support: GossipSupport,
633
634	#[subsystem(blocking, message_capacity: 32000, DisputeCoordinatorMessage, sends: [
635		RuntimeApiMessage,
636		ChainApiMessage,
637		DisputeDistributionMessage,
638		CandidateValidationMessage,
639		AvailabilityStoreMessage,
640		AvailabilityRecoveryMessage,
641		ChainSelectionMessage,
642		ApprovalVotingParallelMessage,
643	], can_receive_priority_messages)]
644	dispute_coordinator: DisputeCoordinator,
645
646	#[subsystem(DisputeDistributionMessage, sends: [
647		RuntimeApiMessage,
648		DisputeCoordinatorMessage,
649		NetworkBridgeTxMessage,
650	])]
651	dispute_distribution: DisputeDistribution,
652
653	#[subsystem(blocking, ChainSelectionMessage, sends: [ChainApiMessage])]
654	chain_selection: ChainSelection,
655
656	#[subsystem(ProspectiveParachainsMessage, sends: [
657		RuntimeApiMessage,
658		ChainApiMessage,
659	])]
660	prospective_parachains: ProspectiveParachains,
661
662	/// External listeners waiting for a hash to be in the active-leave set.
663	pub activation_external_listeners: HashMap<Hash, Vec<oneshot::Sender<SubsystemResult<()>>>>,
664
665	/// The set of the "active leaves".
666	pub active_leaves: HashMap<Hash, BlockNumber>,
667
668	/// An implementation for checking whether a header supports parachain consensus.
669	pub supports_parachains: SupportsParachains,
670
671	/// Various Prometheus metrics.
672	pub metrics: OverseerMetrics,
673}
674
675/// Spawn the metrics metronome task.
676pub fn spawn_metronome_metrics<S, SupportsParachains>(
677	overseer: &mut Overseer<S, SupportsParachains>,
678	metronome_metrics: OverseerMetrics,
679) -> Result<(), SubsystemError>
680where
681	S: Spawner,
682	SupportsParachains: HeadSupportsParachains,
683{
684	struct ExtractNameAndMeters;
685
686	impl<'a, T: 'a> MapSubsystem<&'a OrchestratedSubsystem<T>> for ExtractNameAndMeters {
687		type Output = Option<(&'static str, SubsystemMeters)>;
688
689		fn map_subsystem(&self, subsystem: &'a OrchestratedSubsystem<T>) -> Self::Output {
690			subsystem
691				.instance
692				.as_ref()
693				.map(|instance| (instance.name, instance.meters.clone()))
694		}
695	}
696	let subsystem_meters = overseer.map_subsystems(ExtractNameAndMeters);
697
698	#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
699	let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> =
700		match memory_stats::MemoryAllocationTracker::new() {
701			Ok(memory_stats) =>
702				Box::new(move |metrics: &OverseerMetrics| match memory_stats.snapshot() {
703					Ok(memory_stats_snapshot) => {
704						gum::trace!(
705							target: LOG_TARGET,
706							"memory_stats: {:?}",
707							&memory_stats_snapshot
708						);
709						metrics.memory_stats_snapshot(memory_stats_snapshot);
710					},
711					Err(e) =>
712						gum::debug!(target: LOG_TARGET, "Failed to obtain memory stats: {:?}", e),
713				}),
714			Err(_) => {
715				gum::debug!(
716					target: LOG_TARGET,
717					"Memory allocation tracking is not supported by the allocator.",
718				);
719
720				Box::new(|_| {})
721			},
722		};
723
724	#[cfg(not(any(target_os = "linux", feature = "jemalloc-allocator")))]
725	let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> = Box::new(|_| {});
726
727	let metronome = Metronome::new(std::time::Duration::from_millis(950)).for_each(move |_| {
728		collect_memory_stats(&metronome_metrics);
729
730		// We combine the amount of messages from subsystems to the overseer
731		// as well as the amount of messages from external sources to the overseer
732		// into one `to_overseer` value.
733		metronome_metrics.channel_metrics_snapshot(
734			subsystem_meters
735				.iter()
736				.cloned()
737				.flatten()
738				.map(|(name, ref meters)| (name, meters.read())),
739		);
740
741		futures::future::ready(())
742	});
743	overseer
744		.spawner()
745		.spawn("metrics-metronome", Some("overseer"), Box::pin(metronome));
746
747	Ok(())
748}
749
750impl<S, SupportsParachains> Overseer<S, SupportsParachains>
751where
752	SupportsParachains: HeadSupportsParachains,
753	S: Spawner,
754{
755	/// Stop the `Overseer`.
756	async fn stop(mut self) {
757		let _ = self.wait_terminate(OverseerSignal::Conclude, Duration::from_secs(1_u64)).await;
758	}
759
760	/// Run the `Overseer`.
761	///
762	/// Logging any errors.
763	pub async fn run(self) {
764		if let Err(err) = self.run_inner().await {
765			gum::error!(target: LOG_TARGET, ?err, "Overseer exited with error");
766		}
767	}
768
769	async fn run_inner(mut self) -> SubsystemResult<()> {
770		let metrics = self.metrics.clone();
771		spawn_metronome_metrics(&mut self, metrics)?;
772
773		loop {
774			select! {
775				msg = self.events_rx.select_next_some() => {
776					match msg {
777						Event::MsgToSubsystem { msg, origin, priority } => {
778							match priority {
779								PriorityLevel::Normal => {
780									self.route_message(msg.into(), origin).await?;
781								},
782								PriorityLevel::High => {
783									self.route_message_with_priority::<HighPriority>(msg.into(), origin).await?;
784								},
785							}
786							self.metrics.on_message_relayed();
787						}
788						Event::Stop => {
789							self.stop().await;
790							return Ok(());
791						}
792						Event::BlockImported(block) => {
793							self.block_imported(block).await?;
794						}
795						Event::BlockFinalized(block) => {
796							self.block_finalized(block).await?;
797						}
798						Event::ExternalRequest(request) => {
799							self.handle_external_request(request);
800						}
801					}
802				},
803				msg = self.to_orchestra_rx.select_next_some() => {
804					match msg {
805						ToOrchestra::SpawnJob { name, subsystem, s } => {
806							self.spawn_job(name, subsystem, s);
807						}
808						ToOrchestra::SpawnBlockingJob { name, subsystem, s } => {
809							self.spawn_blocking_job(name, subsystem, s);
810						}
811					}
812				},
813				res = self.running_subsystems.select_next_some() => {
814					gum::error!(
815						target: LOG_TARGET,
816						subsystem = ?res,
817						"subsystem finished unexpectedly",
818					);
819					self.stop().await;
820					return res;
821				},
822			}
823		}
824	}
825
826	async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
827		match self.active_leaves.entry(block.hash) {
828			hash_map::Entry::Vacant(entry) => entry.insert(block.number),
829			hash_map::Entry::Occupied(entry) => {
830				debug_assert_eq!(*entry.get(), block.number);
831				return Ok(())
832			},
833		};
834
835		let mut update = match self.on_head_activated(&block.hash, Some(block.parent_hash)).await {
836			Some(_) => ActiveLeavesUpdate::start_work(ActivatedLeaf {
837				hash: block.hash,
838				number: block.number,
839				unpin_handle: block.unpin_handle,
840			}),
841			None => ActiveLeavesUpdate::default(),
842		};
843
844		if let Some(number) = self.active_leaves.remove(&block.parent_hash) {
845			debug_assert_eq!(block.number.saturating_sub(1), number);
846			update.deactivated.push(block.parent_hash);
847			self.on_head_deactivated(&block.parent_hash);
848		}
849
850		self.clean_up_external_listeners();
851
852		if !update.is_empty() {
853			self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
854		}
855		Ok(())
856	}
857
858	async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
859		let mut update = ActiveLeavesUpdate::default();
860
861		self.active_leaves.retain(|h, n| {
862			// prune all orphaned leaves, but don't prune
863			// the finalized block if it is itself a leaf.
864			if *n <= block.number && *h != block.hash {
865				update.deactivated.push(*h);
866				false
867			} else {
868				true
869			}
870		});
871
872		for deactivated in &update.deactivated {
873			self.on_head_deactivated(deactivated)
874		}
875
876		self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number))
877			.await?;
878
879		// If there are no leaves being deactivated, we don't need to send an update.
880		//
881		// Our peers will be informed about our finalized block the next time we
882		// activating/deactivating some leaf.
883		if !update.is_empty() {
884			self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
885		}
886
887		Ok(())
888	}
889
890	/// Handles a header activation. If the header's state doesn't support the parachains API,
891	/// this returns `None`.
892	async fn on_head_activated(&mut self, hash: &Hash, _parent_hash: Option<Hash>) -> Option<()> {
893		if !self.supports_parachains.head_supports_parachains(hash).await {
894			return None
895		}
896
897		self.metrics.on_head_activated();
898		if let Some(listeners) = self.activation_external_listeners.remove(hash) {
899			gum::trace!(
900				target: LOG_TARGET,
901				relay_parent = ?hash,
902				"Leaf got activated, notifying external listeners"
903			);
904			for listener in listeners {
905				// it's fine if the listener is no longer interested
906				let _ = listener.send(Ok(()));
907			}
908		}
909
910		Some(())
911	}
912
913	fn on_head_deactivated(&mut self, hash: &Hash) {
914		self.metrics.on_head_deactivated();
915		self.activation_external_listeners.remove(hash);
916	}
917
918	fn clean_up_external_listeners(&mut self) {
919		self.activation_external_listeners.retain(|_, v| {
920			// remove dead listeners
921			v.retain(|c| !c.is_canceled());
922			!v.is_empty()
923		})
924	}
925
926	fn handle_external_request(&mut self, request: ExternalRequest) {
927		match request {
928			ExternalRequest::WaitForActivation { hash, response_channel } => {
929				if self.active_leaves.get(&hash).is_some() {
930					gum::trace!(
931						target: LOG_TARGET,
932						relay_parent = ?hash,
933						"Leaf was already ready - answering `WaitForActivation`"
934					);
935					// it's fine if the listener is no longer interested
936					let _ = response_channel.send(Ok(()));
937				} else {
938					gum::trace!(
939						target: LOG_TARGET,
940						relay_parent = ?hash,
941						"Leaf not yet ready - queuing `WaitForActivation` sender"
942					);
943					self.activation_external_listeners
944						.entry(hash)
945						.or_default()
946						.push(response_channel);
947				}
948			},
949		}
950	}
951
952	fn spawn_job(
953		&mut self,
954		task_name: &'static str,
955		subsystem_name: Option<&'static str>,
956		j: BoxFuture<'static, ()>,
957	) {
958		self.spawner.spawn(task_name, subsystem_name, j);
959	}
960
961	fn spawn_blocking_job(
962		&mut self,
963		task_name: &'static str,
964		subsystem_name: Option<&'static str>,
965		j: BoxFuture<'static, ()>,
966	) {
967		self.spawner.spawn_blocking(task_name, subsystem_name, j);
968	}
969}