1use itertools::Itertools;
24use metrics::{Meters, MetricsWatcher};
25use polkadot_node_core_approval_voting::{Config, RealAssignmentCriteria};
26use polkadot_node_metrics::metered::{
27 self, channel, unbounded, MeteredReceiver, MeteredSender, UnboundedMeteredReceiver,
28 UnboundedMeteredSender,
29};
30
31use polkadot_node_primitives::{
32 approval::time::{Clock, SystemClock},
33 DISPUTE_WINDOW,
34};
35use polkadot_node_subsystem::{
36 messages::{ApprovalDistributionMessage, ApprovalVotingMessage, ApprovalVotingParallelMessage},
37 overseer, FromOrchestra, SpawnedSubsystem, SubsystemError, SubsystemResult,
38};
39use polkadot_node_subsystem_util::{
40 self,
41 database::Database,
42 runtime::{Config as RuntimeInfoConfig, RuntimeInfo},
43};
44use polkadot_overseer::{OverseerSignal, Priority, SubsystemSender, TimeoutExt};
45use polkadot_primitives::{CandidateIndex, Hash, ValidatorIndex, ValidatorSignature};
46use rand::SeedableRng;
47
48use sc_keystore::LocalKeystore;
49use sp_consensus::SyncOracle;
50
51use futures::{channel::oneshot, prelude::*, StreamExt};
52pub use metrics::Metrics;
53use polkadot_node_core_approval_voting::{
54 approval_db::common::Config as DatabaseConfig, ApprovalVotingWorkProvider,
55};
56use std::{
57 collections::{HashMap, HashSet},
58 fmt::Debug,
59 sync::Arc,
60 time::Duration,
61};
62use stream::{select_with_strategy, PollNext, SelectWithStrategy};
63pub mod metrics;
64
65#[cfg(test)]
66mod tests;
67
68pub(crate) const LOG_TARGET: &str = "parachain::approval-voting-parallel";
69const WAIT_FOR_SIGS_GATHER_TIMEOUT: Duration = Duration::from_millis(2000);
72
73pub const APPROVAL_DISTRIBUTION_WORKER_COUNT: usize = 4;
75
76pub const DEFAULT_WORKERS_CHANNEL_SIZE: usize = 64000 / APPROVAL_DISTRIBUTION_WORKER_COUNT;
79
80fn prio_right<'a>(_val: &'a mut ()) -> PollNext {
81 PollNext::Right
82}
83
84pub struct ApprovalVotingParallelSubsystem {
86 keystore: Arc<LocalKeystore>,
90 db_config: DatabaseConfig,
91 slot_duration_millis: u64,
92 db: Arc<dyn Database>,
93 sync_oracle: Box<dyn SyncOracle + Send>,
94 metrics: Metrics,
95 spawner: Arc<dyn overseer::gen::Spawner + 'static>,
96 clock: Arc<dyn Clock + Send + Sync>,
97 overseer_message_channel_capacity_override: Option<usize>,
98}
99
100impl ApprovalVotingParallelSubsystem {
101 pub fn with_config(
103 config: Config,
104 db: Arc<dyn Database>,
105 keystore: Arc<LocalKeystore>,
106 sync_oracle: Box<dyn SyncOracle + Send>,
107 metrics: Metrics,
108 spawner: impl overseer::gen::Spawner + 'static + Clone,
109 overseer_message_channel_capacity_override: Option<usize>,
110 ) -> Self {
111 ApprovalVotingParallelSubsystem::with_config_and_clock(
112 config,
113 db,
114 keystore,
115 sync_oracle,
116 metrics,
117 Arc::new(SystemClock {}),
118 spawner,
119 overseer_message_channel_capacity_override,
120 )
121 }
122
123 pub fn with_config_and_clock(
125 config: Config,
126 db: Arc<dyn Database>,
127 keystore: Arc<LocalKeystore>,
128 sync_oracle: Box<dyn SyncOracle + Send>,
129 metrics: Metrics,
130 clock: Arc<dyn Clock + Send + Sync>,
131 spawner: impl overseer::gen::Spawner + 'static,
132 overseer_message_channel_capacity_override: Option<usize>,
133 ) -> Self {
134 ApprovalVotingParallelSubsystem {
135 keystore,
136 slot_duration_millis: config.slot_duration_millis,
137 db,
138 db_config: DatabaseConfig { col_approval_data: config.col_approval_data },
139 sync_oracle,
140 metrics,
141 spawner: Arc::new(spawner),
142 clock,
143 overseer_message_channel_capacity_override,
144 }
145 }
146
147 fn workers_channel_size(&self) -> usize {
149 self.overseer_message_channel_capacity_override
150 .unwrap_or(DEFAULT_WORKERS_CHANNEL_SIZE)
151 }
152}
153
154#[overseer::subsystem(ApprovalVotingParallel, error = SubsystemError, prefix = self::overseer)]
155impl<Context: Send> ApprovalVotingParallelSubsystem {
156 fn start(self, ctx: Context) -> SpawnedSubsystem {
157 let future = run::<Context>(ctx, self)
158 .map_err(|e| SubsystemError::with_origin("approval-voting-parallel", e))
159 .boxed();
160
161 SpawnedSubsystem { name: "approval-voting-parallel-subsystem", future }
162 }
163}
164
165#[overseer::contextbounds(ApprovalVotingParallel, prefix = self::overseer)]
170async fn start_workers<Context>(
171 ctx: &mut Context,
172 subsystem: ApprovalVotingParallelSubsystem,
173 metrics_watcher: &mut MetricsWatcher,
174) -> SubsystemResult<(ToWorker<ApprovalVotingMessage>, Vec<ToWorker<ApprovalDistributionMessage>>)>
175where
176{
177 gum::info!(target: LOG_TARGET, "Starting approval distribution workers");
178
179 let (to_approval_voting_worker, approval_voting_work_provider) = build_worker_handles(
181 "approval-voting-parallel-db".into(),
182 subsystem.workers_channel_size(),
183 metrics_watcher,
184 prio_right,
185 );
186 let mut to_approval_distribution_workers = Vec::new();
187 let slot_duration_millis = subsystem.slot_duration_millis;
188
189 for i in 0..APPROVAL_DISTRIBUTION_WORKER_COUNT {
190 let mut network_sender = ctx.sender().clone();
191 let mut runtime_api_sender = ctx.sender().clone();
192 let mut approval_distribution_to_approval_voting = to_approval_voting_worker.clone();
193
194 let approval_distr_instance =
195 polkadot_approval_distribution::ApprovalDistribution::new_with_clock(
196 subsystem.metrics.approval_distribution_metrics(),
197 subsystem.slot_duration_millis,
198 subsystem.clock.clone(),
199 Arc::new(RealAssignmentCriteria {}),
200 );
201 let task_name = format!("approval-voting-parallel-{}", i);
202 let (to_approval_distribution_worker, mut approval_distribution_work_provider) =
203 build_worker_handles(
204 task_name.clone(),
205 subsystem.workers_channel_size(),
206 metrics_watcher,
207 prio_right,
208 );
209
210 metrics_watcher.watch(task_name.clone(), to_approval_distribution_worker.meter());
211
212 subsystem.spawner.spawn_blocking(
213 task_name.leak(),
214 Some("approval-voting-parallel"),
215 Box::pin(async move {
216 let mut state =
217 polkadot_approval_distribution::State::with_config(slot_duration_millis);
218 let mut rng = rand::rngs::StdRng::from_entropy();
219 let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig {
220 keystore: None,
221 session_cache_lru_size: DISPUTE_WINDOW.get(),
222 });
223
224 loop {
225 let message = match approval_distribution_work_provider.next().await {
226 Some(message) => message,
227 None => {
228 gum::info!(
229 target: LOG_TARGET,
230 "Approval distribution stream finished, most likely shutting down",
231 );
232 break;
233 },
234 };
235 if approval_distr_instance
236 .handle_from_orchestra(
237 message,
238 &mut approval_distribution_to_approval_voting,
239 &mut network_sender,
240 &mut runtime_api_sender,
241 &mut state,
242 &mut rng,
243 &mut session_info_provider,
244 )
245 .await
246 {
247 gum::info!(
248 target: LOG_TARGET,
249 "Approval distribution worker {}, exiting because of shutdown", i
250 );
251 };
252 }
253 }),
254 );
255 to_approval_distribution_workers.push(to_approval_distribution_worker);
256 }
257
258 gum::info!(target: LOG_TARGET, "Starting approval voting workers");
259
260 let sender = ctx.sender().clone();
261 let to_approval_distribution = ApprovalVotingToApprovalDistribution(sender.clone());
262 polkadot_node_core_approval_voting::start_approval_worker(
263 approval_voting_work_provider,
264 sender.clone(),
265 to_approval_distribution,
266 polkadot_node_core_approval_voting::Config {
267 slot_duration_millis: subsystem.slot_duration_millis,
268 col_approval_data: subsystem.db_config.col_approval_data,
269 },
270 subsystem.db.clone(),
271 subsystem.keystore.clone(),
272 subsystem.sync_oracle,
273 subsystem.metrics.approval_voting_metrics(),
274 subsystem.spawner.clone(),
275 "approval-voting-parallel-db",
276 "approval-voting-parallel",
277 subsystem.clock.clone(),
278 )
279 .await?;
280
281 Ok((to_approval_voting_worker, to_approval_distribution_workers))
282}
283
284#[overseer::contextbounds(ApprovalVotingParallel, prefix = self::overseer)]
286async fn run<Context>(
287 mut ctx: Context,
288 subsystem: ApprovalVotingParallelSubsystem,
289) -> SubsystemResult<()> {
290 let mut metrics_watcher = MetricsWatcher::new(subsystem.metrics.clone());
291 gum::info!(
292 target: LOG_TARGET,
293 "Starting workers"
294 );
295
296 let (to_approval_voting_worker, to_approval_distribution_workers) =
297 start_workers(&mut ctx, subsystem, &mut metrics_watcher).await?;
298
299 gum::info!(
300 target: LOG_TARGET,
301 "Starting main subsystem loop"
302 );
303
304 run_main_loop(ctx, to_approval_voting_worker, to_approval_distribution_workers, metrics_watcher)
305 .await
306}
307
308#[overseer::contextbounds(ApprovalVotingParallel, prefix = self::overseer)]
313async fn run_main_loop<Context>(
314 mut ctx: Context,
315 mut to_approval_voting_worker: ToWorker<ApprovalVotingMessage>,
316 mut to_approval_distribution_workers: Vec<ToWorker<ApprovalDistributionMessage>>,
317 metrics_watcher: MetricsWatcher,
318) -> SubsystemResult<()> {
319 loop {
320 futures::select! {
321 next_msg = ctx.recv().fuse() => {
322 let next_msg = match next_msg {
323 Ok(msg) => msg,
324 Err(err) => {
325 gum::info!(target: LOG_TARGET, ?err, "Approval voting parallel subsystem received an error");
326 return Err(err);
327 }
328 };
329
330 match next_msg {
331 FromOrchestra::Signal(msg) => {
332 if matches!(msg, OverseerSignal::ActiveLeaves(_)) {
333 metrics_watcher.collect_metrics();
334 }
335
336 for worker in to_approval_distribution_workers.iter_mut() {
337 worker
338 .send_signal(msg.clone()).await?;
339 }
340
341 to_approval_voting_worker.send_signal(msg.clone()).await?;
342 if matches!(msg, OverseerSignal::Conclude) {
343 break;
344 }
345 },
346 FromOrchestra::Communication { msg } => match msg {
347 ApprovalVotingParallelMessage::ApprovedAncestor(_, _,_) |
349 ApprovalVotingParallelMessage::GetApprovalSignaturesForCandidate(_, _) => {
350 to_approval_voting_worker.send_message_with_priority::<overseer::HighPriority>(
351 msg.try_into().expect(
352 "Message is one of ApprovedAncestor, GetApprovalSignaturesForCandidate
353 and that can be safely converted to ApprovalVotingMessage; qed"
354 )
355 ).await;
356 },
357 ApprovalVotingParallelMessage::NewBlocks(msg) => {
360 for worker in to_approval_distribution_workers.iter_mut() {
361 worker
362 .send_message(
363 ApprovalDistributionMessage::NewBlocks(msg.clone()),
364 )
365 .await;
366 }
367 },
368 ApprovalVotingParallelMessage::DistributeAssignment(assignment, claimed) => {
369 let worker = assigned_worker_for_validator(assignment.validator, &mut to_approval_distribution_workers);
370 worker
371 .send_message(
372 ApprovalDistributionMessage::DistributeAssignment(assignment, claimed)
373 )
374 .await;
375
376 },
377 ApprovalVotingParallelMessage::DistributeApproval(vote) => {
378 let worker = assigned_worker_for_validator(vote.validator, &mut to_approval_distribution_workers);
379 worker
380 .send_message(
381 ApprovalDistributionMessage::DistributeApproval(vote)
382 ).await;
383
384 },
385 ApprovalVotingParallelMessage::NetworkBridgeUpdate(msg) => {
386 if let polkadot_node_subsystem::messages::NetworkBridgeEvent::PeerMessage(
387 peer_id,
388 msg,
389 ) = msg
390 {
391 let (all_msgs_from_same_validator, messages_split_by_validator) = validator_index_for_msg(msg);
392
393 for (validator_index, msg) in all_msgs_from_same_validator.into_iter().chain(messages_split_by_validator.into_iter().flatten()) {
394 let worker = assigned_worker_for_validator(validator_index, &mut to_approval_distribution_workers);
395
396 worker
397 .send_message(
398 ApprovalDistributionMessage::NetworkBridgeUpdate(
399 polkadot_node_subsystem::messages::NetworkBridgeEvent::PeerMessage(
400 peer_id, msg,
401 ),
402 ),
403 ).await;
404 }
405 } else {
406 for worker in to_approval_distribution_workers.iter_mut() {
407 worker
408 .send_message_with_priority::<overseer::HighPriority>(
409 ApprovalDistributionMessage::NetworkBridgeUpdate(msg.clone()),
410 ).await;
411 }
412 }
413 },
414 ApprovalVotingParallelMessage::GetApprovalSignatures(indices, tx) => {
415 handle_get_approval_signatures(&mut ctx, &mut to_approval_distribution_workers, indices, tx).await;
416 },
417 ApprovalVotingParallelMessage::ApprovalCheckingLagUpdate(lag) => {
418 for worker in to_approval_distribution_workers.iter_mut() {
419 worker
420 .send_message(
421 ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag)
422 ).await;
423 }
424 },
425 },
426 };
427
428 },
429 };
430 }
431 Ok(())
432}
433
434#[overseer::contextbounds(ApprovalVotingParallel, prefix = self::overseer)]
437async fn handle_get_approval_signatures<Context>(
438 ctx: &mut Context,
439 to_approval_distribution_workers: &mut Vec<ToWorker<ApprovalDistributionMessage>>,
440 requested_candidates: HashSet<(Hash, CandidateIndex)>,
441 result_channel: oneshot::Sender<
442 HashMap<ValidatorIndex, (Hash, Vec<CandidateIndex>, ValidatorSignature)>,
443 >,
444) {
445 let mut sigs = HashMap::new();
446 let mut signatures_channels = Vec::new();
447 for worker in to_approval_distribution_workers.iter_mut() {
448 let (tx, rx) = oneshot::channel();
449 worker.send_unbounded_message(ApprovalDistributionMessage::GetApprovalSignatures(
450 requested_candidates.clone(),
451 tx,
452 ));
453 signatures_channels.push(rx);
454 }
455
456 let gather_signatures = async move {
457 let Some(results) = futures::future::join_all(signatures_channels)
458 .timeout(WAIT_FOR_SIGS_GATHER_TIMEOUT)
459 .await
460 else {
461 gum::warn!(
462 target: LOG_TARGET,
463 "Waiting for approval signatures timed out - dead lock?"
464 );
465 return;
466 };
467
468 for result in results {
469 let worker_sigs = match result {
470 Ok(sigs) => sigs,
471 Err(_) => {
472 gum::error!(
473 target: LOG_TARGET,
474 "Getting approval signatures failed, oneshot got closed"
475 );
476 continue;
477 },
478 };
479 sigs.extend(worker_sigs);
480 }
481
482 if let Err(_) = result_channel.send(sigs) {
483 gum::debug!(
484 target: LOG_TARGET,
485 "Sending back approval signatures failed, oneshot got closed"
486 );
487 }
488 };
489
490 if let Err(err) = ctx.spawn("approval-voting-gather-signatures", Box::pin(gather_signatures)) {
491 gum::warn!(target: LOG_TARGET, "Failed to spawn gather signatures task: {:?}", err);
492 }
493}
494
495fn assigned_worker_for_validator(
497 validator: ValidatorIndex,
498 to_approval_distribution_workers: &mut Vec<ToWorker<ApprovalDistributionMessage>>,
499) -> &mut ToWorker<ApprovalDistributionMessage> {
500 let worker_index = validator.0 as usize % to_approval_distribution_workers.len();
501 to_approval_distribution_workers
502 .get_mut(worker_index)
503 .expect("Worker index is obtained modulo len; qed")
504}
505
506fn validator_index_for_msg(
515 msg: polkadot_node_network_protocol::ApprovalDistributionMessage,
516) -> (
517 Option<(ValidatorIndex, polkadot_node_network_protocol::ApprovalDistributionMessage)>,
518 Option<Vec<(ValidatorIndex, polkadot_node_network_protocol::ApprovalDistributionMessage)>>,
519) {
520 match msg {
521 polkadot_node_network_protocol::ValidationProtocols::V3(ref message) => match message {
522 polkadot_node_network_protocol::v3::ApprovalDistributionMessage::Assignments(msgs) =>
523 if let Ok(validator) = msgs.iter().map(|(msg, _)| msg.validator).all_equal_value() {
524 (Some((validator, msg)), None)
525 } else {
526 let split = msgs
527 .iter()
528 .map(|(msg, claimed_candidates)| {
529 (
530 msg.validator,
531 polkadot_node_network_protocol::ValidationProtocols::V3(
532 polkadot_node_network_protocol::v3::ApprovalDistributionMessage::Assignments(
533 vec![(msg.clone(), claimed_candidates.clone())]
534 ),
535 ),
536 )
537 })
538 .collect_vec();
539 (None, Some(split))
540 },
541 polkadot_node_network_protocol::v3::ApprovalDistributionMessage::Approvals(msgs) =>
542 if let Ok(validator) = msgs.iter().map(|msg| msg.validator).all_equal_value() {
543 (Some((validator, msg)), None)
544 } else {
545 let split = msgs
546 .iter()
547 .map(|vote| {
548 (
549 vote.validator,
550 polkadot_node_network_protocol::ValidationProtocols::V3(
551 polkadot_node_network_protocol::v3::ApprovalDistributionMessage::Approvals(
552 vec![vote.clone()]
553 ),
554 ),
555 )
556 })
557 .collect_vec();
558 (None, Some(split))
559 },
560 },
561 }
562}
563
564type WorkProvider<M, Clos, State> = WorkProviderImpl<
569 SelectWithStrategy<
570 MeteredReceiver<FromOrchestra<M>>,
571 UnboundedMeteredReceiver<FromOrchestra<M>>,
572 Clos,
573 State,
574 >,
575>;
576
577pub struct WorkProviderImpl<T>(T);
578
579impl<T, M> Stream for WorkProviderImpl<T>
580where
581 T: Stream<Item = FromOrchestra<M>> + Unpin + Send,
582{
583 type Item = FromOrchestra<M>;
584
585 fn poll_next(
586 mut self: std::pin::Pin<&mut Self>,
587 cx: &mut std::task::Context<'_>,
588 ) -> std::task::Poll<Option<Self::Item>> {
589 self.0.poll_next_unpin(cx)
590 }
591}
592
593#[async_trait::async_trait]
594impl<T> ApprovalVotingWorkProvider for WorkProviderImpl<T>
595where
596 T: Stream<Item = FromOrchestra<ApprovalVotingMessage>> + Unpin + Send,
597{
598 async fn recv(&mut self) -> SubsystemResult<FromOrchestra<ApprovalVotingMessage>> {
599 self.0.next().await.ok_or(SubsystemError::Context(
600 "ApprovalVotingWorkProviderImpl: Channel closed".to_string(),
601 ))
602 }
603}
604
605impl<M, Clos, State> WorkProvider<M, Clos, State>
606where
607 M: Send + Sync + 'static,
608 Clos: FnMut(&mut State) -> PollNext,
609 State: Default,
610{
611 fn from_rx_worker(rx: RxWorker<M>, prio: Clos) -> Self {
613 let prioritised = select_with_strategy(rx.0, rx.1, prio);
614 WorkProviderImpl(prioritised)
615 }
616}
617
618pub struct ToWorker<T: Send + Sync + 'static>(
624 MeteredSender<FromOrchestra<T>>,
625 UnboundedMeteredSender<FromOrchestra<T>>,
626);
627
628impl<T: Send + Sync + 'static> Clone for ToWorker<T> {
629 fn clone(&self) -> Self {
630 Self(self.0.clone(), self.1.clone())
631 }
632}
633
634impl<T: Send + Sync + 'static> ToWorker<T> {
635 async fn send_signal(&mut self, signal: OverseerSignal) -> Result<(), SubsystemError> {
636 self.1
637 .unbounded_send(FromOrchestra::Signal(signal))
638 .map_err(|err| SubsystemError::QueueError(err.into_send_error()))
639 }
640
641 fn meter(&self) -> Meters {
642 Meters::new(self.0.meter(), self.1.meter())
643 }
644}
645
646impl<T: Send + Sync + 'static + Debug> overseer::SubsystemSender<T> for ToWorker<T> {
647 fn send_message<'life0, 'async_trait>(
648 &'life0 mut self,
649 msg: T,
650 ) -> ::core::pin::Pin<
651 Box<dyn ::core::future::Future<Output = ()> + ::core::marker::Send + 'async_trait>,
652 >
653 where
654 'life0: 'async_trait,
655 Self: 'async_trait,
656 {
657 async {
658 if let Err(err) =
659 self.0.send(polkadot_overseer::FromOrchestra::Communication { msg }).await
660 {
661 gum::error!(
662 target: LOG_TARGET,
663 "Failed to send message to approval voting worker: {:?}, subsystem is probably shutting down.",
664 err
665 );
666 }
667 }
668 .boxed()
669 }
670
671 fn try_send_message(&mut self, msg: T) -> Result<(), metered::TrySendError<T>> {
672 self.0
673 .try_send(polkadot_overseer::FromOrchestra::Communication { msg })
674 .map_err(|result| {
675 let is_full = result.is_full();
676 let msg = match result.into_inner() {
677 polkadot_overseer::FromOrchestra::Signal(_) =>
678 panic!("Cannot happen variant is never built"),
679 polkadot_overseer::FromOrchestra::Communication { msg } => msg,
680 };
681 if is_full {
682 metered::TrySendError::Full(msg)
683 } else {
684 metered::TrySendError::Closed(msg)
685 }
686 })
687 }
688
689 fn send_messages<'life0, 'async_trait, I>(
690 &'life0 mut self,
691 msgs: I,
692 ) -> ::core::pin::Pin<
693 Box<dyn ::core::future::Future<Output = ()> + ::core::marker::Send + 'async_trait>,
694 >
695 where
696 I: IntoIterator<Item = T> + Send,
697 I::IntoIter: Send,
698 I: 'async_trait,
699 'life0: 'async_trait,
700 Self: 'async_trait,
701 {
702 async {
703 for msg in msgs {
704 self.send_message(msg).await;
705 }
706 }
707 .boxed()
708 }
709
710 fn send_unbounded_message(&mut self, msg: T) {
711 if let Err(err) =
712 self.1.unbounded_send(polkadot_overseer::FromOrchestra::Communication { msg })
713 {
714 gum::error!(
715 target: LOG_TARGET,
716 "Failed to send unbounded message to approval voting worker: {:?}, subsystem is probably shutting down.",
717 err
718 );
719 }
720 }
721
722 fn send_message_with_priority<'life0, 'async_trait, P>(
723 &'life0 mut self,
724 msg: T,
725 ) -> ::core::pin::Pin<
726 Box<dyn ::core::future::Future<Output = ()> + ::core::marker::Send + 'async_trait>,
727 >
728 where
729 P: 'async_trait + Priority,
730 'life0: 'async_trait,
731 Self: 'async_trait,
732 {
733 match P::priority() {
734 polkadot_overseer::PriorityLevel::Normal => self.send_message(msg),
735 polkadot_overseer::PriorityLevel::High =>
736 async { self.send_unbounded_message(msg) }.boxed(),
737 }
738 }
739
740 fn try_send_message_with_priority<P: Priority>(
741 &mut self,
742 msg: T,
743 ) -> Result<(), metered::TrySendError<T>> {
744 match P::priority() {
745 polkadot_overseer::PriorityLevel::Normal => self.try_send_message(msg),
746 polkadot_overseer::PriorityLevel::High => Ok(self.send_unbounded_message(msg)),
747 }
748 }
749}
750
751pub struct RxWorker<T: Send + Sync + 'static>(
753 MeteredReceiver<FromOrchestra<T>>,
754 UnboundedMeteredReceiver<FromOrchestra<T>>,
755);
756
757fn build_channels<T: Send + Sync + 'static>(
760 channel_name: String,
761 channel_size: usize,
762 metrics_watcher: &mut MetricsWatcher,
763) -> (ToWorker<T>, RxWorker<T>) {
764 let (tx_work, rx_work) = channel::<FromOrchestra<T>>(channel_size);
765 let (tx_work_unbounded, rx_work_unbounded) = unbounded::<FromOrchestra<T>>();
766 let to_worker = ToWorker(tx_work, tx_work_unbounded);
767
768 metrics_watcher.watch(channel_name, to_worker.meter());
769
770 (to_worker, RxWorker(rx_work, rx_work_unbounded))
771}
772
773fn build_worker_handles<M, Clos, State>(
778 channel_name: String,
779 channel_size: usize,
780 metrics_watcher: &mut MetricsWatcher,
781 prio_right: Clos,
782) -> (ToWorker<M>, WorkProvider<M, Clos, State>)
783where
784 M: Send + Sync + 'static,
785 Clos: FnMut(&mut State) -> PollNext,
786 State: Default,
787{
788 let (to_worker, rx_worker) = build_channels(channel_name, channel_size, metrics_watcher);
789 (to_worker, WorkProviderImpl::from_rx_worker(rx_worker, prio_right))
790}
791
792#[derive(Clone)]
795pub struct ApprovalVotingToApprovalDistribution<S: SubsystemSender<ApprovalVotingParallelMessage>>(
796 S,
797);
798
799impl<S: SubsystemSender<ApprovalVotingParallelMessage>>
800 overseer::SubsystemSender<ApprovalDistributionMessage>
801 for ApprovalVotingToApprovalDistribution<S>
802{
803 #[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
804 fn send_message<'life0, 'async_trait>(
805 &'life0 mut self,
806 msg: ApprovalDistributionMessage,
807 ) -> ::core::pin::Pin<
808 Box<dyn ::core::future::Future<Output = ()> + ::core::marker::Send + 'async_trait>,
809 >
810 where
811 'life0: 'async_trait,
812 Self: 'async_trait,
813 {
814 self.0.send_message(msg.into())
815 }
816
817 fn try_send_message(
818 &mut self,
819 msg: ApprovalDistributionMessage,
820 ) -> Result<(), metered::TrySendError<ApprovalDistributionMessage>> {
821 self.0.try_send_message(msg.into()).map_err(|err| match err {
822 metered::TrySendError::Closed(msg) =>
824 metered::TrySendError::Closed(msg.try_into().unwrap()),
825 metered::TrySendError::Full(msg) =>
826 metered::TrySendError::Full(msg.try_into().unwrap()),
827 })
828 }
829
830 #[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
831 fn send_messages<'life0, 'async_trait, I>(
832 &'life0 mut self,
833 msgs: I,
834 ) -> ::core::pin::Pin<
835 Box<dyn ::core::future::Future<Output = ()> + ::core::marker::Send + 'async_trait>,
836 >
837 where
838 I: IntoIterator<Item = ApprovalDistributionMessage> + Send,
839 I::IntoIter: Send,
840 I: 'async_trait,
841 'life0: 'async_trait,
842 Self: 'async_trait,
843 {
844 self.0.send_messages(msgs.into_iter().map(|msg| msg.into()))
845 }
846
847 fn send_unbounded_message(&mut self, msg: ApprovalDistributionMessage) {
848 self.0.send_unbounded_message(msg.into())
849 }
850
851 fn send_message_with_priority<'life0, 'async_trait, P>(
852 &'life0 mut self,
853 msg: ApprovalDistributionMessage,
854 ) -> ::core::pin::Pin<
855 Box<dyn ::core::future::Future<Output = ()> + ::core::marker::Send + 'async_trait>,
856 >
857 where
858 P: 'async_trait + Priority,
859 'life0: 'async_trait,
860 Self: 'async_trait,
861 {
862 self.0.send_message_with_priority::<P>(msg.into())
863 }
864
865 fn try_send_message_with_priority<P: Priority>(
866 &mut self,
867 msg: ApprovalDistributionMessage,
868 ) -> Result<(), metered::TrySendError<ApprovalDistributionMessage>> {
869 self.0.try_send_message_with_priority::<P>(msg.into()).map_err(|err| match err {
870 metered::TrySendError::Closed(msg) =>
872 metered::TrySendError::Closed(msg.try_into().unwrap()),
873 metered::TrySendError::Full(msg) =>
874 metered::TrySendError::Full(msg.try_into().unwrap()),
875 })
876 }
877}