1use super::{Error, IsParachainNode, Registry};
18use polkadot_collator_protocol::ReputationConfig;
19use polkadot_node_subsystem_types::{ChainApiBackend, RuntimeApiSubsystemClient};
20use polkadot_overseer::{DummySubsystem, InitializedOverseerBuilder, SubsystemError};
21use sp_core::traits::SpawnNamed;
22
23use polkadot_availability_distribution::IncomingRequestReceivers;
24use polkadot_node_core_approval_voting::Config as ApprovalVotingConfig;
25use polkadot_node_core_av_store::Config as AvailabilityConfig;
26use polkadot_node_core_candidate_validation::Config as CandidateValidationConfig;
27use polkadot_node_core_chain_selection::Config as ChainSelectionConfig;
28use polkadot_node_core_dispute_coordinator::Config as DisputeCoordinatorConfig;
29use polkadot_node_network_protocol::{
30 peer_set::{PeerSet, PeerSetProtocolNames},
31 request_response::{
32 v1 as request_v1, v2 as request_v2, IncomingRequestReceiver, ReqProtocolNames,
33 },
34};
35#[cfg(any(feature = "malus", test))]
36pub use polkadot_overseer::{dummy::dummy_overseer_builder, HeadSupportsParachains};
37use polkadot_overseer::{
38 metrics::Metrics as OverseerMetrics, MetricsTrait, Overseer, OverseerConnector, OverseerHandle,
39 SpawnGlue,
40};
41
42use parking_lot::Mutex;
43use sc_authority_discovery::Service as AuthorityDiscoveryService;
44use sc_client_api::AuxStore;
45use sc_keystore::LocalKeystore;
46use sc_network::{NetworkStateInfo, NotificationService};
47use std::{
48 collections::{HashMap, HashSet},
49 sync::Arc,
50 time::Duration,
51};
52
53pub use polkadot_approval_distribution::ApprovalDistribution as ApprovalDistributionSubsystem;
54pub use polkadot_availability_bitfield_distribution::BitfieldDistribution as BitfieldDistributionSubsystem;
55pub use polkadot_availability_distribution::AvailabilityDistributionSubsystem;
56pub use polkadot_availability_recovery::AvailabilityRecoverySubsystem;
57pub use polkadot_collator_protocol::{CollatorProtocolSubsystem, ProtocolSide};
58pub use polkadot_dispute_distribution::DisputeDistributionSubsystem;
59pub use polkadot_gossip_support::GossipSupport as GossipSupportSubsystem;
60pub use polkadot_network_bridge::{
61 Metrics as NetworkBridgeMetrics, NetworkBridgeRx as NetworkBridgeRxSubsystem,
62 NetworkBridgeTx as NetworkBridgeTxSubsystem,
63};
64pub use polkadot_node_collation_generation::CollationGenerationSubsystem;
65pub use polkadot_node_core_approval_voting::ApprovalVotingSubsystem;
66pub use polkadot_node_core_approval_voting_parallel::{
67 ApprovalVotingParallelSubsystem, Metrics as ApprovalVotingParallelMetrics,
68};
69pub use polkadot_node_core_av_store::AvailabilityStoreSubsystem;
70pub use polkadot_node_core_backing::CandidateBackingSubsystem;
71pub use polkadot_node_core_bitfield_signing::BitfieldSigningSubsystem;
72pub use polkadot_node_core_candidate_validation::CandidateValidationSubsystem;
73pub use polkadot_node_core_chain_api::ChainApiSubsystem;
74pub use polkadot_node_core_chain_selection::ChainSelectionSubsystem;
75pub use polkadot_node_core_dispute_coordinator::DisputeCoordinatorSubsystem;
76pub use polkadot_node_core_prospective_parachains::ProspectiveParachainsSubsystem;
77pub use polkadot_node_core_provisioner::ProvisionerSubsystem;
78pub use polkadot_node_core_pvf_checker::PvfCheckerSubsystem;
79pub use polkadot_node_core_runtime_api::RuntimeApiSubsystem;
80pub use polkadot_statement_distribution::StatementDistributionSubsystem;
81
82pub struct OverseerGenArgs<'a, Spawner, RuntimeClient>
84where
85 Spawner: 'static + SpawnNamed + Clone + Unpin,
86{
87 pub runtime_client: Arc<RuntimeClient>,
89 pub network_service: Arc<dyn sc_network::service::traits::NetworkService>,
91 pub sync_service: Arc<dyn sp_consensus::SyncOracle + Send + Sync>,
93 pub authority_discovery_service: AuthorityDiscoveryService,
95 pub collation_req_v1_receiver: IncomingRequestReceiver<request_v1::CollationFetchingRequest>,
97 pub collation_req_v2_receiver: IncomingRequestReceiver<request_v2::CollationFetchingRequest>,
99 pub available_data_req_receiver:
101 IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
102 pub registry: Option<&'a Registry>,
104 pub spawner: Spawner,
106 pub is_parachain_node: IsParachainNode,
108 pub overseer_message_channel_capacity_override: Option<usize>,
110 pub req_protocol_names: ReqProtocolNames,
112 pub peerset_protocol_names: PeerSetProtocolNames,
114 pub notification_services: HashMap<PeerSet, Box<dyn NotificationService>>,
116}
117
118pub struct ExtendedOverseerGenArgs {
119 pub keystore: Arc<LocalKeystore>,
121 pub parachains_db: Arc<dyn polkadot_node_subsystem_util::database::Database>,
123 pub candidate_validation_config: Option<CandidateValidationConfig>,
125 pub availability_config: AvailabilityConfig,
127 pub pov_req_receiver: IncomingRequestReceiver<request_v1::PoVFetchingRequest>,
129 pub chunk_req_v1_receiver: IncomingRequestReceiver<request_v1::ChunkFetchingRequest>,
131 pub chunk_req_v2_receiver: IncomingRequestReceiver<request_v2::ChunkFetchingRequest>,
133 pub candidate_req_v2_receiver: IncomingRequestReceiver<request_v2::AttestedCandidateRequest>,
135 pub approval_voting_config: ApprovalVotingConfig,
137 pub dispute_req_receiver: IncomingRequestReceiver<request_v1::DisputeRequest>,
139 pub dispute_coordinator_config: DisputeCoordinatorConfig,
141 pub chain_selection_config: ChainSelectionConfig,
143 pub fetch_chunks_threshold: Option<usize>,
147 pub invulnerable_ah_collators: HashSet<polkadot_node_network_protocol::PeerId>,
149 pub collator_protocol_hold_off: Option<Duration>,
151 pub experimental_collator_protocol: bool,
153 pub reputation_config: ReputationConfig,
155}
156
157pub fn validator_overseer_builder<Spawner, RuntimeClient>(
159 OverseerGenArgs {
160 runtime_client,
161 network_service,
162 sync_service,
163 authority_discovery_service,
164 collation_req_v1_receiver: _,
165 collation_req_v2_receiver: _,
166 available_data_req_receiver,
167 registry,
168 spawner,
169 is_parachain_node,
170 overseer_message_channel_capacity_override,
171 req_protocol_names,
172 peerset_protocol_names,
173 notification_services,
174 }: OverseerGenArgs<Spawner, RuntimeClient>,
175 ExtendedOverseerGenArgs {
176 keystore,
177 parachains_db,
178 candidate_validation_config,
179 availability_config,
180 pov_req_receiver,
181 chunk_req_v1_receiver,
182 chunk_req_v2_receiver,
183 candidate_req_v2_receiver,
184 approval_voting_config,
185 dispute_req_receiver,
186 dispute_coordinator_config,
187 chain_selection_config,
188 fetch_chunks_threshold,
189 invulnerable_ah_collators,
190 collator_protocol_hold_off,
191 experimental_collator_protocol,
192 reputation_config,
193 }: ExtendedOverseerGenArgs,
194) -> Result<
195 InitializedOverseerBuilder<
196 SpawnGlue<Spawner>,
197 Arc<RuntimeClient>,
198 CandidateValidationSubsystem,
199 PvfCheckerSubsystem,
200 CandidateBackingSubsystem,
201 StatementDistributionSubsystem,
202 AvailabilityDistributionSubsystem,
203 AvailabilityRecoverySubsystem,
204 BitfieldSigningSubsystem,
205 BitfieldDistributionSubsystem,
206 ProvisionerSubsystem,
207 RuntimeApiSubsystem<RuntimeClient>,
208 AvailabilityStoreSubsystem,
209 NetworkBridgeRxSubsystem<
210 Arc<dyn sc_network::service::traits::NetworkService>,
211 AuthorityDiscoveryService,
212 >,
213 NetworkBridgeTxSubsystem<
214 Arc<dyn sc_network::service::traits::NetworkService>,
215 AuthorityDiscoveryService,
216 >,
217 ChainApiSubsystem<RuntimeClient>,
218 DummySubsystem,
219 CollatorProtocolSubsystem,
220 DummySubsystem,
221 DummySubsystem,
222 ApprovalVotingParallelSubsystem,
223 GossipSupportSubsystem<AuthorityDiscoveryService>,
224 DisputeCoordinatorSubsystem,
225 DisputeDistributionSubsystem<AuthorityDiscoveryService>,
226 ChainSelectionSubsystem,
227 ProspectiveParachainsSubsystem,
228 >,
229 Error,
230>
231where
232 RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
233 Spawner: 'static + SpawnNamed + Clone + Unpin,
234{
235 use polkadot_node_subsystem_util::metrics::Metrics;
236
237 let metrics = <OverseerMetrics as MetricsTrait>::register(registry)?;
238 let notification_sinks = Arc::new(Mutex::new(HashMap::new()));
239
240 let spawner = SpawnGlue(spawner);
241
242 let network_bridge_metrics: NetworkBridgeMetrics = Metrics::register(registry)?;
243 let approval_voting_parallel_metrics: ApprovalVotingParallelMetrics =
244 Metrics::register(registry)?;
245 let builder = Overseer::builder()
246 .network_bridge_tx(NetworkBridgeTxSubsystem::new(
247 network_service.clone(),
248 authority_discovery_service.clone(),
249 network_bridge_metrics.clone(),
250 req_protocol_names.clone(),
251 peerset_protocol_names.clone(),
252 notification_sinks.clone(),
253 ))
254 .network_bridge_rx(NetworkBridgeRxSubsystem::new(
255 network_service.clone(),
256 authority_discovery_service.clone(),
257 Box::new(sync_service.clone()),
258 network_bridge_metrics,
259 peerset_protocol_names,
260 notification_services,
261 notification_sinks,
262 ))
263 .availability_distribution(AvailabilityDistributionSubsystem::new(
264 keystore.clone(),
265 IncomingRequestReceivers {
266 pov_req_receiver,
267 chunk_req_v1_receiver,
268 chunk_req_v2_receiver,
269 },
270 req_protocol_names.clone(),
271 Metrics::register(registry)?,
272 ))
273 .availability_recovery(AvailabilityRecoverySubsystem::for_validator(
274 fetch_chunks_threshold,
275 available_data_req_receiver,
276 &req_protocol_names,
277 Metrics::register(registry)?,
278 ))
279 .availability_store(AvailabilityStoreSubsystem::new(
280 parachains_db.clone(),
281 availability_config,
282 Box::new(sync_service.clone()),
283 Metrics::register(registry)?,
284 ))
285 .bitfield_distribution(BitfieldDistributionSubsystem::new(Metrics::register(registry)?))
286 .bitfield_signing(BitfieldSigningSubsystem::new(
287 keystore.clone(),
288 Metrics::register(registry)?,
289 ))
290 .candidate_backing(CandidateBackingSubsystem::new(
291 keystore.clone(),
292 Metrics::register(registry)?,
293 ))
294 .candidate_validation(CandidateValidationSubsystem::with_config(
295 candidate_validation_config,
296 keystore.clone(),
297 Metrics::register(registry)?, Metrics::register(registry)?, ))
300 .pvf_checker(PvfCheckerSubsystem::new(keystore.clone(), Metrics::register(registry)?))
301 .chain_api(ChainApiSubsystem::new(runtime_client.clone(), Metrics::register(registry)?))
302 .collation_generation(DummySubsystem)
303 .collator_protocol({
304 let side = match is_parachain_node {
305 IsParachainNode::Collator(_) | IsParachainNode::FullNode => {
306 return Err(Error::Overseer(SubsystemError::Context(
307 "build validator overseer for parachain node".to_owned(),
308 )))
309 },
310 IsParachainNode::No => {
311 if experimental_collator_protocol {
312 ProtocolSide::ValidatorExperimental {
313 keystore: keystore.clone(),
314 metrics: Metrics::register(registry)?,
315 db: parachains_db.clone(),
316 reputation_config,
317 }
318 } else {
319 ProtocolSide::Validator {
320 keystore: keystore.clone(),
321 eviction_policy: Default::default(),
322 metrics: Metrics::register(registry)?,
323 invulnerables: invulnerable_ah_collators,
324 collator_protocol_hold_off,
325 }
326 }
327 },
328 };
329 CollatorProtocolSubsystem::new(side)
330 })
331 .provisioner(ProvisionerSubsystem::new(Metrics::register(registry)?))
332 .runtime_api(RuntimeApiSubsystem::new(
333 runtime_client.clone(),
334 Metrics::register(registry)?,
335 spawner.clone(),
336 ))
337 .statement_distribution(StatementDistributionSubsystem::new(
338 keystore.clone(),
339 candidate_req_v2_receiver,
340 Metrics::register(registry)?,
341 ))
342 .approval_distribution(DummySubsystem)
343 .approval_voting(DummySubsystem)
344 .approval_voting_parallel(ApprovalVotingParallelSubsystem::with_config(
345 approval_voting_config,
346 parachains_db.clone(),
347 keystore.clone(),
348 Box::new(sync_service.clone()),
349 approval_voting_parallel_metrics,
350 spawner.clone(),
351 overseer_message_channel_capacity_override,
352 ))
353 .gossip_support(GossipSupportSubsystem::new(
354 keystore.clone(),
355 authority_discovery_service.clone(),
356 Metrics::register(registry)?,
357 ))
358 .dispute_coordinator(DisputeCoordinatorSubsystem::new(
359 parachains_db.clone(),
360 dispute_coordinator_config,
361 keystore.clone(),
362 Metrics::register(registry)?,
363 ))
364 .dispute_distribution(DisputeDistributionSubsystem::new(
365 keystore.clone(),
366 dispute_req_receiver,
367 authority_discovery_service.clone(),
368 Metrics::register(registry)?,
369 ))
370 .chain_selection(ChainSelectionSubsystem::new(chain_selection_config, parachains_db))
371 .prospective_parachains(ProspectiveParachainsSubsystem::new(Metrics::register(registry)?))
372 .activation_external_listeners(Default::default())
373 .active_leaves(Default::default())
374 .supports_parachains(runtime_client)
375 .metrics(metrics)
376 .spawner(spawner);
377
378 let builder = if let Some(capacity) = overseer_message_channel_capacity_override {
379 builder.message_channel_capacity(capacity)
380 } else {
381 builder
382 };
383 Ok(builder)
384}
385
386pub fn collator_overseer_builder<Spawner, RuntimeClient>(
388 OverseerGenArgs {
389 runtime_client,
390 network_service,
391 sync_service,
392 authority_discovery_service,
393 collation_req_v1_receiver: _,
394 collation_req_v2_receiver,
395 available_data_req_receiver,
396 registry,
397 spawner,
398 is_parachain_node,
399 overseer_message_channel_capacity_override,
400 req_protocol_names,
401 peerset_protocol_names,
402 notification_services,
403 }: OverseerGenArgs<Spawner, RuntimeClient>,
404) -> Result<
405 InitializedOverseerBuilder<
406 SpawnGlue<Spawner>,
407 Arc<RuntimeClient>,
408 DummySubsystem,
409 DummySubsystem,
410 DummySubsystem,
411 DummySubsystem,
412 DummySubsystem,
413 AvailabilityRecoverySubsystem,
414 DummySubsystem,
415 DummySubsystem,
416 DummySubsystem,
417 RuntimeApiSubsystem<RuntimeClient>,
418 DummySubsystem,
419 NetworkBridgeRxSubsystem<
420 Arc<dyn sc_network::service::traits::NetworkService>,
421 AuthorityDiscoveryService,
422 >,
423 NetworkBridgeTxSubsystem<
424 Arc<dyn sc_network::service::traits::NetworkService>,
425 AuthorityDiscoveryService,
426 >,
427 ChainApiSubsystem<RuntimeClient>,
428 CollationGenerationSubsystem,
429 CollatorProtocolSubsystem,
430 DummySubsystem,
431 DummySubsystem,
432 DummySubsystem,
433 DummySubsystem,
434 DummySubsystem,
435 DummySubsystem,
436 DummySubsystem,
437 DummySubsystem,
438 >,
439 Error,
440>
441where
442 Spawner: 'static + SpawnNamed + Clone + Unpin,
443 RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
444{
445 use polkadot_node_subsystem_util::metrics::Metrics;
446
447 let notification_sinks = Arc::new(Mutex::new(HashMap::new()));
448
449 let spawner = SpawnGlue(spawner);
450
451 let network_bridge_metrics: NetworkBridgeMetrics = Metrics::register(registry)?;
452
453 let builder = Overseer::builder()
454 .network_bridge_tx(NetworkBridgeTxSubsystem::new(
455 network_service.clone(),
456 authority_discovery_service.clone(),
457 network_bridge_metrics.clone(),
458 req_protocol_names.clone(),
459 peerset_protocol_names.clone(),
460 notification_sinks.clone(),
461 ))
462 .network_bridge_rx(NetworkBridgeRxSubsystem::new(
463 network_service.clone(),
464 authority_discovery_service.clone(),
465 Box::new(sync_service.clone()),
466 network_bridge_metrics,
467 peerset_protocol_names,
468 notification_services,
469 notification_sinks,
470 ))
471 .availability_distribution(DummySubsystem)
472 .availability_recovery(AvailabilityRecoverySubsystem::for_collator(
473 None,
474 available_data_req_receiver,
475 &req_protocol_names,
476 Metrics::register(registry)?,
477 ))
478 .availability_store(DummySubsystem)
479 .bitfield_distribution(DummySubsystem)
480 .bitfield_signing(DummySubsystem)
481 .candidate_backing(DummySubsystem)
482 .candidate_validation(DummySubsystem)
483 .pvf_checker(DummySubsystem)
484 .chain_api(ChainApiSubsystem::new(runtime_client.clone(), Metrics::register(registry)?))
485 .collation_generation(CollationGenerationSubsystem::new(Metrics::register(registry)?))
486 .collator_protocol({
487 let side = match is_parachain_node {
488 IsParachainNode::No => {
489 return Err(Error::Overseer(SubsystemError::Context(
490 "build parachain node overseer for validator".to_owned(),
491 )))
492 },
493 IsParachainNode::Collator(collator_pair) => ProtocolSide::Collator {
494 peer_id: network_service.local_peer_id(),
495 collator_pair,
496 request_receiver_v2: collation_req_v2_receiver,
497 metrics: Metrics::register(registry)?,
498 },
499 IsParachainNode::FullNode => ProtocolSide::None,
500 };
501 CollatorProtocolSubsystem::new(side)
502 })
503 .provisioner(DummySubsystem)
504 .runtime_api(RuntimeApiSubsystem::new(
505 runtime_client.clone(),
506 Metrics::register(registry)?,
507 spawner.clone(),
508 ))
509 .statement_distribution(DummySubsystem)
510 .approval_distribution(DummySubsystem)
511 .approval_voting(DummySubsystem)
512 .approval_voting_parallel(DummySubsystem)
513 .gossip_support(DummySubsystem)
514 .dispute_coordinator(DummySubsystem)
515 .dispute_distribution(DummySubsystem)
516 .chain_selection(DummySubsystem)
517 .prospective_parachains(DummySubsystem)
518 .activation_external_listeners(Default::default())
519 .active_leaves(Default::default())
520 .supports_parachains(runtime_client)
521 .metrics(Metrics::register(registry)?)
522 .spawner(spawner);
523
524 let builder = if let Some(capacity) = overseer_message_channel_capacity_override {
525 builder.message_channel_capacity(capacity)
526 } else {
527 builder
528 };
529 Ok(builder)
530}
531
532pub trait OverseerGen {
534 fn generate<Spawner, RuntimeClient>(
536 &self,
537 connector: OverseerConnector,
538 args: OverseerGenArgs<Spawner, RuntimeClient>,
539 ext_args: Option<ExtendedOverseerGenArgs>,
540 ) -> Result<(Overseer<SpawnGlue<Spawner>, Arc<RuntimeClient>>, OverseerHandle), Error>
541 where
542 RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
543 Spawner: 'static + SpawnNamed + Clone + Unpin;
544
545 }
549
550pub struct ValidatorOverseerGen;
552
553impl OverseerGen for ValidatorOverseerGen {
554 fn generate<Spawner, RuntimeClient>(
555 &self,
556 connector: OverseerConnector,
557 args: OverseerGenArgs<Spawner, RuntimeClient>,
558 ext_args: Option<ExtendedOverseerGenArgs>,
559 ) -> Result<(Overseer<SpawnGlue<Spawner>, Arc<RuntimeClient>>, OverseerHandle), Error>
560 where
561 RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
562 Spawner: 'static + SpawnNamed + Clone + Unpin,
563 {
564 let ext_args = ext_args.ok_or(Error::Overseer(SubsystemError::Context(
565 "create validator overseer as mandatory extended arguments were not provided"
566 .to_owned(),
567 )))?;
568 validator_overseer_builder(args, ext_args)?
569 .build_with_connector(connector)
570 .map_err(|e| e.into())
571 }
572}
573
574pub struct CollatorOverseerGen;
576
577impl OverseerGen for CollatorOverseerGen {
578 fn generate<Spawner, RuntimeClient>(
579 &self,
580 connector: OverseerConnector,
581 args: OverseerGenArgs<Spawner, RuntimeClient>,
582 _ext_args: Option<ExtendedOverseerGenArgs>,
583 ) -> Result<(Overseer<SpawnGlue<Spawner>, Arc<RuntimeClient>>, OverseerHandle), Error>
584 where
585 RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
586 Spawner: 'static + SpawnNamed + Clone + Unpin,
587 {
588 collator_overseer_builder(args)?
589 .build_with_connector(connector)
590 .map_err(|e| e.into())
591 }
592}