1#![warn(missing_docs)]
63#![allow(dead_code, irrefutable_let_patterns)]
65
66use std::{
67 collections::{hash_map, HashMap},
68 fmt::{self, Debug},
69 pin::Pin,
70 sync::Arc,
71 time::Duration,
72};
73
74use futures::{channel::oneshot, future::BoxFuture, select, Future, FutureExt, StreamExt};
75
76use polkadot_primitives::{Block, BlockNumber, Hash};
77use sc_client_api::{BlockImportNotification, BlockchainEvents, FinalityNotification};
78
79use self::messages::{BitfieldSigningMessage, PvfCheckerMessage};
80use polkadot_node_subsystem_types::messages::{
81 ApprovalDistributionMessage, ApprovalVotingMessage, ApprovalVotingParallelMessage,
82 AvailabilityDistributionMessage, AvailabilityRecoveryMessage, AvailabilityStoreMessage,
83 BitfieldDistributionMessage, CandidateBackingMessage, CandidateValidationMessage,
84 ChainApiMessage, ChainSelectionMessage, CollationGenerationMessage, CollatorProtocolMessage,
85 DisputeCoordinatorMessage, DisputeDistributionMessage, GossipSupportMessage,
86 NetworkBridgeRxMessage, NetworkBridgeTxMessage, ProspectiveParachainsMessage,
87 ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage,
88};
89
90pub use polkadot_node_subsystem_types::{
91 errors::{SubsystemError, SubsystemResult},
92 ActivatedLeaf, ActiveLeavesUpdate, ChainApiBackend, OverseerSignal, RuntimeApiSubsystemClient,
93 UnpinHandle,
94};
95
96pub mod metrics;
97pub use self::metrics::Metrics as OverseerMetrics;
98
99pub mod dummy;
101pub use self::dummy::DummySubsystem;
102
103pub use polkadot_node_metrics::{
104 metrics::{prometheus, Metrics as MetricsTrait},
105 Metronome,
106};
107
108pub use orchestra as gen;
109pub use orchestra::{
110 contextbounds, orchestra, subsystem, FromOrchestra, HighPriority, MapSubsystem, MessagePacket,
111 NormalPriority, OrchestraError as OverseerError, Priority, PriorityLevel, SignalsReceived,
112 Spawner, Subsystem, SubsystemContext, SubsystemIncomingMessages, SubsystemInstance,
113 SubsystemMeterReadouts, SubsystemMeters, SubsystemSender, TimeoutExt, ToOrchestra,
114 TrySendError,
115};
116
117#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
118mod memory_stats;
119#[cfg(test)]
120mod tests;
121
122use sp_core::traits::SpawnNamed;
123
124pub struct SpawnGlue<S>(pub S);
126
127impl<S> AsRef<S> for SpawnGlue<S> {
128 fn as_ref(&self) -> &S {
129 &self.0
130 }
131}
132
133impl<S: Clone> Clone for SpawnGlue<S> {
134 fn clone(&self) -> Self {
135 Self(self.0.clone())
136 }
137}
138
139impl<S: SpawnNamed + Clone + Send + Sync> crate::gen::Spawner for SpawnGlue<S> {
140 fn spawn_blocking(
141 &self,
142 name: &'static str,
143 group: Option<&'static str>,
144 future: futures::future::BoxFuture<'static, ()>,
145 ) {
146 SpawnNamed::spawn_blocking(&self.0, name, group, future)
147 }
148 fn spawn(
149 &self,
150 name: &'static str,
151 group: Option<&'static str>,
152 future: futures::future::BoxFuture<'static, ()>,
153 ) {
154 SpawnNamed::spawn(&self.0, name, group, future)
155 }
156}
157
158#[async_trait::async_trait]
160pub trait HeadSupportsParachains {
161 async fn head_supports_parachains(&self, head: &Hash) -> bool;
163}
164
165#[async_trait::async_trait]
166impl<Client> HeadSupportsParachains for Arc<Client>
167where
168 Client: RuntimeApiSubsystemClient + Sync + Send,
169{
170 async fn head_supports_parachains(&self, head: &Hash) -> bool {
171 self.api_version_parachain_host(*head).await.ok().flatten().unwrap_or(0) >= 1
173 }
174}
175
176#[derive(Clone)]
180pub struct Handle(OverseerHandle);
181
182impl Handle {
183 pub fn new(raw: OverseerHandle) -> Self {
185 Self(raw)
186 }
187
188 pub async fn block_imported(&mut self, block: BlockInfo) {
190 self.send_and_log_error(Event::BlockImported(block)).await
191 }
192
193 pub async fn send_msg(&mut self, msg: impl Into<AllMessages>, origin: &'static str) {
195 self.send_msg_with_priority(msg, origin, PriorityLevel::Normal).await
196 }
197
198 pub async fn send_msg_with_priority(
200 &mut self,
201 msg: impl Into<AllMessages>,
202 origin: &'static str,
203 priority: PriorityLevel,
204 ) {
205 self.send_and_log_error(Event::MsgToSubsystem { msg: msg.into(), origin, priority })
206 .await
207 }
208
209 #[inline(always)]
211 pub async fn send_msg_anon(&mut self, msg: impl Into<AllMessages>) {
212 self.send_msg(msg, "").await
213 }
214
215 pub async fn block_finalized(&mut self, block: BlockInfo) {
217 self.send_and_log_error(Event::BlockFinalized(block)).await
218 }
219
220 pub async fn wait_for_activation(
227 &mut self,
228 hash: Hash,
229 response_channel: oneshot::Sender<SubsystemResult<()>>,
230 ) {
231 self.send_and_log_error(Event::ExternalRequest(ExternalRequest::WaitForActivation {
232 hash,
233 response_channel,
234 }))
235 .await;
236 }
237
238 pub async fn stop(&mut self) {
240 self.send_and_log_error(Event::Stop).await;
241 }
242
243 async fn send_and_log_error(&mut self, event: Event) {
245 if self.0.send(event).await.is_err() {
246 gum::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
247 }
248 }
249}
250
251#[derive(Debug, Clone)]
258pub struct BlockInfo {
259 pub hash: Hash,
261 pub parent_hash: Hash,
263 pub number: BlockNumber,
265 pub unpin_handle: UnpinHandle,
267}
268
269impl From<BlockImportNotification<Block>> for BlockInfo {
270 fn from(n: BlockImportNotification<Block>) -> Self {
271 let hash = n.hash;
272 let parent_hash = n.header.parent_hash;
273 let number = n.header.number;
274 let unpin_handle = n.into_unpin_handle();
275
276 BlockInfo { hash, parent_hash, number, unpin_handle }
277 }
278}
279
280impl From<FinalityNotification<Block>> for BlockInfo {
281 fn from(n: FinalityNotification<Block>) -> Self {
282 let hash = n.hash;
283 let parent_hash = n.header.parent_hash;
284 let number = n.header.number;
285 let unpin_handle = n.into_unpin_handle();
286
287 BlockInfo { hash, parent_hash, number, unpin_handle }
288 }
289}
290
291#[derive(Debug)]
294pub enum Event {
295 BlockImported(BlockInfo),
302 BlockFinalized(BlockInfo),
304 MsgToSubsystem {
306 msg: AllMessages,
308 origin: &'static str,
310 priority: PriorityLevel,
312 },
313 ExternalRequest(ExternalRequest),
315 Stop,
317}
318
319#[derive(Debug)]
321pub enum ExternalRequest {
322 WaitForActivation {
325 hash: Hash,
327 response_channel: oneshot::Sender<SubsystemResult<()>>,
329 },
330}
331
332pub async fn forward_events<P: BlockchainEvents<Block>>(client: Arc<P>, mut handle: Handle) {
335 let mut finality = client.finality_notification_stream();
336 let mut imports = client.import_notification_stream();
337
338 loop {
339 select! {
340 f = finality.next() => {
341 match f {
342 Some(block) => {
343 handle.block_finalized(block.into()).await;
344 }
345 None => break,
346 }
347 },
348 i = imports.next() => {
349 match i {
350 Some(block) => {
351 handle.block_imported(block.into()).await;
352 }
353 None => break,
354 }
355 },
356 complete => break,
357 }
358 }
359}
360
361#[orchestra(
477 gen=AllMessages,
478 event=Event,
479 signal=OverseerSignal,
480 error=SubsystemError,
481 message_capacity=2048,
482)]
483pub struct Overseer<SupportsParachains> {
484 #[subsystem(CandidateValidationMessage, sends: [
485 ChainApiMessage,
486 RuntimeApiMessage,
487 ])]
488 candidate_validation: CandidateValidation,
489
490 #[subsystem(sends: [
491 CandidateValidationMessage,
492 RuntimeApiMessage,
493 ])]
494 pvf_checker: PvfChecker,
495
496 #[subsystem(CandidateBackingMessage, sends: [
497 CandidateValidationMessage,
498 CollatorProtocolMessage,
499 ChainApiMessage,
500 AvailabilityDistributionMessage,
501 AvailabilityStoreMessage,
502 StatementDistributionMessage,
503 ProvisionerMessage,
504 RuntimeApiMessage,
505 ProspectiveParachainsMessage,
506 ])]
507 candidate_backing: CandidateBacking,
508
509 #[subsystem(StatementDistributionMessage, sends: [
510 NetworkBridgeTxMessage,
511 CandidateBackingMessage,
512 RuntimeApiMessage,
513 ProspectiveParachainsMessage,
514 ChainApiMessage,
515 ], can_receive_priority_messages)]
516 statement_distribution: StatementDistribution,
517
518 #[subsystem(AvailabilityDistributionMessage, sends: [
519 AvailabilityStoreMessage,
520 ChainApiMessage,
521 RuntimeApiMessage,
522 NetworkBridgeTxMessage,
523 ])]
524 availability_distribution: AvailabilityDistribution,
525
526 #[subsystem(AvailabilityRecoveryMessage, sends: [
527 NetworkBridgeTxMessage,
528 RuntimeApiMessage,
529 AvailabilityStoreMessage,
530 ])]
531 availability_recovery: AvailabilityRecovery,
532
533 #[subsystem(blocking, sends: [
534 AvailabilityStoreMessage,
535 RuntimeApiMessage,
536 BitfieldDistributionMessage,
537 ])]
538 bitfield_signing: BitfieldSigning,
539
540 #[subsystem(blocking, message_capacity: 8192, BitfieldDistributionMessage, sends: [
541 RuntimeApiMessage,
542 NetworkBridgeTxMessage,
543 ProvisionerMessage,
544 ], can_receive_priority_messages)]
545 bitfield_distribution: BitfieldDistribution,
546
547 #[subsystem(ProvisionerMessage, sends: [
548 RuntimeApiMessage,
549 CandidateBackingMessage,
550 DisputeCoordinatorMessage,
551 ProspectiveParachainsMessage,
552 ])]
553 provisioner: Provisioner,
554
555 #[subsystem(blocking, RuntimeApiMessage, sends: [])]
556 runtime_api: RuntimeApi,
557
558 #[subsystem(blocking, AvailabilityStoreMessage, sends: [
559 ChainApiMessage,
560 RuntimeApiMessage,
561 ])]
562 availability_store: AvailabilityStore,
563
564 #[subsystem(blocking, NetworkBridgeRxMessage, sends: [
565 BitfieldDistributionMessage,
566 StatementDistributionMessage,
567 ApprovalVotingParallelMessage,
568 GossipSupportMessage,
569 DisputeDistributionMessage,
570 CollationGenerationMessage,
571 CollatorProtocolMessage,
572 ])]
573 network_bridge_rx: NetworkBridgeRx,
574
575 #[subsystem(blocking, NetworkBridgeTxMessage, sends: [])]
576 network_bridge_tx: NetworkBridgeTx,
577
578 #[subsystem(blocking, ChainApiMessage, sends: [])]
579 chain_api: ChainApi,
580
581 #[subsystem(CollationGenerationMessage, sends: [
582 RuntimeApiMessage,
583 CollatorProtocolMessage,
584 ])]
585 collation_generation: CollationGeneration,
586
587 #[subsystem(CollatorProtocolMessage, sends: [
588 NetworkBridgeTxMessage,
589 RuntimeApiMessage,
590 CandidateBackingMessage,
591 ChainApiMessage,
592 ProspectiveParachainsMessage,
593 ])]
594 collator_protocol: CollatorProtocol,
595
596 #[subsystem(blocking, message_capacity: 64000, ApprovalDistributionMessage, sends: [
597 NetworkBridgeTxMessage,
598 ApprovalVotingMessage,
599 RuntimeApiMessage,
600 ], can_receive_priority_messages)]
601 approval_distribution: ApprovalDistribution,
602
603 #[subsystem(blocking, ApprovalVotingMessage, sends: [
604 ApprovalDistributionMessage,
605 AvailabilityRecoveryMessage,
606 CandidateValidationMessage,
607 ChainApiMessage,
608 ChainSelectionMessage,
609 DisputeCoordinatorMessage,
610 RuntimeApiMessage,
611 ])]
612 approval_voting: ApprovalVoting,
613 #[subsystem(blocking, message_capacity: 64000, ApprovalVotingParallelMessage, sends: [
614 AvailabilityRecoveryMessage,
615 CandidateValidationMessage,
616 ChainApiMessage,
617 ChainSelectionMessage,
618 DisputeCoordinatorMessage,
619 RuntimeApiMessage,
620 NetworkBridgeTxMessage,
621 ApprovalVotingParallelMessage,
622 ], can_receive_priority_messages)]
623 approval_voting_parallel: ApprovalVotingParallel,
624 #[subsystem(GossipSupportMessage, sends: [
625 NetworkBridgeTxMessage,
626 NetworkBridgeRxMessage, RuntimeApiMessage,
628 ChainSelectionMessage,
629 ChainApiMessage,
630 ], can_receive_priority_messages)]
631 gossip_support: GossipSupport,
632
633 #[subsystem(blocking, message_capacity: 32000, DisputeCoordinatorMessage, sends: [
634 RuntimeApiMessage,
635 ChainApiMessage,
636 DisputeDistributionMessage,
637 CandidateValidationMessage,
638 AvailabilityStoreMessage,
639 AvailabilityRecoveryMessage,
640 ChainSelectionMessage,
641 ApprovalVotingParallelMessage,
642 ], can_receive_priority_messages)]
643 dispute_coordinator: DisputeCoordinator,
644
645 #[subsystem(DisputeDistributionMessage, sends: [
646 RuntimeApiMessage,
647 DisputeCoordinatorMessage,
648 NetworkBridgeTxMessage,
649 ])]
650 dispute_distribution: DisputeDistribution,
651
652 #[subsystem(blocking, ChainSelectionMessage, sends: [ChainApiMessage])]
653 chain_selection: ChainSelection,
654
655 #[subsystem(ProspectiveParachainsMessage, sends: [
656 RuntimeApiMessage,
657 ChainApiMessage,
658 ])]
659 prospective_parachains: ProspectiveParachains,
660
661 pub activation_external_listeners: HashMap<Hash, Vec<oneshot::Sender<SubsystemResult<()>>>>,
663
664 pub active_leaves: HashMap<Hash, BlockNumber>,
666
667 pub supports_parachains: SupportsParachains,
669
670 pub metrics: OverseerMetrics,
672}
673
674pub fn spawn_metronome_metrics<S, SupportsParachains>(
676 overseer: &mut Overseer<S, SupportsParachains>,
677 metronome_metrics: OverseerMetrics,
678) -> Result<(), SubsystemError>
679where
680 S: Spawner,
681 SupportsParachains: HeadSupportsParachains,
682{
683 struct ExtractNameAndMeters;
684
685 impl<'a, T: 'a> MapSubsystem<&'a OrchestratedSubsystem<T>> for ExtractNameAndMeters {
686 type Output = Option<(&'static str, SubsystemMeters)>;
687
688 fn map_subsystem(&self, subsystem: &'a OrchestratedSubsystem<T>) -> Self::Output {
689 subsystem
690 .instance
691 .as_ref()
692 .map(|instance| (instance.name, instance.meters.clone()))
693 }
694 }
695 let subsystem_meters = overseer.map_subsystems(ExtractNameAndMeters);
696
697 #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
698 let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> =
699 match memory_stats::MemoryAllocationTracker::new() {
700 Ok(memory_stats) =>
701 Box::new(move |metrics: &OverseerMetrics| match memory_stats.snapshot() {
702 Ok(memory_stats_snapshot) => {
703 gum::trace!(
704 target: LOG_TARGET,
705 "memory_stats: {:?}",
706 &memory_stats_snapshot
707 );
708 metrics.memory_stats_snapshot(memory_stats_snapshot);
709 },
710 Err(e) =>
711 gum::debug!(target: LOG_TARGET, "Failed to obtain memory stats: {:?}", e),
712 }),
713 Err(_) => {
714 gum::debug!(
715 target: LOG_TARGET,
716 "Memory allocation tracking is not supported by the allocator.",
717 );
718
719 Box::new(|_| {})
720 },
721 };
722
723 #[cfg(not(any(target_os = "linux", feature = "jemalloc-allocator")))]
724 let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> = Box::new(|_| {});
725
726 let metronome = Metronome::new(std::time::Duration::from_millis(950)).for_each(move |_| {
727 collect_memory_stats(&metronome_metrics);
728
729 metronome_metrics.channel_metrics_snapshot(
733 subsystem_meters
734 .iter()
735 .cloned()
736 .flatten()
737 .map(|(name, ref meters)| (name, meters.read())),
738 );
739
740 futures::future::ready(())
741 });
742 overseer
743 .spawner()
744 .spawn("metrics-metronome", Some("overseer"), Box::pin(metronome));
745
746 Ok(())
747}
748
749impl<S, SupportsParachains> Overseer<S, SupportsParachains>
750where
751 SupportsParachains: HeadSupportsParachains,
752 S: Spawner,
753{
754 async fn stop(mut self) {
756 let _ = self.wait_terminate(OverseerSignal::Conclude, Duration::from_secs(1_u64)).await;
757 }
758
759 pub async fn run(self) {
763 if let Err(err) = self.run_inner().await {
764 gum::error!(target: LOG_TARGET, ?err, "Overseer exited with error");
765 }
766 }
767
768 async fn run_inner(mut self) -> SubsystemResult<()> {
769 let metrics = self.metrics.clone();
770 spawn_metronome_metrics(&mut self, metrics)?;
771
772 loop {
773 select! {
774 msg = self.events_rx.select_next_some() => {
775 match msg {
776 Event::MsgToSubsystem { msg, origin, priority } => {
777 match priority {
778 PriorityLevel::Normal => {
779 self.route_message(msg.into(), origin).await?;
780 },
781 PriorityLevel::High => {
782 self.route_message_with_priority::<HighPriority>(msg.into(), origin).await?;
783 },
784 }
785 self.metrics.on_message_relayed();
786 }
787 Event::Stop => {
788 self.stop().await;
789 return Ok(());
790 }
791 Event::BlockImported(block) => {
792 self.block_imported(block).await?;
793 }
794 Event::BlockFinalized(block) => {
795 self.block_finalized(block).await?;
796 }
797 Event::ExternalRequest(request) => {
798 self.handle_external_request(request);
799 }
800 }
801 },
802 msg = self.to_orchestra_rx.select_next_some() => {
803 match msg {
804 ToOrchestra::SpawnJob { name, subsystem, s } => {
805 self.spawn_job(name, subsystem, s);
806 }
807 ToOrchestra::SpawnBlockingJob { name, subsystem, s } => {
808 self.spawn_blocking_job(name, subsystem, s);
809 }
810 }
811 },
812 res = self.running_subsystems.select_next_some() => {
813 gum::error!(
814 target: LOG_TARGET,
815 subsystem = ?res,
816 "subsystem finished unexpectedly",
817 );
818 self.stop().await;
819 return res;
820 },
821 }
822 }
823 }
824
825 async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
826 match self.active_leaves.entry(block.hash) {
827 hash_map::Entry::Vacant(entry) => entry.insert(block.number),
828 hash_map::Entry::Occupied(entry) => {
829 debug_assert_eq!(*entry.get(), block.number);
830 return Ok(())
831 },
832 };
833
834 let mut update = match self.on_head_activated(&block.hash, Some(block.parent_hash)).await {
835 Some(_) => ActiveLeavesUpdate::start_work(ActivatedLeaf {
836 hash: block.hash,
837 number: block.number,
838 unpin_handle: block.unpin_handle,
839 }),
840 None => ActiveLeavesUpdate::default(),
841 };
842
843 if let Some(number) = self.active_leaves.remove(&block.parent_hash) {
844 debug_assert_eq!(block.number.saturating_sub(1), number);
845 update.deactivated.push(block.parent_hash);
846 self.on_head_deactivated(&block.parent_hash);
847 }
848
849 self.clean_up_external_listeners();
850
851 if !update.is_empty() {
852 self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
853 }
854 Ok(())
855 }
856
857 async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
858 let mut update = ActiveLeavesUpdate::default();
859
860 self.active_leaves.retain(|h, n| {
861 if *n <= block.number && *h != block.hash {
864 update.deactivated.push(*h);
865 false
866 } else {
867 true
868 }
869 });
870
871 for deactivated in &update.deactivated {
872 self.on_head_deactivated(deactivated)
873 }
874
875 self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number))
876 .await?;
877
878 if !update.is_empty() {
883 self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
884 }
885
886 Ok(())
887 }
888
889 async fn on_head_activated(&mut self, hash: &Hash, _parent_hash: Option<Hash>) -> Option<()> {
892 if !self.supports_parachains.head_supports_parachains(hash).await {
893 return None
894 }
895
896 self.metrics.on_head_activated();
897 if let Some(listeners) = self.activation_external_listeners.remove(hash) {
898 gum::trace!(
899 target: LOG_TARGET,
900 relay_parent = ?hash,
901 "Leaf got activated, notifying external listeners"
902 );
903 for listener in listeners {
904 let _ = listener.send(Ok(()));
906 }
907 }
908
909 Some(())
910 }
911
912 fn on_head_deactivated(&mut self, hash: &Hash) {
913 self.metrics.on_head_deactivated();
914 self.activation_external_listeners.remove(hash);
915 }
916
917 fn clean_up_external_listeners(&mut self) {
918 self.activation_external_listeners.retain(|_, v| {
919 v.retain(|c| !c.is_canceled());
921 !v.is_empty()
922 })
923 }
924
925 fn handle_external_request(&mut self, request: ExternalRequest) {
926 match request {
927 ExternalRequest::WaitForActivation { hash, response_channel } => {
928 if self.active_leaves.get(&hash).is_some() {
929 gum::trace!(
930 target: LOG_TARGET,
931 relay_parent = ?hash,
932 "Leaf was already ready - answering `WaitForActivation`"
933 );
934 let _ = response_channel.send(Ok(()));
936 } else {
937 gum::trace!(
938 target: LOG_TARGET,
939 relay_parent = ?hash,
940 "Leaf not yet ready - queuing `WaitForActivation` sender"
941 );
942 self.activation_external_listeners
943 .entry(hash)
944 .or_default()
945 .push(response_channel);
946 }
947 },
948 }
949 }
950
951 fn spawn_job(
952 &mut self,
953 task_name: &'static str,
954 subsystem_name: Option<&'static str>,
955 j: BoxFuture<'static, ()>,
956 ) {
957 self.spawner.spawn(task_name, subsystem_name, j);
958 }
959
960 fn spawn_blocking_job(
961 &mut self,
962 task_name: &'static str,
963 subsystem_name: Option<&'static str>,
964 j: BoxFuture<'static, ()>,
965 ) {
966 self.spawner.spawn_blocking(task_name, subsystem_name, j);
967 }
968}