use quote::quote;
use syn::Type;
use super::*;
pub(crate) fn impl_orchestra_struct(info: &OrchestraInfo) -> proc_macro2::TokenStream {
let message_wrapper = &info.message_wrapper.clone();
let orchestra_name = info.orchestra_name.clone();
let subsystem_name = &info.subsystem_names_without_wip();
let feature_gates = &info.feature_gates();
let support_crate = info.support_crate_name();
let baggage_decl = &info.baggage_decl();
let baggage_generic_ty = &info.baggage_generic_types();
let generics = quote! {
< S, #( #baggage_generic_ty, )* >
};
let where_clause = quote! {
where
S: #support_crate ::Spawner,
};
let consumes = &info.consumes_without_wip();
let consumes_variant = &info.variant_names_without_wip();
let unconsumes_variant = &info.variant_names_only_wip();
let signal_ty = &info.extern_signal_ty;
let error_ty = &info.extern_error_ty;
let event_ty = &info.extern_event_ty;
let signal_channel_capacity = info.signal_channel_capacity;
let log_target =
syn::LitStr::new(orchestra_name.to_string().to_lowercase().as_str(), orchestra_name.span());
let ts = quote! {
const SIGNAL_CHANNEL_CAPACITY: usize = #signal_channel_capacity;
const SIGNAL_TIMEOUT: ::std::time::Duration = ::std::time::Duration::from_secs(10);
const LOG_TARGET: &str = #log_target;
pub struct #orchestra_name #generics {
#(
#feature_gates
#subsystem_name: OrchestratedSubsystem< #consumes >,
)*
#(
#baggage_decl ,
)*
spawner: S,
running_subsystems: #support_crate ::FuturesUnordered<
BoxFuture<'static, ::std::result::Result<(), #error_ty>>
>,
to_orchestra_rx: #support_crate ::stream::Fuse<
#support_crate ::metered::UnboundedMeteredReceiver< #support_crate ::ToOrchestra >
>,
events_rx: #support_crate ::metered::MeteredReceiver< #event_ty >,
}
impl #generics #orchestra_name #generics #where_clause {
pub async fn wait_terminate(&mut self, signal: #signal_ty, timeout: ::std::time::Duration) -> ::std::result::Result<(), #error_ty > {
#(
#feature_gates
::std::mem::drop(self. #subsystem_name .send_signal(signal.clone()).await);
)*
let _ = signal;
let mut timeout_fut = #support_crate ::Delay::new(
timeout
).fuse();
loop {
#support_crate ::futures::select! {
_ = self.running_subsystems.next() =>
if self.running_subsystems.is_empty() {
break;
},
_ = timeout_fut => break,
complete => break,
}
}
Ok(())
}
pub async fn broadcast_signal<'a>(&'a mut self, signal: #signal_ty) -> ::std::result::Result<(), #error_ty > {
let mut delayed_signals : #support_crate ::futures::stream::FuturesUnordered< #support_crate ::futures::future::BoxFuture<'a, ::std::result::Result<(), #error_ty>>>
= #support_crate ::futures::stream::FuturesUnordered::new();
#(
#feature_gates
if let Err(e) = self. #subsystem_name .try_send_signal(signal.clone()) {
match e {
#support_crate::TrySendError::Full(sig) => {
let instance = self. #subsystem_name .instance.as_mut().expect("checked in try_send_signal");
delayed_signals.push(::std::boxed::Box::pin(async move {
match instance.tx_signal.send(sig).timeout(SIGNAL_TIMEOUT).await {
None => {
Err(#error_ty :: from(
#support_crate ::OrchestraError::SubsystemStalled(instance.name, "signal", ::std::any::type_name::<#signal_ty>())
))
}
Some(res) => {
let res = res.map_err(|_| #error_ty :: from(
#support_crate ::OrchestraError::QueueError
));
if res.is_ok() {
instance.signals_received += 1;
}
res
}
}
}));
},
_ => return Err(#error_ty :: from(#support_crate ::OrchestraError::QueueError))
}
}
)*
let _ = signal;
while let Some(res) = delayed_signals.next().await {
res?;
}
Ok(())
}
pub async fn route_message(&mut self, message: #message_wrapper, origin: &'static str) -> ::std::result::Result<(), #error_ty > {
match message {
#(
#feature_gates
#message_wrapper :: #consumes_variant ( inner ) =>
OrchestratedSubsystem::< #consumes >::send_message2(&mut self. #subsystem_name, inner, origin ).await?,
)*
#(
#feature_gates
#message_wrapper :: #unconsumes_variant ( _ ) => {}
)*
#message_wrapper :: Empty => {}
#[allow(unreachable_patterns)]
unused_msg => {
#support_crate :: tracing :: warn!("Nothing consumes {:?}", unused_msg);
}
}
Ok(())
}
pub fn map_subsystems<'a, Mapper, Output>(&'a self, mapper: Mapper)
-> Vec<Output>
where
#(
Mapper: MapSubsystem<&'a OrchestratedSubsystem< #consumes >, Output=Output>,
)*
{
vec![
#(
#feature_gates
mapper.map_subsystem( & self. #subsystem_name ),
)*
]
}
pub fn spawner (&mut self) -> &mut S {
&mut self.spawner
}
}
};
ts
}
pub(crate) fn impl_orchestrated_subsystem(info: &OrchestraInfo) -> proc_macro2::TokenStream {
let signal = &info.extern_signal_ty;
let error_ty = &info.extern_error_ty;
let support_crate = info.support_crate_name();
let maybe_boxed_message_generic: Type = if info.boxed_messages {
parse_quote! { ::std::boxed::Box<M> }
} else {
parse_quote! { M }
};
let maybe_boxed_message = if info.boxed_messages {
quote! { ::std::boxed::Box::new(message) }
} else {
quote! { message }
};
let ts = quote::quote! {
pub struct OrchestratedSubsystem<M> {
pub instance: std::option::Option<
#support_crate ::SubsystemInstance<#maybe_boxed_message_generic, #signal>
>,
}
impl<M> OrchestratedSubsystem<M> {
pub async fn send_message2(&mut self, message: M, origin: &'static str) -> ::std::result::Result<(), #error_ty > {
const MESSAGE_TIMEOUT: Duration = Duration::from_secs(10);
if let Some(ref mut instance) = self.instance {
match instance.tx_bounded.send(MessagePacket {
signals_received: instance.signals_received,
message: #maybe_boxed_message,
}).timeout(MESSAGE_TIMEOUT).await
{
None => {
#support_crate ::tracing::error!(
target: LOG_TARGET,
%origin,
"Subsystem {} appears unresponsive when sending a message of type {}.",
instance.name,
::std::any::type_name::<M>(),
);
Err(#error_ty :: from(
#support_crate ::OrchestraError::SubsystemStalled(instance.name, "message", ::std::any::type_name::<M>())
))
}
Some(res) => res.map_err(|_| #error_ty :: from(
#support_crate ::OrchestraError::QueueError
)),
}
} else {
Ok(())
}
}
pub fn try_send_signal(&mut self, signal: #signal) -> ::std::result::Result<(), #support_crate :: TrySendError<#signal> > {
if let Some(ref mut instance) = self.instance {
instance.tx_signal.try_send(signal)?;
instance.signals_received += 1;
Ok(())
} else {
Ok(())
}
}
pub async fn send_signal(&mut self, signal: #signal) -> ::std::result::Result<(), #error_ty > {
if let Some(ref mut instance) = self.instance {
match instance.tx_signal.send(signal).timeout(SIGNAL_TIMEOUT).await {
None => {
Err(#error_ty :: from(
#support_crate ::OrchestraError::SubsystemStalled(instance.name, "signal", ::std::any::type_name::<#signal>())
))
}
Some(res) => {
let res = res.map_err(|_| #error_ty :: from(
#support_crate ::OrchestraError::QueueError
));
if res.is_ok() {
instance.signals_received += 1;
}
res
}
}
} else {
Ok(())
}
}
}
};
ts
}