1use super::{Error, IsParachainNode, Registry};
18use polkadot_node_subsystem_types::{ChainApiBackend, RuntimeApiSubsystemClient};
19use polkadot_overseer::{DummySubsystem, InitializedOverseerBuilder, SubsystemError};
20use sp_core::traits::SpawnNamed;
21
22use polkadot_availability_distribution::IncomingRequestReceivers;
23use polkadot_node_core_approval_voting::Config as ApprovalVotingConfig;
24use polkadot_node_core_av_store::Config as AvailabilityConfig;
25use polkadot_node_core_candidate_validation::Config as CandidateValidationConfig;
26use polkadot_node_core_chain_selection::Config as ChainSelectionConfig;
27use polkadot_node_core_dispute_coordinator::Config as DisputeCoordinatorConfig;
28use polkadot_node_network_protocol::{
29 peer_set::{PeerSet, PeerSetProtocolNames},
30 request_response::{
31 v1 as request_v1, v2 as request_v2, IncomingRequestReceiver, ReqProtocolNames,
32 },
33};
34#[cfg(any(feature = "malus", test))]
35pub use polkadot_overseer::{dummy::dummy_overseer_builder, HeadSupportsParachains};
36use polkadot_overseer::{
37 metrics::Metrics as OverseerMetrics, MetricsTrait, Overseer, OverseerConnector, OverseerHandle,
38 SpawnGlue,
39};
40
41use parking_lot::Mutex;
42use sc_authority_discovery::Service as AuthorityDiscoveryService;
43use sc_client_api::AuxStore;
44use sc_keystore::LocalKeystore;
45use sc_network::{NetworkStateInfo, NotificationService};
46use std::{collections::HashMap, sync::Arc};
47
48pub use polkadot_approval_distribution::ApprovalDistribution as ApprovalDistributionSubsystem;
49pub use polkadot_availability_bitfield_distribution::BitfieldDistribution as BitfieldDistributionSubsystem;
50pub use polkadot_availability_distribution::AvailabilityDistributionSubsystem;
51pub use polkadot_availability_recovery::AvailabilityRecoverySubsystem;
52pub use polkadot_collator_protocol::{CollatorProtocolSubsystem, ProtocolSide};
53pub use polkadot_dispute_distribution::DisputeDistributionSubsystem;
54pub use polkadot_gossip_support::GossipSupport as GossipSupportSubsystem;
55pub use polkadot_network_bridge::{
56 Metrics as NetworkBridgeMetrics, NetworkBridgeRx as NetworkBridgeRxSubsystem,
57 NetworkBridgeTx as NetworkBridgeTxSubsystem,
58};
59pub use polkadot_node_collation_generation::CollationGenerationSubsystem;
60pub use polkadot_node_core_approval_voting::ApprovalVotingSubsystem;
61pub use polkadot_node_core_approval_voting_parallel::{
62 ApprovalVotingParallelSubsystem, Metrics as ApprovalVotingParallelMetrics,
63};
64pub use polkadot_node_core_av_store::AvailabilityStoreSubsystem;
65pub use polkadot_node_core_backing::CandidateBackingSubsystem;
66pub use polkadot_node_core_bitfield_signing::BitfieldSigningSubsystem;
67pub use polkadot_node_core_candidate_validation::CandidateValidationSubsystem;
68pub use polkadot_node_core_chain_api::ChainApiSubsystem;
69pub use polkadot_node_core_chain_selection::ChainSelectionSubsystem;
70pub use polkadot_node_core_dispute_coordinator::DisputeCoordinatorSubsystem;
71pub use polkadot_node_core_prospective_parachains::ProspectiveParachainsSubsystem;
72pub use polkadot_node_core_provisioner::ProvisionerSubsystem;
73pub use polkadot_node_core_pvf_checker::PvfCheckerSubsystem;
74pub use polkadot_node_core_runtime_api::RuntimeApiSubsystem;
75pub use polkadot_statement_distribution::StatementDistributionSubsystem;
76
77pub struct OverseerGenArgs<'a, Spawner, RuntimeClient>
79where
80 Spawner: 'static + SpawnNamed + Clone + Unpin,
81{
82 pub runtime_client: Arc<RuntimeClient>,
84 pub network_service: Arc<dyn sc_network::service::traits::NetworkService>,
86 pub sync_service: Arc<dyn sp_consensus::SyncOracle + Send + Sync>,
88 pub authority_discovery_service: AuthorityDiscoveryService,
90 pub collation_req_v1_receiver: IncomingRequestReceiver<request_v1::CollationFetchingRequest>,
92 pub collation_req_v2_receiver: IncomingRequestReceiver<request_v2::CollationFetchingRequest>,
94 pub available_data_req_receiver:
96 IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
97 pub registry: Option<&'a Registry>,
99 pub spawner: Spawner,
101 pub is_parachain_node: IsParachainNode,
103 pub overseer_message_channel_capacity_override: Option<usize>,
105 pub req_protocol_names: ReqProtocolNames,
107 pub peerset_protocol_names: PeerSetProtocolNames,
109 pub notification_services: HashMap<PeerSet, Box<dyn NotificationService>>,
111}
112
113pub struct ExtendedOverseerGenArgs {
114 pub keystore: Arc<LocalKeystore>,
116 pub parachains_db: Arc<dyn polkadot_node_subsystem_util::database::Database>,
118 pub candidate_validation_config: Option<CandidateValidationConfig>,
120 pub availability_config: AvailabilityConfig,
122 pub pov_req_receiver: IncomingRequestReceiver<request_v1::PoVFetchingRequest>,
124 pub chunk_req_v1_receiver: IncomingRequestReceiver<request_v1::ChunkFetchingRequest>,
126 pub chunk_req_v2_receiver: IncomingRequestReceiver<request_v2::ChunkFetchingRequest>,
128 pub candidate_req_v2_receiver: IncomingRequestReceiver<request_v2::AttestedCandidateRequest>,
130 pub approval_voting_config: ApprovalVotingConfig,
132 pub dispute_req_receiver: IncomingRequestReceiver<request_v1::DisputeRequest>,
134 pub dispute_coordinator_config: DisputeCoordinatorConfig,
136 pub chain_selection_config: ChainSelectionConfig,
138 pub fetch_chunks_threshold: Option<usize>,
142}
143
144pub fn validator_overseer_builder<Spawner, RuntimeClient>(
146 OverseerGenArgs {
147 runtime_client,
148 network_service,
149 sync_service,
150 authority_discovery_service,
151 collation_req_v1_receiver: _,
152 collation_req_v2_receiver: _,
153 available_data_req_receiver,
154 registry,
155 spawner,
156 is_parachain_node,
157 overseer_message_channel_capacity_override,
158 req_protocol_names,
159 peerset_protocol_names,
160 notification_services,
161 }: OverseerGenArgs<Spawner, RuntimeClient>,
162 ExtendedOverseerGenArgs {
163 keystore,
164 parachains_db,
165 candidate_validation_config,
166 availability_config,
167 pov_req_receiver,
168 chunk_req_v1_receiver,
169 chunk_req_v2_receiver,
170 candidate_req_v2_receiver,
171 approval_voting_config,
172 dispute_req_receiver,
173 dispute_coordinator_config,
174 chain_selection_config,
175 fetch_chunks_threshold,
176 }: ExtendedOverseerGenArgs,
177) -> Result<
178 InitializedOverseerBuilder<
179 SpawnGlue<Spawner>,
180 Arc<RuntimeClient>,
181 CandidateValidationSubsystem,
182 PvfCheckerSubsystem,
183 CandidateBackingSubsystem,
184 StatementDistributionSubsystem,
185 AvailabilityDistributionSubsystem,
186 AvailabilityRecoverySubsystem,
187 BitfieldSigningSubsystem,
188 BitfieldDistributionSubsystem,
189 ProvisionerSubsystem,
190 RuntimeApiSubsystem<RuntimeClient>,
191 AvailabilityStoreSubsystem,
192 NetworkBridgeRxSubsystem<
193 Arc<dyn sc_network::service::traits::NetworkService>,
194 AuthorityDiscoveryService,
195 >,
196 NetworkBridgeTxSubsystem<
197 Arc<dyn sc_network::service::traits::NetworkService>,
198 AuthorityDiscoveryService,
199 >,
200 ChainApiSubsystem<RuntimeClient>,
201 DummySubsystem,
202 CollatorProtocolSubsystem,
203 DummySubsystem,
204 DummySubsystem,
205 ApprovalVotingParallelSubsystem,
206 GossipSupportSubsystem<AuthorityDiscoveryService>,
207 DisputeCoordinatorSubsystem,
208 DisputeDistributionSubsystem<AuthorityDiscoveryService>,
209 ChainSelectionSubsystem,
210 ProspectiveParachainsSubsystem,
211 >,
212 Error,
213>
214where
215 RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
216 Spawner: 'static + SpawnNamed + Clone + Unpin,
217{
218 use polkadot_node_subsystem_util::metrics::Metrics;
219
220 let metrics = <OverseerMetrics as MetricsTrait>::register(registry)?;
221 let notification_sinks = Arc::new(Mutex::new(HashMap::new()));
222
223 let spawner = SpawnGlue(spawner);
224
225 let network_bridge_metrics: NetworkBridgeMetrics = Metrics::register(registry)?;
226 let approval_voting_parallel_metrics: ApprovalVotingParallelMetrics =
227 Metrics::register(registry)?;
228 let builder = Overseer::builder()
229 .network_bridge_tx(NetworkBridgeTxSubsystem::new(
230 network_service.clone(),
231 authority_discovery_service.clone(),
232 network_bridge_metrics.clone(),
233 req_protocol_names.clone(),
234 peerset_protocol_names.clone(),
235 notification_sinks.clone(),
236 ))
237 .network_bridge_rx(NetworkBridgeRxSubsystem::new(
238 network_service.clone(),
239 authority_discovery_service.clone(),
240 Box::new(sync_service.clone()),
241 network_bridge_metrics,
242 peerset_protocol_names,
243 notification_services,
244 notification_sinks,
245 ))
246 .availability_distribution(AvailabilityDistributionSubsystem::new(
247 keystore.clone(),
248 IncomingRequestReceivers {
249 pov_req_receiver,
250 chunk_req_v1_receiver,
251 chunk_req_v2_receiver,
252 },
253 req_protocol_names.clone(),
254 Metrics::register(registry)?,
255 ))
256 .availability_recovery(AvailabilityRecoverySubsystem::for_validator(
257 fetch_chunks_threshold,
258 available_data_req_receiver,
259 &req_protocol_names,
260 Metrics::register(registry)?,
261 ))
262 .availability_store(AvailabilityStoreSubsystem::new(
263 parachains_db.clone(),
264 availability_config,
265 Box::new(sync_service.clone()),
266 Metrics::register(registry)?,
267 ))
268 .bitfield_distribution(BitfieldDistributionSubsystem::new(Metrics::register(registry)?))
269 .bitfield_signing(BitfieldSigningSubsystem::new(
270 keystore.clone(),
271 Metrics::register(registry)?,
272 ))
273 .candidate_backing(CandidateBackingSubsystem::new(
274 keystore.clone(),
275 Metrics::register(registry)?,
276 ))
277 .candidate_validation(CandidateValidationSubsystem::with_config(
278 candidate_validation_config,
279 keystore.clone(),
280 Metrics::register(registry)?, Metrics::register(registry)?, ))
283 .pvf_checker(PvfCheckerSubsystem::new(keystore.clone(), Metrics::register(registry)?))
284 .chain_api(ChainApiSubsystem::new(runtime_client.clone(), Metrics::register(registry)?))
285 .collation_generation(DummySubsystem)
286 .collator_protocol({
287 let side = match is_parachain_node {
288 IsParachainNode::Collator(_) | IsParachainNode::FullNode =>
289 return Err(Error::Overseer(SubsystemError::Context(
290 "build validator overseer for parachain node".to_owned(),
291 ))),
292 IsParachainNode::No => ProtocolSide::Validator {
293 keystore: keystore.clone(),
294 eviction_policy: Default::default(),
295 metrics: Metrics::register(registry)?,
296 },
297 };
298 CollatorProtocolSubsystem::new(side)
299 })
300 .provisioner(ProvisionerSubsystem::new(Metrics::register(registry)?))
301 .runtime_api(RuntimeApiSubsystem::new(
302 runtime_client.clone(),
303 Metrics::register(registry)?,
304 spawner.clone(),
305 ))
306 .statement_distribution(StatementDistributionSubsystem::new(
307 keystore.clone(),
308 candidate_req_v2_receiver,
309 Metrics::register(registry)?,
310 ))
311 .approval_distribution(DummySubsystem)
312 .approval_voting(DummySubsystem)
313 .approval_voting_parallel(ApprovalVotingParallelSubsystem::with_config(
314 approval_voting_config,
315 parachains_db.clone(),
316 keystore.clone(),
317 Box::new(sync_service.clone()),
318 approval_voting_parallel_metrics,
319 spawner.clone(),
320 overseer_message_channel_capacity_override,
321 ))
322 .gossip_support(GossipSupportSubsystem::new(
323 keystore.clone(),
324 authority_discovery_service.clone(),
325 Metrics::register(registry)?,
326 ))
327 .dispute_coordinator(DisputeCoordinatorSubsystem::new(
328 parachains_db.clone(),
329 dispute_coordinator_config,
330 keystore.clone(),
331 Metrics::register(registry)?,
332 ))
333 .dispute_distribution(DisputeDistributionSubsystem::new(
334 keystore.clone(),
335 dispute_req_receiver,
336 authority_discovery_service.clone(),
337 Metrics::register(registry)?,
338 ))
339 .chain_selection(ChainSelectionSubsystem::new(chain_selection_config, parachains_db))
340 .prospective_parachains(ProspectiveParachainsSubsystem::new(Metrics::register(registry)?))
341 .activation_external_listeners(Default::default())
342 .active_leaves(Default::default())
343 .supports_parachains(runtime_client)
344 .metrics(metrics)
345 .spawner(spawner);
346
347 let builder = if let Some(capacity) = overseer_message_channel_capacity_override {
348 builder.message_channel_capacity(capacity)
349 } else {
350 builder
351 };
352 Ok(builder)
353}
354
355pub fn collator_overseer_builder<Spawner, RuntimeClient>(
357 OverseerGenArgs {
358 runtime_client,
359 network_service,
360 sync_service,
361 authority_discovery_service,
362 collation_req_v1_receiver: _,
363 collation_req_v2_receiver,
364 available_data_req_receiver,
365 registry,
366 spawner,
367 is_parachain_node,
368 overseer_message_channel_capacity_override,
369 req_protocol_names,
370 peerset_protocol_names,
371 notification_services,
372 }: OverseerGenArgs<Spawner, RuntimeClient>,
373) -> Result<
374 InitializedOverseerBuilder<
375 SpawnGlue<Spawner>,
376 Arc<RuntimeClient>,
377 DummySubsystem,
378 DummySubsystem,
379 DummySubsystem,
380 DummySubsystem,
381 DummySubsystem,
382 AvailabilityRecoverySubsystem,
383 DummySubsystem,
384 DummySubsystem,
385 DummySubsystem,
386 RuntimeApiSubsystem<RuntimeClient>,
387 DummySubsystem,
388 NetworkBridgeRxSubsystem<
389 Arc<dyn sc_network::service::traits::NetworkService>,
390 AuthorityDiscoveryService,
391 >,
392 NetworkBridgeTxSubsystem<
393 Arc<dyn sc_network::service::traits::NetworkService>,
394 AuthorityDiscoveryService,
395 >,
396 ChainApiSubsystem<RuntimeClient>,
397 CollationGenerationSubsystem,
398 CollatorProtocolSubsystem,
399 DummySubsystem,
400 DummySubsystem,
401 DummySubsystem,
402 DummySubsystem,
403 DummySubsystem,
404 DummySubsystem,
405 DummySubsystem,
406 DummySubsystem,
407 >,
408 Error,
409>
410where
411 Spawner: 'static + SpawnNamed + Clone + Unpin,
412 RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
413{
414 use polkadot_node_subsystem_util::metrics::Metrics;
415
416 let notification_sinks = Arc::new(Mutex::new(HashMap::new()));
417
418 let spawner = SpawnGlue(spawner);
419
420 let network_bridge_metrics: NetworkBridgeMetrics = Metrics::register(registry)?;
421
422 let builder = Overseer::builder()
423 .network_bridge_tx(NetworkBridgeTxSubsystem::new(
424 network_service.clone(),
425 authority_discovery_service.clone(),
426 network_bridge_metrics.clone(),
427 req_protocol_names.clone(),
428 peerset_protocol_names.clone(),
429 notification_sinks.clone(),
430 ))
431 .network_bridge_rx(NetworkBridgeRxSubsystem::new(
432 network_service.clone(),
433 authority_discovery_service.clone(),
434 Box::new(sync_service.clone()),
435 network_bridge_metrics,
436 peerset_protocol_names,
437 notification_services,
438 notification_sinks,
439 ))
440 .availability_distribution(DummySubsystem)
441 .availability_recovery(AvailabilityRecoverySubsystem::for_collator(
442 None,
443 available_data_req_receiver,
444 &req_protocol_names,
445 Metrics::register(registry)?,
446 ))
447 .availability_store(DummySubsystem)
448 .bitfield_distribution(DummySubsystem)
449 .bitfield_signing(DummySubsystem)
450 .candidate_backing(DummySubsystem)
451 .candidate_validation(DummySubsystem)
452 .pvf_checker(DummySubsystem)
453 .chain_api(ChainApiSubsystem::new(runtime_client.clone(), Metrics::register(registry)?))
454 .collation_generation(CollationGenerationSubsystem::new(Metrics::register(registry)?))
455 .collator_protocol({
456 let side = match is_parachain_node {
457 IsParachainNode::No =>
458 return Err(Error::Overseer(SubsystemError::Context(
459 "build parachain node overseer for validator".to_owned(),
460 ))),
461 IsParachainNode::Collator(collator_pair) => ProtocolSide::Collator {
462 peer_id: network_service.local_peer_id(),
463 collator_pair,
464 request_receiver_v2: collation_req_v2_receiver,
465 metrics: Metrics::register(registry)?,
466 },
467 IsParachainNode::FullNode => ProtocolSide::None,
468 };
469 CollatorProtocolSubsystem::new(side)
470 })
471 .provisioner(DummySubsystem)
472 .runtime_api(RuntimeApiSubsystem::new(
473 runtime_client.clone(),
474 Metrics::register(registry)?,
475 spawner.clone(),
476 ))
477 .statement_distribution(DummySubsystem)
478 .approval_distribution(DummySubsystem)
479 .approval_voting(DummySubsystem)
480 .approval_voting_parallel(DummySubsystem)
481 .gossip_support(DummySubsystem)
482 .dispute_coordinator(DummySubsystem)
483 .dispute_distribution(DummySubsystem)
484 .chain_selection(DummySubsystem)
485 .prospective_parachains(DummySubsystem)
486 .activation_external_listeners(Default::default())
487 .active_leaves(Default::default())
488 .supports_parachains(runtime_client)
489 .metrics(Metrics::register(registry)?)
490 .spawner(spawner);
491
492 let builder = if let Some(capacity) = overseer_message_channel_capacity_override {
493 builder.message_channel_capacity(capacity)
494 } else {
495 builder
496 };
497 Ok(builder)
498}
499
500pub trait OverseerGen {
502 fn generate<Spawner, RuntimeClient>(
504 &self,
505 connector: OverseerConnector,
506 args: OverseerGenArgs<Spawner, RuntimeClient>,
507 ext_args: Option<ExtendedOverseerGenArgs>,
508 ) -> Result<(Overseer<SpawnGlue<Spawner>, Arc<RuntimeClient>>, OverseerHandle), Error>
509 where
510 RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
511 Spawner: 'static + SpawnNamed + Clone + Unpin;
512
513 }
517
518pub struct ValidatorOverseerGen;
520
521impl OverseerGen for ValidatorOverseerGen {
522 fn generate<Spawner, RuntimeClient>(
523 &self,
524 connector: OverseerConnector,
525 args: OverseerGenArgs<Spawner, RuntimeClient>,
526 ext_args: Option<ExtendedOverseerGenArgs>,
527 ) -> Result<(Overseer<SpawnGlue<Spawner>, Arc<RuntimeClient>>, OverseerHandle), Error>
528 where
529 RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
530 Spawner: 'static + SpawnNamed + Clone + Unpin,
531 {
532 let ext_args = ext_args.ok_or(Error::Overseer(SubsystemError::Context(
533 "create validator overseer as mandatory extended arguments were not provided"
534 .to_owned(),
535 )))?;
536 validator_overseer_builder(args, ext_args)?
537 .build_with_connector(connector)
538 .map_err(|e| e.into())
539 }
540}
541
542pub struct CollatorOverseerGen;
544
545impl OverseerGen for CollatorOverseerGen {
546 fn generate<Spawner, RuntimeClient>(
547 &self,
548 connector: OverseerConnector,
549 args: OverseerGenArgs<Spawner, RuntimeClient>,
550 _ext_args: Option<ExtendedOverseerGenArgs>,
551 ) -> Result<(Overseer<SpawnGlue<Spawner>, Arc<RuntimeClient>>, OverseerHandle), Error>
552 where
553 RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
554 Spawner: 'static + SpawnNamed + Clone + Unpin,
555 {
556 collator_overseer_builder(args)?
557 .build_with_connector(connector)
558 .map_err(|e| e.into())
559 }
560}