referrerpolicy=no-referrer-when-downgrade

polkadot_service/
overseer.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use super::{Error, IsParachainNode, Registry};
18use polkadot_node_subsystem_types::{ChainApiBackend, RuntimeApiSubsystemClient};
19use polkadot_overseer::{DummySubsystem, InitializedOverseerBuilder, SubsystemError};
20use sp_core::traits::SpawnNamed;
21
22use polkadot_availability_distribution::IncomingRequestReceivers;
23use polkadot_node_core_approval_voting::Config as ApprovalVotingConfig;
24use polkadot_node_core_av_store::Config as AvailabilityConfig;
25use polkadot_node_core_candidate_validation::Config as CandidateValidationConfig;
26use polkadot_node_core_chain_selection::Config as ChainSelectionConfig;
27use polkadot_node_core_dispute_coordinator::Config as DisputeCoordinatorConfig;
28use polkadot_node_network_protocol::{
29	peer_set::{PeerSet, PeerSetProtocolNames},
30	request_response::{
31		v1 as request_v1, v2 as request_v2, IncomingRequestReceiver, ReqProtocolNames,
32	},
33};
34#[cfg(any(feature = "malus", test))]
35pub use polkadot_overseer::{dummy::dummy_overseer_builder, HeadSupportsParachains};
36use polkadot_overseer::{
37	metrics::Metrics as OverseerMetrics, MetricsTrait, Overseer, OverseerConnector, OverseerHandle,
38	SpawnGlue,
39};
40
41use parking_lot::Mutex;
42use sc_authority_discovery::Service as AuthorityDiscoveryService;
43use sc_client_api::AuxStore;
44use sc_keystore::LocalKeystore;
45use sc_network::{NetworkStateInfo, NotificationService};
46use std::{collections::HashMap, sync::Arc};
47
48pub use polkadot_approval_distribution::ApprovalDistribution as ApprovalDistributionSubsystem;
49pub use polkadot_availability_bitfield_distribution::BitfieldDistribution as BitfieldDistributionSubsystem;
50pub use polkadot_availability_distribution::AvailabilityDistributionSubsystem;
51pub use polkadot_availability_recovery::AvailabilityRecoverySubsystem;
52pub use polkadot_collator_protocol::{CollatorProtocolSubsystem, ProtocolSide};
53pub use polkadot_dispute_distribution::DisputeDistributionSubsystem;
54pub use polkadot_gossip_support::GossipSupport as GossipSupportSubsystem;
55pub use polkadot_network_bridge::{
56	Metrics as NetworkBridgeMetrics, NetworkBridgeRx as NetworkBridgeRxSubsystem,
57	NetworkBridgeTx as NetworkBridgeTxSubsystem,
58};
59pub use polkadot_node_collation_generation::CollationGenerationSubsystem;
60pub use polkadot_node_core_approval_voting::ApprovalVotingSubsystem;
61pub use polkadot_node_core_approval_voting_parallel::{
62	ApprovalVotingParallelSubsystem, Metrics as ApprovalVotingParallelMetrics,
63};
64pub use polkadot_node_core_av_store::AvailabilityStoreSubsystem;
65pub use polkadot_node_core_backing::CandidateBackingSubsystem;
66pub use polkadot_node_core_bitfield_signing::BitfieldSigningSubsystem;
67pub use polkadot_node_core_candidate_validation::CandidateValidationSubsystem;
68pub use polkadot_node_core_chain_api::ChainApiSubsystem;
69pub use polkadot_node_core_chain_selection::ChainSelectionSubsystem;
70pub use polkadot_node_core_dispute_coordinator::DisputeCoordinatorSubsystem;
71pub use polkadot_node_core_prospective_parachains::ProspectiveParachainsSubsystem;
72pub use polkadot_node_core_provisioner::ProvisionerSubsystem;
73pub use polkadot_node_core_pvf_checker::PvfCheckerSubsystem;
74pub use polkadot_node_core_runtime_api::RuntimeApiSubsystem;
75pub use polkadot_statement_distribution::StatementDistributionSubsystem;
76
77/// Arguments passed for overseer construction.
78pub struct OverseerGenArgs<'a, Spawner, RuntimeClient>
79where
80	Spawner: 'static + SpawnNamed + Clone + Unpin,
81{
82	/// Runtime client generic, providing the `ProvideRuntimeApi` trait besides others.
83	pub runtime_client: Arc<RuntimeClient>,
84	/// Underlying network service implementation.
85	pub network_service: Arc<dyn sc_network::service::traits::NetworkService>,
86	/// Underlying syncing service implementation.
87	pub sync_service: Arc<dyn sp_consensus::SyncOracle + Send + Sync>,
88	/// Underlying authority discovery service.
89	pub authority_discovery_service: AuthorityDiscoveryService,
90	/// Collations request receiver for network protocol v1.
91	pub collation_req_v1_receiver: IncomingRequestReceiver<request_v1::CollationFetchingRequest>,
92	/// Collations request receiver for network protocol v2.
93	pub collation_req_v2_receiver: IncomingRequestReceiver<request_v2::CollationFetchingRequest>,
94	/// Receiver for available data requests.
95	pub available_data_req_receiver:
96		IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
97	/// Prometheus registry, commonly used for production systems, less so for test.
98	pub registry: Option<&'a Registry>,
99	/// Task spawner to be used throughout the overseer and the APIs it provides.
100	pub spawner: Spawner,
101	/// Determines the behavior of the collator.
102	pub is_parachain_node: IsParachainNode,
103	/// Overseer channel capacity override.
104	pub overseer_message_channel_capacity_override: Option<usize>,
105	/// Request-response protocol names source.
106	pub req_protocol_names: ReqProtocolNames,
107	/// `PeerSet` protocol names to protocols mapping.
108	pub peerset_protocol_names: PeerSetProtocolNames,
109	/// Notification services for validation/collation protocols.
110	pub notification_services: HashMap<PeerSet, Box<dyn NotificationService>>,
111}
112
113pub struct ExtendedOverseerGenArgs {
114	/// The keystore to use for i.e. validator keys.
115	pub keystore: Arc<LocalKeystore>,
116	/// The underlying key value store for the parachains.
117	pub parachains_db: Arc<dyn polkadot_node_subsystem_util::database::Database>,
118	/// Configuration for the candidate validation subsystem.
119	pub candidate_validation_config: Option<CandidateValidationConfig>,
120	/// Configuration for the availability store subsystem.
121	pub availability_config: AvailabilityConfig,
122	/// POV request receiver.
123	pub pov_req_receiver: IncomingRequestReceiver<request_v1::PoVFetchingRequest>,
124	/// Erasure chunk request v1 receiver.
125	pub chunk_req_v1_receiver: IncomingRequestReceiver<request_v1::ChunkFetchingRequest>,
126	/// Erasure chunk request v2 receiver.
127	pub chunk_req_v2_receiver: IncomingRequestReceiver<request_v2::ChunkFetchingRequest>,
128	/// Receiver for incoming candidate requests.
129	pub candidate_req_v2_receiver: IncomingRequestReceiver<request_v2::AttestedCandidateRequest>,
130	/// Configuration for the approval voting subsystem.
131	pub approval_voting_config: ApprovalVotingConfig,
132	/// Receiver for incoming disputes.
133	pub dispute_req_receiver: IncomingRequestReceiver<request_v1::DisputeRequest>,
134	/// Configuration for the dispute coordinator subsystem.
135	pub dispute_coordinator_config: DisputeCoordinatorConfig,
136	/// Configuration for the chain selection subsystem.
137	pub chain_selection_config: ChainSelectionConfig,
138	/// Optional availability recovery fetch chunks threshold. If PoV size size is lower
139	/// than the value put in here we always try to recovery availability from backers.
140	/// The presence of this parameter here is needed to have different values per chain.
141	pub fetch_chunks_threshold: Option<usize>,
142}
143
144/// Obtain a prepared validator `Overseer`, that is initialized with all default values.
145pub fn validator_overseer_builder<Spawner, RuntimeClient>(
146	OverseerGenArgs {
147		runtime_client,
148		network_service,
149		sync_service,
150		authority_discovery_service,
151		collation_req_v1_receiver: _,
152		collation_req_v2_receiver: _,
153		available_data_req_receiver,
154		registry,
155		spawner,
156		is_parachain_node,
157		overseer_message_channel_capacity_override,
158		req_protocol_names,
159		peerset_protocol_names,
160		notification_services,
161	}: OverseerGenArgs<Spawner, RuntimeClient>,
162	ExtendedOverseerGenArgs {
163		keystore,
164		parachains_db,
165		candidate_validation_config,
166		availability_config,
167		pov_req_receiver,
168		chunk_req_v1_receiver,
169		chunk_req_v2_receiver,
170		candidate_req_v2_receiver,
171		approval_voting_config,
172		dispute_req_receiver,
173		dispute_coordinator_config,
174		chain_selection_config,
175		fetch_chunks_threshold,
176	}: ExtendedOverseerGenArgs,
177) -> Result<
178	InitializedOverseerBuilder<
179		SpawnGlue<Spawner>,
180		Arc<RuntimeClient>,
181		CandidateValidationSubsystem,
182		PvfCheckerSubsystem,
183		CandidateBackingSubsystem,
184		StatementDistributionSubsystem,
185		AvailabilityDistributionSubsystem,
186		AvailabilityRecoverySubsystem,
187		BitfieldSigningSubsystem,
188		BitfieldDistributionSubsystem,
189		ProvisionerSubsystem,
190		RuntimeApiSubsystem<RuntimeClient>,
191		AvailabilityStoreSubsystem,
192		NetworkBridgeRxSubsystem<
193			Arc<dyn sc_network::service::traits::NetworkService>,
194			AuthorityDiscoveryService,
195		>,
196		NetworkBridgeTxSubsystem<
197			Arc<dyn sc_network::service::traits::NetworkService>,
198			AuthorityDiscoveryService,
199		>,
200		ChainApiSubsystem<RuntimeClient>,
201		DummySubsystem,
202		CollatorProtocolSubsystem,
203		DummySubsystem,
204		DummySubsystem,
205		ApprovalVotingParallelSubsystem,
206		GossipSupportSubsystem<AuthorityDiscoveryService>,
207		DisputeCoordinatorSubsystem,
208		DisputeDistributionSubsystem<AuthorityDiscoveryService>,
209		ChainSelectionSubsystem,
210		ProspectiveParachainsSubsystem,
211	>,
212	Error,
213>
214where
215	RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
216	Spawner: 'static + SpawnNamed + Clone + Unpin,
217{
218	use polkadot_node_subsystem_util::metrics::Metrics;
219
220	let metrics = <OverseerMetrics as MetricsTrait>::register(registry)?;
221	let notification_sinks = Arc::new(Mutex::new(HashMap::new()));
222
223	let spawner = SpawnGlue(spawner);
224
225	let network_bridge_metrics: NetworkBridgeMetrics = Metrics::register(registry)?;
226	let approval_voting_parallel_metrics: ApprovalVotingParallelMetrics =
227		Metrics::register(registry)?;
228	let builder = Overseer::builder()
229		.network_bridge_tx(NetworkBridgeTxSubsystem::new(
230			network_service.clone(),
231			authority_discovery_service.clone(),
232			network_bridge_metrics.clone(),
233			req_protocol_names.clone(),
234			peerset_protocol_names.clone(),
235			notification_sinks.clone(),
236		))
237		.network_bridge_rx(NetworkBridgeRxSubsystem::new(
238			network_service.clone(),
239			authority_discovery_service.clone(),
240			Box::new(sync_service.clone()),
241			network_bridge_metrics,
242			peerset_protocol_names,
243			notification_services,
244			notification_sinks,
245		))
246		.availability_distribution(AvailabilityDistributionSubsystem::new(
247			keystore.clone(),
248			IncomingRequestReceivers {
249				pov_req_receiver,
250				chunk_req_v1_receiver,
251				chunk_req_v2_receiver,
252			},
253			req_protocol_names.clone(),
254			Metrics::register(registry)?,
255		))
256		.availability_recovery(AvailabilityRecoverySubsystem::for_validator(
257			fetch_chunks_threshold,
258			available_data_req_receiver,
259			&req_protocol_names,
260			Metrics::register(registry)?,
261		))
262		.availability_store(AvailabilityStoreSubsystem::new(
263			parachains_db.clone(),
264			availability_config,
265			Box::new(sync_service.clone()),
266			Metrics::register(registry)?,
267		))
268		.bitfield_distribution(BitfieldDistributionSubsystem::new(Metrics::register(registry)?))
269		.bitfield_signing(BitfieldSigningSubsystem::new(
270			keystore.clone(),
271			Metrics::register(registry)?,
272		))
273		.candidate_backing(CandidateBackingSubsystem::new(
274			keystore.clone(),
275			Metrics::register(registry)?,
276		))
277		.candidate_validation(CandidateValidationSubsystem::with_config(
278			candidate_validation_config,
279			keystore.clone(),
280			Metrics::register(registry)?, // candidate-validation metrics
281			Metrics::register(registry)?, // validation host metrics
282		))
283		.pvf_checker(PvfCheckerSubsystem::new(keystore.clone(), Metrics::register(registry)?))
284		.chain_api(ChainApiSubsystem::new(runtime_client.clone(), Metrics::register(registry)?))
285		.collation_generation(DummySubsystem)
286		.collator_protocol({
287			let side = match is_parachain_node {
288				IsParachainNode::Collator(_) | IsParachainNode::FullNode =>
289					return Err(Error::Overseer(SubsystemError::Context(
290						"build validator overseer for parachain node".to_owned(),
291					))),
292				IsParachainNode::No => ProtocolSide::Validator {
293					keystore: keystore.clone(),
294					eviction_policy: Default::default(),
295					metrics: Metrics::register(registry)?,
296				},
297			};
298			CollatorProtocolSubsystem::new(side)
299		})
300		.provisioner(ProvisionerSubsystem::new(Metrics::register(registry)?))
301		.runtime_api(RuntimeApiSubsystem::new(
302			runtime_client.clone(),
303			Metrics::register(registry)?,
304			spawner.clone(),
305		))
306		.statement_distribution(StatementDistributionSubsystem::new(
307			keystore.clone(),
308			candidate_req_v2_receiver,
309			Metrics::register(registry)?,
310		))
311		.approval_distribution(DummySubsystem)
312		.approval_voting(DummySubsystem)
313		.approval_voting_parallel(ApprovalVotingParallelSubsystem::with_config(
314			approval_voting_config,
315			parachains_db.clone(),
316			keystore.clone(),
317			Box::new(sync_service.clone()),
318			approval_voting_parallel_metrics,
319			spawner.clone(),
320			overseer_message_channel_capacity_override,
321		))
322		.gossip_support(GossipSupportSubsystem::new(
323			keystore.clone(),
324			authority_discovery_service.clone(),
325			Metrics::register(registry)?,
326		))
327		.dispute_coordinator(DisputeCoordinatorSubsystem::new(
328			parachains_db.clone(),
329			dispute_coordinator_config,
330			keystore.clone(),
331			Metrics::register(registry)?,
332		))
333		.dispute_distribution(DisputeDistributionSubsystem::new(
334			keystore.clone(),
335			dispute_req_receiver,
336			authority_discovery_service.clone(),
337			Metrics::register(registry)?,
338		))
339		.chain_selection(ChainSelectionSubsystem::new(chain_selection_config, parachains_db))
340		.prospective_parachains(ProspectiveParachainsSubsystem::new(Metrics::register(registry)?))
341		.activation_external_listeners(Default::default())
342		.active_leaves(Default::default())
343		.supports_parachains(runtime_client)
344		.metrics(metrics)
345		.spawner(spawner);
346
347	let builder = if let Some(capacity) = overseer_message_channel_capacity_override {
348		builder.message_channel_capacity(capacity)
349	} else {
350		builder
351	};
352	Ok(builder)
353}
354
355/// Obtain a prepared collator `Overseer`, that is initialized with all default values.
356pub fn collator_overseer_builder<Spawner, RuntimeClient>(
357	OverseerGenArgs {
358		runtime_client,
359		network_service,
360		sync_service,
361		authority_discovery_service,
362		collation_req_v1_receiver: _,
363		collation_req_v2_receiver,
364		available_data_req_receiver,
365		registry,
366		spawner,
367		is_parachain_node,
368		overseer_message_channel_capacity_override,
369		req_protocol_names,
370		peerset_protocol_names,
371		notification_services,
372	}: OverseerGenArgs<Spawner, RuntimeClient>,
373) -> Result<
374	InitializedOverseerBuilder<
375		SpawnGlue<Spawner>,
376		Arc<RuntimeClient>,
377		DummySubsystem,
378		DummySubsystem,
379		DummySubsystem,
380		DummySubsystem,
381		DummySubsystem,
382		AvailabilityRecoverySubsystem,
383		DummySubsystem,
384		DummySubsystem,
385		DummySubsystem,
386		RuntimeApiSubsystem<RuntimeClient>,
387		DummySubsystem,
388		NetworkBridgeRxSubsystem<
389			Arc<dyn sc_network::service::traits::NetworkService>,
390			AuthorityDiscoveryService,
391		>,
392		NetworkBridgeTxSubsystem<
393			Arc<dyn sc_network::service::traits::NetworkService>,
394			AuthorityDiscoveryService,
395		>,
396		ChainApiSubsystem<RuntimeClient>,
397		CollationGenerationSubsystem,
398		CollatorProtocolSubsystem,
399		DummySubsystem,
400		DummySubsystem,
401		DummySubsystem,
402		DummySubsystem,
403		DummySubsystem,
404		DummySubsystem,
405		DummySubsystem,
406		DummySubsystem,
407	>,
408	Error,
409>
410where
411	Spawner: 'static + SpawnNamed + Clone + Unpin,
412	RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
413{
414	use polkadot_node_subsystem_util::metrics::Metrics;
415
416	let notification_sinks = Arc::new(Mutex::new(HashMap::new()));
417
418	let spawner = SpawnGlue(spawner);
419
420	let network_bridge_metrics: NetworkBridgeMetrics = Metrics::register(registry)?;
421
422	let builder = Overseer::builder()
423		.network_bridge_tx(NetworkBridgeTxSubsystem::new(
424			network_service.clone(),
425			authority_discovery_service.clone(),
426			network_bridge_metrics.clone(),
427			req_protocol_names.clone(),
428			peerset_protocol_names.clone(),
429			notification_sinks.clone(),
430		))
431		.network_bridge_rx(NetworkBridgeRxSubsystem::new(
432			network_service.clone(),
433			authority_discovery_service.clone(),
434			Box::new(sync_service.clone()),
435			network_bridge_metrics,
436			peerset_protocol_names,
437			notification_services,
438			notification_sinks,
439		))
440		.availability_distribution(DummySubsystem)
441		.availability_recovery(AvailabilityRecoverySubsystem::for_collator(
442			None,
443			available_data_req_receiver,
444			&req_protocol_names,
445			Metrics::register(registry)?,
446		))
447		.availability_store(DummySubsystem)
448		.bitfield_distribution(DummySubsystem)
449		.bitfield_signing(DummySubsystem)
450		.candidate_backing(DummySubsystem)
451		.candidate_validation(DummySubsystem)
452		.pvf_checker(DummySubsystem)
453		.chain_api(ChainApiSubsystem::new(runtime_client.clone(), Metrics::register(registry)?))
454		.collation_generation(CollationGenerationSubsystem::new(Metrics::register(registry)?))
455		.collator_protocol({
456			let side = match is_parachain_node {
457				IsParachainNode::No =>
458					return Err(Error::Overseer(SubsystemError::Context(
459						"build parachain node overseer for validator".to_owned(),
460					))),
461				IsParachainNode::Collator(collator_pair) => ProtocolSide::Collator {
462					peer_id: network_service.local_peer_id(),
463					collator_pair,
464					request_receiver_v2: collation_req_v2_receiver,
465					metrics: Metrics::register(registry)?,
466				},
467				IsParachainNode::FullNode => ProtocolSide::None,
468			};
469			CollatorProtocolSubsystem::new(side)
470		})
471		.provisioner(DummySubsystem)
472		.runtime_api(RuntimeApiSubsystem::new(
473			runtime_client.clone(),
474			Metrics::register(registry)?,
475			spawner.clone(),
476		))
477		.statement_distribution(DummySubsystem)
478		.approval_distribution(DummySubsystem)
479		.approval_voting(DummySubsystem)
480		.approval_voting_parallel(DummySubsystem)
481		.gossip_support(DummySubsystem)
482		.dispute_coordinator(DummySubsystem)
483		.dispute_distribution(DummySubsystem)
484		.chain_selection(DummySubsystem)
485		.prospective_parachains(DummySubsystem)
486		.activation_external_listeners(Default::default())
487		.active_leaves(Default::default())
488		.supports_parachains(runtime_client)
489		.metrics(Metrics::register(registry)?)
490		.spawner(spawner);
491
492	let builder = if let Some(capacity) = overseer_message_channel_capacity_override {
493		builder.message_channel_capacity(capacity)
494	} else {
495		builder
496	};
497	Ok(builder)
498}
499
500/// Trait for the `fn` generating the overseer.
501pub trait OverseerGen {
502	/// Overwrite the full generation of the overseer, including the subsystems.
503	fn generate<Spawner, RuntimeClient>(
504		&self,
505		connector: OverseerConnector,
506		args: OverseerGenArgs<Spawner, RuntimeClient>,
507		ext_args: Option<ExtendedOverseerGenArgs>,
508	) -> Result<(Overseer<SpawnGlue<Spawner>, Arc<RuntimeClient>>, OverseerHandle), Error>
509	where
510		RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
511		Spawner: 'static + SpawnNamed + Clone + Unpin;
512
513	// It would be nice to make `create_subsystems` part of this trait,
514	// but the amount of generic arguments that would be required as
515	// as consequence make this rather annoying to implement and use.
516}
517
518/// The regular set of subsystems.
519pub struct ValidatorOverseerGen;
520
521impl OverseerGen for ValidatorOverseerGen {
522	fn generate<Spawner, RuntimeClient>(
523		&self,
524		connector: OverseerConnector,
525		args: OverseerGenArgs<Spawner, RuntimeClient>,
526		ext_args: Option<ExtendedOverseerGenArgs>,
527	) -> Result<(Overseer<SpawnGlue<Spawner>, Arc<RuntimeClient>>, OverseerHandle), Error>
528	where
529		RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
530		Spawner: 'static + SpawnNamed + Clone + Unpin,
531	{
532		let ext_args = ext_args.ok_or(Error::Overseer(SubsystemError::Context(
533			"create validator overseer as mandatory extended arguments were not provided"
534				.to_owned(),
535		)))?;
536		validator_overseer_builder(args, ext_args)?
537			.build_with_connector(connector)
538			.map_err(|e| e.into())
539	}
540}
541
542/// Reduced set of subsystems, to use in collator and collator's full node.
543pub struct CollatorOverseerGen;
544
545impl OverseerGen for CollatorOverseerGen {
546	fn generate<Spawner, RuntimeClient>(
547		&self,
548		connector: OverseerConnector,
549		args: OverseerGenArgs<Spawner, RuntimeClient>,
550		_ext_args: Option<ExtendedOverseerGenArgs>,
551	) -> Result<(Overseer<SpawnGlue<Spawner>, Arc<RuntimeClient>>, OverseerHandle), Error>
552	where
553		RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
554		Spawner: 'static + SpawnNamed + Clone + Unpin,
555	{
556		collator_overseer_builder(args)?
557			.build_with_connector(connector)
558			.map_err(|e| e.into())
559	}
560}