orchestra_proc_macro/
impl_builder.rs

1// Copyright (C) 2021 Parity Technologies (UK) Ltd.
2// SPDX-License-Identifier: Apache-2.0
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use quote::{format_ident, quote};
17use syn::{parse_quote, Path, PathSegment, Type, TypePath};
18
19use super::*;
20
21fn recollect_without_idx<T: Clone>(x: &[T], idx: usize) -> Vec<T> {
22	let mut v = Vec::<T>::with_capacity(x.len().saturating_sub(1));
23	v.extend(x.iter().take(idx).cloned());
24	v.extend(x.iter().skip(idx + 1).cloned());
25	v
26}
27
28/// Implement a builder pattern for the `Orchestra`-type,
29/// which acts as the gateway to constructing the orchestra.
30///
31/// Elements tagged with `wip` are not covered here.
32pub(crate) fn impl_builder(info: &OrchestraInfo) -> proc_macro2::TokenStream {
33	let orchestra_name = info.orchestra_name.clone();
34
35	let builder = format_ident!("{}Builder", orchestra_name);
36	let handle = format_ident!("{}Handle", orchestra_name);
37	let connector = format_ident!("{}Connector", orchestra_name);
38	let subsystem_ctx_name = format_ident!("{}SubsystemContext", orchestra_name);
39	let feature_powerset_cfgs = &info.feature_gated_subsystem_sets();
40
41	let mut ts = proc_macro2::TokenStream::new();
42
43	// In this loop we generate items that are required for each config set separately
44	for cfg_set in feature_powerset_cfgs.into_iter() {
45		let builder_items_for_config = impl_feature_gated_items(
46			info,
47			cfg_set,
48			&builder,
49			&subsystem_ctx_name,
50			&connector,
51			&orchestra_name,
52			&handle,
53		);
54		ts.extend(builder_items_for_config);
55	}
56
57	let event = &info.extern_event_ty;
58	let support_crate = info.support_crate_name();
59
60	ts.extend(quote! {
61		/// Handle for an orchestra.
62		pub type #handle = #support_crate ::metered::MeteredSender< #event >;
63
64		/// External connector.
65		pub struct #connector {
66			/// Publicly accessible handle, to be used for setting up
67			/// components that are _not_ subsystems but access is needed
68			/// due to other limitations.
69			///
70			/// For subsystems, use the `_with` variants of the builder.
71			handle: #handle,
72			/// The side consumed by the `spawned` side of the orchestra pattern.
73			consumer: #support_crate ::metered::MeteredReceiver < #event >,
74		}
75
76		impl #connector {
77			/// Obtain access to the orchestra handle.
78			pub fn as_handle_mut(&mut self) -> &mut #handle {
79				&mut self.handle
80			}
81			/// Obtain access to the orchestra handle.
82			pub fn as_handle(&self) -> &#handle {
83				&self.handle
84			}
85			/// Obtain a clone of the handle.
86			pub fn handle(&self) -> #handle {
87				self.handle.clone()
88			}
89
90			/// Create a new connector with non-default event channel capacity.
91			pub fn with_event_capacity(event_channel_capacity: usize) -> Self {
92				let (events_tx, events_rx) = #support_crate ::metered::channel::<
93					#event
94					>(event_channel_capacity);
95
96				Self {
97					handle: events_tx,
98					consumer: events_rx,
99				}
100			}
101		}
102
103		impl ::std::default::Default for #connector {
104			fn default() -> Self {
105				Self::with_event_capacity(SIGNAL_CHANNEL_CAPACITY)
106			}
107		}
108	});
109
110	let error_ty = &info.extern_error_ty;
111	ts.extend(quote! {
112			/// Convenience alias.
113			type SubsystemInitFn<T> = Box<dyn FnOnce(#handle) -> ::std::result::Result<T, #error_ty> + Send + 'static>;
114
115			/// Type for the initialized field of the orchestra builder
116			pub enum Init<T> {
117				/// Defer initialization to a point where the `handle` is available.
118				Fn(SubsystemInitFn<T>),
119				/// Directly initialize the subsystem with the given subsystem type `T`.
120				/// Also used for baggage fields
121				Value(T),
122			}
123			/// Type marker for the uninitialized field of the orchestra builder.
124			/// `PhantomData` is used for type hinting when creating uninitialized
125			/// builder, e.g. to avoid specifying the generics when instantiating
126			/// the `FooBuilder` when calling `Foo::builder()`
127			#[derive(Debug)]
128			pub struct Missing<T>(::core::marker::PhantomData<T>);
129
130			/// Trait used to mark fields status in a builder
131			trait OrchestraFieldState<T> {}
132
133			impl<T> OrchestraFieldState<T> for Init<T> {}
134			impl<T> OrchestraFieldState<T> for Missing<T> {}
135
136			impl<T> ::std::default::Default for Missing<T> {
137				fn default() -> Self {
138					Missing::<T>(::core::marker::PhantomData)
139				}
140			}
141	});
142	ts.extend(impl_task_kind(info));
143	ts
144}
145
146pub(crate) fn impl_feature_gated_items(
147	info: &OrchestraInfo,
148	cfg_set: &SubsystemConfigSet,
149	builder: &Ident,
150	subsystem_ctx_name: &Ident,
151	connector: &Ident,
152	orchestra_name: &Ident,
153	handle: &Ident,
154) -> proc_macro2::TokenStream {
155	let mut ts = quote! {};
156
157	let cfg_guard = &cfg_set.feature_gate;
158	let subsystem_name = &cfg_set.subsystem_names_without_wip();
159	let subsystem_generics = &cfg_set.subsystem_generic_types();
160	let consumes = &cfg_set.consumes_without_wip();
161	let maybe_boxed_consumes = consumes
162		.iter()
163		.map(|consume| info.box_message_if_needed(consume, Span::call_site()))
164		.collect::<Vec<_>>();
165	let channel_name = &cfg_set.channel_names_without_wip(None);
166	let channel_name_unbounded = &cfg_set.channel_names_without_wip("_unbounded");
167	let message_channel_capacity =
168		&cfg_set.message_channel_capacities_without_wip(info.message_channel_capacity);
169	let signal_channel_capacity =
170		&cfg_set.signal_channel_capacities_without_wip(info.signal_channel_capacity);
171
172	let channel_name_tx = &cfg_set.channel_names_without_wip("_tx");
173	let channel_name_unbounded_tx = &cfg_set.channel_names_without_wip("_unbounded_tx");
174
175	let channel_name_rx = &cfg_set.channel_names_without_wip("_rx");
176	let channel_name_unbounded_rx = &info.channel_names_without_wip("_unbounded_rx");
177
178	let can_receive_priority_messages = &cfg_set.can_receive_priority_messages_without_wip();
179
180	let baggage_name = &info.baggage_names();
181	let baggage_generic_ty = &info.baggage_generic_types();
182
183	// State generics that are used to encode each field's status (Init/Missing)
184	let baggage_passthrough_state_generics = baggage_name
185		.iter()
186		.enumerate()
187		.map(|(idx, _)| format_ident!("InitStateBaggage{}", idx))
188		.collect::<Vec<_>>();
189	let subsystem_passthrough_state_generics = subsystem_name
190		.iter()
191		.enumerate()
192		.map(|(idx, _)| format_ident!("InitStateSubsystem{}", idx))
193		.collect::<Vec<_>>();
194
195	let error_ty = &info.extern_error_ty;
196
197	let support_crate = info.support_crate_name();
198
199	let blocking = &cfg_set
200		.enabled_subsystems
201		.iter()
202		.map(|x| {
203			if x.blocking {
204				quote! { Blocking }
205			} else {
206				quote! { Regular }
207			}
208		})
209		.collect::<Vec<_>>();
210
211	// Helpers to use within quote! macros
212	let spawner_where_clause: syn::TypeParam = parse_quote! {
213			S: #support_crate ::Spawner
214	};
215
216	// Field names and real types
217	let field_name = subsystem_name.iter().chain(baggage_name.iter()).collect::<Vec<_>>();
218	let field_type = subsystem_generics
219		.iter()
220		.map(|ident| {
221			syn::Type::Path(TypePath {
222				qself: None,
223				path: Path::from(PathSegment::from(ident.clone())),
224			})
225		})
226		.chain(info.baggage().iter().map(|bag| bag.field_ty.clone()))
227		.collect::<Vec<_>>();
228
229	// Setters logic
230
231	// For each setter we need to leave the remaining fields untouched and
232	// remove the field that we are fixing in this setter
233	// For subsystems `*_with` and `replace_*` setters are needed.
234	let subsystem_specific_setters =
235		cfg_set.enabled_subsystems.iter().filter(|ssf| !ssf.wip).enumerate().map(|(idx, ssf)| {
236			let field_name = &ssf.name;
237			let field_type = &ssf.generic;
238			let subsystem_consumes = &ssf.message_to_consume();
239			// Remove state generic for the item to be replaced. It sufficient to know `field_type` for
240			// that since we always move from `Init<#field_type>` to `Init<NEW>`.
241			let impl_subsystem_state_generics = recollect_without_idx(&subsystem_passthrough_state_generics[..], idx);
242
243			let field_name_with = format_ident!("{}_with", field_name);
244			let field_name_replace = format_ident!("replace_{}", field_name);
245
246			// In a setter we replace `Uninit<T>` with `Init<T>` leaving all other
247			// types as they are, as such they will be free generics.
248			let mut current_state_generics = subsystem_passthrough_state_generics
249				.iter()
250				.map(|subsystem_state_generic_ty| parse_quote!(#subsystem_state_generic_ty))
251				.collect::<Vec<syn::GenericArgument>>();
252			current_state_generics[idx] = parse_quote! { Missing<#field_type> };
253
254			// Generics that will be present after initializing a specific `Missing<_>` field.
255			let mut post_setter_state_generics = current_state_generics.clone();
256			post_setter_state_generics[idx] = parse_quote! { Init<#field_type> };
257
258			let mut post_replace_state_generics = current_state_generics.clone();
259			post_replace_state_generics[idx] = parse_quote! { Init<NEW> };
260
261			// All fields except the one we update with the new argument
262			// see the loop below.
263			let to_keep_subsystem_name = recollect_without_idx(&subsystem_name[..], idx);
264
265			let subsystem_sender_trait = format_ident!("{}SenderTrait", field_type);
266			let _subsystem_ctx_trait = format_ident!("{}ContextTrait", field_type);
267
268			let builder_where_clause = quote!{
269				#field_type : #support_crate::Subsystem< #subsystem_ctx_name< #subsystem_consumes >, #error_ty>,
270				< #subsystem_ctx_name < #subsystem_consumes > as #support_crate :: SubsystemContext>::Sender:
271					#subsystem_sender_trait,
272			};
273
274			// Create the field init `fn`
275			quote! {
276				#cfg_guard
277				impl <InitStateSpawner, #field_type, #( #impl_subsystem_state_generics, )* #( #baggage_passthrough_state_generics, )*>
278				#builder <InitStateSpawner, #( #current_state_generics, )* #( #baggage_passthrough_state_generics, )*>
279				where
280					#builder_where_clause
281				{
282					/// Specify the subsystem in the builder directly
283					#[allow(clippy::type_complexity)]
284					pub fn #field_name (self, var: #field_type ) ->
285						#builder <InitStateSpawner, #( #post_setter_state_generics, )* #( #baggage_passthrough_state_generics, )*>
286					{
287						#builder {
288							#field_name: Init::< #field_type >::Value(var),
289							#(
290								#to_keep_subsystem_name: self. #to_keep_subsystem_name,
291							)*
292							#(
293								#baggage_name: self. #baggage_name,
294							)*
295							spawner: self.spawner,
296
297							channel_capacity: self.channel_capacity,
298							signal_capacity: self.signal_capacity,
299						}
300					}
301
302					/// Specify the the initialization function for a subsystem
303					#[allow(clippy::type_complexity)]
304					pub fn #field_name_with<F>(self, subsystem_init_fn: F ) ->
305						#builder <InitStateSpawner, #( #post_setter_state_generics, )* #( #baggage_passthrough_state_generics, )*>
306					where
307						F: 'static + Send + FnOnce(#handle) ->
308							::std::result::Result<#field_type, #error_ty>,
309					{
310						let boxed_func = Init::<#field_type>::Fn(
311							Box::new(subsystem_init_fn) as SubsystemInitFn<#field_type>
312						);
313						#builder {
314							#field_name: boxed_func,
315							#(
316								#to_keep_subsystem_name: self. #to_keep_subsystem_name,
317							)*
318							#(
319								#baggage_name: self. #baggage_name,
320							)*
321							spawner: self.spawner,
322
323
324							channel_capacity: self.channel_capacity,
325							signal_capacity: self.signal_capacity,
326						}
327					}
328				}
329
330				#[allow(clippy::type_complexity)]
331				#cfg_guard
332				impl <InitStateSpawner, #field_type, #( #impl_subsystem_state_generics, )* #( #baggage_passthrough_state_generics, )*>
333				#builder <InitStateSpawner, #( #post_setter_state_generics, )* #( #baggage_passthrough_state_generics, )*>
334				where
335					#builder_where_clause
336				{
337					/// Replace a subsystem by another implementation for the
338					/// consumable message type.
339					pub fn #field_name_replace<NEW, F>(self, gen_replacement_fn: F)
340						-> #builder <InitStateSpawner, #( #post_replace_state_generics, )* #( #baggage_passthrough_state_generics, )*>
341					where
342						#field_type: 'static,
343						F: 'static + Send + FnOnce(#field_type) -> NEW,
344						NEW: #support_crate ::Subsystem<#subsystem_ctx_name< #subsystem_consumes >, #error_ty>,
345					{
346						let replacement: Init<NEW> = match self.#field_name {
347							Init::Fn(fx) =>
348								Init::<NEW>::Fn(Box::new(move |handle: #handle| {
349								let orig = fx(handle)?;
350								Ok(gen_replacement_fn(orig))
351							})),
352							Init::Value(val) =>
353								Init::Value(gen_replacement_fn(val)),
354						};
355						#builder {
356							#field_name: replacement,
357							#(
358								#to_keep_subsystem_name: self. #to_keep_subsystem_name,
359							)*
360							#(
361								#baggage_name: self. #baggage_name,
362							)*
363							spawner: self.spawner,
364
365							channel_capacity: self.channel_capacity,
366							signal_capacity: self.signal_capacity,
367						}
368					}
369				}
370			}
371		});
372
373	// Produce setters for all baggage fields as well
374	let baggage_specific_setters = info.baggage().iter().enumerate().map(|(idx, bag_field)| {
375		// Baggage fields follow subsystems
376		let fname = &bag_field.field_name;
377		let field_type = &bag_field.field_ty;
378		let impl_baggage_state_generics = recollect_without_idx(&baggage_passthrough_state_generics[..], idx);
379		let to_keep_baggage_name = recollect_without_idx(&baggage_name[..], idx);
380
381		let mut pre_setter_generics = baggage_passthrough_state_generics
382			.iter()
383			.map(|gen_ty| parse_quote!(#gen_ty))
384			.collect::<Vec<syn::GenericArgument>>();
385		pre_setter_generics[idx] = parse_quote! { Missing<#field_type> };
386
387		let mut post_setter_generics = pre_setter_generics.clone();
388		post_setter_generics[idx] = parse_quote! { Init<#field_type> };
389
390		// Baggage can also be generic, so we need to include that to a signature
391		let preserved_baggage_generics = &bag_field.generic_types;
392		quote! {
393			#cfg_guard
394			impl <InitStateSpawner, #( #preserved_baggage_generics, )* #( #subsystem_passthrough_state_generics, )* #( #impl_baggage_state_generics, )* >
395			#builder <InitStateSpawner, #( #subsystem_passthrough_state_generics, )* #( #pre_setter_generics, )* >
396			{
397				/// Specify the baggage in the builder when it was not initialized before
398				#[allow(clippy::type_complexity)]
399				pub fn #fname (self, var: #field_type ) ->
400					#builder <InitStateSpawner, #( #subsystem_passthrough_state_generics, )* #( #post_setter_generics, )* >
401				{
402					#builder {
403						#fname: Init::<#field_type>::Value(var),
404						#(
405							#subsystem_name: self. #subsystem_name,
406						)*
407						#(
408							#to_keep_baggage_name: self. #to_keep_baggage_name,
409						)*
410						spawner: self.spawner,
411
412						channel_capacity: self.channel_capacity,
413						signal_capacity: self.signal_capacity,
414					}
415				}
416			}
417
418			#cfg_guard
419			impl <InitStateSpawner, #( #preserved_baggage_generics, )* #( #subsystem_passthrough_state_generics, )* #( #impl_baggage_state_generics, )* >
420			#builder <InitStateSpawner, #( #subsystem_passthrough_state_generics, )* #( #post_setter_generics, )* > {
421				/// Specify the baggage in the builder when it has been previously initialized
422				#[allow(clippy::type_complexity)]
423				pub fn #fname (self, var: #field_type ) ->
424					#builder <InitStateSpawner, #( #subsystem_passthrough_state_generics, )* #( #post_setter_generics, )* >
425				{
426					#builder {
427						#fname: Init::<#field_type>::Value(var),
428						#(
429							#subsystem_name: self. #subsystem_name,
430						)*
431						#(
432							#to_keep_baggage_name: self. #to_keep_baggage_name,
433						)*
434						spawner: self.spawner,
435
436						channel_capacity: self.channel_capacity,
437						signal_capacity: self.signal_capacity,
438					}
439				}
440			}
441		}
442	});
443
444	let initialized_builder = format_ident!("Initialized{}", builder);
445	// The direct generics as expected by the `Orchestra<_,_,..>`, without states
446	let initialized_builder_generics = quote! {
447		S, #( #baggage_generic_ty, )* #( #subsystem_generics, )*
448	};
449
450	let builder_where_clause = cfg_set
451		.enabled_subsystems
452		.iter()
453		.map(|ssf| {
454			let field_type = &ssf.generic;
455			let message_to_consume = &ssf.message_to_consume();
456			let subsystem_sender_trait = format_ident!("{}SenderTrait", ssf.generic);
457			let subsystem_ctx_trait = format_ident!("{}ContextTrait", ssf.generic);
458			quote! {
459				#field_type:
460					#support_crate::Subsystem< #subsystem_ctx_name < #message_to_consume>, #error_ty>,
461				<#subsystem_ctx_name< #message_to_consume > as #subsystem_ctx_trait>::Sender:
462					#subsystem_sender_trait,
463				#subsystem_ctx_name< #message_to_consume >:
464					#subsystem_ctx_trait,
465			}
466		})
467		.fold(TokenStream::new(), |mut ts, addendum| {
468			ts.extend(addendum);
469			ts
470		});
471
472	ts.extend(quote! {
473		#cfg_guard
474		impl<S #(, #baggage_generic_ty )*> #orchestra_name <S #(, #baggage_generic_ty)*>
475		where
476			#spawner_where_clause,
477		{
478			/// Create a new orchestra utilizing the builder.
479			#[allow(clippy::type_complexity)]
480			pub fn builder< #( #subsystem_generics),* >() ->
481				#builder<Missing<S> #(, Missing< #field_type > )* >
482			where
483				#builder_where_clause
484			{
485				#builder :: new()
486			}
487		}
488	});
489
490	ts.extend(quote!{
491		/// Builder pattern to create compile time safe construction path.
492		#[allow(clippy::type_complexity)]
493		#cfg_guard
494		pub struct #builder <InitStateSpawner, #( #subsystem_passthrough_state_generics, )* #( #baggage_passthrough_state_generics, )*>
495		{
496			#(
497				#subsystem_name: #subsystem_passthrough_state_generics,
498			)*
499			#(
500				#baggage_name: #baggage_passthrough_state_generics,
501			)*
502			spawner: InitStateSpawner,
503			// user provided runtime overrides,
504			// if `None`, then a specific subsystem capacity is used `subsystem(message_capacity: 123,...)`
505			// otherwise `orchestra(message_capacity=123,..)` is used or the default value in that exact order.
506			channel_capacity: Option<usize>,
507			signal_capacity: Option<usize>,
508		}
509	});
510
511	ts.extend(quote! {
512		#[allow(clippy::type_complexity)]
513		#cfg_guard
514		impl<#initialized_builder_generics> #builder<Missing<S>, #( Missing<#field_type>, )*>
515		{
516			/// Create a new builder pattern, with all fields being uninitialized.
517			fn new() -> Self {
518				// explicitly assure the required traits are implemented
519				fn trait_from_must_be_implemented<E>()
520				where
521					E: ::std::error::Error + Send + Sync + 'static + From<#support_crate ::OrchestraError>
522				{}
523
524				trait_from_must_be_implemented::< #error_ty >();
525
526				Self {
527					#(
528						#field_name: Missing::<#field_type>::default(),
529					)*
530					spawner: Missing::<S>::default(),
531
532					channel_capacity: None,
533					signal_capacity: None,
534				}
535			}
536		}
537	});
538
539	// Spawner setter
540	ts.extend(quote!{
541		#[allow(clippy::type_complexity)]
542		#cfg_guard
543		impl<S, #( #subsystem_passthrough_state_generics, )* #( #baggage_passthrough_state_generics, )*>
544			#builder<Missing<S>, #( #subsystem_passthrough_state_generics, )* #( #baggage_passthrough_state_generics, )*>
545		where
546			#spawner_where_clause,
547		{
548			/// The `spawner` to use for spawning tasks.
549			pub fn spawner(self, spawner: S) -> #builder<
550				Init<S>,
551				#( #subsystem_passthrough_state_generics, )*
552				#( #baggage_passthrough_state_generics, )*
553			>
554			{
555				#builder {
556					#(
557						#field_name: self. #field_name,
558					)*
559					spawner: Init::<S>::Value(spawner),
560
561					channel_capacity: self.channel_capacity,
562					signal_capacity: self.signal_capacity,
563				}
564			}
565		}
566	});
567
568	// message and signal channel capacity
569	ts.extend(quote! {
570		#[allow(clippy::type_complexity)]
571		#cfg_guard
572		impl<S, #( #subsystem_passthrough_state_generics, )* #( #baggage_passthrough_state_generics, )*>
573			#builder<Init<S>, #( #subsystem_passthrough_state_generics, )* #( #baggage_passthrough_state_generics, )*>
574		where
575			#spawner_where_clause,
576		{
577			/// Set the interconnecting signal channel capacity.
578			/// This will override both static overseer default, e.g. `overseer(signal_capacity=123,...)`,
579			/// **and** subsystem specific capacities, e.g. `subsystem(signal_capacity: 123,...)`.
580			pub fn signal_channel_capacity(mut self, capacity: usize) -> Self
581			{
582				self.signal_capacity = Some(capacity);
583				self
584			}
585
586			/// Set the interconnecting message channel capacities.
587			/// This will override both static overseer default, e.g. `overseer(message_capacity=123,...)`,
588			/// **and** subsystem specific capacities, e.g. `subsystem(message_capacity: 123,...)`.
589			pub fn message_channel_capacity(mut self, capacity: usize) -> Self
590			{
591				self.channel_capacity = Some(capacity);
592				self
593			}
594		}
595	});
596
597	// Create the string literals for spawn.
598	let subsystem_name_str_literal = subsystem_name
599		.iter()
600		.map(|ident| proc_macro2::Literal::string(ident.to_string().replace("_", "-").as_str()))
601		.collect::<Vec<_>>();
602
603	ts.extend(quote! {
604		/// Type used to represent a builder where all fields are initialized and the orchestra could be constructed.
605		#cfg_guard
606		pub type #initialized_builder<#initialized_builder_generics> = #builder<Init<S>, #( Init<#field_type>, )*>;
607
608		// A builder specialization where all fields are set
609		#cfg_guard
610		impl<#initialized_builder_generics> #initialized_builder<#initialized_builder_generics>
611		where
612			#spawner_where_clause,
613			#builder_where_clause
614		{
615			/// Complete the construction and create the orchestra type.
616			pub fn build(self)
617				-> ::std::result::Result<(#orchestra_name<S, #( #baggage_generic_ty, )*>, #handle), #error_ty> {
618				let connector = #connector ::with_event_capacity(
619					self.signal_capacity.unwrap_or(SIGNAL_CHANNEL_CAPACITY)
620				);
621				self.build_with_connector(connector)
622			}
623
624			/// Complete the construction and create the orchestra type based on an existing `connector`.
625			pub fn build_with_connector(self, connector: #connector)
626				-> ::std::result::Result<(#orchestra_name<S, #( #baggage_generic_ty, )*>, #handle), #error_ty>
627			{
628				let #connector {
629					handle: events_tx,
630					consumer: events_rx,
631				} = connector;
632
633				let (to_orchestra_tx, to_orchestra_rx) = #support_crate ::metered::unbounded::<
634					ToOrchestra
635				>();
636
637				#(
638					let (#channel_name_tx, #channel_name_rx) = if #can_receive_priority_messages {
639						#support_crate ::metered::channel_with_priority::<
640							MessagePacket< #maybe_boxed_consumes >
641						>(
642							self.channel_capacity.unwrap_or(#message_channel_capacity),
643							self.channel_capacity.unwrap_or(#message_channel_capacity)
644						)
645					} else {
646						#support_crate ::metered::channel::<
647							MessagePacket< #maybe_boxed_consumes >
648						>(
649							self.channel_capacity.unwrap_or(#message_channel_capacity)
650						)
651					};
652				)*
653
654				#(
655					let (#channel_name_unbounded_tx, #channel_name_unbounded_rx) =
656						#support_crate ::metered::unbounded::<
657							MessagePacket< #maybe_boxed_consumes >
658						>();
659				)*
660
661				let channels_out =
662					ChannelsOut {
663						#(
664							#channel_name: #channel_name_tx .clone(),
665						)*
666						#(
667							#channel_name_unbounded: #channel_name_unbounded_tx,
668						)*
669					};
670
671				let mut spawner = match self.spawner {
672					Init::Value(value) => value,
673					_ => unreachable!("Only ever init spawner as value. qed"),
674				};
675
676				let mut running_subsystems = #support_crate ::FuturesUnordered::<
677						BoxFuture<'static, ::std::result::Result<(), #error_ty > >
678					>::new();
679
680				#(
681					let #subsystem_name = match self. #subsystem_name {
682						Init::Fn(func) => func(events_tx.clone())?,
683						Init::Value(val) => val,
684					};
685
686					let unbounded_meter = #channel_name_unbounded_rx.meter().clone();
687					// Prefer unbounded channel when selecting
688					let message_rx: SubsystemIncomingMessages< #maybe_boxed_consumes > = #support_crate ::select_with_strategy(
689						#channel_name_rx, #channel_name_unbounded_rx,
690						#support_crate ::select_message_channel_strategy
691					);
692					let (signal_tx, signal_rx) = #support_crate ::metered::channel(
693						self.signal_capacity.unwrap_or(#signal_channel_capacity)
694					);
695
696					let ctx = #subsystem_ctx_name::< #consumes >::new(
697						signal_rx,
698						message_rx,
699						channels_out.clone(),
700						to_orchestra_tx.clone(),
701						#subsystem_name_str_literal
702					);
703
704					let #subsystem_name: OrchestratedSubsystem< #consumes > =	spawn::<_,_, #blocking, _, _, _>(
705							&mut spawner,
706							#channel_name_tx,
707							signal_tx,
708							unbounded_meter,
709							ctx,
710							#subsystem_name,
711							#subsystem_name_str_literal,
712							&mut running_subsystems,
713						)?;
714				)*
715
716				// silence a clippy warning for the last instantiation
717				std::mem::drop(to_orchestra_tx);
718				std::mem::drop(channels_out);
719
720				use #support_crate ::StreamExt;
721
722				let to_orchestra_rx = to_orchestra_rx.fuse();
723				let orchestra = #orchestra_name {
724					#(
725						#subsystem_name,
726					)*
727
728					#(
729						#baggage_name: match self. #baggage_name {
730							Init::Value(val) => val,
731							_ => panic!("unexpected baggage initialization, must be value"),
732						},
733					)*
734
735					spawner,
736					running_subsystems,
737					events_rx,
738					to_orchestra_rx,
739				};
740
741				Ok((orchestra, events_tx))
742			}
743		}
744	});
745
746	ts.extend(baggage_specific_setters);
747	ts.extend(subsystem_specific_setters);
748	ts
749}
750
751pub(crate) fn impl_task_kind(info: &OrchestraInfo) -> proc_macro2::TokenStream {
752	let signal = &info.extern_signal_ty;
753	let error_ty = &info.extern_error_ty;
754	let support_crate = info.support_crate_name();
755	let maybe_boxed_message_generic: Type = if info.boxed_messages {
756		parse_quote! { ::std::boxed::Box<M> }
757	} else {
758		parse_quote! { M }
759	};
760
761	let ts = quote! {
762		/// Task kind to launch.
763		pub trait TaskKind {
764			/// Spawn a task, it depends on the implementer if this is blocking or not.
765			fn launch_task<S: Spawner>(spawner: &mut S, task_name: &'static str, subsystem_name: &'static str, future: BoxFuture<'static, ()>);
766		}
767
768		#[allow(missing_docs)]
769		struct Regular;
770		impl TaskKind for Regular {
771			fn launch_task<S: Spawner>(spawner: &mut S, task_name: &'static str, subsystem_name: &'static str, future: BoxFuture<'static, ()>) {
772				spawner.spawn(task_name, Some(subsystem_name), future)
773			}
774		}
775
776		#[allow(missing_docs)]
777		struct Blocking;
778		impl TaskKind for Blocking {
779			fn launch_task<S: Spawner>(spawner: &mut S, task_name: &'static str, subsystem_name: &'static str, future: BoxFuture<'static, ()>) {
780				spawner.spawn_blocking(task_name, Some(subsystem_name), future)
781			}
782		}
783
784		/// Spawn task of kind `self` using spawner `S`.
785		#[allow(clippy::too_many_arguments)]
786		pub fn spawn<S, M, TK, Ctx, E, SubSys>(
787			spawner: &mut S,
788			message_tx: #support_crate ::metered::MeteredSender<MessagePacket<#maybe_boxed_message_generic>>,
789			signal_tx: #support_crate ::metered::MeteredSender< #signal >,
790			// meter for the unbounded channel
791			unbounded_meter: #support_crate ::metered::Meter,
792			ctx: Ctx,
793			subsystem: SubSys,
794			subsystem_name: &'static str,
795			futures: &mut #support_crate ::FuturesUnordered<BoxFuture<'static, ::std::result::Result<(), #error_ty> >>,
796		) -> ::std::result::Result<OrchestratedSubsystem<M>, #error_ty >
797		where
798			S: #support_crate ::Spawner,
799			M: std::fmt::Debug + Send + 'static,
800			TK: TaskKind,
801			Ctx: #support_crate ::SubsystemContext<Message=M>,
802			E: ::std::error::Error + Send + Sync + 'static + ::std::convert::From<#support_crate ::OrchestraError>,
803			SubSys: #support_crate ::Subsystem<Ctx, E>,
804		{
805			let #support_crate ::SpawnedSubsystem::<E> { future, name } = subsystem.start(ctx);
806
807			let (terminate_tx, terminate_rx) = #support_crate ::oneshot::channel();
808
809			let fut = Box::pin(async move {
810				#[allow(clippy::suspicious_else_formatting)]
811				if let Err(err) = future.await {
812					#support_crate ::tracing::error!(subsystem=subsystem_name, ?err, "subsystem exited with error");
813					let mut source: &(dyn std::error::Error + 'static) = &err as &_;
814					while let Some(err) = source.source() {
815						#support_crate ::tracing::debug!(subsystem=subsystem_name, ?err, "caused (subsystem)");
816						source = err;
817					}
818				} else {
819					#support_crate ::tracing::debug!(subsystem=subsystem_name, "subsystem exited, successfully");
820				}
821				let _ = terminate_tx.send(());
822			});
823
824			<TK as TaskKind>::launch_task(spawner, name, subsystem_name, fut);
825
826			futures.push(Box::pin(
827				terminate_rx.map(move |result| {
828					#support_crate ::tracing::warn!(subsystem=subsystem_name, "Terminating due to subsystem exit");
829					if let Err(err) = result {
830						#support_crate ::tracing::warn!(subsystem=subsystem_name, ?err, "termination error detected, dropping but terminating the execution");
831					}
832					Ok(())
833				})
834			));
835
836			let instance = Some(SubsystemInstance {
837				meters: #support_crate ::SubsystemMeters {
838					unbounded: unbounded_meter,
839					bounded: message_tx.meter().clone(),
840					signals: signal_tx.meter().clone(),
841				},
842				tx_signal: signal_tx,
843				tx_bounded: message_tx,
844				signals_received: 0,
845				name,
846			});
847
848			Ok(OrchestratedSubsystem {
849				instance,
850			})
851		}
852	};
853
854	ts
855}