referrerpolicy=no-referrer-when-downgrade

polkadot_node_core_approval_voting_parallel/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! The Approval Voting Parallel Subsystem.
18//!
19//! This subsystem is responsible for orchestrating the work done by
20//! approval-voting and approval-distribution subsystem, so they can
21//! do their work in parallel, rather than serially, when they are run
22//! as independent subsystems.
23use itertools::Itertools;
24use metrics::{Meters, MetricsWatcher};
25use polkadot_node_core_approval_voting::{Config, RealAssignmentCriteria};
26use polkadot_node_metrics::metered::{
27	self, channel, unbounded, MeteredReceiver, MeteredSender, UnboundedMeteredReceiver,
28	UnboundedMeteredSender,
29};
30
31use polkadot_node_primitives::{
32	approval::time::{Clock, SystemClock},
33	DISPUTE_WINDOW,
34};
35use polkadot_node_subsystem::{
36	messages::{ApprovalDistributionMessage, ApprovalVotingMessage, ApprovalVotingParallelMessage},
37	overseer, FromOrchestra, SpawnedSubsystem, SubsystemError, SubsystemResult,
38};
39use polkadot_node_subsystem_util::{
40	self,
41	database::Database,
42	runtime::{Config as RuntimeInfoConfig, RuntimeInfo},
43};
44use polkadot_overseer::{OverseerSignal, Priority, SubsystemSender, TimeoutExt};
45use polkadot_primitives::{CandidateIndex, Hash, ValidatorIndex, ValidatorSignature};
46use rand::SeedableRng;
47
48use sc_keystore::LocalKeystore;
49use sp_consensus::SyncOracle;
50
51use futures::{channel::oneshot, prelude::*, StreamExt};
52pub use metrics::Metrics;
53use polkadot_node_core_approval_voting::{
54	approval_db::common::Config as DatabaseConfig, ApprovalVotingWorkProvider,
55};
56use std::{
57	collections::{HashMap, HashSet},
58	fmt::Debug,
59	sync::Arc,
60	time::Duration,
61};
62use stream::{select_with_strategy, PollNext, SelectWithStrategy};
63pub mod metrics;
64
65#[cfg(test)]
66mod tests;
67
68pub(crate) const LOG_TARGET: &str = "parachain::approval-voting-parallel";
69// Value rather arbitrarily: Should not be hit in practice, it exists to more easily diagnose dead
70// lock issues for example.
71const WAIT_FOR_SIGS_GATHER_TIMEOUT: Duration = Duration::from_millis(2000);
72
73/// The number of workers used for running the approval-distribution logic.
74pub const APPROVAL_DISTRIBUTION_WORKER_COUNT: usize = 4;
75
76/// The default channel size for the workers, can be overridden by the user through
77/// `overseer_channel_capacity_override`
78pub const DEFAULT_WORKERS_CHANNEL_SIZE: usize = 64000 / APPROVAL_DISTRIBUTION_WORKER_COUNT;
79
80fn prio_right<'a>(_val: &'a mut ()) -> PollNext {
81	PollNext::Right
82}
83
84/// The approval voting parallel subsystem.
85pub struct ApprovalVotingParallelSubsystem {
86	/// `LocalKeystore` is needed for assignment keys, but not necessarily approval keys.
87	///
88	/// We do a lot of VRF signing and need the keys to have low latency.
89	keystore: Arc<LocalKeystore>,
90	db_config: DatabaseConfig,
91	slot_duration_millis: u64,
92	db: Arc<dyn Database>,
93	sync_oracle: Box<dyn SyncOracle + Send>,
94	metrics: Metrics,
95	spawner: Arc<dyn overseer::gen::Spawner + 'static>,
96	clock: Arc<dyn Clock + Send + Sync>,
97	overseer_message_channel_capacity_override: Option<usize>,
98}
99
100impl ApprovalVotingParallelSubsystem {
101	/// Create a new approval voting subsystem with the given keystore, config, and database.
102	pub fn with_config(
103		config: Config,
104		db: Arc<dyn Database>,
105		keystore: Arc<LocalKeystore>,
106		sync_oracle: Box<dyn SyncOracle + Send>,
107		metrics: Metrics,
108		spawner: impl overseer::gen::Spawner + 'static + Clone,
109		overseer_message_channel_capacity_override: Option<usize>,
110	) -> Self {
111		ApprovalVotingParallelSubsystem::with_config_and_clock(
112			config,
113			db,
114			keystore,
115			sync_oracle,
116			metrics,
117			Arc::new(SystemClock {}),
118			spawner,
119			overseer_message_channel_capacity_override,
120		)
121	}
122
123	/// Create a new approval voting subsystem with the given keystore, config, clock, and database.
124	pub fn with_config_and_clock(
125		config: Config,
126		db: Arc<dyn Database>,
127		keystore: Arc<LocalKeystore>,
128		sync_oracle: Box<dyn SyncOracle + Send>,
129		metrics: Metrics,
130		clock: Arc<dyn Clock + Send + Sync>,
131		spawner: impl overseer::gen::Spawner + 'static,
132		overseer_message_channel_capacity_override: Option<usize>,
133	) -> Self {
134		ApprovalVotingParallelSubsystem {
135			keystore,
136			slot_duration_millis: config.slot_duration_millis,
137			db,
138			db_config: DatabaseConfig { col_approval_data: config.col_approval_data },
139			sync_oracle,
140			metrics,
141			spawner: Arc::new(spawner),
142			clock,
143			overseer_message_channel_capacity_override,
144		}
145	}
146
147	/// The size of the channel used for the workers.
148	fn workers_channel_size(&self) -> usize {
149		self.overseer_message_channel_capacity_override
150			.unwrap_or(DEFAULT_WORKERS_CHANNEL_SIZE)
151	}
152}
153
154#[overseer::subsystem(ApprovalVotingParallel, error = SubsystemError, prefix = self::overseer)]
155impl<Context: Send> ApprovalVotingParallelSubsystem {
156	fn start(self, ctx: Context) -> SpawnedSubsystem {
157		let future = run::<Context>(ctx, self)
158			.map_err(|e| SubsystemError::with_origin("approval-voting-parallel", e))
159			.boxed();
160
161		SpawnedSubsystem { name: "approval-voting-parallel-subsystem", future }
162	}
163}
164
165// It starts worker for the approval voting subsystem and the `APPROVAL_DISTRIBUTION_WORKER_COUNT`
166// workers for the approval distribution subsystem.
167//
168// It returns handles that can be used to send messages to the workers.
169#[overseer::contextbounds(ApprovalVotingParallel, prefix = self::overseer)]
170async fn start_workers<Context>(
171	ctx: &mut Context,
172	subsystem: ApprovalVotingParallelSubsystem,
173	metrics_watcher: &mut MetricsWatcher,
174) -> SubsystemResult<(ToWorker<ApprovalVotingMessage>, Vec<ToWorker<ApprovalDistributionMessage>>)>
175where
176{
177	gum::info!(target: LOG_TARGET, "Starting approval distribution workers");
178
179	// Build approval voting handles.
180	let (to_approval_voting_worker, approval_voting_work_provider) = build_worker_handles(
181		"approval-voting-parallel-db".into(),
182		subsystem.workers_channel_size(),
183		metrics_watcher,
184		prio_right,
185	);
186	let mut to_approval_distribution_workers = Vec::new();
187	let slot_duration_millis = subsystem.slot_duration_millis;
188
189	for i in 0..APPROVAL_DISTRIBUTION_WORKER_COUNT {
190		let mut network_sender = ctx.sender().clone();
191		let mut runtime_api_sender = ctx.sender().clone();
192		let mut approval_distribution_to_approval_voting = to_approval_voting_worker.clone();
193
194		let approval_distr_instance =
195			polkadot_approval_distribution::ApprovalDistribution::new_with_clock(
196				subsystem.metrics.approval_distribution_metrics(),
197				subsystem.slot_duration_millis,
198				subsystem.clock.clone(),
199				Arc::new(RealAssignmentCriteria {}),
200			);
201		let task_name = format!("approval-voting-parallel-{}", i);
202		let (to_approval_distribution_worker, mut approval_distribution_work_provider) =
203			build_worker_handles(
204				task_name.clone(),
205				subsystem.workers_channel_size(),
206				metrics_watcher,
207				prio_right,
208			);
209
210		metrics_watcher.watch(task_name.clone(), to_approval_distribution_worker.meter());
211
212		subsystem.spawner.spawn_blocking(
213			task_name.leak(),
214			Some("approval-voting-parallel"),
215			Box::pin(async move {
216				let mut state =
217					polkadot_approval_distribution::State::with_config(slot_duration_millis);
218				let mut rng = rand::rngs::StdRng::from_entropy();
219				let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig {
220					keystore: None,
221					session_cache_lru_size: DISPUTE_WINDOW.get(),
222				});
223
224				loop {
225					let message = match approval_distribution_work_provider.next().await {
226						Some(message) => message,
227						None => {
228							gum::info!(
229								target: LOG_TARGET,
230								"Approval distribution stream finished, most likely shutting down",
231							);
232							break;
233						},
234					};
235					if approval_distr_instance
236						.handle_from_orchestra(
237							message,
238							&mut approval_distribution_to_approval_voting,
239							&mut network_sender,
240							&mut runtime_api_sender,
241							&mut state,
242							&mut rng,
243							&mut session_info_provider,
244						)
245						.await
246					{
247						gum::info!(
248							target: LOG_TARGET,
249							"Approval distribution worker {}, exiting because of shutdown", i
250						);
251					};
252				}
253			}),
254		);
255		to_approval_distribution_workers.push(to_approval_distribution_worker);
256	}
257
258	gum::info!(target: LOG_TARGET, "Starting approval voting workers");
259
260	let sender = ctx.sender().clone();
261	let to_approval_distribution = ApprovalVotingToApprovalDistribution(sender.clone());
262	polkadot_node_core_approval_voting::start_approval_worker(
263		approval_voting_work_provider,
264		sender.clone(),
265		to_approval_distribution,
266		polkadot_node_core_approval_voting::Config {
267			slot_duration_millis: subsystem.slot_duration_millis,
268			col_approval_data: subsystem.db_config.col_approval_data,
269		},
270		subsystem.db.clone(),
271		subsystem.keystore.clone(),
272		subsystem.sync_oracle,
273		subsystem.metrics.approval_voting_metrics(),
274		subsystem.spawner.clone(),
275		"approval-voting-parallel-db",
276		"approval-voting-parallel",
277		subsystem.clock.clone(),
278	)
279	.await?;
280
281	Ok((to_approval_voting_worker, to_approval_distribution_workers))
282}
283
284// The main run function of the approval parallel voting subsystem.
285#[overseer::contextbounds(ApprovalVotingParallel, prefix = self::overseer)]
286async fn run<Context>(
287	mut ctx: Context,
288	subsystem: ApprovalVotingParallelSubsystem,
289) -> SubsystemResult<()> {
290	let mut metrics_watcher = MetricsWatcher::new(subsystem.metrics.clone());
291	gum::info!(
292		target: LOG_TARGET,
293		"Starting workers"
294	);
295
296	let (to_approval_voting_worker, to_approval_distribution_workers) =
297		start_workers(&mut ctx, subsystem, &mut metrics_watcher).await?;
298
299	gum::info!(
300		target: LOG_TARGET,
301		"Starting main subsystem loop"
302	);
303
304	run_main_loop(ctx, to_approval_voting_worker, to_approval_distribution_workers, metrics_watcher)
305		.await
306}
307
308// Main loop of the subsystem, it shouldn't include any logic just dispatching of messages to
309// the workers.
310//
311// It listens for messages from the overseer and dispatches them to the workers.
312#[overseer::contextbounds(ApprovalVotingParallel, prefix = self::overseer)]
313async fn run_main_loop<Context>(
314	mut ctx: Context,
315	mut to_approval_voting_worker: ToWorker<ApprovalVotingMessage>,
316	mut to_approval_distribution_workers: Vec<ToWorker<ApprovalDistributionMessage>>,
317	metrics_watcher: MetricsWatcher,
318) -> SubsystemResult<()> {
319	loop {
320		futures::select! {
321			next_msg = ctx.recv().fuse() => {
322				let next_msg = match next_msg {
323					Ok(msg) => msg,
324					Err(err) => {
325						gum::info!(target: LOG_TARGET, ?err, "Approval voting parallel subsystem received an error");
326						return Err(err);
327					}
328				};
329
330				match next_msg {
331					FromOrchestra::Signal(msg) => {
332						if matches!(msg, OverseerSignal::ActiveLeaves(_)) {
333							metrics_watcher.collect_metrics();
334						}
335
336						for worker in to_approval_distribution_workers.iter_mut() {
337							worker
338								.send_signal(msg.clone()).await?;
339						}
340
341						to_approval_voting_worker.send_signal(msg.clone()).await?;
342						if matches!(msg, OverseerSignal::Conclude) {
343							break;
344						}
345					},
346					FromOrchestra::Communication { msg } => match msg {
347						// The message the approval voting subsystem would've handled.
348						ApprovalVotingParallelMessage::ApprovedAncestor(_, _,_) |
349						ApprovalVotingParallelMessage::GetApprovalSignaturesForCandidate(_, _)  => {
350							to_approval_voting_worker.send_message_with_priority::<overseer::HighPriority>(
351								msg.try_into().expect(
352									"Message is one of ApprovedAncestor, GetApprovalSignaturesForCandidate
353									 and that can be safely converted to ApprovalVotingMessage; qed"
354								)
355							).await;
356						},
357						// Now the message the approval distribution subsystem would've handled and need to
358						// be forwarded to the workers.
359						ApprovalVotingParallelMessage::NewBlocks(msg) => {
360							for worker in to_approval_distribution_workers.iter_mut() {
361								worker
362									.send_message(
363										ApprovalDistributionMessage::NewBlocks(msg.clone()),
364									)
365									.await;
366							}
367						},
368						ApprovalVotingParallelMessage::DistributeAssignment(assignment, claimed) => {
369							let worker = assigned_worker_for_validator(assignment.validator, &mut to_approval_distribution_workers);
370							worker
371								.send_message(
372									ApprovalDistributionMessage::DistributeAssignment(assignment, claimed)
373								)
374								.await;
375
376						},
377						ApprovalVotingParallelMessage::DistributeApproval(vote) => {
378							let worker = assigned_worker_for_validator(vote.validator, &mut to_approval_distribution_workers);
379							worker
380								.send_message(
381									ApprovalDistributionMessage::DistributeApproval(vote)
382								).await;
383
384						},
385						ApprovalVotingParallelMessage::NetworkBridgeUpdate(msg) => {
386							if let polkadot_node_subsystem::messages::NetworkBridgeEvent::PeerMessage(
387								peer_id,
388								msg,
389							) = msg
390							{
391								let (all_msgs_from_same_validator, messages_split_by_validator) = validator_index_for_msg(msg);
392
393								for (validator_index, msg) in all_msgs_from_same_validator.into_iter().chain(messages_split_by_validator.into_iter().flatten()) {
394									let worker = assigned_worker_for_validator(validator_index, &mut to_approval_distribution_workers);
395
396									worker
397										.send_message(
398											ApprovalDistributionMessage::NetworkBridgeUpdate(
399												polkadot_node_subsystem::messages::NetworkBridgeEvent::PeerMessage(
400													peer_id, msg,
401												),
402											),
403										).await;
404								}
405							} else {
406								for worker in to_approval_distribution_workers.iter_mut() {
407									worker
408										.send_message_with_priority::<overseer::HighPriority>(
409											ApprovalDistributionMessage::NetworkBridgeUpdate(msg.clone()),
410										).await;
411								}
412							}
413						},
414						ApprovalVotingParallelMessage::GetApprovalSignatures(indices, tx) => {
415							handle_get_approval_signatures(&mut ctx, &mut to_approval_distribution_workers, indices, tx).await;
416						},
417						ApprovalVotingParallelMessage::ApprovalCheckingLagUpdate(lag) => {
418							for worker in to_approval_distribution_workers.iter_mut() {
419								worker
420									.send_message(
421										ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag)
422									).await;
423							}
424						},
425					},
426				};
427
428			},
429		};
430	}
431	Ok(())
432}
433
434// It sends a message to all approval workers to get the approval signatures for the requested
435// candidates and then merges them all together and sends them back to the requester.
436#[overseer::contextbounds(ApprovalVotingParallel, prefix = self::overseer)]
437async fn handle_get_approval_signatures<Context>(
438	ctx: &mut Context,
439	to_approval_distribution_workers: &mut Vec<ToWorker<ApprovalDistributionMessage>>,
440	requested_candidates: HashSet<(Hash, CandidateIndex)>,
441	result_channel: oneshot::Sender<
442		HashMap<ValidatorIndex, (Hash, Vec<CandidateIndex>, ValidatorSignature)>,
443	>,
444) {
445	let mut sigs = HashMap::new();
446	let mut signatures_channels = Vec::new();
447	for worker in to_approval_distribution_workers.iter_mut() {
448		let (tx, rx) = oneshot::channel();
449		worker.send_unbounded_message(ApprovalDistributionMessage::GetApprovalSignatures(
450			requested_candidates.clone(),
451			tx,
452		));
453		signatures_channels.push(rx);
454	}
455
456	let gather_signatures = async move {
457		let Some(results) = futures::future::join_all(signatures_channels)
458			.timeout(WAIT_FOR_SIGS_GATHER_TIMEOUT)
459			.await
460		else {
461			gum::warn!(
462				target: LOG_TARGET,
463				"Waiting for approval signatures timed out - dead lock?"
464			);
465			return;
466		};
467
468		for result in results {
469			let worker_sigs = match result {
470				Ok(sigs) => sigs,
471				Err(_) => {
472					gum::error!(
473						target: LOG_TARGET,
474						"Getting approval signatures failed, oneshot got closed"
475					);
476					continue;
477				},
478			};
479			sigs.extend(worker_sigs);
480		}
481
482		if let Err(_) = result_channel.send(sigs) {
483			gum::debug!(
484					target: LOG_TARGET,
485					"Sending back approval signatures failed, oneshot got closed"
486			);
487		}
488	};
489
490	if let Err(err) = ctx.spawn("approval-voting-gather-signatures", Box::pin(gather_signatures)) {
491		gum::warn!(target: LOG_TARGET, "Failed to spawn gather signatures task: {:?}", err);
492	}
493}
494
495// Returns the worker that should receive the message for the given validator.
496fn assigned_worker_for_validator(
497	validator: ValidatorIndex,
498	to_approval_distribution_workers: &mut Vec<ToWorker<ApprovalDistributionMessage>>,
499) -> &mut ToWorker<ApprovalDistributionMessage> {
500	let worker_index = validator.0 as usize % to_approval_distribution_workers.len();
501	to_approval_distribution_workers
502		.get_mut(worker_index)
503		.expect("Worker index is obtained modulo len; qed")
504}
505
506// Returns the validators that initially created this assignments/votes, the validator index
507// is later used to decide which approval-distribution worker should receive the message.
508//
509// Because this is on the hot path and we don't want to be unnecessarily slow, it contains two logic
510// paths. The ultra fast path where all messages have the same validator index and we don't do
511// any cloning or allocation and the path where we need to split the messages into multiple
512// messages, because they have different validator indices, where we do need to clone and allocate.
513// In practice most of the message will fall on the ultra fast path.
514fn validator_index_for_msg(
515	msg: polkadot_node_network_protocol::ApprovalDistributionMessage,
516) -> (
517	Option<(ValidatorIndex, polkadot_node_network_protocol::ApprovalDistributionMessage)>,
518	Option<Vec<(ValidatorIndex, polkadot_node_network_protocol::ApprovalDistributionMessage)>>,
519) {
520	match msg {
521		polkadot_node_network_protocol::ValidationProtocols::V3(ref message) => match message {
522			polkadot_node_network_protocol::v3::ApprovalDistributionMessage::Assignments(msgs) =>
523				if let Ok(validator) = msgs.iter().map(|(msg, _)| msg.validator).all_equal_value() {
524					(Some((validator, msg)), None)
525				} else {
526					let split = msgs
527						.iter()
528						.map(|(msg, claimed_candidates)| {
529							(
530								msg.validator,
531								polkadot_node_network_protocol::ValidationProtocols::V3(
532									polkadot_node_network_protocol::v3::ApprovalDistributionMessage::Assignments(
533										vec![(msg.clone(), claimed_candidates.clone())]
534									),
535								),
536							)
537						})
538						.collect_vec();
539					(None, Some(split))
540				},
541			polkadot_node_network_protocol::v3::ApprovalDistributionMessage::Approvals(msgs) =>
542				if let Ok(validator) = msgs.iter().map(|msg| msg.validator).all_equal_value() {
543					(Some((validator, msg)), None)
544				} else {
545					let split = msgs
546						.iter()
547						.map(|vote| {
548							(
549								vote.validator,
550								polkadot_node_network_protocol::ValidationProtocols::V3(
551									polkadot_node_network_protocol::v3::ApprovalDistributionMessage::Approvals(
552										vec![vote.clone()]
553									),
554								),
555							)
556						})
557						.collect_vec();
558					(None, Some(split))
559				},
560		},
561	}
562}
563
564/// A handler object that both type of workers use for receiving work.
565///
566/// In practive this is just a wrapper over two channels Receiver, that is injected into
567/// approval-voting worker and approval-distribution workers.
568type WorkProvider<M, Clos, State> = WorkProviderImpl<
569	SelectWithStrategy<
570		MeteredReceiver<FromOrchestra<M>>,
571		UnboundedMeteredReceiver<FromOrchestra<M>>,
572		Clos,
573		State,
574	>,
575>;
576
577pub struct WorkProviderImpl<T>(T);
578
579impl<T, M> Stream for WorkProviderImpl<T>
580where
581	T: Stream<Item = FromOrchestra<M>> + Unpin + Send,
582{
583	type Item = FromOrchestra<M>;
584
585	fn poll_next(
586		mut self: std::pin::Pin<&mut Self>,
587		cx: &mut std::task::Context<'_>,
588	) -> std::task::Poll<Option<Self::Item>> {
589		self.0.poll_next_unpin(cx)
590	}
591}
592
593#[async_trait::async_trait]
594impl<T> ApprovalVotingWorkProvider for WorkProviderImpl<T>
595where
596	T: Stream<Item = FromOrchestra<ApprovalVotingMessage>> + Unpin + Send,
597{
598	async fn recv(&mut self) -> SubsystemResult<FromOrchestra<ApprovalVotingMessage>> {
599		self.0.next().await.ok_or(SubsystemError::Context(
600			"ApprovalVotingWorkProviderImpl: Channel closed".to_string(),
601		))
602	}
603}
604
605impl<M, Clos, State> WorkProvider<M, Clos, State>
606where
607	M: Send + Sync + 'static,
608	Clos: FnMut(&mut State) -> PollNext,
609	State: Default,
610{
611	// Constructs a work providers from the channels handles.
612	fn from_rx_worker(rx: RxWorker<M>, prio: Clos) -> Self {
613		let prioritised = select_with_strategy(rx.0, rx.1, prio);
614		WorkProviderImpl(prioritised)
615	}
616}
617
618/// Just a wrapper for implementing `overseer::SubsystemSender<ApprovalVotingMessage>` and
619/// `overseer::SubsystemSender<ApprovalDistributionMessage>`.
620///
621/// The instance of this struct can be injected into the workers, so they can talk
622/// directly with each other without intermediating in this subsystem loop.
623pub struct ToWorker<T: Send + Sync + 'static>(
624	MeteredSender<FromOrchestra<T>>,
625	UnboundedMeteredSender<FromOrchestra<T>>,
626);
627
628impl<T: Send + Sync + 'static> Clone for ToWorker<T> {
629	fn clone(&self) -> Self {
630		Self(self.0.clone(), self.1.clone())
631	}
632}
633
634impl<T: Send + Sync + 'static> ToWorker<T> {
635	async fn send_signal(&mut self, signal: OverseerSignal) -> Result<(), SubsystemError> {
636		self.1
637			.unbounded_send(FromOrchestra::Signal(signal))
638			.map_err(|err| SubsystemError::QueueError(err.into_send_error()))
639	}
640
641	fn meter(&self) -> Meters {
642		Meters::new(self.0.meter(), self.1.meter())
643	}
644}
645
646impl<T: Send + Sync + 'static + Debug> overseer::SubsystemSender<T> for ToWorker<T> {
647	fn send_message<'life0, 'async_trait>(
648		&'life0 mut self,
649		msg: T,
650	) -> ::core::pin::Pin<
651		Box<dyn ::core::future::Future<Output = ()> + ::core::marker::Send + 'async_trait>,
652	>
653	where
654		'life0: 'async_trait,
655		Self: 'async_trait,
656	{
657		async {
658			if let Err(err) =
659				self.0.send(polkadot_overseer::FromOrchestra::Communication { msg }).await
660			{
661				gum::error!(
662					target: LOG_TARGET,
663					"Failed to send message to approval voting worker: {:?}, subsystem is probably shutting down.",
664					err
665				);
666			}
667		}
668		.boxed()
669	}
670
671	fn try_send_message(&mut self, msg: T) -> Result<(), metered::TrySendError<T>> {
672		self.0
673			.try_send(polkadot_overseer::FromOrchestra::Communication { msg })
674			.map_err(|result| {
675				let is_full = result.is_full();
676				let msg = match result.into_inner() {
677					polkadot_overseer::FromOrchestra::Signal(_) =>
678						panic!("Cannot happen variant is never built"),
679					polkadot_overseer::FromOrchestra::Communication { msg } => msg,
680				};
681				if is_full {
682					metered::TrySendError::Full(msg)
683				} else {
684					metered::TrySendError::Closed(msg)
685				}
686			})
687	}
688
689	fn send_messages<'life0, 'async_trait, I>(
690		&'life0 mut self,
691		msgs: I,
692	) -> ::core::pin::Pin<
693		Box<dyn ::core::future::Future<Output = ()> + ::core::marker::Send + 'async_trait>,
694	>
695	where
696		I: IntoIterator<Item = T> + Send,
697		I::IntoIter: Send,
698		I: 'async_trait,
699		'life0: 'async_trait,
700		Self: 'async_trait,
701	{
702		async {
703			for msg in msgs {
704				self.send_message(msg).await;
705			}
706		}
707		.boxed()
708	}
709
710	fn send_unbounded_message(&mut self, msg: T) {
711		if let Err(err) =
712			self.1.unbounded_send(polkadot_overseer::FromOrchestra::Communication { msg })
713		{
714			gum::error!(
715				target: LOG_TARGET,
716				"Failed to send unbounded message to approval voting worker: {:?}, subsystem is probably shutting down.",
717				err
718			);
719		}
720	}
721
722	fn send_message_with_priority<'life0, 'async_trait, P>(
723		&'life0 mut self,
724		msg: T,
725	) -> ::core::pin::Pin<
726		Box<dyn ::core::future::Future<Output = ()> + ::core::marker::Send + 'async_trait>,
727	>
728	where
729		P: 'async_trait + Priority,
730		'life0: 'async_trait,
731		Self: 'async_trait,
732	{
733		match P::priority() {
734			polkadot_overseer::PriorityLevel::Normal => self.send_message(msg),
735			polkadot_overseer::PriorityLevel::High =>
736				async { self.send_unbounded_message(msg) }.boxed(),
737		}
738	}
739
740	fn try_send_message_with_priority<P: Priority>(
741		&mut self,
742		msg: T,
743	) -> Result<(), metered::TrySendError<T>> {
744		match P::priority() {
745			polkadot_overseer::PriorityLevel::Normal => self.try_send_message(msg),
746			polkadot_overseer::PriorityLevel::High => Ok(self.send_unbounded_message(msg)),
747		}
748	}
749}
750
751/// Handles that are used by an worker to receive work.
752pub struct RxWorker<T: Send + Sync + 'static>(
753	MeteredReceiver<FromOrchestra<T>>,
754	UnboundedMeteredReceiver<FromOrchestra<T>>,
755);
756
757// Build all the necessary channels for sending messages to an worker
758// and for the worker to receive them.
759fn build_channels<T: Send + Sync + 'static>(
760	channel_name: String,
761	channel_size: usize,
762	metrics_watcher: &mut MetricsWatcher,
763) -> (ToWorker<T>, RxWorker<T>) {
764	let (tx_work, rx_work) = channel::<FromOrchestra<T>>(channel_size);
765	let (tx_work_unbounded, rx_work_unbounded) = unbounded::<FromOrchestra<T>>();
766	let to_worker = ToWorker(tx_work, tx_work_unbounded);
767
768	metrics_watcher.watch(channel_name, to_worker.meter());
769
770	(to_worker, RxWorker(rx_work, rx_work_unbounded))
771}
772
773/// Build the worker handles used for interacting with the workers.
774///
775/// `ToWorker` is used for sending messages to the workers.
776/// `WorkProvider` is used by the workers for receiving the messages.
777fn build_worker_handles<M, Clos, State>(
778	channel_name: String,
779	channel_size: usize,
780	metrics_watcher: &mut MetricsWatcher,
781	prio_right: Clos,
782) -> (ToWorker<M>, WorkProvider<M, Clos, State>)
783where
784	M: Send + Sync + 'static,
785	Clos: FnMut(&mut State) -> PollNext,
786	State: Default,
787{
788	let (to_worker, rx_worker) = build_channels(channel_name, channel_size, metrics_watcher);
789	(to_worker, WorkProviderImpl::from_rx_worker(rx_worker, prio_right))
790}
791
792/// Just a wrapper for implementing `overseer::SubsystemSender<ApprovalDistributionMessage>`, so
793/// that we can inject into the approval voting subsystem.
794#[derive(Clone)]
795pub struct ApprovalVotingToApprovalDistribution<S: SubsystemSender<ApprovalVotingParallelMessage>>(
796	S,
797);
798
799impl<S: SubsystemSender<ApprovalVotingParallelMessage>>
800	overseer::SubsystemSender<ApprovalDistributionMessage>
801	for ApprovalVotingToApprovalDistribution<S>
802{
803	#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
804	fn send_message<'life0, 'async_trait>(
805		&'life0 mut self,
806		msg: ApprovalDistributionMessage,
807	) -> ::core::pin::Pin<
808		Box<dyn ::core::future::Future<Output = ()> + ::core::marker::Send + 'async_trait>,
809	>
810	where
811		'life0: 'async_trait,
812		Self: 'async_trait,
813	{
814		self.0.send_message(msg.into())
815	}
816
817	fn try_send_message(
818		&mut self,
819		msg: ApprovalDistributionMessage,
820	) -> Result<(), metered::TrySendError<ApprovalDistributionMessage>> {
821		self.0.try_send_message(msg.into()).map_err(|err| match err {
822			// Safe to unwrap because it was built from the same type.
823			metered::TrySendError::Closed(msg) =>
824				metered::TrySendError::Closed(msg.try_into().unwrap()),
825			metered::TrySendError::Full(msg) =>
826				metered::TrySendError::Full(msg.try_into().unwrap()),
827		})
828	}
829
830	#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
831	fn send_messages<'life0, 'async_trait, I>(
832		&'life0 mut self,
833		msgs: I,
834	) -> ::core::pin::Pin<
835		Box<dyn ::core::future::Future<Output = ()> + ::core::marker::Send + 'async_trait>,
836	>
837	where
838		I: IntoIterator<Item = ApprovalDistributionMessage> + Send,
839		I::IntoIter: Send,
840		I: 'async_trait,
841		'life0: 'async_trait,
842		Self: 'async_trait,
843	{
844		self.0.send_messages(msgs.into_iter().map(|msg| msg.into()))
845	}
846
847	fn send_unbounded_message(&mut self, msg: ApprovalDistributionMessage) {
848		self.0.send_unbounded_message(msg.into())
849	}
850
851	fn send_message_with_priority<'life0, 'async_trait, P>(
852		&'life0 mut self,
853		msg: ApprovalDistributionMessage,
854	) -> ::core::pin::Pin<
855		Box<dyn ::core::future::Future<Output = ()> + ::core::marker::Send + 'async_trait>,
856	>
857	where
858		P: 'async_trait + Priority,
859		'life0: 'async_trait,
860		Self: 'async_trait,
861	{
862		self.0.send_message_with_priority::<P>(msg.into())
863	}
864
865	fn try_send_message_with_priority<P: Priority>(
866		&mut self,
867		msg: ApprovalDistributionMessage,
868	) -> Result<(), metered::TrySendError<ApprovalDistributionMessage>> {
869		self.0.try_send_message_with_priority::<P>(msg.into()).map_err(|err| match err {
870			// Safe to unwrap because it was built from the same type.
871			metered::TrySendError::Closed(msg) =>
872				metered::TrySendError::Closed(msg.try_into().unwrap()),
873			metered::TrySendError::Full(msg) =>
874				metered::TrySendError::Full(msg.try_into().unwrap()),
875		})
876	}
877}