1use super::{Error, IsParachainNode, Registry};
18use polkadot_collator_protocol::ReputationConfig;
19use polkadot_node_subsystem_types::{ChainApiBackend, RuntimeApiSubsystemClient};
20use polkadot_overseer::{DummySubsystem, InitializedOverseerBuilder, SubsystemError};
21use sp_core::traits::SpawnNamed;
22
23use polkadot_availability_distribution::IncomingRequestReceivers;
24use polkadot_node_core_approval_voting::Config as ApprovalVotingConfig;
25use polkadot_node_core_av_store::Config as AvailabilityConfig;
26use polkadot_node_core_candidate_validation::Config as CandidateValidationConfig;
27use polkadot_node_core_chain_selection::Config as ChainSelectionConfig;
28use polkadot_node_core_dispute_coordinator::Config as DisputeCoordinatorConfig;
29use polkadot_node_network_protocol::{
30 peer_set::{PeerSet, PeerSetProtocolNames},
31 request_response::{
32 v1 as request_v1, v2 as request_v2, IncomingRequestReceiver, ReqProtocolNames,
33 },
34};
35#[cfg(any(feature = "malus", test))]
36pub use polkadot_overseer::{dummy::dummy_overseer_builder, HeadSupportsParachains};
37use polkadot_overseer::{
38 metrics::Metrics as OverseerMetrics, MetricsTrait, Overseer, OverseerConnector, OverseerHandle,
39 SpawnGlue,
40};
41
42use parking_lot::Mutex;
43use sc_authority_discovery::Service as AuthorityDiscoveryService;
44use sc_client_api::AuxStore;
45use sc_keystore::LocalKeystore;
46use sc_network::{NetworkStateInfo, NotificationService};
47use std::{
48 collections::{HashMap, HashSet},
49 sync::Arc,
50 time::Duration,
51};
52
53pub use polkadot_approval_distribution::ApprovalDistribution as ApprovalDistributionSubsystem;
54pub use polkadot_availability_bitfield_distribution::BitfieldDistribution as BitfieldDistributionSubsystem;
55pub use polkadot_availability_distribution::AvailabilityDistributionSubsystem;
56pub use polkadot_availability_recovery::AvailabilityRecoverySubsystem;
57pub use polkadot_collator_protocol::{CollatorProtocolSubsystem, ProtocolSide};
58pub use polkadot_dispute_distribution::DisputeDistributionSubsystem;
59pub use polkadot_gossip_support::GossipSupport as GossipSupportSubsystem;
60pub use polkadot_network_bridge::{
61 Metrics as NetworkBridgeMetrics, NetworkBridgeRx as NetworkBridgeRxSubsystem,
62 NetworkBridgeTx as NetworkBridgeTxSubsystem,
63};
64pub use polkadot_node_collation_generation::CollationGenerationSubsystem;
65pub use polkadot_node_core_approval_voting::ApprovalVotingSubsystem;
66pub use polkadot_node_core_approval_voting_parallel::{
67 ApprovalVotingParallelSubsystem, Metrics as ApprovalVotingParallelMetrics,
68};
69pub use polkadot_node_core_av_store::AvailabilityStoreSubsystem;
70pub use polkadot_node_core_backing::CandidateBackingSubsystem;
71pub use polkadot_node_core_bitfield_signing::BitfieldSigningSubsystem;
72pub use polkadot_node_core_candidate_validation::CandidateValidationSubsystem;
73pub use polkadot_node_core_chain_api::ChainApiSubsystem;
74pub use polkadot_node_core_chain_selection::ChainSelectionSubsystem;
75pub use polkadot_node_core_dispute_coordinator::DisputeCoordinatorSubsystem;
76pub use polkadot_node_core_prospective_parachains::ProspectiveParachainsSubsystem;
77pub use polkadot_node_core_provisioner::ProvisionerSubsystem;
78pub use polkadot_node_core_pvf_checker::PvfCheckerSubsystem;
79pub use polkadot_node_core_runtime_api::RuntimeApiSubsystem;
80pub use polkadot_statement_distribution::StatementDistributionSubsystem;
81
82pub struct OverseerGenArgs<'a, Spawner, RuntimeClient>
84where
85 Spawner: 'static + SpawnNamed + Clone + Unpin,
86{
87 pub runtime_client: Arc<RuntimeClient>,
89 pub network_service: Arc<dyn sc_network::service::traits::NetworkService>,
91 pub sync_service: Arc<dyn sp_consensus::SyncOracle + Send + Sync>,
93 pub authority_discovery_service: AuthorityDiscoveryService,
95 pub collation_req_v1_receiver: IncomingRequestReceiver<request_v1::CollationFetchingRequest>,
97 pub collation_req_v2_receiver: IncomingRequestReceiver<request_v2::CollationFetchingRequest>,
99 pub available_data_req_receiver:
101 IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
102 pub registry: Option<&'a Registry>,
104 pub spawner: Spawner,
106 pub is_parachain_node: IsParachainNode,
108 pub overseer_message_channel_capacity_override: Option<usize>,
110 pub req_protocol_names: ReqProtocolNames,
112 pub peerset_protocol_names: PeerSetProtocolNames,
114 pub notification_services: HashMap<PeerSet, Box<dyn NotificationService>>,
116}
117
118pub struct ExtendedOverseerGenArgs {
119 pub keystore: Arc<LocalKeystore>,
121 pub parachains_db: Arc<dyn polkadot_node_subsystem_util::database::Database>,
123 pub candidate_validation_config: Option<CandidateValidationConfig>,
125 pub availability_config: AvailabilityConfig,
127 pub pov_req_receiver: IncomingRequestReceiver<request_v1::PoVFetchingRequest>,
129 pub chunk_req_v1_receiver: IncomingRequestReceiver<request_v1::ChunkFetchingRequest>,
131 pub chunk_req_v2_receiver: IncomingRequestReceiver<request_v2::ChunkFetchingRequest>,
133 pub candidate_req_v2_receiver: IncomingRequestReceiver<request_v2::AttestedCandidateRequest>,
135 pub approval_voting_config: ApprovalVotingConfig,
137 pub dispute_req_receiver: IncomingRequestReceiver<request_v1::DisputeRequest>,
139 pub dispute_coordinator_config: DisputeCoordinatorConfig,
141 pub chain_selection_config: ChainSelectionConfig,
143 pub fetch_chunks_threshold: Option<usize>,
147 pub invulnerable_ah_collators: HashSet<polkadot_node_network_protocol::PeerId>,
149 pub collator_protocol_hold_off: Option<Duration>,
151 pub experimental_collator_protocol: bool,
153 pub reputation_config: ReputationConfig,
155}
156
157pub fn validator_overseer_builder<Spawner, RuntimeClient>(
159 OverseerGenArgs {
160 runtime_client,
161 network_service,
162 sync_service,
163 authority_discovery_service,
164 collation_req_v1_receiver: _,
165 collation_req_v2_receiver: _,
166 available_data_req_receiver,
167 registry,
168 spawner,
169 is_parachain_node,
170 overseer_message_channel_capacity_override,
171 req_protocol_names,
172 peerset_protocol_names,
173 notification_services,
174 }: OverseerGenArgs<Spawner, RuntimeClient>,
175 ExtendedOverseerGenArgs {
176 keystore,
177 parachains_db,
178 candidate_validation_config,
179 availability_config,
180 pov_req_receiver,
181 chunk_req_v1_receiver,
182 chunk_req_v2_receiver,
183 candidate_req_v2_receiver,
184 approval_voting_config,
185 dispute_req_receiver,
186 dispute_coordinator_config,
187 chain_selection_config,
188 fetch_chunks_threshold,
189 invulnerable_ah_collators,
190 collator_protocol_hold_off,
191 experimental_collator_protocol,
192 reputation_config,
193 }: ExtendedOverseerGenArgs,
194) -> Result<
195 InitializedOverseerBuilder<
196 SpawnGlue<Spawner>,
197 Arc<RuntimeClient>,
198 CandidateValidationSubsystem,
199 PvfCheckerSubsystem,
200 CandidateBackingSubsystem,
201 StatementDistributionSubsystem,
202 AvailabilityDistributionSubsystem,
203 AvailabilityRecoverySubsystem,
204 BitfieldSigningSubsystem,
205 BitfieldDistributionSubsystem,
206 ProvisionerSubsystem,
207 RuntimeApiSubsystem<RuntimeClient>,
208 AvailabilityStoreSubsystem,
209 NetworkBridgeRxSubsystem<
210 Arc<dyn sc_network::service::traits::NetworkService>,
211 AuthorityDiscoveryService,
212 >,
213 NetworkBridgeTxSubsystem<
214 Arc<dyn sc_network::service::traits::NetworkService>,
215 AuthorityDiscoveryService,
216 >,
217 ChainApiSubsystem<RuntimeClient>,
218 DummySubsystem,
219 CollatorProtocolSubsystem,
220 DummySubsystem,
221 DummySubsystem,
222 ApprovalVotingParallelSubsystem,
223 GossipSupportSubsystem<AuthorityDiscoveryService>,
224 DisputeCoordinatorSubsystem,
225 DisputeDistributionSubsystem<AuthorityDiscoveryService>,
226 ChainSelectionSubsystem,
227 ProspectiveParachainsSubsystem,
228 >,
229 Error,
230>
231where
232 RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
233 Spawner: 'static + SpawnNamed + Clone + Unpin,
234{
235 use polkadot_node_subsystem_util::metrics::Metrics;
236
237 let metrics = <OverseerMetrics as MetricsTrait>::register(registry)?;
238 let notification_sinks = Arc::new(Mutex::new(HashMap::new()));
239
240 let spawner = SpawnGlue(spawner);
241
242 let network_bridge_metrics: NetworkBridgeMetrics = Metrics::register(registry)?;
243 let approval_voting_parallel_metrics: ApprovalVotingParallelMetrics =
244 Metrics::register(registry)?;
245 let builder = Overseer::builder()
246 .network_bridge_tx(NetworkBridgeTxSubsystem::new(
247 network_service.clone(),
248 authority_discovery_service.clone(),
249 network_bridge_metrics.clone(),
250 req_protocol_names.clone(),
251 peerset_protocol_names.clone(),
252 notification_sinks.clone(),
253 ))
254 .network_bridge_rx(NetworkBridgeRxSubsystem::new(
255 network_service.clone(),
256 authority_discovery_service.clone(),
257 Box::new(sync_service.clone()),
258 network_bridge_metrics,
259 peerset_protocol_names,
260 notification_services,
261 notification_sinks,
262 ))
263 .availability_distribution(AvailabilityDistributionSubsystem::new(
264 keystore.clone(),
265 IncomingRequestReceivers {
266 pov_req_receiver,
267 chunk_req_v1_receiver,
268 chunk_req_v2_receiver,
269 },
270 req_protocol_names.clone(),
271 Metrics::register(registry)?,
272 ))
273 .availability_recovery(AvailabilityRecoverySubsystem::for_validator(
274 fetch_chunks_threshold,
275 available_data_req_receiver,
276 &req_protocol_names,
277 Metrics::register(registry)?,
278 ))
279 .availability_store(AvailabilityStoreSubsystem::new(
280 parachains_db.clone(),
281 availability_config,
282 Box::new(sync_service.clone()),
283 Metrics::register(registry)?,
284 ))
285 .bitfield_distribution(BitfieldDistributionSubsystem::new(Metrics::register(registry)?))
286 .bitfield_signing(BitfieldSigningSubsystem::new(
287 keystore.clone(),
288 Metrics::register(registry)?,
289 ))
290 .candidate_backing(CandidateBackingSubsystem::new(
291 keystore.clone(),
292 Metrics::register(registry)?,
293 ))
294 .candidate_validation(CandidateValidationSubsystem::with_config(
295 candidate_validation_config,
296 keystore.clone(),
297 Metrics::register(registry)?, Metrics::register(registry)?, ))
300 .pvf_checker(PvfCheckerSubsystem::new(keystore.clone(), Metrics::register(registry)?))
301 .chain_api(ChainApiSubsystem::new(runtime_client.clone(), Metrics::register(registry)?))
302 .collation_generation(DummySubsystem)
303 .collator_protocol({
304 let side = match is_parachain_node {
305 IsParachainNode::Collator(_) | IsParachainNode::FullNode => {
306 return Err(Error::Overseer(SubsystemError::Context(
307 "build validator overseer for parachain node".to_owned(),
308 )))
309 },
310 IsParachainNode::No => {
311 if experimental_collator_protocol {
312 ProtocolSide::ValidatorExperimental {
313 keystore: keystore.clone(),
314 metrics: Metrics::register(registry)?,
315 db: parachains_db.clone(),
316 reputation_config,
317 clock: polkadot_node_clock::system_clock(),
318 }
319 } else {
320 ProtocolSide::Validator {
321 keystore: keystore.clone(),
322 eviction_policy: Default::default(),
323 metrics: Metrics::register(registry)?,
324 invulnerables: invulnerable_ah_collators,
325 collator_protocol_hold_off,
326 clock: polkadot_node_clock::system_clock(),
327 }
328 }
329 },
330 };
331 CollatorProtocolSubsystem::new(side)
332 })
333 .provisioner(ProvisionerSubsystem::new(Metrics::register(registry)?))
334 .runtime_api(RuntimeApiSubsystem::new(
335 runtime_client.clone(),
336 Metrics::register(registry)?,
337 spawner.clone(),
338 ))
339 .statement_distribution(StatementDistributionSubsystem::new(
340 keystore.clone(),
341 candidate_req_v2_receiver,
342 Metrics::register(registry)?,
343 ))
344 .approval_distribution(DummySubsystem)
345 .approval_voting(DummySubsystem)
346 .approval_voting_parallel(ApprovalVotingParallelSubsystem::with_config(
347 approval_voting_config,
348 parachains_db.clone(),
349 keystore.clone(),
350 Box::new(sync_service.clone()),
351 approval_voting_parallel_metrics,
352 spawner.clone(),
353 overseer_message_channel_capacity_override,
354 ))
355 .gossip_support(GossipSupportSubsystem::new(
356 keystore.clone(),
357 authority_discovery_service.clone(),
358 Metrics::register(registry)?,
359 ))
360 .dispute_coordinator(DisputeCoordinatorSubsystem::new(
361 parachains_db.clone(),
362 dispute_coordinator_config,
363 keystore.clone(),
364 Metrics::register(registry)?,
365 ))
366 .dispute_distribution(DisputeDistributionSubsystem::new(
367 keystore.clone(),
368 dispute_req_receiver,
369 authority_discovery_service.clone(),
370 Metrics::register(registry)?,
371 ))
372 .chain_selection(ChainSelectionSubsystem::new(chain_selection_config, parachains_db))
373 .prospective_parachains(ProspectiveParachainsSubsystem::new(Metrics::register(registry)?))
374 .activation_external_listeners(Default::default())
375 .active_leaves(Default::default())
376 .supports_parachains(runtime_client)
377 .metrics(metrics)
378 .spawner(spawner);
379
380 let builder = if let Some(capacity) = overseer_message_channel_capacity_override {
381 builder.message_channel_capacity(capacity)
382 } else {
383 builder
384 };
385 Ok(builder)
386}
387
388pub fn collator_overseer_builder<Spawner, RuntimeClient>(
390 OverseerGenArgs {
391 runtime_client,
392 network_service,
393 sync_service,
394 authority_discovery_service,
395 collation_req_v1_receiver: _,
396 collation_req_v2_receiver,
397 available_data_req_receiver,
398 registry,
399 spawner,
400 is_parachain_node,
401 overseer_message_channel_capacity_override,
402 req_protocol_names,
403 peerset_protocol_names,
404 notification_services,
405 }: OverseerGenArgs<Spawner, RuntimeClient>,
406) -> Result<
407 InitializedOverseerBuilder<
408 SpawnGlue<Spawner>,
409 Arc<RuntimeClient>,
410 DummySubsystem,
411 DummySubsystem,
412 DummySubsystem,
413 DummySubsystem,
414 DummySubsystem,
415 AvailabilityRecoverySubsystem,
416 DummySubsystem,
417 DummySubsystem,
418 DummySubsystem,
419 RuntimeApiSubsystem<RuntimeClient>,
420 DummySubsystem,
421 NetworkBridgeRxSubsystem<
422 Arc<dyn sc_network::service::traits::NetworkService>,
423 AuthorityDiscoveryService,
424 >,
425 NetworkBridgeTxSubsystem<
426 Arc<dyn sc_network::service::traits::NetworkService>,
427 AuthorityDiscoveryService,
428 >,
429 ChainApiSubsystem<RuntimeClient>,
430 CollationGenerationSubsystem,
431 CollatorProtocolSubsystem,
432 DummySubsystem,
433 DummySubsystem,
434 DummySubsystem,
435 DummySubsystem,
436 DummySubsystem,
437 DummySubsystem,
438 DummySubsystem,
439 DummySubsystem,
440 >,
441 Error,
442>
443where
444 Spawner: 'static + SpawnNamed + Clone + Unpin,
445 RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
446{
447 use polkadot_node_subsystem_util::metrics::Metrics;
448
449 let notification_sinks = Arc::new(Mutex::new(HashMap::new()));
450
451 let spawner = SpawnGlue(spawner);
452
453 let network_bridge_metrics: NetworkBridgeMetrics = Metrics::register(registry)?;
454
455 let builder = Overseer::builder()
456 .network_bridge_tx(NetworkBridgeTxSubsystem::new(
457 network_service.clone(),
458 authority_discovery_service.clone(),
459 network_bridge_metrics.clone(),
460 req_protocol_names.clone(),
461 peerset_protocol_names.clone(),
462 notification_sinks.clone(),
463 ))
464 .network_bridge_rx(NetworkBridgeRxSubsystem::new(
465 network_service.clone(),
466 authority_discovery_service.clone(),
467 Box::new(sync_service.clone()),
468 network_bridge_metrics,
469 peerset_protocol_names,
470 notification_services,
471 notification_sinks,
472 ))
473 .availability_distribution(DummySubsystem)
474 .availability_recovery(AvailabilityRecoverySubsystem::for_collator(
475 None,
476 available_data_req_receiver,
477 &req_protocol_names,
478 Metrics::register(registry)?,
479 ))
480 .availability_store(DummySubsystem)
481 .bitfield_distribution(DummySubsystem)
482 .bitfield_signing(DummySubsystem)
483 .candidate_backing(DummySubsystem)
484 .candidate_validation(DummySubsystem)
485 .pvf_checker(DummySubsystem)
486 .chain_api(ChainApiSubsystem::new(runtime_client.clone(), Metrics::register(registry)?))
487 .collation_generation(CollationGenerationSubsystem::new(Metrics::register(registry)?))
488 .collator_protocol({
489 let side = match is_parachain_node {
490 IsParachainNode::No => {
491 return Err(Error::Overseer(SubsystemError::Context(
492 "build parachain node overseer for validator".to_owned(),
493 )))
494 },
495 IsParachainNode::Collator(collator_pair) => ProtocolSide::Collator {
496 peer_id: network_service.local_peer_id(),
497 collator_pair,
498 request_receiver_v2: collation_req_v2_receiver,
499 metrics: Metrics::register(registry)?,
500 clock: polkadot_node_clock::system_clock(),
501 },
502 IsParachainNode::FullNode => ProtocolSide::None,
503 };
504 CollatorProtocolSubsystem::new(side)
505 })
506 .provisioner(DummySubsystem)
507 .runtime_api(RuntimeApiSubsystem::new(
508 runtime_client.clone(),
509 Metrics::register(registry)?,
510 spawner.clone(),
511 ))
512 .statement_distribution(DummySubsystem)
513 .approval_distribution(DummySubsystem)
514 .approval_voting(DummySubsystem)
515 .approval_voting_parallel(DummySubsystem)
516 .gossip_support(DummySubsystem)
517 .dispute_coordinator(DummySubsystem)
518 .dispute_distribution(DummySubsystem)
519 .chain_selection(DummySubsystem)
520 .prospective_parachains(DummySubsystem)
521 .activation_external_listeners(Default::default())
522 .active_leaves(Default::default())
523 .supports_parachains(runtime_client)
524 .metrics(Metrics::register(registry)?)
525 .spawner(spawner);
526
527 let builder = if let Some(capacity) = overseer_message_channel_capacity_override {
528 builder.message_channel_capacity(capacity)
529 } else {
530 builder
531 };
532 Ok(builder)
533}
534
535pub trait OverseerGen {
537 fn generate<Spawner, RuntimeClient>(
539 &self,
540 connector: OverseerConnector,
541 args: OverseerGenArgs<Spawner, RuntimeClient>,
542 ext_args: Option<ExtendedOverseerGenArgs>,
543 ) -> Result<(Overseer<SpawnGlue<Spawner>, Arc<RuntimeClient>>, OverseerHandle), Error>
544 where
545 RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
546 Spawner: 'static + SpawnNamed + Clone + Unpin;
547
548 }
552
553pub struct ValidatorOverseerGen;
555
556impl OverseerGen for ValidatorOverseerGen {
557 fn generate<Spawner, RuntimeClient>(
558 &self,
559 connector: OverseerConnector,
560 args: OverseerGenArgs<Spawner, RuntimeClient>,
561 ext_args: Option<ExtendedOverseerGenArgs>,
562 ) -> Result<(Overseer<SpawnGlue<Spawner>, Arc<RuntimeClient>>, OverseerHandle), Error>
563 where
564 RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
565 Spawner: 'static + SpawnNamed + Clone + Unpin,
566 {
567 let ext_args = ext_args.ok_or(Error::Overseer(SubsystemError::Context(
568 "create validator overseer as mandatory extended arguments were not provided"
569 .to_owned(),
570 )))?;
571 validator_overseer_builder(args, ext_args)?
572 .build_with_connector(connector)
573 .map_err(|e| e.into())
574 }
575}
576
577pub struct CollatorOverseerGen;
579
580impl OverseerGen for CollatorOverseerGen {
581 fn generate<Spawner, RuntimeClient>(
582 &self,
583 connector: OverseerConnector,
584 args: OverseerGenArgs<Spawner, RuntimeClient>,
585 _ext_args: Option<ExtendedOverseerGenArgs>,
586 ) -> Result<(Overseer<SpawnGlue<Spawner>, Arc<RuntimeClient>>, OverseerHandle), Error>
587 where
588 RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
589 Spawner: 'static + SpawnNamed + Clone + Unpin,
590 {
591 collator_overseer_builder(args)?
592 .build_with_connector(connector)
593 .map_err(|e| e.into())
594 }
595}