referrerpolicy=no-referrer-when-downgrade

polkadot_overseer/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! # Overseer
18//!
19//! `overseer` implements the Overseer architecture described in the
20//! [implementers' guide][overseer-page].
21//! For the motivations behind implementing the overseer itself you should
22//! check out that guide, documentation in this crate will be mostly discussing
23//! technical stuff.
24//!
25//! An `Overseer` is something that allows spawning/stopping and overseeing
26//! asynchronous tasks as well as establishing a well-defined and easy to use
27//! protocol that the tasks can use to communicate with each other. It is desired
28//! that this protocol is the only way tasks communicate with each other, however
29//! at this moment there are no foolproof guards against other ways of communication.
30//!
31//! The `Overseer` is instantiated with a pre-defined set of `Subsystems` that
32//! share the same behavior from `Overseer`'s point of view.
33//!
34//! ```text
35//!                              +-----------------------------+
36//!                              |         Overseer            |
37//!                              +-----------------------------+
38//!
39//!             ................|  Overseer "holds" these and uses |..............
40//!             .                  them to (re)start things                      .
41//!             .                                                                .
42//!             .  +-------------------+                +---------------------+  .
43//!             .  |   Subsystem1      |                |   Subsystem2        |  .
44//!             .  +-------------------+                +---------------------+  .
45//!             .           |                                       |            .
46//!             ..................................................................
47//!                         |                                       |
48//!                       start()                                 start()
49//!                         V                                       V
50//!             ..................| Overseer "runs" these |.......................
51//!             .  +--------------------+               +---------------------+  .
52//!             .  | SubsystemInstance1 |               | SubsystemInstance2  |  .
53//!             .  +--------------------+               +---------------------+  .
54//!             ..................................................................
55//! ```
56//!
57//! [overseer-page]: https://paritytech.github.io/polkadot-sdk/book/node/overseer.html
58
59// #![deny(unused_results)]
60// unused dependencies can not work for test and examples at the same time
61// yielding false positives
62#![warn(missing_docs)]
63// TODO https://github.com/paritytech/polkadot-sdk/issues/5793
64#![allow(dead_code, irrefutable_let_patterns)]
65
66use std::{
67	collections::{hash_map, HashMap},
68	fmt::{self, Debug},
69	pin::Pin,
70	sync::Arc,
71	time::Duration,
72};
73
74use futures::{channel::oneshot, future::BoxFuture, select, Future, FutureExt, StreamExt};
75
76use polkadot_primitives::{Block, BlockNumber, Hash};
77use sc_client_api::{BlockImportNotification, BlockchainEvents, FinalityNotification};
78
79use self::messages::{BitfieldSigningMessage, PvfCheckerMessage};
80use polkadot_node_subsystem_types::messages::{
81	ApprovalDistributionMessage, ApprovalVotingMessage, ApprovalVotingParallelMessage,
82	AvailabilityDistributionMessage, AvailabilityRecoveryMessage, AvailabilityStoreMessage,
83	BitfieldDistributionMessage, CandidateBackingMessage, CandidateValidationMessage,
84	ChainApiMessage, ChainSelectionMessage, CollationGenerationMessage, CollatorProtocolMessage,
85	DisputeCoordinatorMessage, DisputeDistributionMessage, GossipSupportMessage,
86	NetworkBridgeRxMessage, NetworkBridgeTxMessage, ProspectiveParachainsMessage,
87	ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage,
88};
89
90pub use polkadot_node_subsystem_types::{
91	errors::{SubsystemError, SubsystemResult},
92	ActivatedLeaf, ActiveLeavesUpdate, ChainApiBackend, OverseerSignal, RuntimeApiSubsystemClient,
93	UnpinHandle,
94};
95
96pub mod metrics;
97pub use self::metrics::Metrics as OverseerMetrics;
98
99/// A dummy subsystem, mostly useful for placeholders and tests.
100pub mod dummy;
101pub use self::dummy::DummySubsystem;
102
103pub use polkadot_node_metrics::{
104	metrics::{prometheus, Metrics as MetricsTrait},
105	Metronome,
106};
107
108pub use orchestra as gen;
109pub use orchestra::{
110	contextbounds, orchestra, subsystem, FromOrchestra, HighPriority, MapSubsystem, MessagePacket,
111	NormalPriority, OrchestraError as OverseerError, Priority, PriorityLevel, SignalsReceived,
112	Spawner, Subsystem, SubsystemContext, SubsystemIncomingMessages, SubsystemInstance,
113	SubsystemMeterReadouts, SubsystemMeters, SubsystemSender, TimeoutExt, ToOrchestra,
114	TrySendError,
115};
116
117#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
118mod memory_stats;
119#[cfg(test)]
120mod tests;
121
122use sp_core::traits::SpawnNamed;
123
124/// Glue to connect `trait orchestra::Spawner` and `SpawnNamed` from `substrate`.
125pub struct SpawnGlue<S>(pub S);
126
127impl<S> AsRef<S> for SpawnGlue<S> {
128	fn as_ref(&self) -> &S {
129		&self.0
130	}
131}
132
133impl<S: Clone> Clone for SpawnGlue<S> {
134	fn clone(&self) -> Self {
135		Self(self.0.clone())
136	}
137}
138
139impl<S: SpawnNamed + Clone + Send + Sync> crate::gen::Spawner for SpawnGlue<S> {
140	fn spawn_blocking(
141		&self,
142		name: &'static str,
143		group: Option<&'static str>,
144		future: futures::future::BoxFuture<'static, ()>,
145	) {
146		SpawnNamed::spawn_blocking(&self.0, name, group, future)
147	}
148	fn spawn(
149		&self,
150		name: &'static str,
151		group: Option<&'static str>,
152		future: futures::future::BoxFuture<'static, ()>,
153	) {
154		SpawnNamed::spawn(&self.0, name, group, future)
155	}
156}
157
158/// Whether a header supports parachain consensus or not.
159#[async_trait::async_trait]
160pub trait HeadSupportsParachains {
161	/// Return true if the given header supports parachain consensus. Otherwise, false.
162	async fn head_supports_parachains(&self, head: &Hash) -> bool;
163}
164
165#[async_trait::async_trait]
166impl<Client> HeadSupportsParachains for Arc<Client>
167where
168	Client: RuntimeApiSubsystemClient + Sync + Send,
169{
170	async fn head_supports_parachains(&self, head: &Hash) -> bool {
171		// Check that the `ParachainHost` runtime api is at least with version 1 present on chain.
172		self.api_version_parachain_host(*head).await.ok().flatten().unwrap_or(0) >= 1
173	}
174}
175
176/// A handle used to communicate with the [`Overseer`].
177///
178/// [`Overseer`]: struct.Overseer.html
179#[derive(Clone)]
180pub struct Handle(OverseerHandle);
181
182impl Handle {
183	/// Create a new [`Handle`].
184	pub fn new(raw: OverseerHandle) -> Self {
185		Self(raw)
186	}
187
188	/// Inform the `Overseer` that that some block was imported.
189	pub async fn block_imported(&mut self, block: BlockInfo) {
190		self.send_and_log_error(Event::BlockImported(block)).await
191	}
192
193	/// Send some message with normal priority to one of the `Subsystem`s.
194	pub async fn send_msg(&mut self, msg: impl Into<AllMessages>, origin: &'static str) {
195		self.send_msg_with_priority(msg, origin, PriorityLevel::Normal).await
196	}
197
198	/// Send some message with the specified priority to one of the `Subsystem`s.
199	pub async fn send_msg_with_priority(
200		&mut self,
201		msg: impl Into<AllMessages>,
202		origin: &'static str,
203		priority: PriorityLevel,
204	) {
205		self.send_and_log_error(Event::MsgToSubsystem { msg: msg.into(), origin, priority })
206			.await
207	}
208
209	/// Send a message not providing an origin.
210	#[inline(always)]
211	pub async fn send_msg_anon(&mut self, msg: impl Into<AllMessages>) {
212		self.send_msg(msg, "").await
213	}
214
215	/// Inform the `Overseer` that some block was finalized.
216	pub async fn block_finalized(&mut self, block: BlockInfo) {
217		self.send_and_log_error(Event::BlockFinalized(block)).await
218	}
219
220	/// Wait for a block with the given hash to be in the active-leaves set.
221	///
222	/// The response channel responds if the hash was activated and is closed if the hash was
223	/// deactivated. Note that due the fact the overseer doesn't store the whole active-leaves set,
224	/// only deltas, the response channel may never return if the hash was deactivated before this
225	/// call. In this case, it's the caller's responsibility to ensure a timeout is set.
226	pub async fn wait_for_activation(
227		&mut self,
228		hash: Hash,
229		response_channel: oneshot::Sender<SubsystemResult<()>>,
230	) {
231		self.send_and_log_error(Event::ExternalRequest(ExternalRequest::WaitForActivation {
232			hash,
233			response_channel,
234		}))
235		.await;
236	}
237
238	/// Tell `Overseer` to shutdown.
239	pub async fn stop(&mut self) {
240		self.send_and_log_error(Event::Stop).await;
241	}
242
243	/// Most basic operation, to stop a server.
244	async fn send_and_log_error(&mut self, event: Event) {
245		if self.0.send(event).await.is_err() {
246			gum::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
247		}
248	}
249}
250
251/// An event telling the `Overseer` on the particular block
252/// that has been imported or finalized.
253///
254/// This structure exists solely for the purposes of decoupling
255/// `Overseer` code from the client code and the necessity to call
256/// `HeaderBackend::block_number_from_id()`.
257#[derive(Debug, Clone)]
258pub struct BlockInfo {
259	/// Hash of the block.
260	pub hash: Hash,
261	/// Hash of the parent block.
262	pub parent_hash: Hash,
263	/// Block's number.
264	pub number: BlockNumber,
265	/// A handle to unpin the block on drop.
266	pub unpin_handle: UnpinHandle,
267}
268
269impl From<BlockImportNotification<Block>> for BlockInfo {
270	fn from(n: BlockImportNotification<Block>) -> Self {
271		let hash = n.hash;
272		let parent_hash = n.header.parent_hash;
273		let number = n.header.number;
274		let unpin_handle = n.into_unpin_handle();
275
276		BlockInfo { hash, parent_hash, number, unpin_handle }
277	}
278}
279
280impl From<FinalityNotification<Block>> for BlockInfo {
281	fn from(n: FinalityNotification<Block>) -> Self {
282		let hash = n.hash;
283		let parent_hash = n.header.parent_hash;
284		let number = n.header.number;
285		let unpin_handle = n.into_unpin_handle();
286
287		BlockInfo { hash, parent_hash, number, unpin_handle }
288	}
289}
290
291/// An event from outside the overseer scope, such
292/// as the substrate framework or user interaction.
293#[derive(Debug)]
294pub enum Event {
295	/// A new block was imported.
296	///
297	/// This event is not sent if the block was already known
298	/// and we reorged to it e.g. due to a reversion.
299	///
300	/// Also, these events are not sent during a major sync.
301	BlockImported(BlockInfo),
302	/// A block was finalized with i.e. babe or another consensus algorithm.
303	BlockFinalized(BlockInfo),
304	/// Message as sent to a subsystem.
305	MsgToSubsystem {
306		/// The actual message.
307		msg: AllMessages,
308		/// The originating subsystem name.
309		origin: &'static str,
310		/// The priority of the message.
311		priority: PriorityLevel,
312	},
313	/// A request from the outer world.
314	ExternalRequest(ExternalRequest),
315	/// Stop the overseer on i.e. a UNIX signal.
316	Stop,
317}
318
319/// Some request from outer world.
320#[derive(Debug)]
321pub enum ExternalRequest {
322	/// Wait for the activation of a particular hash
323	/// and be notified by means of the return channel.
324	WaitForActivation {
325		/// The relay parent for which activation to wait for.
326		hash: Hash,
327		/// Response channel to await on.
328		response_channel: oneshot::Sender<SubsystemResult<()>>,
329	},
330}
331
332/// Glues together the [`Overseer`] and `BlockchainEvents` by forwarding
333/// import and finality notifications into the [`OverseerHandle`].
334pub async fn forward_events<P: BlockchainEvents<Block>>(client: Arc<P>, mut handle: Handle) {
335	let mut finality = client.finality_notification_stream();
336	let mut imports = client.import_notification_stream();
337
338	loop {
339		select! {
340			f = finality.next() => {
341				match f {
342					Some(block) => {
343						handle.block_finalized(block.into()).await;
344					}
345					None => break,
346				}
347			},
348			i = imports.next() => {
349				match i {
350					Some(block) => {
351						handle.block_imported(block.into()).await;
352					}
353					None => break,
354				}
355			},
356			complete => break,
357		}
358	}
359}
360
361/// Create a new instance of the [`Overseer`] with a fixed set of [`Subsystem`]s.
362///
363/// This returns the overseer along with an [`OverseerHandle`] which can
364/// be used to send messages from external parts of the codebase.
365///
366/// The [`OverseerHandle`] returned from this function is connected to
367/// the returned [`Overseer`].
368///
369/// ```text
370///                  +------------------------------------+
371///                  |            Overseer                |
372///                  +------------------------------------+
373///                    /            |             |      \
374///      ................. subsystems...................................
375///      . +-----------+    +-----------+   +----------+   +---------+ .
376///      . |           |    |           |   |          |   |         | .
377///      . +-----------+    +-----------+   +----------+   +---------+ .
378///      ...............................................................
379///                              |
380///                        probably `spawn`
381///                            a `job`
382///                              |
383///                              V
384///                         +-----------+
385///                         |           |
386///                         +-----------+
387/// ```
388///
389/// [`Subsystem`]: trait.Subsystem.html
390///
391/// # Example
392///
393/// The [`Subsystems`] may be any type as long as they implement an expected interface.
394/// Here, we create a mock validation subsystem and a few dummy ones and start the `Overseer` with
395/// them. For the sake of simplicity the termination of the example is done with a timeout.
396/// ```
397/// # use std::time::Duration;
398/// # use futures::{executor, pin_mut, select, FutureExt};
399/// # use futures_timer::Delay;
400/// # use polkadot_primitives::Hash;
401/// # use polkadot_overseer::{
402/// # 	self as overseer,
403/// #   OverseerSignal,
404/// # 	SubsystemSender as _,
405/// # 	AllMessages,
406/// # 	HeadSupportsParachains,
407/// # 	Overseer,
408/// # 	SubsystemError,
409/// # 	gen::{
410/// # 		SubsystemContext,
411/// # 		FromOrchestra,
412/// # 		SpawnedSubsystem,
413/// # 	},
414/// # };
415/// # use polkadot_node_subsystem_types::messages::{
416/// # 	CandidateValidationMessage, CandidateBackingMessage,
417/// # 	NetworkBridgeTxMessage,
418/// # };
419///
420/// struct ValidationSubsystem;
421///
422/// impl<Ctx> overseer::Subsystem<Ctx, SubsystemError> for ValidationSubsystem
423/// where
424///     Ctx: overseer::SubsystemContext<
425/// 				Message=CandidateValidationMessage,
426/// 				AllMessages=AllMessages,
427/// 				Signal=OverseerSignal,
428/// 				Error=SubsystemError,
429/// 			>,
430/// {
431///     fn start(
432///         self,
433///         mut ctx: Ctx,
434///     ) -> SpawnedSubsystem<SubsystemError> {
435///         SpawnedSubsystem {
436///             name: "validation-subsystem",
437///             future: Box::pin(async move {
438///                 loop {
439///                     Delay::new(Duration::from_secs(1)).await;
440///                 }
441///             }),
442///         }
443///     }
444/// }
445///
446/// # fn main() { executor::block_on(async move {
447///
448/// struct AlwaysSupportsParachains;
449///
450/// #[async_trait::async_trait]
451/// impl HeadSupportsParachains for AlwaysSupportsParachains {
452///      async fn head_supports_parachains(&self, _head: &Hash) -> bool { true }
453/// }
454///
455/// let spawner = sp_core::testing::TaskExecutor::new();
456/// let (overseer, _handle) = dummy_overseer_builder(spawner, AlwaysSupportsParachains, None)
457/// 		.unwrap()
458/// 		.replace_candidate_validation(|_| ValidationSubsystem)
459/// 		.build()
460/// 		.unwrap();
461///
462/// let timer = Delay::new(Duration::from_millis(50)).fuse();
463///
464/// let overseer_fut = overseer.run().fuse();
465/// pin_mut!(timer);
466/// pin_mut!(overseer_fut);
467///
468/// select! {
469///     _ = overseer_fut => (),
470///     _ = timer => (),
471/// }
472/// #
473/// # 	});
474/// # }
475/// ```
476#[orchestra(
477	gen=AllMessages,
478	event=Event,
479	signal=OverseerSignal,
480	error=SubsystemError,
481	message_capacity=2048,
482)]
483pub struct Overseer<SupportsParachains> {
484	#[subsystem(CandidateValidationMessage, sends: [
485		ChainApiMessage,
486		RuntimeApiMessage,
487	])]
488	candidate_validation: CandidateValidation,
489
490	#[subsystem(sends: [
491		CandidateValidationMessage,
492		RuntimeApiMessage,
493	])]
494	pvf_checker: PvfChecker,
495
496	#[subsystem(CandidateBackingMessage, sends: [
497		CandidateValidationMessage,
498		CollatorProtocolMessage,
499		ChainApiMessage,
500		AvailabilityDistributionMessage,
501		AvailabilityStoreMessage,
502		StatementDistributionMessage,
503		ProvisionerMessage,
504		RuntimeApiMessage,
505		ProspectiveParachainsMessage,
506	])]
507	candidate_backing: CandidateBacking,
508
509	#[subsystem(StatementDistributionMessage, sends: [
510		NetworkBridgeTxMessage,
511		CandidateBackingMessage,
512		RuntimeApiMessage,
513		ProspectiveParachainsMessage,
514		ChainApiMessage,
515	], can_receive_priority_messages)]
516	statement_distribution: StatementDistribution,
517
518	#[subsystem(AvailabilityDistributionMessage, sends: [
519		AvailabilityStoreMessage,
520		ChainApiMessage,
521		RuntimeApiMessage,
522		NetworkBridgeTxMessage,
523	])]
524	availability_distribution: AvailabilityDistribution,
525
526	#[subsystem(AvailabilityRecoveryMessage, sends: [
527		NetworkBridgeTxMessage,
528		RuntimeApiMessage,
529		AvailabilityStoreMessage,
530	])]
531	availability_recovery: AvailabilityRecovery,
532
533	#[subsystem(blocking, sends: [
534		AvailabilityStoreMessage,
535		RuntimeApiMessage,
536		BitfieldDistributionMessage,
537	])]
538	bitfield_signing: BitfieldSigning,
539
540	#[subsystem(blocking, message_capacity: 8192, BitfieldDistributionMessage, sends: [
541		RuntimeApiMessage,
542		NetworkBridgeTxMessage,
543		ProvisionerMessage,
544	], can_receive_priority_messages)]
545	bitfield_distribution: BitfieldDistribution,
546
547	#[subsystem(ProvisionerMessage, sends: [
548		RuntimeApiMessage,
549		CandidateBackingMessage,
550		DisputeCoordinatorMessage,
551		ProspectiveParachainsMessage,
552	])]
553	provisioner: Provisioner,
554
555	#[subsystem(blocking, RuntimeApiMessage, sends: [])]
556	runtime_api: RuntimeApi,
557
558	#[subsystem(blocking, AvailabilityStoreMessage, sends: [
559		ChainApiMessage,
560		RuntimeApiMessage,
561	])]
562	availability_store: AvailabilityStore,
563
564	#[subsystem(blocking, NetworkBridgeRxMessage, sends: [
565		BitfieldDistributionMessage,
566		StatementDistributionMessage,
567		ApprovalVotingParallelMessage,
568		GossipSupportMessage,
569		DisputeDistributionMessage,
570		CollationGenerationMessage,
571		CollatorProtocolMessage,
572	])]
573	network_bridge_rx: NetworkBridgeRx,
574
575	#[subsystem(blocking, NetworkBridgeTxMessage, sends: [])]
576	network_bridge_tx: NetworkBridgeTx,
577
578	#[subsystem(blocking, ChainApiMessage, sends: [])]
579	chain_api: ChainApi,
580
581	#[subsystem(CollationGenerationMessage, sends: [
582		RuntimeApiMessage,
583		CollatorProtocolMessage,
584	])]
585	collation_generation: CollationGeneration,
586
587	#[subsystem(CollatorProtocolMessage, sends: [
588		NetworkBridgeTxMessage,
589		RuntimeApiMessage,
590		CandidateBackingMessage,
591		ChainApiMessage,
592		ProspectiveParachainsMessage,
593	])]
594	collator_protocol: CollatorProtocol,
595
596	#[subsystem(blocking, message_capacity: 64000, ApprovalDistributionMessage, sends: [
597		NetworkBridgeTxMessage,
598		ApprovalVotingMessage,
599		RuntimeApiMessage,
600	], can_receive_priority_messages)]
601	approval_distribution: ApprovalDistribution,
602
603	#[subsystem(blocking, ApprovalVotingMessage, sends: [
604		ApprovalDistributionMessage,
605		AvailabilityRecoveryMessage,
606		CandidateValidationMessage,
607		ChainApiMessage,
608		ChainSelectionMessage,
609		DisputeCoordinatorMessage,
610		RuntimeApiMessage,
611	])]
612	approval_voting: ApprovalVoting,
613	#[subsystem(blocking, message_capacity: 64000, ApprovalVotingParallelMessage, sends: [
614		AvailabilityRecoveryMessage,
615		CandidateValidationMessage,
616		ChainApiMessage,
617		ChainSelectionMessage,
618		DisputeCoordinatorMessage,
619		RuntimeApiMessage,
620		NetworkBridgeTxMessage,
621		ApprovalVotingParallelMessage,
622	], can_receive_priority_messages)]
623	approval_voting_parallel: ApprovalVotingParallel,
624	#[subsystem(GossipSupportMessage, sends: [
625		NetworkBridgeTxMessage,
626		NetworkBridgeRxMessage, // TODO <https://github.com/paritytech/polkadot/issues/5626>
627		RuntimeApiMessage,
628		ChainSelectionMessage,
629		ChainApiMessage,
630	], can_receive_priority_messages)]
631	gossip_support: GossipSupport,
632
633	#[subsystem(blocking, message_capacity: 32000, DisputeCoordinatorMessage, sends: [
634		RuntimeApiMessage,
635		ChainApiMessage,
636		DisputeDistributionMessage,
637		CandidateValidationMessage,
638		AvailabilityStoreMessage,
639		AvailabilityRecoveryMessage,
640		ChainSelectionMessage,
641		ApprovalVotingParallelMessage,
642	], can_receive_priority_messages)]
643	dispute_coordinator: DisputeCoordinator,
644
645	#[subsystem(DisputeDistributionMessage, sends: [
646		RuntimeApiMessage,
647		DisputeCoordinatorMessage,
648		NetworkBridgeTxMessage,
649	])]
650	dispute_distribution: DisputeDistribution,
651
652	#[subsystem(blocking, ChainSelectionMessage, sends: [ChainApiMessage])]
653	chain_selection: ChainSelection,
654
655	#[subsystem(ProspectiveParachainsMessage, sends: [
656		RuntimeApiMessage,
657		ChainApiMessage,
658	])]
659	prospective_parachains: ProspectiveParachains,
660
661	/// External listeners waiting for a hash to be in the active-leave set.
662	pub activation_external_listeners: HashMap<Hash, Vec<oneshot::Sender<SubsystemResult<()>>>>,
663
664	/// The set of the "active leaves".
665	pub active_leaves: HashMap<Hash, BlockNumber>,
666
667	/// An implementation for checking whether a header supports parachain consensus.
668	pub supports_parachains: SupportsParachains,
669
670	/// Various Prometheus metrics.
671	pub metrics: OverseerMetrics,
672}
673
674/// Spawn the metrics metronome task.
675pub fn spawn_metronome_metrics<S, SupportsParachains>(
676	overseer: &mut Overseer<S, SupportsParachains>,
677	metronome_metrics: OverseerMetrics,
678) -> Result<(), SubsystemError>
679where
680	S: Spawner,
681	SupportsParachains: HeadSupportsParachains,
682{
683	struct ExtractNameAndMeters;
684
685	impl<'a, T: 'a> MapSubsystem<&'a OrchestratedSubsystem<T>> for ExtractNameAndMeters {
686		type Output = Option<(&'static str, SubsystemMeters)>;
687
688		fn map_subsystem(&self, subsystem: &'a OrchestratedSubsystem<T>) -> Self::Output {
689			subsystem
690				.instance
691				.as_ref()
692				.map(|instance| (instance.name, instance.meters.clone()))
693		}
694	}
695	let subsystem_meters = overseer.map_subsystems(ExtractNameAndMeters);
696
697	#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
698	let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> =
699		match memory_stats::MemoryAllocationTracker::new() {
700			Ok(memory_stats) =>
701				Box::new(move |metrics: &OverseerMetrics| match memory_stats.snapshot() {
702					Ok(memory_stats_snapshot) => {
703						gum::trace!(
704							target: LOG_TARGET,
705							"memory_stats: {:?}",
706							&memory_stats_snapshot
707						);
708						metrics.memory_stats_snapshot(memory_stats_snapshot);
709					},
710					Err(e) =>
711						gum::debug!(target: LOG_TARGET, "Failed to obtain memory stats: {:?}", e),
712				}),
713			Err(_) => {
714				gum::debug!(
715					target: LOG_TARGET,
716					"Memory allocation tracking is not supported by the allocator.",
717				);
718
719				Box::new(|_| {})
720			},
721		};
722
723	#[cfg(not(any(target_os = "linux", feature = "jemalloc-allocator")))]
724	let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> = Box::new(|_| {});
725
726	let metronome = Metronome::new(std::time::Duration::from_millis(950)).for_each(move |_| {
727		collect_memory_stats(&metronome_metrics);
728
729		// We combine the amount of messages from subsystems to the overseer
730		// as well as the amount of messages from external sources to the overseer
731		// into one `to_overseer` value.
732		metronome_metrics.channel_metrics_snapshot(
733			subsystem_meters
734				.iter()
735				.cloned()
736				.flatten()
737				.map(|(name, ref meters)| (name, meters.read())),
738		);
739
740		futures::future::ready(())
741	});
742	overseer
743		.spawner()
744		.spawn("metrics-metronome", Some("overseer"), Box::pin(metronome));
745
746	Ok(())
747}
748
749impl<S, SupportsParachains> Overseer<S, SupportsParachains>
750where
751	SupportsParachains: HeadSupportsParachains,
752	S: Spawner,
753{
754	/// Stop the `Overseer`.
755	async fn stop(mut self) {
756		let _ = self.wait_terminate(OverseerSignal::Conclude, Duration::from_secs(1_u64)).await;
757	}
758
759	/// Run the `Overseer`.
760	///
761	/// Logging any errors.
762	pub async fn run(self) {
763		if let Err(err) = self.run_inner().await {
764			gum::error!(target: LOG_TARGET, ?err, "Overseer exited with error");
765		}
766	}
767
768	async fn run_inner(mut self) -> SubsystemResult<()> {
769		let metrics = self.metrics.clone();
770		spawn_metronome_metrics(&mut self, metrics)?;
771
772		loop {
773			select! {
774				msg = self.events_rx.select_next_some() => {
775					match msg {
776						Event::MsgToSubsystem { msg, origin, priority } => {
777							match priority {
778								PriorityLevel::Normal => {
779									self.route_message(msg.into(), origin).await?;
780								},
781								PriorityLevel::High => {
782									self.route_message_with_priority::<HighPriority>(msg.into(), origin).await?;
783								},
784							}
785							self.metrics.on_message_relayed();
786						}
787						Event::Stop => {
788							self.stop().await;
789							return Ok(());
790						}
791						Event::BlockImported(block) => {
792							self.block_imported(block).await?;
793						}
794						Event::BlockFinalized(block) => {
795							self.block_finalized(block).await?;
796						}
797						Event::ExternalRequest(request) => {
798							self.handle_external_request(request);
799						}
800					}
801				},
802				msg = self.to_orchestra_rx.select_next_some() => {
803					match msg {
804						ToOrchestra::SpawnJob { name, subsystem, s } => {
805							self.spawn_job(name, subsystem, s);
806						}
807						ToOrchestra::SpawnBlockingJob { name, subsystem, s } => {
808							self.spawn_blocking_job(name, subsystem, s);
809						}
810					}
811				},
812				res = self.running_subsystems.select_next_some() => {
813					gum::error!(
814						target: LOG_TARGET,
815						subsystem = ?res,
816						"subsystem finished unexpectedly",
817					);
818					self.stop().await;
819					return res;
820				},
821			}
822		}
823	}
824
825	async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
826		match self.active_leaves.entry(block.hash) {
827			hash_map::Entry::Vacant(entry) => entry.insert(block.number),
828			hash_map::Entry::Occupied(entry) => {
829				debug_assert_eq!(*entry.get(), block.number);
830				return Ok(())
831			},
832		};
833
834		let mut update = match self.on_head_activated(&block.hash, Some(block.parent_hash)).await {
835			Some(_) => ActiveLeavesUpdate::start_work(ActivatedLeaf {
836				hash: block.hash,
837				number: block.number,
838				unpin_handle: block.unpin_handle,
839			}),
840			None => ActiveLeavesUpdate::default(),
841		};
842
843		if let Some(number) = self.active_leaves.remove(&block.parent_hash) {
844			debug_assert_eq!(block.number.saturating_sub(1), number);
845			update.deactivated.push(block.parent_hash);
846			self.on_head_deactivated(&block.parent_hash);
847		}
848
849		self.clean_up_external_listeners();
850
851		if !update.is_empty() {
852			self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
853		}
854		Ok(())
855	}
856
857	async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
858		let mut update = ActiveLeavesUpdate::default();
859
860		self.active_leaves.retain(|h, n| {
861			// prune all orphaned leaves, but don't prune
862			// the finalized block if it is itself a leaf.
863			if *n <= block.number && *h != block.hash {
864				update.deactivated.push(*h);
865				false
866			} else {
867				true
868			}
869		});
870
871		for deactivated in &update.deactivated {
872			self.on_head_deactivated(deactivated)
873		}
874
875		self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number))
876			.await?;
877
878		// If there are no leaves being deactivated, we don't need to send an update.
879		//
880		// Our peers will be informed about our finalized block the next time we
881		// activating/deactivating some leaf.
882		if !update.is_empty() {
883			self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
884		}
885
886		Ok(())
887	}
888
889	/// Handles a header activation. If the header's state doesn't support the parachains API,
890	/// this returns `None`.
891	async fn on_head_activated(&mut self, hash: &Hash, _parent_hash: Option<Hash>) -> Option<()> {
892		if !self.supports_parachains.head_supports_parachains(hash).await {
893			return None
894		}
895
896		self.metrics.on_head_activated();
897		if let Some(listeners) = self.activation_external_listeners.remove(hash) {
898			gum::trace!(
899				target: LOG_TARGET,
900				relay_parent = ?hash,
901				"Leaf got activated, notifying external listeners"
902			);
903			for listener in listeners {
904				// it's fine if the listener is no longer interested
905				let _ = listener.send(Ok(()));
906			}
907		}
908
909		Some(())
910	}
911
912	fn on_head_deactivated(&mut self, hash: &Hash) {
913		self.metrics.on_head_deactivated();
914		self.activation_external_listeners.remove(hash);
915	}
916
917	fn clean_up_external_listeners(&mut self) {
918		self.activation_external_listeners.retain(|_, v| {
919			// remove dead listeners
920			v.retain(|c| !c.is_canceled());
921			!v.is_empty()
922		})
923	}
924
925	fn handle_external_request(&mut self, request: ExternalRequest) {
926		match request {
927			ExternalRequest::WaitForActivation { hash, response_channel } => {
928				if self.active_leaves.get(&hash).is_some() {
929					gum::trace!(
930						target: LOG_TARGET,
931						relay_parent = ?hash,
932						"Leaf was already ready - answering `WaitForActivation`"
933					);
934					// it's fine if the listener is no longer interested
935					let _ = response_channel.send(Ok(()));
936				} else {
937					gum::trace!(
938						target: LOG_TARGET,
939						relay_parent = ?hash,
940						"Leaf not yet ready - queuing `WaitForActivation` sender"
941					);
942					self.activation_external_listeners
943						.entry(hash)
944						.or_default()
945						.push(response_channel);
946				}
947			},
948		}
949	}
950
951	fn spawn_job(
952		&mut self,
953		task_name: &'static str,
954		subsystem_name: Option<&'static str>,
955		j: BoxFuture<'static, ()>,
956	) {
957		self.spawner.spawn(task_name, subsystem_name, j);
958	}
959
960	fn spawn_blocking_job(
961		&mut self,
962		task_name: &'static str,
963		subsystem_name: Option<&'static str>,
964		j: BoxFuture<'static, ()>,
965	) {
966		self.spawner.spawn_blocking(task_name, subsystem_name, j);
967	}
968}