1#![warn(missing_docs)]
63#![allow(dead_code)] use std::{
66 collections::{hash_map, HashMap},
67 fmt::{self, Debug},
68 pin::Pin,
69 sync::Arc,
70 time::Duration,
71};
72
73use futures::{channel::oneshot, future::BoxFuture, select, Future, FutureExt, StreamExt};
74
75use polkadot_primitives::{Block, BlockNumber, Hash};
76use sc_client_api::{BlockImportNotification, BlockchainEvents, FinalityNotification};
77
78use self::messages::{BitfieldSigningMessage, PvfCheckerMessage};
79use polkadot_node_subsystem_types::messages::{
80 ApprovalDistributionMessage, ApprovalVotingMessage, ApprovalVotingParallelMessage,
81 AvailabilityDistributionMessage, AvailabilityRecoveryMessage, AvailabilityStoreMessage,
82 BitfieldDistributionMessage, CandidateBackingMessage, CandidateValidationMessage,
83 ChainApiMessage, ChainSelectionMessage, CollationGenerationMessage, CollatorProtocolMessage,
84 DisputeCoordinatorMessage, DisputeDistributionMessage, GossipSupportMessage,
85 NetworkBridgeRxMessage, NetworkBridgeTxMessage, ProspectiveParachainsMessage,
86 ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage,
87};
88
89pub use polkadot_node_subsystem_types::{
90 errors::{SubsystemError, SubsystemResult},
91 jaeger, ActivatedLeaf, ActiveLeavesUpdate, ChainApiBackend, OverseerSignal,
92 RuntimeApiSubsystemClient, UnpinHandle,
93};
94
95pub mod metrics;
96pub use self::metrics::Metrics as OverseerMetrics;
97
98pub mod dummy;
100pub use self::dummy::DummySubsystem;
101
102pub use polkadot_node_metrics::{
103 metrics::{prometheus, Metrics as MetricsTrait},
104 Metronome,
105};
106
107pub use orchestra as gen;
108pub use orchestra::{
109 contextbounds, orchestra, subsystem, FromOrchestra, HighPriority, MapSubsystem, MessagePacket,
110 NormalPriority, OrchestraError as OverseerError, Priority, PriorityLevel, SignalsReceived,
111 Spawner, Subsystem, SubsystemContext, SubsystemIncomingMessages, SubsystemInstance,
112 SubsystemMeterReadouts, SubsystemMeters, SubsystemSender, TimeoutExt, ToOrchestra,
113 TrySendError,
114};
115
116#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
117mod memory_stats;
118#[cfg(test)]
119mod tests;
120
121use sp_core::traits::SpawnNamed;
122
123pub struct SpawnGlue<S>(pub S);
125
126impl<S> AsRef<S> for SpawnGlue<S> {
127 fn as_ref(&self) -> &S {
128 &self.0
129 }
130}
131
132impl<S: Clone> Clone for SpawnGlue<S> {
133 fn clone(&self) -> Self {
134 Self(self.0.clone())
135 }
136}
137
138impl<S: SpawnNamed + Clone + Send + Sync> crate::gen::Spawner for SpawnGlue<S> {
139 fn spawn_blocking(
140 &self,
141 name: &'static str,
142 group: Option<&'static str>,
143 future: futures::future::BoxFuture<'static, ()>,
144 ) {
145 SpawnNamed::spawn_blocking(&self.0, name, group, future)
146 }
147 fn spawn(
148 &self,
149 name: &'static str,
150 group: Option<&'static str>,
151 future: futures::future::BoxFuture<'static, ()>,
152 ) {
153 SpawnNamed::spawn(&self.0, name, group, future)
154 }
155}
156
157#[async_trait::async_trait]
159pub trait HeadSupportsParachains {
160 async fn head_supports_parachains(&self, head: &Hash) -> bool;
162}
163
164#[async_trait::async_trait]
165impl<Client> HeadSupportsParachains for Arc<Client>
166where
167 Client: RuntimeApiSubsystemClient + Sync + Send,
168{
169 async fn head_supports_parachains(&self, head: &Hash) -> bool {
170 self.api_version_parachain_host(*head).await.ok().flatten().unwrap_or(0) >= 1
172 }
173}
174
175#[derive(Clone)]
179pub struct Handle(OverseerHandle);
180
181impl Handle {
182 pub fn new(raw: OverseerHandle) -> Self {
184 Self(raw)
185 }
186
187 pub async fn block_imported(&mut self, block: BlockInfo) {
189 self.send_and_log_error(Event::BlockImported(block)).await
190 }
191
192 pub async fn send_msg(&mut self, msg: impl Into<AllMessages>, origin: &'static str) {
194 self.send_and_log_error(Event::MsgToSubsystem { msg: msg.into(), origin }).await
195 }
196
197 #[inline(always)]
199 pub async fn send_msg_anon(&mut self, msg: impl Into<AllMessages>) {
200 self.send_msg(msg, "").await
201 }
202
203 pub async fn block_finalized(&mut self, block: BlockInfo) {
205 self.send_and_log_error(Event::BlockFinalized(block)).await
206 }
207
208 pub async fn wait_for_activation(
215 &mut self,
216 hash: Hash,
217 response_channel: oneshot::Sender<SubsystemResult<()>>,
218 ) {
219 self.send_and_log_error(Event::ExternalRequest(ExternalRequest::WaitForActivation {
220 hash,
221 response_channel,
222 }))
223 .await;
224 }
225
226 pub async fn stop(&mut self) {
228 self.send_and_log_error(Event::Stop).await;
229 }
230
231 async fn send_and_log_error(&mut self, event: Event) {
233 if self.0.send(event).await.is_err() {
234 gum::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
235 }
236 }
237}
238
239#[derive(Debug, Clone)]
246pub struct BlockInfo {
247 pub hash: Hash,
249 pub parent_hash: Hash,
251 pub number: BlockNumber,
253 pub unpin_handle: UnpinHandle,
255}
256
257impl From<BlockImportNotification<Block>> for BlockInfo {
258 fn from(n: BlockImportNotification<Block>) -> Self {
259 let hash = n.hash;
260 let parent_hash = n.header.parent_hash;
261 let number = n.header.number;
262 let unpin_handle = n.into_unpin_handle();
263
264 BlockInfo { hash, parent_hash, number, unpin_handle }
265 }
266}
267
268impl From<FinalityNotification<Block>> for BlockInfo {
269 fn from(n: FinalityNotification<Block>) -> Self {
270 let hash = n.hash;
271 let parent_hash = n.header.parent_hash;
272 let number = n.header.number;
273 let unpin_handle = n.into_unpin_handle();
274
275 BlockInfo { hash, parent_hash, number, unpin_handle }
276 }
277}
278
279#[derive(Debug)]
282pub enum Event {
283 BlockImported(BlockInfo),
290 BlockFinalized(BlockInfo),
292 MsgToSubsystem {
294 msg: AllMessages,
296 origin: &'static str,
298 },
299 ExternalRequest(ExternalRequest),
301 Stop,
303}
304
305#[derive(Debug)]
307pub enum ExternalRequest {
308 WaitForActivation {
311 hash: Hash,
313 response_channel: oneshot::Sender<SubsystemResult<()>>,
315 },
316}
317
318pub async fn forward_events<P: BlockchainEvents<Block>>(client: Arc<P>, mut handle: Handle) {
321 let mut finality = client.finality_notification_stream();
322 let mut imports = client.import_notification_stream();
323
324 loop {
325 select! {
326 f = finality.next() => {
327 match f {
328 Some(block) => {
329 handle.block_finalized(block.into()).await;
330 }
331 None => break,
332 }
333 },
334 i = imports.next() => {
335 match i {
336 Some(block) => {
337 handle.block_imported(block.into()).await;
338 }
339 None => break,
340 }
341 },
342 complete => break,
343 }
344 }
345}
346
347#[orchestra(
463 gen=AllMessages,
464 event=Event,
465 signal=OverseerSignal,
466 error=SubsystemError,
467 message_capacity=2048,
468)]
469pub struct Overseer<SupportsParachains> {
470 #[subsystem(CandidateValidationMessage, sends: [
471 RuntimeApiMessage,
472 ])]
473 candidate_validation: CandidateValidation,
474
475 #[subsystem(sends: [
476 CandidateValidationMessage,
477 RuntimeApiMessage,
478 ])]
479 pvf_checker: PvfChecker,
480
481 #[subsystem(CandidateBackingMessage, sends: [
482 CandidateValidationMessage,
483 CollatorProtocolMessage,
484 ChainApiMessage,
485 AvailabilityDistributionMessage,
486 AvailabilityStoreMessage,
487 StatementDistributionMessage,
488 ProvisionerMessage,
489 RuntimeApiMessage,
490 ProspectiveParachainsMessage,
491 ])]
492 candidate_backing: CandidateBacking,
493
494 #[subsystem(StatementDistributionMessage, sends: [
495 NetworkBridgeTxMessage,
496 CandidateBackingMessage,
497 RuntimeApiMessage,
498 ProspectiveParachainsMessage,
499 ChainApiMessage,
500 ], can_receive_priority_messages)]
501 statement_distribution: StatementDistribution,
502
503 #[subsystem(AvailabilityDistributionMessage, sends: [
504 AvailabilityStoreMessage,
505 ChainApiMessage,
506 RuntimeApiMessage,
507 NetworkBridgeTxMessage,
508 ])]
509 availability_distribution: AvailabilityDistribution,
510
511 #[subsystem(AvailabilityRecoveryMessage, sends: [
512 NetworkBridgeTxMessage,
513 RuntimeApiMessage,
514 AvailabilityStoreMessage,
515 ])]
516 availability_recovery: AvailabilityRecovery,
517
518 #[subsystem(blocking, sends: [
519 AvailabilityStoreMessage,
520 RuntimeApiMessage,
521 BitfieldDistributionMessage,
522 ])]
523 bitfield_signing: BitfieldSigning,
524
525 #[subsystem(blocking, message_capacity: 8192, BitfieldDistributionMessage, sends: [
526 RuntimeApiMessage,
527 NetworkBridgeTxMessage,
528 ProvisionerMessage,
529 ], can_receive_priority_messages)]
530 bitfield_distribution: BitfieldDistribution,
531
532 #[subsystem(ProvisionerMessage, sends: [
533 RuntimeApiMessage,
534 CandidateBackingMessage,
535 ChainApiMessage,
536 DisputeCoordinatorMessage,
537 ProspectiveParachainsMessage,
538 ])]
539 provisioner: Provisioner,
540
541 #[subsystem(blocking, RuntimeApiMessage, sends: [])]
542 runtime_api: RuntimeApi,
543
544 #[subsystem(blocking, AvailabilityStoreMessage, sends: [
545 ChainApiMessage,
546 RuntimeApiMessage,
547 ])]
548 availability_store: AvailabilityStore,
549
550 #[subsystem(blocking, NetworkBridgeRxMessage, sends: [
551 BitfieldDistributionMessage,
552 StatementDistributionMessage,
553 ApprovalDistributionMessage,
554 ApprovalVotingParallelMessage,
555 GossipSupportMessage,
556 DisputeDistributionMessage,
557 CollationGenerationMessage,
558 CollatorProtocolMessage,
559 ])]
560 network_bridge_rx: NetworkBridgeRx,
561
562 #[subsystem(blocking, NetworkBridgeTxMessage, sends: [])]
563 network_bridge_tx: NetworkBridgeTx,
564
565 #[subsystem(blocking, ChainApiMessage, sends: [])]
566 chain_api: ChainApi,
567
568 #[subsystem(CollationGenerationMessage, sends: [
569 RuntimeApiMessage,
570 CollatorProtocolMessage,
571 ])]
572 collation_generation: CollationGeneration,
573
574 #[subsystem(CollatorProtocolMessage, sends: [
575 NetworkBridgeTxMessage,
576 RuntimeApiMessage,
577 CandidateBackingMessage,
578 ChainApiMessage,
579 ProspectiveParachainsMessage,
580 ])]
581 collator_protocol: CollatorProtocol,
582
583 #[subsystem(blocking, message_capacity: 64000, ApprovalDistributionMessage, sends: [
584 NetworkBridgeTxMessage,
585 ApprovalVotingMessage,
586 RuntimeApiMessage,
587 ], can_receive_priority_messages)]
588 approval_distribution: ApprovalDistribution,
589
590 #[subsystem(blocking, ApprovalVotingMessage, sends: [
591 ApprovalDistributionMessage,
592 AvailabilityRecoveryMessage,
593 CandidateValidationMessage,
594 ChainApiMessage,
595 ChainSelectionMessage,
596 DisputeCoordinatorMessage,
597 RuntimeApiMessage,
598 ])]
599 approval_voting: ApprovalVoting,
600 #[subsystem(blocking, message_capacity: 64000, ApprovalVotingParallelMessage, sends: [
601 AvailabilityRecoveryMessage,
602 CandidateValidationMessage,
603 ChainApiMessage,
604 ChainSelectionMessage,
605 DisputeCoordinatorMessage,
606 RuntimeApiMessage,
607 NetworkBridgeTxMessage,
608 ApprovalVotingMessage,
609 ApprovalDistributionMessage,
610 ApprovalVotingParallelMessage,
611 ])]
612 approval_voting_parallel: ApprovalVotingParallel,
613 #[subsystem(GossipSupportMessage, sends: [
614 NetworkBridgeTxMessage,
615 NetworkBridgeRxMessage, RuntimeApiMessage,
617 ChainSelectionMessage,
618 ], can_receive_priority_messages)]
619 gossip_support: GossipSupport,
620
621 #[subsystem(blocking, message_capacity: 32000, DisputeCoordinatorMessage, sends: [
622 RuntimeApiMessage,
623 ChainApiMessage,
624 DisputeDistributionMessage,
625 CandidateValidationMessage,
626 ApprovalVotingMessage,
627 AvailabilityStoreMessage,
628 AvailabilityRecoveryMessage,
629 ChainSelectionMessage,
630 ApprovalVotingParallelMessage,
631 ])]
632 dispute_coordinator: DisputeCoordinator,
633
634 #[subsystem(DisputeDistributionMessage, sends: [
635 RuntimeApiMessage,
636 DisputeCoordinatorMessage,
637 NetworkBridgeTxMessage,
638 ])]
639 dispute_distribution: DisputeDistribution,
640
641 #[subsystem(blocking, ChainSelectionMessage, sends: [ChainApiMessage])]
642 chain_selection: ChainSelection,
643
644 #[subsystem(ProspectiveParachainsMessage, sends: [
645 RuntimeApiMessage,
646 ChainApiMessage,
647 ])]
648 prospective_parachains: ProspectiveParachains,
649
650 pub activation_external_listeners: HashMap<Hash, Vec<oneshot::Sender<SubsystemResult<()>>>>,
652
653 pub span_per_active_leaf: HashMap<Hash, Arc<jaeger::Span>>,
655
656 pub active_leaves: HashMap<Hash, BlockNumber>,
658
659 pub supports_parachains: SupportsParachains,
661
662 pub metrics: OverseerMetrics,
664}
665
666pub fn spawn_metronome_metrics<S, SupportsParachains>(
668 overseer: &mut Overseer<S, SupportsParachains>,
669 metronome_metrics: OverseerMetrics,
670) -> Result<(), SubsystemError>
671where
672 S: Spawner,
673 SupportsParachains: HeadSupportsParachains,
674{
675 struct ExtractNameAndMeters;
676
677 impl<'a, T: 'a> MapSubsystem<&'a OrchestratedSubsystem<T>> for ExtractNameAndMeters {
678 type Output = Option<(&'static str, SubsystemMeters)>;
679
680 fn map_subsystem(&self, subsystem: &'a OrchestratedSubsystem<T>) -> Self::Output {
681 subsystem
682 .instance
683 .as_ref()
684 .map(|instance| (instance.name, instance.meters.clone()))
685 }
686 }
687 let subsystem_meters = overseer.map_subsystems(ExtractNameAndMeters);
688
689 #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
690 let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> =
691 match memory_stats::MemoryAllocationTracker::new() {
692 Ok(memory_stats) =>
693 Box::new(move |metrics: &OverseerMetrics| match memory_stats.snapshot() {
694 Ok(memory_stats_snapshot) => {
695 gum::trace!(
696 target: LOG_TARGET,
697 "memory_stats: {:?}",
698 &memory_stats_snapshot
699 );
700 metrics.memory_stats_snapshot(memory_stats_snapshot);
701 },
702 Err(e) =>
703 gum::debug!(target: LOG_TARGET, "Failed to obtain memory stats: {:?}", e),
704 }),
705 Err(_) => {
706 gum::debug!(
707 target: LOG_TARGET,
708 "Memory allocation tracking is not supported by the allocator.",
709 );
710
711 Box::new(|_| {})
712 },
713 };
714
715 #[cfg(not(any(target_os = "linux", feature = "jemalloc-allocator")))]
716 let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> = Box::new(|_| {});
717
718 let metronome = Metronome::new(std::time::Duration::from_millis(950)).for_each(move |_| {
719 collect_memory_stats(&metronome_metrics);
720
721 metronome_metrics.channel_metrics_snapshot(
725 subsystem_meters
726 .iter()
727 .cloned()
728 .flatten()
729 .map(|(name, ref meters)| (name, meters.read())),
730 );
731
732 futures::future::ready(())
733 });
734 overseer
735 .spawner()
736 .spawn("metrics-metronome", Some("overseer"), Box::pin(metronome));
737
738 Ok(())
739}
740
741impl<S, SupportsParachains> Overseer<S, SupportsParachains>
742where
743 SupportsParachains: HeadSupportsParachains,
744 S: Spawner,
745{
746 async fn stop(mut self) {
748 let _ = self.wait_terminate(OverseerSignal::Conclude, Duration::from_secs(1_u64)).await;
749 }
750
751 pub async fn run(self) {
755 if let Err(err) = self.run_inner().await {
756 gum::error!(target: LOG_TARGET, ?err, "Overseer exited with error");
757 }
758 }
759
760 async fn run_inner(mut self) -> SubsystemResult<()> {
761 let metrics = self.metrics.clone();
762 spawn_metronome_metrics(&mut self, metrics)?;
763
764 loop {
765 select! {
766 msg = self.events_rx.select_next_some() => {
767 match msg {
768 Event::MsgToSubsystem { msg, origin } => {
769 self.route_message(msg.into(), origin).await?;
770 self.metrics.on_message_relayed();
771 }
772 Event::Stop => {
773 self.stop().await;
774 return Ok(());
775 }
776 Event::BlockImported(block) => {
777 self.block_imported(block).await?;
778 }
779 Event::BlockFinalized(block) => {
780 self.block_finalized(block).await?;
781 }
782 Event::ExternalRequest(request) => {
783 self.handle_external_request(request);
784 }
785 }
786 },
787 msg = self.to_orchestra_rx.select_next_some() => {
788 match msg {
789 ToOrchestra::SpawnJob { name, subsystem, s } => {
790 self.spawn_job(name, subsystem, s);
791 }
792 ToOrchestra::SpawnBlockingJob { name, subsystem, s } => {
793 self.spawn_blocking_job(name, subsystem, s);
794 }
795 }
796 },
797 res = self.running_subsystems.select_next_some() => {
798 gum::error!(
799 target: LOG_TARGET,
800 subsystem = ?res,
801 "subsystem finished unexpectedly",
802 );
803 self.stop().await;
804 return res;
805 },
806 }
807 }
808 }
809
810 async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
811 match self.active_leaves.entry(block.hash) {
812 hash_map::Entry::Vacant(entry) => entry.insert(block.number),
813 hash_map::Entry::Occupied(entry) => {
814 debug_assert_eq!(*entry.get(), block.number);
815 return Ok(())
816 },
817 };
818
819 let mut update = match self.on_head_activated(&block.hash, Some(block.parent_hash)).await {
820 Some(span) => ActiveLeavesUpdate::start_work(ActivatedLeaf {
821 hash: block.hash,
822 number: block.number,
823 unpin_handle: block.unpin_handle,
824 span,
825 }),
826 None => ActiveLeavesUpdate::default(),
827 };
828
829 if let Some(number) = self.active_leaves.remove(&block.parent_hash) {
830 debug_assert_eq!(block.number.saturating_sub(1), number);
831 update.deactivated.push(block.parent_hash);
832 self.on_head_deactivated(&block.parent_hash);
833 }
834
835 self.clean_up_external_listeners();
836
837 if !update.is_empty() {
838 self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
839 }
840 Ok(())
841 }
842
843 async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
844 let mut update = ActiveLeavesUpdate::default();
845
846 self.active_leaves.retain(|h, n| {
847 if *n <= block.number && *h != block.hash {
850 update.deactivated.push(*h);
851 false
852 } else {
853 true
854 }
855 });
856
857 for deactivated in &update.deactivated {
858 self.on_head_deactivated(deactivated)
859 }
860
861 self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number))
862 .await?;
863
864 if !update.is_empty() {
869 self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
870 }
871
872 Ok(())
873 }
874
875 async fn on_head_activated(
878 &mut self,
879 hash: &Hash,
880 parent_hash: Option<Hash>,
881 ) -> Option<Arc<jaeger::Span>> {
882 if !self.supports_parachains.head_supports_parachains(hash).await {
883 return None
884 }
885
886 self.metrics.on_head_activated();
887 if let Some(listeners) = self.activation_external_listeners.remove(hash) {
888 gum::trace!(
889 target: LOG_TARGET,
890 relay_parent = ?hash,
891 "Leaf got activated, notifying external listeners"
892 );
893 for listener in listeners {
894 let _ = listener.send(Ok(()));
896 }
897 }
898
899 let mut span = jaeger::Span::new(*hash, "leaf-activated");
900
901 if let Some(parent_span) = parent_hash.and_then(|h| self.span_per_active_leaf.get(&h)) {
902 span.add_follows_from(parent_span);
903 }
904
905 let span = Arc::new(span);
906 self.span_per_active_leaf.insert(*hash, span.clone());
907
908 Some(span)
909 }
910
911 fn on_head_deactivated(&mut self, hash: &Hash) {
912 self.metrics.on_head_deactivated();
913 self.activation_external_listeners.remove(hash);
914 self.span_per_active_leaf.remove(hash);
915 }
916
917 fn clean_up_external_listeners(&mut self) {
918 self.activation_external_listeners.retain(|_, v| {
919 v.retain(|c| !c.is_canceled());
921 !v.is_empty()
922 })
923 }
924
925 fn handle_external_request(&mut self, request: ExternalRequest) {
926 match request {
927 ExternalRequest::WaitForActivation { hash, response_channel } => {
928 if self.active_leaves.get(&hash).is_some() {
929 gum::trace!(
930 target: LOG_TARGET,
931 relay_parent = ?hash,
932 "Leaf was already ready - answering `WaitForActivation`"
933 );
934 let _ = response_channel.send(Ok(()));
936 } else {
937 gum::trace!(
938 target: LOG_TARGET,
939 relay_parent = ?hash,
940 "Leaf not yet ready - queuing `WaitForActivation` sender"
941 );
942 self.activation_external_listeners
943 .entry(hash)
944 .or_default()
945 .push(response_channel);
946 }
947 },
948 }
949 }
950
951 fn spawn_job(
952 &mut self,
953 task_name: &'static str,
954 subsystem_name: Option<&'static str>,
955 j: BoxFuture<'static, ()>,
956 ) {
957 self.spawner.spawn(task_name, subsystem_name, j);
958 }
959
960 fn spawn_blocking_job(
961 &mut self,
962 task_name: &'static str,
963 subsystem_name: Option<&'static str>,
964 j: BoxFuture<'static, ()>,
965 ) {
966 self.spawner.spawn_blocking(task_name, subsystem_name, j);
967 }
968}