orchestra_proc_macro/
impl_orchestra.rs1use quote::quote;
17use syn::Type;
18
19use super::*;
20
21pub(crate) fn impl_orchestra_struct(info: &OrchestraInfo) -> proc_macro2::TokenStream {
22 let message_wrapper = &info.message_wrapper.clone();
23 let orchestra_name = info.orchestra_name.clone();
24 let subsystem_name = &info.subsystem_names_without_wip();
25 let feature_gates = &info.feature_gates();
26 let support_crate = info.support_crate_name();
27
28 let baggage_decl = &info.baggage_decl();
29
30 let baggage_generic_ty = &info.baggage_generic_types();
31
32 let generics = quote! {
33 < S, #( #baggage_generic_ty, )* >
34 };
35
36 let where_clause = quote! {
37 where
38 S: #support_crate ::Spawner,
39 };
40 let consumes = &info.consumes_without_wip();
44 let consumes_variant = &info.variant_names_without_wip();
45 let unconsumes_variant = &info.variant_names_only_wip();
46
47 let signal_ty = &info.extern_signal_ty;
48
49 let error_ty = &info.extern_error_ty;
50
51 let event_ty = &info.extern_event_ty;
52
53 let signal_channel_capacity = info.signal_channel_capacity;
54
55 let log_target =
56 syn::LitStr::new(orchestra_name.to_string().to_lowercase().as_str(), orchestra_name.span());
57
58 let ts = quote! {
59 const SIGNAL_CHANNEL_CAPACITY: usize = #signal_channel_capacity;
64 const SIGNAL_TIMEOUT: ::std::time::Duration = ::std::time::Duration::from_secs(10);
67
68 const LOG_TARGET: &str = #log_target;
70
71 pub struct #orchestra_name #generics {
73
74 #(
75 #feature_gates
77 #subsystem_name: OrchestratedSubsystem< #consumes >,
78 )*
79
80 #(
81 #baggage_decl ,
83 )*
84
85 spawner: S,
87
88 running_subsystems: #support_crate ::FuturesUnordered<
90 BoxFuture<'static, ::std::result::Result<(), #error_ty>>
91 >,
92
93 to_orchestra_rx: #support_crate ::stream::Fuse<
95 #support_crate ::metered::UnboundedMeteredReceiver< #support_crate ::ToOrchestra >
96 >,
97
98 events_rx: #support_crate ::metered::MeteredReceiver< #event_ty >,
100 }
101
102 impl #generics #orchestra_name #generics #where_clause {
103 pub async fn wait_terminate(&mut self, signal: #signal_ty, timeout: ::std::time::Duration) -> ::std::result::Result<(), #error_ty > {
109 #(
110 #feature_gates
111 ::std::mem::drop(self. #subsystem_name .send_signal(signal.clone()).await);
112 )*
113 let _ = signal;
114
115 let mut timeout_fut = #support_crate ::Delay::new(
116 timeout
117 ).fuse();
118
119 loop {
120 #support_crate ::futures::select! {
121 _ = self.running_subsystems.next() =>
122 if self.running_subsystems.is_empty() {
123 break;
124 },
125 _ = timeout_fut => break,
126 complete => break,
127 }
128 }
129
130 Ok(())
131 }
132
133 pub async fn broadcast_signal<'a>(&'a mut self, signal: #signal_ty) -> ::std::result::Result<(), #error_ty > {
135 let mut delayed_signals : #support_crate ::futures::stream::FuturesUnordered< #support_crate ::futures::future::BoxFuture<'a, ::std::result::Result<(), #error_ty>>>
136 = #support_crate ::futures::stream::FuturesUnordered::new();
137 #(
138 #feature_gates
140 if let Err(e) = self. #subsystem_name .try_send_signal(signal.clone()) {
141 match e {
142 #support_crate::TrySendError::Full(sig) => {
143 let instance = self. #subsystem_name .instance.as_mut().expect("checked in try_send_signal");
144 delayed_signals.push(::std::boxed::Box::pin(async move {
145 match instance.tx_signal.send(sig).timeout(SIGNAL_TIMEOUT).await {
146 None => {
147 Err(#error_ty :: from(
148 #support_crate ::OrchestraError::SubsystemStalled(instance.name, "signal", ::std::any::type_name::<#signal_ty>())
149 ))
150 }
151 Some(res) => {
152 let res = res.map_err(|_| #error_ty :: from(
153 #support_crate ::OrchestraError::QueueError
154 ));
155 if res.is_ok() {
156 instance.signals_received += 1;
157 }
158 res
159 }
160 }
161 }));
162 },
163 _ => return Err(#error_ty :: from(#support_crate ::OrchestraError::QueueError))
164 }
165 }
166 )*
167 let _ = signal;
168
169 while let Some(res) = delayed_signals.next().await {
171 res?;
172 }
173
174 Ok(())
175 }
176
177 pub async fn route_message(&mut self, message: #message_wrapper, origin: &'static str) -> ::std::result::Result<(), #error_ty > {
179 match message {
180 #(
181 #feature_gates
182 #message_wrapper :: #consumes_variant ( inner ) =>
183 OrchestratedSubsystem::< #consumes >::send_message2(&mut self. #subsystem_name, inner, origin ).await?,
184 )*
185 #(
187 #feature_gates
188 #message_wrapper :: #unconsumes_variant ( _ ) => {}
189 )*
190 #message_wrapper :: Empty => {}
191
192 #[allow(unreachable_patterns)]
194 unused_msg => {
195 #support_crate :: tracing :: warn!("Nothing consumes {:?}", unused_msg);
196 }
197 }
198 Ok(())
199 }
200
201 pub fn map_subsystems<'a, Mapper, Output>(&'a self, mapper: Mapper)
203 -> Vec<Output>
204 where
205 #(
206 Mapper: MapSubsystem<&'a OrchestratedSubsystem< #consumes >, Output=Output>,
207 )*
208 {
209 vec![
210 #(
211 #feature_gates
212 mapper.map_subsystem( & self. #subsystem_name ),
213 )*
214 ]
215 }
216
217 pub fn spawner (&mut self) -> &mut S {
219 &mut self.spawner
220 }
221 }
222
223 };
224
225 ts
226}
227
228pub(crate) fn impl_orchestrated_subsystem(info: &OrchestraInfo) -> proc_macro2::TokenStream {
229 let signal = &info.extern_signal_ty;
230 let error_ty = &info.extern_error_ty;
231 let support_crate = info.support_crate_name();
232
233 let maybe_boxed_message_generic: Type = if info.boxed_messages {
234 parse_quote! { ::std::boxed::Box<M> }
235 } else {
236 parse_quote! { M }
237 };
238
239 let maybe_boxed_message = if info.boxed_messages {
240 quote! { ::std::boxed::Box::new(message) }
241 } else {
242 quote! { message }
243 };
244
245 let ts = quote::quote! {
246 pub struct OrchestratedSubsystem<M> {
254 pub instance: std::option::Option<
256 #support_crate ::SubsystemInstance<#maybe_boxed_message_generic, #signal>
257 >,
258 }
259
260 impl<M> OrchestratedSubsystem<M> {
261 pub async fn send_message2(&mut self, message: M, origin: &'static str) -> ::std::result::Result<(), #error_ty > {
265 const MESSAGE_TIMEOUT: Duration = Duration::from_secs(10);
266
267 if let Some(ref mut instance) = self.instance {
268 match instance.tx_bounded.send(MessagePacket {
269 signals_received: instance.signals_received,
270 message: #maybe_boxed_message,
271 }).timeout(MESSAGE_TIMEOUT).await
272 {
273 None => {
274 #support_crate ::tracing::error!(
275 target: LOG_TARGET,
276 %origin,
277 "Subsystem {} appears unresponsive when sending a message of type {}.",
278 instance.name,
279 ::std::any::type_name::<M>(),
280 );
281 Err(#error_ty :: from(
282 #support_crate ::OrchestraError::SubsystemStalled(instance.name, "message", ::std::any::type_name::<M>())
283 ))
284 }
285 Some(res) => res.map_err(|_| #error_ty :: from(
286 #support_crate ::OrchestraError::QueueError
287 )),
288 }
289 } else {
290 Ok(())
291 }
292 }
293
294 pub fn try_send_signal(&mut self, signal: #signal) -> ::std::result::Result<(), #support_crate :: TrySendError<#signal> > {
296 if let Some(ref mut instance) = self.instance {
297 instance.tx_signal.try_send(signal)?;
298 instance.signals_received += 1;
299 Ok(())
300 } else {
301 Ok(())
302 }
303 }
304
305 pub async fn send_signal(&mut self, signal: #signal) -> ::std::result::Result<(), #error_ty > {
309 if let Some(ref mut instance) = self.instance {
310 match instance.tx_signal.send(signal).timeout(SIGNAL_TIMEOUT).await {
311 None => {
312 Err(#error_ty :: from(
313 #support_crate ::OrchestraError::SubsystemStalled(instance.name, "signal", ::std::any::type_name::<#signal>())
314 ))
315 }
316 Some(res) => {
317 let res = res.map_err(|_| #error_ty :: from(
318 #support_crate ::OrchestraError::QueueError
319 ));
320 if res.is_ok() {
321 instance.signals_received += 1;
322 }
323 res
324 }
325 }
326 } else {
327 Ok(())
328 }
329 }
330 }
331 };
332 ts
333}