orchestra_proc_macro/
impl_orchestra.rs

1// Copyright (C) 2021 Parity Technologies (UK) Ltd.
2// SPDX-License-Identifier: Apache-2.0
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use quote::quote;
17use syn::Type;
18
19use super::*;
20
21pub(crate) fn impl_orchestra_struct(info: &OrchestraInfo) -> proc_macro2::TokenStream {
22	let message_wrapper = &info.message_wrapper.clone();
23	let orchestra_name = info.orchestra_name.clone();
24	let subsystem_name = &info.subsystem_names_without_wip();
25	let feature_gates = &info.feature_gates();
26	let support_crate = info.support_crate_name();
27
28	let baggage_decl = &info.baggage_decl();
29
30	let baggage_generic_ty = &info.baggage_generic_types();
31
32	let generics = quote! {
33		< S, #( #baggage_generic_ty, )* >
34	};
35
36	let where_clause = quote! {
37		where
38			S: #support_crate ::Spawner,
39	};
40	// TODO add `where ..` clauses for baggage types
41	// TODO <https://github.com/paritytech/polkadot/issues/3427>
42
43	let consumes = &info.consumes_without_wip();
44	let consumes_variant = &info.variant_names_without_wip();
45	let unconsumes_variant = &info.variant_names_only_wip();
46
47	let signal_ty = &info.extern_signal_ty;
48
49	let error_ty = &info.extern_error_ty;
50
51	let event_ty = &info.extern_event_ty;
52
53	let signal_channel_capacity = info.signal_channel_capacity;
54
55	let log_target =
56		syn::LitStr::new(orchestra_name.to_string().to_lowercase().as_str(), orchestra_name.span());
57
58	let ts = quote! {
59		// without `cargo fmt`, there will be some weirdness around else brackets
60		// that does not originate from how we create it
61
62		/// Capacity of a signal channel between a subsystem and the orchestra.
63		const SIGNAL_CHANNEL_CAPACITY: usize = #signal_channel_capacity;
64		/// Timeout to wait for a signal to be processed by the target subsystem. If this timeout is exceeded, the
65		/// orchestra terminates with an error.
66		const SIGNAL_TIMEOUT: ::std::time::Duration = ::std::time::Duration::from_secs(10);
67
68		/// The log target tag.
69		const LOG_TARGET: &str = #log_target;
70
71		/// The orchestra.
72		pub struct #orchestra_name #generics {
73
74			#(
75				/// A subsystem instance.
76				#feature_gates
77				#subsystem_name: OrchestratedSubsystem< #consumes >,
78			)*
79
80			#(
81				/// A user specified addendum field.
82				#baggage_decl ,
83			)*
84
85			/// Responsible for driving the subsystem futures.
86			spawner: S,
87
88			/// The set of running subsystems.
89			running_subsystems: #support_crate ::FuturesUnordered<
90				BoxFuture<'static, ::std::result::Result<(), #error_ty>>
91			>,
92
93			/// Gather running subsystems' outbound streams into one.
94			to_orchestra_rx: #support_crate ::stream::Fuse<
95				#support_crate ::metered::UnboundedMeteredReceiver< #support_crate ::ToOrchestra >
96			>,
97
98			/// Events that are sent to the orchestra from the outside world.
99			events_rx: #support_crate ::metered::MeteredReceiver< #event_ty >,
100		}
101
102		impl #generics #orchestra_name #generics #where_clause {
103			/// Send the given signal, a termination signal, to all subsystems
104			/// and wait for all subsystems to go down.
105			///
106			/// The definition of a termination signal is up to the user and
107			/// implementation specific.
108			pub async fn wait_terminate(&mut self, signal: #signal_ty, timeout: ::std::time::Duration) -> ::std::result::Result<(), #error_ty > {
109				#(
110					#feature_gates
111					::std::mem::drop(self. #subsystem_name .send_signal(signal.clone()).await);
112				)*
113				let _ = signal;
114
115				let mut timeout_fut = #support_crate ::Delay::new(
116						timeout
117					).fuse();
118
119				loop {
120					#support_crate ::futures::select! {
121						_ = self.running_subsystems.next() =>
122						if self.running_subsystems.is_empty() {
123							break;
124						},
125						_ = timeout_fut => break,
126						complete => break,
127					}
128				}
129
130				Ok(())
131			}
132
133			/// Broadcast a signal to all subsystems.
134			pub async fn broadcast_signal<'a>(&'a mut self, signal: #signal_ty) -> ::std::result::Result<(), #error_ty > {
135				let mut delayed_signals : #support_crate ::futures::stream::FuturesUnordered< #support_crate ::futures::future::BoxFuture<'a, ::std::result::Result<(), #error_ty>>>
136					= #support_crate ::futures::stream::FuturesUnordered::new();
137				#(
138					// Use fast path if possible.
139					#feature_gates
140					if let Err(e) = self. #subsystem_name .try_send_signal(signal.clone()) {
141							match e {
142								#support_crate::TrySendError::Full(sig) => {
143									let instance = self. #subsystem_name .instance.as_mut().expect("checked in try_send_signal");
144									delayed_signals.push(::std::boxed::Box::pin(async move {
145										match instance.tx_signal.send(sig).timeout(SIGNAL_TIMEOUT).await {
146											None => {
147												Err(#error_ty :: from(
148													#support_crate ::OrchestraError::SubsystemStalled(instance.name, "signal", ::std::any::type_name::<#signal_ty>())
149												))
150											}
151											Some(res) => {
152												let res = res.map_err(|_| #error_ty :: from(
153													#support_crate ::OrchestraError::QueueError
154												));
155												if res.is_ok() {
156													instance.signals_received += 1;
157												}
158												res
159											}
160										}
161									}));
162								},
163								_ => return Err(#error_ty :: from(#support_crate ::OrchestraError::QueueError))
164							}
165					}
166				)*
167				let _ = signal;
168
169				// If fast path failed, wait for all delayed signals with no specific order
170				while let Some(res) = delayed_signals.next().await {
171					res?;
172				}
173
174				Ok(())
175			}
176
177			/// Route a particular message to a subsystem that consumes the message.
178			pub async fn route_message(&mut self, message: #message_wrapper, origin: &'static str) -> ::std::result::Result<(), #error_ty > {
179				match message {
180					#(
181						#feature_gates
182						#message_wrapper :: #consumes_variant ( inner ) =>
183							OrchestratedSubsystem::< #consumes >::send_message2(&mut self. #subsystem_name, inner, origin ).await?,
184					)*
185					// subsystems that are still work in progress
186					#(
187						#feature_gates
188						#message_wrapper :: #unconsumes_variant ( _ ) => {}
189					)*
190					#message_wrapper :: Empty => {}
191
192					// And everything that's not WIP but no subsystem consumes it
193					#[allow(unreachable_patterns)]
194					unused_msg => {
195						#support_crate :: tracing :: warn!("Nothing consumes {:?}", unused_msg);
196					}
197				}
198				Ok(())
199			}
200
201			/// Extract information from each subsystem.
202			pub fn map_subsystems<'a, Mapper, Output>(&'a self, mapper: Mapper)
203			-> Vec<Output>
204				where
205				#(
206					Mapper: MapSubsystem<&'a OrchestratedSubsystem< #consumes >, Output=Output>,
207				)*
208			{
209				vec![
210				#(
211					#feature_gates
212					mapper.map_subsystem( & self. #subsystem_name ),
213				)*
214				]
215			}
216
217			/// Get access to internal task spawner.
218			pub fn spawner (&mut self) -> &mut S {
219				&mut self.spawner
220			}
221		}
222
223	};
224
225	ts
226}
227
228pub(crate) fn impl_orchestrated_subsystem(info: &OrchestraInfo) -> proc_macro2::TokenStream {
229	let signal = &info.extern_signal_ty;
230	let error_ty = &info.extern_error_ty;
231	let support_crate = info.support_crate_name();
232
233	let maybe_boxed_message_generic: Type = if info.boxed_messages {
234		parse_quote! { ::std::boxed::Box<M> }
235	} else {
236		parse_quote! { M }
237	};
238
239	let maybe_boxed_message = if info.boxed_messages {
240		quote! { ::std::boxed::Box::new(message) }
241	} else {
242		quote! { message }
243	};
244
245	let ts = quote::quote! {
246		/// A subsystem that the orchestrator orchestrates.
247		///
248		/// Ties together the [`Subsystem`] itself and it's running instance
249		/// (which may be missing if the [`Subsystem`] is not running at the moment
250		/// for whatever reason).
251		///
252		/// [`Subsystem`]: trait.Subsystem.html
253		pub struct OrchestratedSubsystem<M> {
254			/// The instance.
255			pub instance: std::option::Option<
256				#support_crate ::SubsystemInstance<#maybe_boxed_message_generic, #signal>
257			>,
258		}
259
260		impl<M> OrchestratedSubsystem<M> {
261			/// Send a message to the wrapped subsystem.
262			///
263			/// If the inner `instance` is `None`, nothing is happening.
264			pub async fn send_message2(&mut self, message: M, origin: &'static str) -> ::std::result::Result<(), #error_ty > {
265				const MESSAGE_TIMEOUT: Duration = Duration::from_secs(10);
266
267				if let Some(ref mut instance) = self.instance {
268					match instance.tx_bounded.send(MessagePacket {
269						signals_received: instance.signals_received,
270						message: #maybe_boxed_message,
271					}).timeout(MESSAGE_TIMEOUT).await
272					{
273						None => {
274							#support_crate ::tracing::error!(
275								target: LOG_TARGET,
276								%origin,
277								"Subsystem {} appears unresponsive when sending a message of type {}.",
278								instance.name,
279								::std::any::type_name::<M>(),
280							);
281							Err(#error_ty :: from(
282								#support_crate ::OrchestraError::SubsystemStalled(instance.name, "message", ::std::any::type_name::<M>())
283							))
284						}
285						Some(res) => res.map_err(|_| #error_ty :: from(
286								#support_crate ::OrchestraError::QueueError
287							)),
288					}
289				} else {
290					Ok(())
291				}
292			}
293
294			/// Tries to send a signal to the wrapped subsystem without waiting.
295			pub fn try_send_signal(&mut self, signal: #signal) -> ::std::result::Result<(), #support_crate :: TrySendError<#signal> > {
296				if let Some(ref mut instance) = self.instance {
297					instance.tx_signal.try_send(signal)?;
298					instance.signals_received += 1;
299					Ok(())
300				} else {
301					Ok(())
302				}
303			}
304
305			/// Send a signal to the wrapped subsystem.
306			///
307			/// If the inner `instance` is `None`, nothing is happening.
308			pub async fn send_signal(&mut self, signal: #signal) -> ::std::result::Result<(), #error_ty > {
309				if let Some(ref mut instance) = self.instance {
310					match instance.tx_signal.send(signal).timeout(SIGNAL_TIMEOUT).await {
311						None => {
312							Err(#error_ty :: from(
313								#support_crate ::OrchestraError::SubsystemStalled(instance.name, "signal", ::std::any::type_name::<#signal>())
314							))
315						}
316						Some(res) => {
317							let res = res.map_err(|_| #error_ty :: from(
318								#support_crate ::OrchestraError::QueueError
319							));
320							if res.is_ok() {
321								instance.signals_received += 1;
322							}
323							res
324						}
325					}
326				} else {
327					Ok(())
328				}
329			}
330		}
331	};
332	ts
333}