1use super::{Error, IsParachainNode, Registry};
18use polkadot_node_subsystem_types::{ChainApiBackend, RuntimeApiSubsystemClient};
19use polkadot_overseer::{DummySubsystem, InitializedOverseerBuilder, SubsystemError};
20use sp_core::traits::SpawnNamed;
21
22use polkadot_availability_distribution::IncomingRequestReceivers;
23use polkadot_node_core_approval_voting::Config as ApprovalVotingConfig;
24use polkadot_node_core_av_store::Config as AvailabilityConfig;
25use polkadot_node_core_candidate_validation::Config as CandidateValidationConfig;
26use polkadot_node_core_chain_selection::Config as ChainSelectionConfig;
27use polkadot_node_core_dispute_coordinator::Config as DisputeCoordinatorConfig;
28use polkadot_node_network_protocol::{
29 peer_set::{PeerSet, PeerSetProtocolNames},
30 request_response::{
31 v1 as request_v1, v2 as request_v2, IncomingRequestReceiver, ReqProtocolNames,
32 },
33};
34#[cfg(any(feature = "malus", test))]
35pub use polkadot_overseer::{dummy::dummy_overseer_builder, HeadSupportsParachains};
36use polkadot_overseer::{
37 metrics::Metrics as OverseerMetrics, MetricsTrait, Overseer, OverseerConnector, OverseerHandle,
38 SpawnGlue,
39};
40
41use parking_lot::Mutex;
42use sc_authority_discovery::Service as AuthorityDiscoveryService;
43use sc_client_api::AuxStore;
44use sc_keystore::LocalKeystore;
45use sc_network::{NetworkStateInfo, NotificationService};
46use std::{
47 collections::{HashMap, HashSet},
48 sync::Arc,
49 time::Duration,
50};
51
52pub use polkadot_approval_distribution::ApprovalDistribution as ApprovalDistributionSubsystem;
53pub use polkadot_availability_bitfield_distribution::BitfieldDistribution as BitfieldDistributionSubsystem;
54pub use polkadot_availability_distribution::AvailabilityDistributionSubsystem;
55pub use polkadot_availability_recovery::AvailabilityRecoverySubsystem;
56pub use polkadot_collator_protocol::{CollatorProtocolSubsystem, ProtocolSide};
57pub use polkadot_dispute_distribution::DisputeDistributionSubsystem;
58pub use polkadot_gossip_support::GossipSupport as GossipSupportSubsystem;
59pub use polkadot_network_bridge::{
60 Metrics as NetworkBridgeMetrics, NetworkBridgeRx as NetworkBridgeRxSubsystem,
61 NetworkBridgeTx as NetworkBridgeTxSubsystem,
62};
63pub use polkadot_node_collation_generation::CollationGenerationSubsystem;
64pub use polkadot_node_core_approval_voting::ApprovalVotingSubsystem;
65pub use polkadot_node_core_approval_voting_parallel::{
66 ApprovalVotingParallelSubsystem, Metrics as ApprovalVotingParallelMetrics,
67};
68pub use polkadot_node_core_av_store::AvailabilityStoreSubsystem;
69pub use polkadot_node_core_backing::CandidateBackingSubsystem;
70pub use polkadot_node_core_bitfield_signing::BitfieldSigningSubsystem;
71pub use polkadot_node_core_candidate_validation::CandidateValidationSubsystem;
72pub use polkadot_node_core_chain_api::ChainApiSubsystem;
73pub use polkadot_node_core_chain_selection::ChainSelectionSubsystem;
74pub use polkadot_node_core_dispute_coordinator::DisputeCoordinatorSubsystem;
75pub use polkadot_node_core_prospective_parachains::ProspectiveParachainsSubsystem;
76pub use polkadot_node_core_provisioner::ProvisionerSubsystem;
77pub use polkadot_node_core_pvf_checker::PvfCheckerSubsystem;
78pub use polkadot_node_core_runtime_api::RuntimeApiSubsystem;
79pub use polkadot_statement_distribution::StatementDistributionSubsystem;
80
81pub struct OverseerGenArgs<'a, Spawner, RuntimeClient>
83where
84 Spawner: 'static + SpawnNamed + Clone + Unpin,
85{
86 pub runtime_client: Arc<RuntimeClient>,
88 pub network_service: Arc<dyn sc_network::service::traits::NetworkService>,
90 pub sync_service: Arc<dyn sp_consensus::SyncOracle + Send + Sync>,
92 pub authority_discovery_service: AuthorityDiscoveryService,
94 pub collation_req_v1_receiver: IncomingRequestReceiver<request_v1::CollationFetchingRequest>,
96 pub collation_req_v2_receiver: IncomingRequestReceiver<request_v2::CollationFetchingRequest>,
98 pub available_data_req_receiver:
100 IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
101 pub registry: Option<&'a Registry>,
103 pub spawner: Spawner,
105 pub is_parachain_node: IsParachainNode,
107 pub overseer_message_channel_capacity_override: Option<usize>,
109 pub req_protocol_names: ReqProtocolNames,
111 pub peerset_protocol_names: PeerSetProtocolNames,
113 pub notification_services: HashMap<PeerSet, Box<dyn NotificationService>>,
115}
116
117pub struct ExtendedOverseerGenArgs {
118 pub keystore: Arc<LocalKeystore>,
120 pub parachains_db: Arc<dyn polkadot_node_subsystem_util::database::Database>,
122 pub candidate_validation_config: Option<CandidateValidationConfig>,
124 pub availability_config: AvailabilityConfig,
126 pub pov_req_receiver: IncomingRequestReceiver<request_v1::PoVFetchingRequest>,
128 pub chunk_req_v1_receiver: IncomingRequestReceiver<request_v1::ChunkFetchingRequest>,
130 pub chunk_req_v2_receiver: IncomingRequestReceiver<request_v2::ChunkFetchingRequest>,
132 pub candidate_req_v2_receiver: IncomingRequestReceiver<request_v2::AttestedCandidateRequest>,
134 pub approval_voting_config: ApprovalVotingConfig,
136 pub dispute_req_receiver: IncomingRequestReceiver<request_v1::DisputeRequest>,
138 pub dispute_coordinator_config: DisputeCoordinatorConfig,
140 pub chain_selection_config: ChainSelectionConfig,
142 pub fetch_chunks_threshold: Option<usize>,
146 pub invulnerable_ah_collators: HashSet<polkadot_node_network_protocol::PeerId>,
148 pub collator_protocol_hold_off: Option<Duration>,
150}
151
152pub fn validator_overseer_builder<Spawner, RuntimeClient>(
154 OverseerGenArgs {
155 runtime_client,
156 network_service,
157 sync_service,
158 authority_discovery_service,
159 collation_req_v1_receiver: _,
160 collation_req_v2_receiver: _,
161 available_data_req_receiver,
162 registry,
163 spawner,
164 is_parachain_node,
165 overseer_message_channel_capacity_override,
166 req_protocol_names,
167 peerset_protocol_names,
168 notification_services,
169 }: OverseerGenArgs<Spawner, RuntimeClient>,
170 ExtendedOverseerGenArgs {
171 keystore,
172 parachains_db,
173 candidate_validation_config,
174 availability_config,
175 pov_req_receiver,
176 chunk_req_v1_receiver,
177 chunk_req_v2_receiver,
178 candidate_req_v2_receiver,
179 approval_voting_config,
180 dispute_req_receiver,
181 dispute_coordinator_config,
182 chain_selection_config,
183 fetch_chunks_threshold,
184 invulnerable_ah_collators,
185 collator_protocol_hold_off,
186 }: ExtendedOverseerGenArgs,
187) -> Result<
188 InitializedOverseerBuilder<
189 SpawnGlue<Spawner>,
190 Arc<RuntimeClient>,
191 CandidateValidationSubsystem,
192 PvfCheckerSubsystem,
193 CandidateBackingSubsystem,
194 StatementDistributionSubsystem,
195 AvailabilityDistributionSubsystem,
196 AvailabilityRecoverySubsystem,
197 BitfieldSigningSubsystem,
198 BitfieldDistributionSubsystem,
199 ProvisionerSubsystem,
200 RuntimeApiSubsystem<RuntimeClient>,
201 AvailabilityStoreSubsystem,
202 NetworkBridgeRxSubsystem<
203 Arc<dyn sc_network::service::traits::NetworkService>,
204 AuthorityDiscoveryService,
205 >,
206 NetworkBridgeTxSubsystem<
207 Arc<dyn sc_network::service::traits::NetworkService>,
208 AuthorityDiscoveryService,
209 >,
210 ChainApiSubsystem<RuntimeClient>,
211 DummySubsystem,
212 CollatorProtocolSubsystem,
213 DummySubsystem,
214 DummySubsystem,
215 ApprovalVotingParallelSubsystem,
216 GossipSupportSubsystem<AuthorityDiscoveryService>,
217 DisputeCoordinatorSubsystem,
218 DisputeDistributionSubsystem<AuthorityDiscoveryService>,
219 ChainSelectionSubsystem,
220 ProspectiveParachainsSubsystem,
221 >,
222 Error,
223>
224where
225 RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
226 Spawner: 'static + SpawnNamed + Clone + Unpin,
227{
228 use polkadot_node_subsystem_util::metrics::Metrics;
229
230 let metrics = <OverseerMetrics as MetricsTrait>::register(registry)?;
231 let notification_sinks = Arc::new(Mutex::new(HashMap::new()));
232
233 let spawner = SpawnGlue(spawner);
234
235 let network_bridge_metrics: NetworkBridgeMetrics = Metrics::register(registry)?;
236 let approval_voting_parallel_metrics: ApprovalVotingParallelMetrics =
237 Metrics::register(registry)?;
238 let builder = Overseer::builder()
239 .network_bridge_tx(NetworkBridgeTxSubsystem::new(
240 network_service.clone(),
241 authority_discovery_service.clone(),
242 network_bridge_metrics.clone(),
243 req_protocol_names.clone(),
244 peerset_protocol_names.clone(),
245 notification_sinks.clone(),
246 ))
247 .network_bridge_rx(NetworkBridgeRxSubsystem::new(
248 network_service.clone(),
249 authority_discovery_service.clone(),
250 Box::new(sync_service.clone()),
251 network_bridge_metrics,
252 peerset_protocol_names,
253 notification_services,
254 notification_sinks,
255 ))
256 .availability_distribution(AvailabilityDistributionSubsystem::new(
257 keystore.clone(),
258 IncomingRequestReceivers {
259 pov_req_receiver,
260 chunk_req_v1_receiver,
261 chunk_req_v2_receiver,
262 },
263 req_protocol_names.clone(),
264 Metrics::register(registry)?,
265 ))
266 .availability_recovery(AvailabilityRecoverySubsystem::for_validator(
267 fetch_chunks_threshold,
268 available_data_req_receiver,
269 &req_protocol_names,
270 Metrics::register(registry)?,
271 ))
272 .availability_store(AvailabilityStoreSubsystem::new(
273 parachains_db.clone(),
274 availability_config,
275 Box::new(sync_service.clone()),
276 Metrics::register(registry)?,
277 ))
278 .bitfield_distribution(BitfieldDistributionSubsystem::new(Metrics::register(registry)?))
279 .bitfield_signing(BitfieldSigningSubsystem::new(
280 keystore.clone(),
281 Metrics::register(registry)?,
282 ))
283 .candidate_backing(CandidateBackingSubsystem::new(
284 keystore.clone(),
285 Metrics::register(registry)?,
286 ))
287 .candidate_validation(CandidateValidationSubsystem::with_config(
288 candidate_validation_config,
289 keystore.clone(),
290 Metrics::register(registry)?, Metrics::register(registry)?, ))
293 .pvf_checker(PvfCheckerSubsystem::new(keystore.clone(), Metrics::register(registry)?))
294 .chain_api(ChainApiSubsystem::new(runtime_client.clone(), Metrics::register(registry)?))
295 .collation_generation(DummySubsystem)
296 .collator_protocol({
297 let side = match is_parachain_node {
298 IsParachainNode::Collator(_) | IsParachainNode::FullNode =>
299 return Err(Error::Overseer(SubsystemError::Context(
300 "build validator overseer for parachain node".to_owned(),
301 ))),
302 IsParachainNode::No => ProtocolSide::Validator {
303 keystore: keystore.clone(),
304 eviction_policy: Default::default(),
305 metrics: Metrics::register(registry)?,
306 invulnerables: invulnerable_ah_collators,
307 collator_protocol_hold_off,
308 },
309 };
310 CollatorProtocolSubsystem::new(side)
311 })
312 .provisioner(ProvisionerSubsystem::new(Metrics::register(registry)?))
313 .runtime_api(RuntimeApiSubsystem::new(
314 runtime_client.clone(),
315 Metrics::register(registry)?,
316 spawner.clone(),
317 ))
318 .statement_distribution(StatementDistributionSubsystem::new(
319 keystore.clone(),
320 candidate_req_v2_receiver,
321 Metrics::register(registry)?,
322 ))
323 .approval_distribution(DummySubsystem)
324 .approval_voting(DummySubsystem)
325 .approval_voting_parallel(ApprovalVotingParallelSubsystem::with_config(
326 approval_voting_config,
327 parachains_db.clone(),
328 keystore.clone(),
329 Box::new(sync_service.clone()),
330 approval_voting_parallel_metrics,
331 spawner.clone(),
332 overseer_message_channel_capacity_override,
333 ))
334 .gossip_support(GossipSupportSubsystem::new(
335 keystore.clone(),
336 authority_discovery_service.clone(),
337 Metrics::register(registry)?,
338 ))
339 .dispute_coordinator(DisputeCoordinatorSubsystem::new(
340 parachains_db.clone(),
341 dispute_coordinator_config,
342 keystore.clone(),
343 Metrics::register(registry)?,
344 ))
345 .dispute_distribution(DisputeDistributionSubsystem::new(
346 keystore.clone(),
347 dispute_req_receiver,
348 authority_discovery_service.clone(),
349 Metrics::register(registry)?,
350 ))
351 .chain_selection(ChainSelectionSubsystem::new(chain_selection_config, parachains_db))
352 .prospective_parachains(ProspectiveParachainsSubsystem::new(Metrics::register(registry)?))
353 .activation_external_listeners(Default::default())
354 .active_leaves(Default::default())
355 .supports_parachains(runtime_client)
356 .metrics(metrics)
357 .spawner(spawner);
358
359 let builder = if let Some(capacity) = overseer_message_channel_capacity_override {
360 builder.message_channel_capacity(capacity)
361 } else {
362 builder
363 };
364 Ok(builder)
365}
366
367pub fn collator_overseer_builder<Spawner, RuntimeClient>(
369 OverseerGenArgs {
370 runtime_client,
371 network_service,
372 sync_service,
373 authority_discovery_service,
374 collation_req_v1_receiver: _,
375 collation_req_v2_receiver,
376 available_data_req_receiver,
377 registry,
378 spawner,
379 is_parachain_node,
380 overseer_message_channel_capacity_override,
381 req_protocol_names,
382 peerset_protocol_names,
383 notification_services,
384 }: OverseerGenArgs<Spawner, RuntimeClient>,
385) -> Result<
386 InitializedOverseerBuilder<
387 SpawnGlue<Spawner>,
388 Arc<RuntimeClient>,
389 DummySubsystem,
390 DummySubsystem,
391 DummySubsystem,
392 DummySubsystem,
393 DummySubsystem,
394 AvailabilityRecoverySubsystem,
395 DummySubsystem,
396 DummySubsystem,
397 DummySubsystem,
398 RuntimeApiSubsystem<RuntimeClient>,
399 DummySubsystem,
400 NetworkBridgeRxSubsystem<
401 Arc<dyn sc_network::service::traits::NetworkService>,
402 AuthorityDiscoveryService,
403 >,
404 NetworkBridgeTxSubsystem<
405 Arc<dyn sc_network::service::traits::NetworkService>,
406 AuthorityDiscoveryService,
407 >,
408 ChainApiSubsystem<RuntimeClient>,
409 CollationGenerationSubsystem,
410 CollatorProtocolSubsystem,
411 DummySubsystem,
412 DummySubsystem,
413 DummySubsystem,
414 DummySubsystem,
415 DummySubsystem,
416 DummySubsystem,
417 DummySubsystem,
418 DummySubsystem,
419 >,
420 Error,
421>
422where
423 Spawner: 'static + SpawnNamed + Clone + Unpin,
424 RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
425{
426 use polkadot_node_subsystem_util::metrics::Metrics;
427
428 let notification_sinks = Arc::new(Mutex::new(HashMap::new()));
429
430 let spawner = SpawnGlue(spawner);
431
432 let network_bridge_metrics: NetworkBridgeMetrics = Metrics::register(registry)?;
433
434 let builder = Overseer::builder()
435 .network_bridge_tx(NetworkBridgeTxSubsystem::new(
436 network_service.clone(),
437 authority_discovery_service.clone(),
438 network_bridge_metrics.clone(),
439 req_protocol_names.clone(),
440 peerset_protocol_names.clone(),
441 notification_sinks.clone(),
442 ))
443 .network_bridge_rx(NetworkBridgeRxSubsystem::new(
444 network_service.clone(),
445 authority_discovery_service.clone(),
446 Box::new(sync_service.clone()),
447 network_bridge_metrics,
448 peerset_protocol_names,
449 notification_services,
450 notification_sinks,
451 ))
452 .availability_distribution(DummySubsystem)
453 .availability_recovery(AvailabilityRecoverySubsystem::for_collator(
454 None,
455 available_data_req_receiver,
456 &req_protocol_names,
457 Metrics::register(registry)?,
458 ))
459 .availability_store(DummySubsystem)
460 .bitfield_distribution(DummySubsystem)
461 .bitfield_signing(DummySubsystem)
462 .candidate_backing(DummySubsystem)
463 .candidate_validation(DummySubsystem)
464 .pvf_checker(DummySubsystem)
465 .chain_api(ChainApiSubsystem::new(runtime_client.clone(), Metrics::register(registry)?))
466 .collation_generation(CollationGenerationSubsystem::new(Metrics::register(registry)?))
467 .collator_protocol({
468 let side = match is_parachain_node {
469 IsParachainNode::No =>
470 return Err(Error::Overseer(SubsystemError::Context(
471 "build parachain node overseer for validator".to_owned(),
472 ))),
473 IsParachainNode::Collator(collator_pair) => ProtocolSide::Collator {
474 peer_id: network_service.local_peer_id(),
475 collator_pair,
476 request_receiver_v2: collation_req_v2_receiver,
477 metrics: Metrics::register(registry)?,
478 },
479 IsParachainNode::FullNode => ProtocolSide::None,
480 };
481 CollatorProtocolSubsystem::new(side)
482 })
483 .provisioner(DummySubsystem)
484 .runtime_api(RuntimeApiSubsystem::new(
485 runtime_client.clone(),
486 Metrics::register(registry)?,
487 spawner.clone(),
488 ))
489 .statement_distribution(DummySubsystem)
490 .approval_distribution(DummySubsystem)
491 .approval_voting(DummySubsystem)
492 .approval_voting_parallel(DummySubsystem)
493 .gossip_support(DummySubsystem)
494 .dispute_coordinator(DummySubsystem)
495 .dispute_distribution(DummySubsystem)
496 .chain_selection(DummySubsystem)
497 .prospective_parachains(DummySubsystem)
498 .activation_external_listeners(Default::default())
499 .active_leaves(Default::default())
500 .supports_parachains(runtime_client)
501 .metrics(Metrics::register(registry)?)
502 .spawner(spawner);
503
504 let builder = if let Some(capacity) = overseer_message_channel_capacity_override {
505 builder.message_channel_capacity(capacity)
506 } else {
507 builder
508 };
509 Ok(builder)
510}
511
512pub trait OverseerGen {
514 fn generate<Spawner, RuntimeClient>(
516 &self,
517 connector: OverseerConnector,
518 args: OverseerGenArgs<Spawner, RuntimeClient>,
519 ext_args: Option<ExtendedOverseerGenArgs>,
520 ) -> Result<(Overseer<SpawnGlue<Spawner>, Arc<RuntimeClient>>, OverseerHandle), Error>
521 where
522 RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
523 Spawner: 'static + SpawnNamed + Clone + Unpin;
524
525 }
529
530pub struct ValidatorOverseerGen;
532
533impl OverseerGen for ValidatorOverseerGen {
534 fn generate<Spawner, RuntimeClient>(
535 &self,
536 connector: OverseerConnector,
537 args: OverseerGenArgs<Spawner, RuntimeClient>,
538 ext_args: Option<ExtendedOverseerGenArgs>,
539 ) -> Result<(Overseer<SpawnGlue<Spawner>, Arc<RuntimeClient>>, OverseerHandle), Error>
540 where
541 RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
542 Spawner: 'static + SpawnNamed + Clone + Unpin,
543 {
544 let ext_args = ext_args.ok_or(Error::Overseer(SubsystemError::Context(
545 "create validator overseer as mandatory extended arguments were not provided"
546 .to_owned(),
547 )))?;
548 validator_overseer_builder(args, ext_args)?
549 .build_with_connector(connector)
550 .map_err(|e| e.into())
551 }
552}
553
554pub struct CollatorOverseerGen;
556
557impl OverseerGen for CollatorOverseerGen {
558 fn generate<Spawner, RuntimeClient>(
559 &self,
560 connector: OverseerConnector,
561 args: OverseerGenArgs<Spawner, RuntimeClient>,
562 _ext_args: Option<ExtendedOverseerGenArgs>,
563 ) -> Result<(Overseer<SpawnGlue<Spawner>, Arc<RuntimeClient>>, OverseerHandle), Error>
564 where
565 RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
566 Spawner: 'static + SpawnNamed + Clone + Unpin,
567 {
568 collator_overseer_builder(args)?
569 .build_with_connector(connector)
570 .map_err(|e| e.into())
571 }
572}