1#![warn(missing_docs)]
63#![allow(dead_code, irrefutable_let_patterns)]
65
66use std::{
67 collections::{hash_map, HashMap},
68 fmt::{self, Debug},
69 pin::Pin,
70 sync::Arc,
71 time::Duration,
72};
73
74use futures::{channel::oneshot, future::BoxFuture, select, Future, FutureExt, StreamExt};
75
76use polkadot_primitives::{Block, BlockNumber, Hash};
77use sc_client_api::{BlockImportNotification, BlockchainEvents, FinalityNotification};
78
79use self::messages::{BitfieldSigningMessage, PvfCheckerMessage};
80use polkadot_node_subsystem_types::messages::{
81 ApprovalDistributionMessage, ApprovalVotingMessage, ApprovalVotingParallelMessage,
82 AvailabilityDistributionMessage, AvailabilityRecoveryMessage, AvailabilityStoreMessage,
83 BitfieldDistributionMessage, CandidateBackingMessage, CandidateValidationMessage,
84 ChainApiMessage, ChainSelectionMessage, CollationGenerationMessage, CollatorProtocolMessage,
85 DisputeCoordinatorMessage, DisputeDistributionMessage, GossipSupportMessage,
86 NetworkBridgeRxMessage, NetworkBridgeTxMessage, ProspectiveParachainsMessage,
87 ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage,
88};
89
90pub use polkadot_node_subsystem_types::{
91 errors::{SubsystemError, SubsystemResult},
92 ActivatedLeaf, ActiveLeavesUpdate, ChainApiBackend, OverseerSignal, RuntimeApiSubsystemClient,
93 UnpinHandle,
94};
95
96pub mod metrics;
97pub use self::metrics::Metrics as OverseerMetrics;
98
99pub mod dummy;
101pub use self::dummy::DummySubsystem;
102
103pub use polkadot_node_metrics::{
104 metrics::{prometheus, Metrics as MetricsTrait},
105 Metronome,
106};
107
108pub use orchestra as gen;
109pub use orchestra::{
110 contextbounds, orchestra, subsystem, FromOrchestra, HighPriority, MapSubsystem, MessagePacket,
111 NormalPriority, OrchestraError as OverseerError, Priority, PriorityLevel, SignalsReceived,
112 Spawner, Subsystem, SubsystemContext, SubsystemIncomingMessages, SubsystemInstance,
113 SubsystemMeterReadouts, SubsystemMeters, SubsystemSender, TimeoutExt, ToOrchestra,
114 TrySendError,
115};
116
117#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
118mod memory_stats;
119#[cfg(test)]
120mod tests;
121
122use sp_core::traits::SpawnNamed;
123
124pub struct SpawnGlue<S>(pub S);
126
127impl<S> AsRef<S> for SpawnGlue<S> {
128 fn as_ref(&self) -> &S {
129 &self.0
130 }
131}
132
133impl<S: Clone> Clone for SpawnGlue<S> {
134 fn clone(&self) -> Self {
135 Self(self.0.clone())
136 }
137}
138
139impl<S: SpawnNamed + Clone + Send + Sync> crate::gen::Spawner for SpawnGlue<S> {
140 fn spawn_blocking(
141 &self,
142 name: &'static str,
143 group: Option<&'static str>,
144 future: futures::future::BoxFuture<'static, ()>,
145 ) {
146 SpawnNamed::spawn_blocking(&self.0, name, group, future)
147 }
148 fn spawn(
149 &self,
150 name: &'static str,
151 group: Option<&'static str>,
152 future: futures::future::BoxFuture<'static, ()>,
153 ) {
154 SpawnNamed::spawn(&self.0, name, group, future)
155 }
156}
157
158#[async_trait::async_trait]
160pub trait HeadSupportsParachains {
161 async fn head_supports_parachains(&self, head: &Hash) -> bool;
163}
164
165#[async_trait::async_trait]
166impl<Client> HeadSupportsParachains for Arc<Client>
167where
168 Client: RuntimeApiSubsystemClient + Sync + Send,
169{
170 async fn head_supports_parachains(&self, head: &Hash) -> bool {
171 self.api_version_parachain_host(*head).await.ok().flatten().unwrap_or(0) >= 1
173 }
174}
175
176#[derive(Clone)]
180pub struct Handle(OverseerHandle);
181
182impl Handle {
183 pub fn new(raw: OverseerHandle) -> Self {
185 Self(raw)
186 }
187
188 pub async fn block_imported(&mut self, block: BlockInfo) {
190 self.send_and_log_error(Event::BlockImported(block)).await
191 }
192
193 pub async fn send_msg(&mut self, msg: impl Into<AllMessages>, origin: &'static str) {
195 self.send_msg_with_priority(msg, origin, PriorityLevel::Normal).await
196 }
197
198 pub async fn send_msg_with_priority(
200 &mut self,
201 msg: impl Into<AllMessages>,
202 origin: &'static str,
203 priority: PriorityLevel,
204 ) {
205 self.send_and_log_error(Event::MsgToSubsystem { msg: msg.into(), origin, priority })
206 .await
207 }
208
209 #[inline(always)]
211 pub async fn send_msg_anon(&mut self, msg: impl Into<AllMessages>) {
212 self.send_msg(msg, "").await
213 }
214
215 pub async fn block_finalized(&mut self, block: BlockInfo) {
217 self.send_and_log_error(Event::BlockFinalized(block)).await
218 }
219
220 pub async fn wait_for_activation(
227 &mut self,
228 hash: Hash,
229 response_channel: oneshot::Sender<SubsystemResult<()>>,
230 ) {
231 self.send_and_log_error(Event::ExternalRequest(ExternalRequest::WaitForActivation {
232 hash,
233 response_channel,
234 }))
235 .await;
236 }
237
238 pub async fn stop(&mut self) {
240 self.send_and_log_error(Event::Stop).await;
241 }
242
243 async fn send_and_log_error(&mut self, event: Event) {
245 if self.0.send(event).await.is_err() {
246 gum::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
247 }
248 }
249}
250
251#[derive(Debug, Clone)]
258pub struct BlockInfo {
259 pub hash: Hash,
261 pub parent_hash: Hash,
263 pub number: BlockNumber,
265 pub unpin_handle: UnpinHandle,
267}
268
269impl From<BlockImportNotification<Block>> for BlockInfo {
270 fn from(n: BlockImportNotification<Block>) -> Self {
271 let hash = n.hash;
272 let parent_hash = n.header.parent_hash;
273 let number = n.header.number;
274 let unpin_handle = n.into_unpin_handle();
275
276 BlockInfo { hash, parent_hash, number, unpin_handle }
277 }
278}
279
280impl From<FinalityNotification<Block>> for BlockInfo {
281 fn from(n: FinalityNotification<Block>) -> Self {
282 let hash = n.hash;
283 let parent_hash = n.header.parent_hash;
284 let number = n.header.number;
285 let unpin_handle = n.into_unpin_handle();
286
287 BlockInfo { hash, parent_hash, number, unpin_handle }
288 }
289}
290
291#[derive(Debug)]
294pub enum Event {
295 BlockImported(BlockInfo),
302 BlockFinalized(BlockInfo),
304 MsgToSubsystem {
306 msg: AllMessages,
308 origin: &'static str,
310 priority: PriorityLevel,
312 },
313 ExternalRequest(ExternalRequest),
315 Stop,
317}
318
319#[derive(Debug)]
321pub enum ExternalRequest {
322 WaitForActivation {
325 hash: Hash,
327 response_channel: oneshot::Sender<SubsystemResult<()>>,
329 },
330}
331
332pub async fn forward_events<P: BlockchainEvents<Block>>(client: Arc<P>, mut handle: Handle) {
335 let mut finality = client.finality_notification_stream();
336 let mut imports = client.import_notification_stream();
337
338 loop {
339 select! {
340 f = finality.next() => {
341 match f {
342 Some(block) => {
343 handle.block_finalized(block.into()).await;
344 }
345 None => break,
346 }
347 },
348 i = imports.next() => {
349 match i {
350 Some(block) => {
351 handle.block_imported(block.into()).await;
352 }
353 None => break,
354 }
355 },
356 complete => break,
357 }
358 }
359}
360
361#[orchestra(
477 gen=AllMessages,
478 event=Event,
479 signal=OverseerSignal,
480 error=SubsystemError,
481 message_capacity=2048,
482)]
483pub struct Overseer<SupportsParachains> {
484 #[subsystem(CandidateValidationMessage, sends: [
485 ChainApiMessage,
486 RuntimeApiMessage,
487 ])]
488 candidate_validation: CandidateValidation,
489
490 #[subsystem(sends: [
491 CandidateValidationMessage,
492 RuntimeApiMessage,
493 ])]
494 pvf_checker: PvfChecker,
495
496 #[subsystem(CandidateBackingMessage, sends: [
497 CandidateValidationMessage,
498 CollatorProtocolMessage,
499 ChainApiMessage,
500 AvailabilityDistributionMessage,
501 AvailabilityStoreMessage,
502 StatementDistributionMessage,
503 ProvisionerMessage,
504 RuntimeApiMessage,
505 ProspectiveParachainsMessage,
506 ])]
507 candidate_backing: CandidateBacking,
508
509 #[subsystem(StatementDistributionMessage, sends: [
510 NetworkBridgeTxMessage,
511 CandidateBackingMessage,
512 RuntimeApiMessage,
513 ProspectiveParachainsMessage,
514 ChainApiMessage,
515 ], can_receive_priority_messages)]
516 statement_distribution: StatementDistribution,
517
518 #[subsystem(AvailabilityDistributionMessage, sends: [
519 AvailabilityStoreMessage,
520 ChainApiMessage,
521 RuntimeApiMessage,
522 NetworkBridgeTxMessage,
523 ])]
524 availability_distribution: AvailabilityDistribution,
525
526 #[subsystem(AvailabilityRecoveryMessage, sends: [
527 NetworkBridgeTxMessage,
528 RuntimeApiMessage,
529 AvailabilityStoreMessage,
530 ])]
531 availability_recovery: AvailabilityRecovery,
532
533 #[subsystem(blocking, sends: [
534 AvailabilityStoreMessage,
535 RuntimeApiMessage,
536 BitfieldDistributionMessage,
537 ])]
538 bitfield_signing: BitfieldSigning,
539
540 #[subsystem(blocking, message_capacity: 8192, BitfieldDistributionMessage, sends: [
541 RuntimeApiMessage,
542 NetworkBridgeTxMessage,
543 ProvisionerMessage,
544 ], can_receive_priority_messages)]
545 bitfield_distribution: BitfieldDistribution,
546
547 #[subsystem(ProvisionerMessage, sends: [
548 RuntimeApiMessage,
549 CandidateBackingMessage,
550 DisputeCoordinatorMessage,
551 ProspectiveParachainsMessage,
552 ChainApiMessage,
553 ])]
554 provisioner: Provisioner,
555
556 #[subsystem(blocking, RuntimeApiMessage, sends: [])]
557 runtime_api: RuntimeApi,
558
559 #[subsystem(blocking, AvailabilityStoreMessage, sends: [
560 ChainApiMessage,
561 RuntimeApiMessage,
562 ])]
563 availability_store: AvailabilityStore,
564
565 #[subsystem(blocking, NetworkBridgeRxMessage, sends: [
566 BitfieldDistributionMessage,
567 StatementDistributionMessage,
568 ApprovalVotingParallelMessage,
569 GossipSupportMessage,
570 DisputeDistributionMessage,
571 CollationGenerationMessage,
572 CollatorProtocolMessage,
573 ])]
574 network_bridge_rx: NetworkBridgeRx,
575
576 #[subsystem(blocking, NetworkBridgeTxMessage, sends: [])]
577 network_bridge_tx: NetworkBridgeTx,
578
579 #[subsystem(blocking, ChainApiMessage, sends: [])]
580 chain_api: ChainApi,
581
582 #[subsystem(CollationGenerationMessage, sends: [
583 RuntimeApiMessage,
584 CollatorProtocolMessage,
585 ])]
586 collation_generation: CollationGeneration,
587
588 #[subsystem(CollatorProtocolMessage, sends: [
589 NetworkBridgeTxMessage,
590 RuntimeApiMessage,
591 CandidateBackingMessage,
592 ChainApiMessage,
593 ProspectiveParachainsMessage,
594 ])]
595 collator_protocol: CollatorProtocol,
596
597 #[subsystem(blocking, message_capacity: 64000, ApprovalDistributionMessage, sends: [
598 NetworkBridgeTxMessage,
599 ApprovalVotingMessage,
600 RuntimeApiMessage,
601 ], can_receive_priority_messages)]
602 approval_distribution: ApprovalDistribution,
603
604 #[subsystem(blocking, ApprovalVotingMessage, sends: [
605 ApprovalDistributionMessage,
606 AvailabilityRecoveryMessage,
607 CandidateValidationMessage,
608 ChainApiMessage,
609 ChainSelectionMessage,
610 DisputeCoordinatorMessage,
611 RuntimeApiMessage,
612 ])]
613 approval_voting: ApprovalVoting,
614 #[subsystem(blocking, message_capacity: 64000, ApprovalVotingParallelMessage, sends: [
615 AvailabilityRecoveryMessage,
616 CandidateValidationMessage,
617 ChainApiMessage,
618 ChainSelectionMessage,
619 DisputeCoordinatorMessage,
620 RuntimeApiMessage,
621 NetworkBridgeTxMessage,
622 ApprovalVotingParallelMessage,
623 ], can_receive_priority_messages)]
624 approval_voting_parallel: ApprovalVotingParallel,
625 #[subsystem(GossipSupportMessage, sends: [
626 NetworkBridgeTxMessage,
627 NetworkBridgeRxMessage, RuntimeApiMessage,
629 ChainSelectionMessage,
630 ChainApiMessage,
631 ], can_receive_priority_messages)]
632 gossip_support: GossipSupport,
633
634 #[subsystem(blocking, message_capacity: 32000, DisputeCoordinatorMessage, sends: [
635 RuntimeApiMessage,
636 ChainApiMessage,
637 DisputeDistributionMessage,
638 CandidateValidationMessage,
639 AvailabilityStoreMessage,
640 AvailabilityRecoveryMessage,
641 ChainSelectionMessage,
642 ApprovalVotingParallelMessage,
643 ], can_receive_priority_messages)]
644 dispute_coordinator: DisputeCoordinator,
645
646 #[subsystem(DisputeDistributionMessage, sends: [
647 RuntimeApiMessage,
648 DisputeCoordinatorMessage,
649 NetworkBridgeTxMessage,
650 ])]
651 dispute_distribution: DisputeDistribution,
652
653 #[subsystem(blocking, ChainSelectionMessage, sends: [ChainApiMessage])]
654 chain_selection: ChainSelection,
655
656 #[subsystem(ProspectiveParachainsMessage, sends: [
657 RuntimeApiMessage,
658 ChainApiMessage,
659 ])]
660 prospective_parachains: ProspectiveParachains,
661
662 pub activation_external_listeners: HashMap<Hash, Vec<oneshot::Sender<SubsystemResult<()>>>>,
664
665 pub active_leaves: HashMap<Hash, BlockNumber>,
667
668 pub supports_parachains: SupportsParachains,
670
671 pub metrics: OverseerMetrics,
673}
674
675pub fn spawn_metronome_metrics<S, SupportsParachains>(
677 overseer: &mut Overseer<S, SupportsParachains>,
678 metronome_metrics: OverseerMetrics,
679) -> Result<(), SubsystemError>
680where
681 S: Spawner,
682 SupportsParachains: HeadSupportsParachains,
683{
684 struct ExtractNameAndMeters;
685
686 impl<'a, T: 'a> MapSubsystem<&'a OrchestratedSubsystem<T>> for ExtractNameAndMeters {
687 type Output = Option<(&'static str, SubsystemMeters)>;
688
689 fn map_subsystem(&self, subsystem: &'a OrchestratedSubsystem<T>) -> Self::Output {
690 subsystem
691 .instance
692 .as_ref()
693 .map(|instance| (instance.name, instance.meters.clone()))
694 }
695 }
696 let subsystem_meters = overseer.map_subsystems(ExtractNameAndMeters);
697
698 #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
699 let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> =
700 match memory_stats::MemoryAllocationTracker::new() {
701 Ok(memory_stats) =>
702 Box::new(move |metrics: &OverseerMetrics| match memory_stats.snapshot() {
703 Ok(memory_stats_snapshot) => {
704 gum::trace!(
705 target: LOG_TARGET,
706 "memory_stats: {:?}",
707 &memory_stats_snapshot
708 );
709 metrics.memory_stats_snapshot(memory_stats_snapshot);
710 },
711 Err(e) =>
712 gum::debug!(target: LOG_TARGET, "Failed to obtain memory stats: {:?}", e),
713 }),
714 Err(_) => {
715 gum::debug!(
716 target: LOG_TARGET,
717 "Memory allocation tracking is not supported by the allocator.",
718 );
719
720 Box::new(|_| {})
721 },
722 };
723
724 #[cfg(not(any(target_os = "linux", feature = "jemalloc-allocator")))]
725 let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> = Box::new(|_| {});
726
727 let metronome = Metronome::new(std::time::Duration::from_millis(950)).for_each(move |_| {
728 collect_memory_stats(&metronome_metrics);
729
730 metronome_metrics.channel_metrics_snapshot(
734 subsystem_meters
735 .iter()
736 .cloned()
737 .flatten()
738 .map(|(name, ref meters)| (name, meters.read())),
739 );
740
741 futures::future::ready(())
742 });
743 overseer
744 .spawner()
745 .spawn("metrics-metronome", Some("overseer"), Box::pin(metronome));
746
747 Ok(())
748}
749
750impl<S, SupportsParachains> Overseer<S, SupportsParachains>
751where
752 SupportsParachains: HeadSupportsParachains,
753 S: Spawner,
754{
755 async fn stop(mut self) {
757 let _ = self.wait_terminate(OverseerSignal::Conclude, Duration::from_secs(1_u64)).await;
758 }
759
760 pub async fn run(self) {
764 if let Err(err) = self.run_inner().await {
765 gum::error!(target: LOG_TARGET, ?err, "Overseer exited with error");
766 }
767 }
768
769 async fn run_inner(mut self) -> SubsystemResult<()> {
770 let metrics = self.metrics.clone();
771 spawn_metronome_metrics(&mut self, metrics)?;
772
773 loop {
774 select! {
775 msg = self.events_rx.select_next_some() => {
776 match msg {
777 Event::MsgToSubsystem { msg, origin, priority } => {
778 match priority {
779 PriorityLevel::Normal => {
780 self.route_message(msg.into(), origin).await?;
781 },
782 PriorityLevel::High => {
783 self.route_message_with_priority::<HighPriority>(msg.into(), origin).await?;
784 },
785 }
786 self.metrics.on_message_relayed();
787 }
788 Event::Stop => {
789 self.stop().await;
790 return Ok(());
791 }
792 Event::BlockImported(block) => {
793 self.block_imported(block).await?;
794 }
795 Event::BlockFinalized(block) => {
796 self.block_finalized(block).await?;
797 }
798 Event::ExternalRequest(request) => {
799 self.handle_external_request(request);
800 }
801 }
802 },
803 msg = self.to_orchestra_rx.select_next_some() => {
804 match msg {
805 ToOrchestra::SpawnJob { name, subsystem, s } => {
806 self.spawn_job(name, subsystem, s);
807 }
808 ToOrchestra::SpawnBlockingJob { name, subsystem, s } => {
809 self.spawn_blocking_job(name, subsystem, s);
810 }
811 }
812 },
813 res = self.running_subsystems.select_next_some() => {
814 gum::error!(
815 target: LOG_TARGET,
816 subsystem = ?res,
817 "subsystem finished unexpectedly",
818 );
819 self.stop().await;
820 return res;
821 },
822 }
823 }
824 }
825
826 async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
827 match self.active_leaves.entry(block.hash) {
828 hash_map::Entry::Vacant(entry) => entry.insert(block.number),
829 hash_map::Entry::Occupied(entry) => {
830 debug_assert_eq!(*entry.get(), block.number);
831 return Ok(())
832 },
833 };
834
835 let mut update = match self.on_head_activated(&block.hash, Some(block.parent_hash)).await {
836 Some(_) => ActiveLeavesUpdate::start_work(ActivatedLeaf {
837 hash: block.hash,
838 number: block.number,
839 unpin_handle: block.unpin_handle,
840 }),
841 None => ActiveLeavesUpdate::default(),
842 };
843
844 if let Some(number) = self.active_leaves.remove(&block.parent_hash) {
845 debug_assert_eq!(block.number.saturating_sub(1), number);
846 update.deactivated.push(block.parent_hash);
847 self.on_head_deactivated(&block.parent_hash);
848 }
849
850 self.clean_up_external_listeners();
851
852 if !update.is_empty() {
853 self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
854 }
855 Ok(())
856 }
857
858 async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
859 let mut update = ActiveLeavesUpdate::default();
860
861 self.active_leaves.retain(|h, n| {
862 if *n <= block.number && *h != block.hash {
865 update.deactivated.push(*h);
866 false
867 } else {
868 true
869 }
870 });
871
872 for deactivated in &update.deactivated {
873 self.on_head_deactivated(deactivated)
874 }
875
876 self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number))
877 .await?;
878
879 if !update.is_empty() {
884 self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
885 }
886
887 Ok(())
888 }
889
890 async fn on_head_activated(&mut self, hash: &Hash, _parent_hash: Option<Hash>) -> Option<()> {
893 if !self.supports_parachains.head_supports_parachains(hash).await {
894 return None
895 }
896
897 self.metrics.on_head_activated();
898 if let Some(listeners) = self.activation_external_listeners.remove(hash) {
899 gum::trace!(
900 target: LOG_TARGET,
901 relay_parent = ?hash,
902 "Leaf got activated, notifying external listeners"
903 );
904 for listener in listeners {
905 let _ = listener.send(Ok(()));
907 }
908 }
909
910 Some(())
911 }
912
913 fn on_head_deactivated(&mut self, hash: &Hash) {
914 self.metrics.on_head_deactivated();
915 self.activation_external_listeners.remove(hash);
916 }
917
918 fn clean_up_external_listeners(&mut self) {
919 self.activation_external_listeners.retain(|_, v| {
920 v.retain(|c| !c.is_canceled());
922 !v.is_empty()
923 })
924 }
925
926 fn handle_external_request(&mut self, request: ExternalRequest) {
927 match request {
928 ExternalRequest::WaitForActivation { hash, response_channel } => {
929 if self.active_leaves.get(&hash).is_some() {
930 gum::trace!(
931 target: LOG_TARGET,
932 relay_parent = ?hash,
933 "Leaf was already ready - answering `WaitForActivation`"
934 );
935 let _ = response_channel.send(Ok(()));
937 } else {
938 gum::trace!(
939 target: LOG_TARGET,
940 relay_parent = ?hash,
941 "Leaf not yet ready - queuing `WaitForActivation` sender"
942 );
943 self.activation_external_listeners
944 .entry(hash)
945 .or_default()
946 .push(response_channel);
947 }
948 },
949 }
950 }
951
952 fn spawn_job(
953 &mut self,
954 task_name: &'static str,
955 subsystem_name: Option<&'static str>,
956 j: BoxFuture<'static, ()>,
957 ) {
958 self.spawner.spawn(task_name, subsystem_name, j);
959 }
960
961 fn spawn_blocking_job(
962 &mut self,
963 task_name: &'static str,
964 subsystem_name: Option<&'static str>,
965 j: BoxFuture<'static, ()>,
966 ) {
967 self.spawner.spawn_blocking(task_name, subsystem_name, j);
968 }
969}