1use quote::{format_ident, quote};
17use syn::{parse_quote, Path, PathSegment, Type, TypePath};
18
19use super::*;
20
21fn recollect_without_idx<T: Clone>(x: &[T], idx: usize) -> Vec<T> {
22 let mut v = Vec::<T>::with_capacity(x.len().saturating_sub(1));
23 v.extend(x.iter().take(idx).cloned());
24 v.extend(x.iter().skip(idx + 1).cloned());
25 v
26}
27
28pub(crate) fn impl_builder(info: &OrchestraInfo) -> proc_macro2::TokenStream {
33 let orchestra_name = info.orchestra_name.clone();
34
35 let builder = format_ident!("{}Builder", orchestra_name);
36 let handle = format_ident!("{}Handle", orchestra_name);
37 let connector = format_ident!("{}Connector", orchestra_name);
38 let subsystem_ctx_name = format_ident!("{}SubsystemContext", orchestra_name);
39 let feature_powerset_cfgs = &info.feature_gated_subsystem_sets();
40
41 let mut ts = proc_macro2::TokenStream::new();
42
43 for cfg_set in feature_powerset_cfgs.into_iter() {
45 let builder_items_for_config = impl_feature_gated_items(
46 info,
47 cfg_set,
48 &builder,
49 &subsystem_ctx_name,
50 &connector,
51 &orchestra_name,
52 &handle,
53 );
54 ts.extend(builder_items_for_config);
55 }
56
57 let event = &info.extern_event_ty;
58 let support_crate = info.support_crate_name();
59
60 ts.extend(quote! {
61 pub type #handle = #support_crate ::metered::MeteredSender< #event >;
63
64 pub struct #connector {
66 handle: #handle,
72 consumer: #support_crate ::metered::MeteredReceiver < #event >,
74 }
75
76 impl #connector {
77 pub fn as_handle_mut(&mut self) -> &mut #handle {
79 &mut self.handle
80 }
81 pub fn as_handle(&self) -> &#handle {
83 &self.handle
84 }
85 pub fn handle(&self) -> #handle {
87 self.handle.clone()
88 }
89
90 pub fn with_event_capacity(event_channel_capacity: usize) -> Self {
92 let (events_tx, events_rx) = #support_crate ::metered::channel::<
93 #event
94 >(event_channel_capacity);
95
96 Self {
97 handle: events_tx,
98 consumer: events_rx,
99 }
100 }
101 }
102
103 impl ::std::default::Default for #connector {
104 fn default() -> Self {
105 Self::with_event_capacity(SIGNAL_CHANNEL_CAPACITY)
106 }
107 }
108 });
109
110 let error_ty = &info.extern_error_ty;
111 ts.extend(quote! {
112 type SubsystemInitFn<T> = Box<dyn FnOnce(#handle) -> ::std::result::Result<T, #error_ty> + Send + 'static>;
114
115 pub enum Init<T> {
117 Fn(SubsystemInitFn<T>),
119 Value(T),
122 }
123 #[derive(Debug)]
128 pub struct Missing<T>(::core::marker::PhantomData<T>);
129
130 trait OrchestraFieldState<T> {}
132
133 impl<T> OrchestraFieldState<T> for Init<T> {}
134 impl<T> OrchestraFieldState<T> for Missing<T> {}
135
136 impl<T> ::std::default::Default for Missing<T> {
137 fn default() -> Self {
138 Missing::<T>(::core::marker::PhantomData)
139 }
140 }
141 });
142 ts.extend(impl_task_kind(info));
143 ts
144}
145
146pub(crate) fn impl_feature_gated_items(
147 info: &OrchestraInfo,
148 cfg_set: &SubsystemConfigSet,
149 builder: &Ident,
150 subsystem_ctx_name: &Ident,
151 connector: &Ident,
152 orchestra_name: &Ident,
153 handle: &Ident,
154) -> proc_macro2::TokenStream {
155 let mut ts = quote! {};
156
157 let cfg_guard = &cfg_set.feature_gate;
158 let subsystem_name = &cfg_set.subsystem_names_without_wip();
159 let subsystem_generics = &cfg_set.subsystem_generic_types();
160 let consumes = &cfg_set.consumes_without_wip();
161 let maybe_boxed_consumes = consumes
162 .iter()
163 .map(|consume| info.box_message_if_needed(consume, Span::call_site()))
164 .collect::<Vec<_>>();
165 let channel_name = &cfg_set.channel_names_without_wip(None);
166 let channel_name_unbounded = &cfg_set.channel_names_without_wip("_unbounded");
167 let message_channel_capacity =
168 &cfg_set.message_channel_capacities_without_wip(info.message_channel_capacity);
169 let signal_channel_capacity =
170 &cfg_set.signal_channel_capacities_without_wip(info.signal_channel_capacity);
171
172 let channel_name_tx = &cfg_set.channel_names_without_wip("_tx");
173 let channel_name_unbounded_tx = &cfg_set.channel_names_without_wip("_unbounded_tx");
174
175 let channel_name_rx = &cfg_set.channel_names_without_wip("_rx");
176 let channel_name_unbounded_rx = &info.channel_names_without_wip("_unbounded_rx");
177
178 let can_receive_priority_messages = &cfg_set.can_receive_priority_messages_without_wip();
179
180 let baggage_name = &info.baggage_names();
181 let baggage_generic_ty = &info.baggage_generic_types();
182
183 let baggage_passthrough_state_generics = baggage_name
185 .iter()
186 .enumerate()
187 .map(|(idx, _)| format_ident!("InitStateBaggage{}", idx))
188 .collect::<Vec<_>>();
189 let subsystem_passthrough_state_generics = subsystem_name
190 .iter()
191 .enumerate()
192 .map(|(idx, _)| format_ident!("InitStateSubsystem{}", idx))
193 .collect::<Vec<_>>();
194
195 let error_ty = &info.extern_error_ty;
196
197 let support_crate = info.support_crate_name();
198
199 let blocking = &cfg_set
200 .enabled_subsystems
201 .iter()
202 .map(|x| {
203 if x.blocking {
204 quote! { Blocking }
205 } else {
206 quote! { Regular }
207 }
208 })
209 .collect::<Vec<_>>();
210
211 let spawner_where_clause: syn::TypeParam = parse_quote! {
213 S: #support_crate ::Spawner
214 };
215
216 let field_name = subsystem_name.iter().chain(baggage_name.iter()).collect::<Vec<_>>();
218 let field_type = subsystem_generics
219 .iter()
220 .map(|ident| {
221 syn::Type::Path(TypePath {
222 qself: None,
223 path: Path::from(PathSegment::from(ident.clone())),
224 })
225 })
226 .chain(info.baggage().iter().map(|bag| bag.field_ty.clone()))
227 .collect::<Vec<_>>();
228
229 let subsystem_specific_setters =
235 cfg_set.enabled_subsystems.iter().filter(|ssf| !ssf.wip).enumerate().map(|(idx, ssf)| {
236 let field_name = &ssf.name;
237 let field_type = &ssf.generic;
238 let subsystem_consumes = &ssf.message_to_consume();
239 let impl_subsystem_state_generics = recollect_without_idx(&subsystem_passthrough_state_generics[..], idx);
242
243 let field_name_with = format_ident!("{}_with", field_name);
244 let field_name_replace = format_ident!("replace_{}", field_name);
245
246 let mut current_state_generics = subsystem_passthrough_state_generics
249 .iter()
250 .map(|subsystem_state_generic_ty| parse_quote!(#subsystem_state_generic_ty))
251 .collect::<Vec<syn::GenericArgument>>();
252 current_state_generics[idx] = parse_quote! { Missing<#field_type> };
253
254 let mut post_setter_state_generics = current_state_generics.clone();
256 post_setter_state_generics[idx] = parse_quote! { Init<#field_type> };
257
258 let mut post_replace_state_generics = current_state_generics.clone();
259 post_replace_state_generics[idx] = parse_quote! { Init<NEW> };
260
261 let to_keep_subsystem_name = recollect_without_idx(&subsystem_name[..], idx);
264
265 let subsystem_sender_trait = format_ident!("{}SenderTrait", field_type);
266 let _subsystem_ctx_trait = format_ident!("{}ContextTrait", field_type);
267
268 let builder_where_clause = quote!{
269 #field_type : #support_crate::Subsystem< #subsystem_ctx_name< #subsystem_consumes >, #error_ty>,
270 < #subsystem_ctx_name < #subsystem_consumes > as #support_crate :: SubsystemContext>::Sender:
271 #subsystem_sender_trait,
272 };
273
274 quote! {
276 #cfg_guard
277 impl <InitStateSpawner, #field_type, #( #impl_subsystem_state_generics, )* #( #baggage_passthrough_state_generics, )*>
278 #builder <InitStateSpawner, #( #current_state_generics, )* #( #baggage_passthrough_state_generics, )*>
279 where
280 #builder_where_clause
281 {
282 #[allow(clippy::type_complexity)]
284 pub fn #field_name (self, var: #field_type ) ->
285 #builder <InitStateSpawner, #( #post_setter_state_generics, )* #( #baggage_passthrough_state_generics, )*>
286 {
287 #builder {
288 #field_name: Init::< #field_type >::Value(var),
289 #(
290 #to_keep_subsystem_name: self. #to_keep_subsystem_name,
291 )*
292 #(
293 #baggage_name: self. #baggage_name,
294 )*
295 spawner: self.spawner,
296
297 channel_capacity: self.channel_capacity,
298 signal_capacity: self.signal_capacity,
299 }
300 }
301
302 #[allow(clippy::type_complexity)]
304 pub fn #field_name_with<F>(self, subsystem_init_fn: F ) ->
305 #builder <InitStateSpawner, #( #post_setter_state_generics, )* #( #baggage_passthrough_state_generics, )*>
306 where
307 F: 'static + Send + FnOnce(#handle) ->
308 ::std::result::Result<#field_type, #error_ty>,
309 {
310 let boxed_func = Init::<#field_type>::Fn(
311 Box::new(subsystem_init_fn) as SubsystemInitFn<#field_type>
312 );
313 #builder {
314 #field_name: boxed_func,
315 #(
316 #to_keep_subsystem_name: self. #to_keep_subsystem_name,
317 )*
318 #(
319 #baggage_name: self. #baggage_name,
320 )*
321 spawner: self.spawner,
322
323
324 channel_capacity: self.channel_capacity,
325 signal_capacity: self.signal_capacity,
326 }
327 }
328 }
329
330 #[allow(clippy::type_complexity)]
331 #cfg_guard
332 impl <InitStateSpawner, #field_type, #( #impl_subsystem_state_generics, )* #( #baggage_passthrough_state_generics, )*>
333 #builder <InitStateSpawner, #( #post_setter_state_generics, )* #( #baggage_passthrough_state_generics, )*>
334 where
335 #builder_where_clause
336 {
337 pub fn #field_name_replace<NEW, F>(self, gen_replacement_fn: F)
340 -> #builder <InitStateSpawner, #( #post_replace_state_generics, )* #( #baggage_passthrough_state_generics, )*>
341 where
342 #field_type: 'static,
343 F: 'static + Send + FnOnce(#field_type) -> NEW,
344 NEW: #support_crate ::Subsystem<#subsystem_ctx_name< #subsystem_consumes >, #error_ty>,
345 {
346 let replacement: Init<NEW> = match self.#field_name {
347 Init::Fn(fx) =>
348 Init::<NEW>::Fn(Box::new(move |handle: #handle| {
349 let orig = fx(handle)?;
350 Ok(gen_replacement_fn(orig))
351 })),
352 Init::Value(val) =>
353 Init::Value(gen_replacement_fn(val)),
354 };
355 #builder {
356 #field_name: replacement,
357 #(
358 #to_keep_subsystem_name: self. #to_keep_subsystem_name,
359 )*
360 #(
361 #baggage_name: self. #baggage_name,
362 )*
363 spawner: self.spawner,
364
365 channel_capacity: self.channel_capacity,
366 signal_capacity: self.signal_capacity,
367 }
368 }
369 }
370 }
371 });
372
373 let baggage_specific_setters = info.baggage().iter().enumerate().map(|(idx, bag_field)| {
375 let fname = &bag_field.field_name;
377 let field_type = &bag_field.field_ty;
378 let impl_baggage_state_generics = recollect_without_idx(&baggage_passthrough_state_generics[..], idx);
379 let to_keep_baggage_name = recollect_without_idx(&baggage_name[..], idx);
380
381 let mut pre_setter_generics = baggage_passthrough_state_generics
382 .iter()
383 .map(|gen_ty| parse_quote!(#gen_ty))
384 .collect::<Vec<syn::GenericArgument>>();
385 pre_setter_generics[idx] = parse_quote! { Missing<#field_type> };
386
387 let mut post_setter_generics = pre_setter_generics.clone();
388 post_setter_generics[idx] = parse_quote! { Init<#field_type> };
389
390 let preserved_baggage_generics = &bag_field.generic_types;
392 quote! {
393 #cfg_guard
394 impl <InitStateSpawner, #( #preserved_baggage_generics, )* #( #subsystem_passthrough_state_generics, )* #( #impl_baggage_state_generics, )* >
395 #builder <InitStateSpawner, #( #subsystem_passthrough_state_generics, )* #( #pre_setter_generics, )* >
396 {
397 #[allow(clippy::type_complexity)]
399 pub fn #fname (self, var: #field_type ) ->
400 #builder <InitStateSpawner, #( #subsystem_passthrough_state_generics, )* #( #post_setter_generics, )* >
401 {
402 #builder {
403 #fname: Init::<#field_type>::Value(var),
404 #(
405 #subsystem_name: self. #subsystem_name,
406 )*
407 #(
408 #to_keep_baggage_name: self. #to_keep_baggage_name,
409 )*
410 spawner: self.spawner,
411
412 channel_capacity: self.channel_capacity,
413 signal_capacity: self.signal_capacity,
414 }
415 }
416 }
417
418 #cfg_guard
419 impl <InitStateSpawner, #( #preserved_baggage_generics, )* #( #subsystem_passthrough_state_generics, )* #( #impl_baggage_state_generics, )* >
420 #builder <InitStateSpawner, #( #subsystem_passthrough_state_generics, )* #( #post_setter_generics, )* > {
421 #[allow(clippy::type_complexity)]
423 pub fn #fname (self, var: #field_type ) ->
424 #builder <InitStateSpawner, #( #subsystem_passthrough_state_generics, )* #( #post_setter_generics, )* >
425 {
426 #builder {
427 #fname: Init::<#field_type>::Value(var),
428 #(
429 #subsystem_name: self. #subsystem_name,
430 )*
431 #(
432 #to_keep_baggage_name: self. #to_keep_baggage_name,
433 )*
434 spawner: self.spawner,
435
436 channel_capacity: self.channel_capacity,
437 signal_capacity: self.signal_capacity,
438 }
439 }
440 }
441 }
442 });
443
444 let initialized_builder = format_ident!("Initialized{}", builder);
445 let initialized_builder_generics = quote! {
447 S, #( #baggage_generic_ty, )* #( #subsystem_generics, )*
448 };
449
450 let builder_where_clause = cfg_set
451 .enabled_subsystems
452 .iter()
453 .map(|ssf| {
454 let field_type = &ssf.generic;
455 let message_to_consume = &ssf.message_to_consume();
456 let subsystem_sender_trait = format_ident!("{}SenderTrait", ssf.generic);
457 let subsystem_ctx_trait = format_ident!("{}ContextTrait", ssf.generic);
458 quote! {
459 #field_type:
460 #support_crate::Subsystem< #subsystem_ctx_name < #message_to_consume>, #error_ty>,
461 <#subsystem_ctx_name< #message_to_consume > as #subsystem_ctx_trait>::Sender:
462 #subsystem_sender_trait,
463 #subsystem_ctx_name< #message_to_consume >:
464 #subsystem_ctx_trait,
465 }
466 })
467 .fold(TokenStream::new(), |mut ts, addendum| {
468 ts.extend(addendum);
469 ts
470 });
471
472 ts.extend(quote! {
473 #cfg_guard
474 impl<S #(, #baggage_generic_ty )*> #orchestra_name <S #(, #baggage_generic_ty)*>
475 where
476 #spawner_where_clause,
477 {
478 #[allow(clippy::type_complexity)]
480 pub fn builder< #( #subsystem_generics),* >() ->
481 #builder<Missing<S> #(, Missing< #field_type > )* >
482 where
483 #builder_where_clause
484 {
485 #builder :: new()
486 }
487 }
488 });
489
490 ts.extend(quote!{
491 #[allow(clippy::type_complexity)]
493 #cfg_guard
494 pub struct #builder <InitStateSpawner, #( #subsystem_passthrough_state_generics, )* #( #baggage_passthrough_state_generics, )*>
495 {
496 #(
497 #subsystem_name: #subsystem_passthrough_state_generics,
498 )*
499 #(
500 #baggage_name: #baggage_passthrough_state_generics,
501 )*
502 spawner: InitStateSpawner,
503 channel_capacity: Option<usize>,
507 signal_capacity: Option<usize>,
508 }
509 });
510
511 ts.extend(quote! {
512 #[allow(clippy::type_complexity)]
513 #cfg_guard
514 impl<#initialized_builder_generics> #builder<Missing<S>, #( Missing<#field_type>, )*>
515 {
516 fn new() -> Self {
518 fn trait_from_must_be_implemented<E>()
520 where
521 E: ::std::error::Error + Send + Sync + 'static + From<#support_crate ::OrchestraError>
522 {}
523
524 trait_from_must_be_implemented::< #error_ty >();
525
526 Self {
527 #(
528 #field_name: Missing::<#field_type>::default(),
529 )*
530 spawner: Missing::<S>::default(),
531
532 channel_capacity: None,
533 signal_capacity: None,
534 }
535 }
536 }
537 });
538
539 ts.extend(quote!{
541 #[allow(clippy::type_complexity)]
542 #cfg_guard
543 impl<S, #( #subsystem_passthrough_state_generics, )* #( #baggage_passthrough_state_generics, )*>
544 #builder<Missing<S>, #( #subsystem_passthrough_state_generics, )* #( #baggage_passthrough_state_generics, )*>
545 where
546 #spawner_where_clause,
547 {
548 pub fn spawner(self, spawner: S) -> #builder<
550 Init<S>,
551 #( #subsystem_passthrough_state_generics, )*
552 #( #baggage_passthrough_state_generics, )*
553 >
554 {
555 #builder {
556 #(
557 #field_name: self. #field_name,
558 )*
559 spawner: Init::<S>::Value(spawner),
560
561 channel_capacity: self.channel_capacity,
562 signal_capacity: self.signal_capacity,
563 }
564 }
565 }
566 });
567
568 ts.extend(quote! {
570 #[allow(clippy::type_complexity)]
571 #cfg_guard
572 impl<S, #( #subsystem_passthrough_state_generics, )* #( #baggage_passthrough_state_generics, )*>
573 #builder<Init<S>, #( #subsystem_passthrough_state_generics, )* #( #baggage_passthrough_state_generics, )*>
574 where
575 #spawner_where_clause,
576 {
577 pub fn signal_channel_capacity(mut self, capacity: usize) -> Self
581 {
582 self.signal_capacity = Some(capacity);
583 self
584 }
585
586 pub fn message_channel_capacity(mut self, capacity: usize) -> Self
590 {
591 self.channel_capacity = Some(capacity);
592 self
593 }
594 }
595 });
596
597 let subsystem_name_str_literal = subsystem_name
599 .iter()
600 .map(|ident| proc_macro2::Literal::string(ident.to_string().replace("_", "-").as_str()))
601 .collect::<Vec<_>>();
602
603 ts.extend(quote! {
604 #cfg_guard
606 pub type #initialized_builder<#initialized_builder_generics> = #builder<Init<S>, #( Init<#field_type>, )*>;
607
608 #cfg_guard
610 impl<#initialized_builder_generics> #initialized_builder<#initialized_builder_generics>
611 where
612 #spawner_where_clause,
613 #builder_where_clause
614 {
615 pub fn build(self)
617 -> ::std::result::Result<(#orchestra_name<S, #( #baggage_generic_ty, )*>, #handle), #error_ty> {
618 let connector = #connector ::with_event_capacity(
619 self.signal_capacity.unwrap_or(SIGNAL_CHANNEL_CAPACITY)
620 );
621 self.build_with_connector(connector)
622 }
623
624 pub fn build_with_connector(self, connector: #connector)
626 -> ::std::result::Result<(#orchestra_name<S, #( #baggage_generic_ty, )*>, #handle), #error_ty>
627 {
628 let #connector {
629 handle: events_tx,
630 consumer: events_rx,
631 } = connector;
632
633 let (to_orchestra_tx, to_orchestra_rx) = #support_crate ::metered::unbounded::<
634 ToOrchestra
635 >();
636
637 #(
638 let (#channel_name_tx, #channel_name_rx) = if #can_receive_priority_messages {
639 #support_crate ::metered::channel_with_priority::<
640 MessagePacket< #maybe_boxed_consumes >
641 >(
642 self.channel_capacity.unwrap_or(#message_channel_capacity),
643 self.channel_capacity.unwrap_or(#message_channel_capacity)
644 )
645 } else {
646 #support_crate ::metered::channel::<
647 MessagePacket< #maybe_boxed_consumes >
648 >(
649 self.channel_capacity.unwrap_or(#message_channel_capacity)
650 )
651 };
652 )*
653
654 #(
655 let (#channel_name_unbounded_tx, #channel_name_unbounded_rx) =
656 #support_crate ::metered::unbounded::<
657 MessagePacket< #maybe_boxed_consumes >
658 >();
659 )*
660
661 let channels_out =
662 ChannelsOut {
663 #(
664 #channel_name: #channel_name_tx .clone(),
665 )*
666 #(
667 #channel_name_unbounded: #channel_name_unbounded_tx,
668 )*
669 };
670
671 let mut spawner = match self.spawner {
672 Init::Value(value) => value,
673 _ => unreachable!("Only ever init spawner as value. qed"),
674 };
675
676 let mut running_subsystems = #support_crate ::FuturesUnordered::<
677 BoxFuture<'static, ::std::result::Result<(), #error_ty > >
678 >::new();
679
680 #(
681 let #subsystem_name = match self. #subsystem_name {
682 Init::Fn(func) => func(events_tx.clone())?,
683 Init::Value(val) => val,
684 };
685
686 let unbounded_meter = #channel_name_unbounded_rx.meter().clone();
687 let message_rx: SubsystemIncomingMessages< #maybe_boxed_consumes > = #support_crate ::select_with_strategy(
689 #channel_name_rx, #channel_name_unbounded_rx,
690 #support_crate ::select_message_channel_strategy
691 );
692 let (signal_tx, signal_rx) = #support_crate ::metered::channel(
693 self.signal_capacity.unwrap_or(#signal_channel_capacity)
694 );
695
696 let ctx = #subsystem_ctx_name::< #consumes >::new(
697 signal_rx,
698 message_rx,
699 channels_out.clone(),
700 to_orchestra_tx.clone(),
701 #subsystem_name_str_literal
702 );
703
704 let #subsystem_name: OrchestratedSubsystem< #consumes > = spawn::<_,_, #blocking, _, _, _>(
705 &mut spawner,
706 #channel_name_tx,
707 signal_tx,
708 unbounded_meter,
709 ctx,
710 #subsystem_name,
711 #subsystem_name_str_literal,
712 &mut running_subsystems,
713 )?;
714 )*
715
716 std::mem::drop(to_orchestra_tx);
718 std::mem::drop(channels_out);
719
720 use #support_crate ::StreamExt;
721
722 let to_orchestra_rx = to_orchestra_rx.fuse();
723 let orchestra = #orchestra_name {
724 #(
725 #subsystem_name,
726 )*
727
728 #(
729 #baggage_name: match self. #baggage_name {
730 Init::Value(val) => val,
731 _ => panic!("unexpected baggage initialization, must be value"),
732 },
733 )*
734
735 spawner,
736 running_subsystems,
737 events_rx,
738 to_orchestra_rx,
739 };
740
741 Ok((orchestra, events_tx))
742 }
743 }
744 });
745
746 ts.extend(baggage_specific_setters);
747 ts.extend(subsystem_specific_setters);
748 ts
749}
750
751pub(crate) fn impl_task_kind(info: &OrchestraInfo) -> proc_macro2::TokenStream {
752 let signal = &info.extern_signal_ty;
753 let error_ty = &info.extern_error_ty;
754 let support_crate = info.support_crate_name();
755 let maybe_boxed_message_generic: Type = if info.boxed_messages {
756 parse_quote! { ::std::boxed::Box<M> }
757 } else {
758 parse_quote! { M }
759 };
760
761 let ts = quote! {
762 pub trait TaskKind {
764 fn launch_task<S: Spawner>(spawner: &mut S, task_name: &'static str, subsystem_name: &'static str, future: BoxFuture<'static, ()>);
766 }
767
768 #[allow(missing_docs)]
769 struct Regular;
770 impl TaskKind for Regular {
771 fn launch_task<S: Spawner>(spawner: &mut S, task_name: &'static str, subsystem_name: &'static str, future: BoxFuture<'static, ()>) {
772 spawner.spawn(task_name, Some(subsystem_name), future)
773 }
774 }
775
776 #[allow(missing_docs)]
777 struct Blocking;
778 impl TaskKind for Blocking {
779 fn launch_task<S: Spawner>(spawner: &mut S, task_name: &'static str, subsystem_name: &'static str, future: BoxFuture<'static, ()>) {
780 spawner.spawn_blocking(task_name, Some(subsystem_name), future)
781 }
782 }
783
784 #[allow(clippy::too_many_arguments)]
786 pub fn spawn<S, M, TK, Ctx, E, SubSys>(
787 spawner: &mut S,
788 message_tx: #support_crate ::metered::MeteredSender<MessagePacket<#maybe_boxed_message_generic>>,
789 signal_tx: #support_crate ::metered::MeteredSender< #signal >,
790 unbounded_meter: #support_crate ::metered::Meter,
792 ctx: Ctx,
793 subsystem: SubSys,
794 subsystem_name: &'static str,
795 futures: &mut #support_crate ::FuturesUnordered<BoxFuture<'static, ::std::result::Result<(), #error_ty> >>,
796 ) -> ::std::result::Result<OrchestratedSubsystem<M>, #error_ty >
797 where
798 S: #support_crate ::Spawner,
799 M: std::fmt::Debug + Send + 'static,
800 TK: TaskKind,
801 Ctx: #support_crate ::SubsystemContext<Message=M>,
802 E: ::std::error::Error + Send + Sync + 'static + ::std::convert::From<#support_crate ::OrchestraError>,
803 SubSys: #support_crate ::Subsystem<Ctx, E>,
804 {
805 let #support_crate ::SpawnedSubsystem::<E> { future, name } = subsystem.start(ctx);
806
807 let (terminate_tx, terminate_rx) = #support_crate ::oneshot::channel();
808
809 let fut = Box::pin(async move {
810 #[allow(clippy::suspicious_else_formatting)]
811 if let Err(err) = future.await {
812 #support_crate ::tracing::error!(subsystem=subsystem_name, ?err, "subsystem exited with error");
813 let mut source: &(dyn std::error::Error + 'static) = &err as &_;
814 while let Some(err) = source.source() {
815 #support_crate ::tracing::debug!(subsystem=subsystem_name, ?err, "caused (subsystem)");
816 source = err;
817 }
818 } else {
819 #support_crate ::tracing::debug!(subsystem=subsystem_name, "subsystem exited, successfully");
820 }
821 let _ = terminate_tx.send(());
822 });
823
824 <TK as TaskKind>::launch_task(spawner, name, subsystem_name, fut);
825
826 futures.push(Box::pin(
827 terminate_rx.map(move |result| {
828 #support_crate ::tracing::warn!(subsystem=subsystem_name, "Terminating due to subsystem exit");
829 if let Err(err) = result {
830 #support_crate ::tracing::warn!(subsystem=subsystem_name, ?err, "termination error detected, dropping but terminating the execution");
831 }
832 Ok(())
833 })
834 ));
835
836 let instance = Some(SubsystemInstance {
837 meters: #support_crate ::SubsystemMeters {
838 unbounded: unbounded_meter,
839 bounded: message_tx.meter().clone(),
840 signals: signal_tx.meter().clone(),
841 },
842 tx_signal: signal_tx,
843 tx_bounded: message_tx,
844 signals_received: 0,
845 name,
846 });
847
848 Ok(OrchestratedSubsystem {
849 instance,
850 })
851 }
852 };
853
854 ts
855}