orchestra_proc_macro/
impl_channels_out.rs1use quote::quote;
17use syn::Result;
18
19use super::*;
20
21pub(crate) fn impl_channels_out_struct(info: &OrchestraInfo) -> Result<proc_macro2::TokenStream> {
23 let message_wrapper = info.message_wrapper.clone();
24
25 let channel_name = &info.channel_names_without_wip(None);
26 let channel_name_unbounded = &info.channel_names_without_wip("_unbounded");
27
28 let maybe_boxed_consumes = info
29 .consumes_without_wip()
30 .iter()
31 .map(|consume| info.box_message_if_needed(consume, Span::call_site()))
32 .collect::<Vec<_>>();
33
34 let maybe_boxed_send = if info.boxed_messages {
35 quote! { ::std::boxed::Box::new(inner) }
36 } else {
37 quote! { inner }
38 };
39 let maybe_unbox_error = if info.boxed_messages {
40 quote! { *err_inner.message }
41 } else {
42 quote! { err_inner.message }
43 };
44
45 let consumes_variant = &info.variant_names_without_wip();
46 let unconsumes_variant = &info.variant_names_only_wip();
47
48 let feature_gates = info.feature_gates();
49 let support_crate = info.support_crate_name();
50
51 let ts = quote! {
52 #[derive(Debug, Clone)]
56 pub struct ChannelsOut {
57 #(
58 #feature_gates
60 pub #channel_name:
61 #support_crate ::metered::MeteredSender<
62 MessagePacket< #maybe_boxed_consumes >
63 >,
64 )*
65
66 #(
67 #feature_gates
69 pub #channel_name_unbounded:
70 #support_crate ::metered::UnboundedMeteredSender<
71 MessagePacket< #maybe_boxed_consumes >
72 >,
73 )*
74 }
75
76 #[allow(unreachable_code)]
77 impl ChannelsOut {
79 pub async fn send_and_log_error<P: #support_crate ::Priority>(
81 &mut self,
82 signals_received: usize,
83 message: #message_wrapper
84 ) {
85 let res: ::std::result::Result<_, _> = match message {
86 #(
87 #feature_gates
88 #message_wrapper :: #consumes_variant ( inner ) => {
89 match P::priority() {
90 #support_crate ::PriorityLevel::Normal => {
91 self. #channel_name .send(
92 #support_crate ::make_packet(signals_received, #maybe_boxed_send)
93 ).await
94 },
95 #support_crate ::PriorityLevel::High => {
96 self. #channel_name .priority_send(
97 #support_crate ::make_packet(signals_received, #maybe_boxed_send)
98 ).await
99 },
100 }.map_err(|_| stringify!( #channel_name ))
101 }
102 )*
103 #(
105 #message_wrapper :: #unconsumes_variant ( _ ) => Ok(()),
106 )*
107 #message_wrapper :: Empty => Ok(()),
109
110 #[allow(unreachable_patterns)]
111 unused_msg => {
113 #support_crate :: tracing :: warn!("Nothing consumes {:?}", unused_msg);
114 Ok(())
115 }
116 };
117
118 if let Err(subsystem_name) = res {
119 #support_crate ::tracing::debug!(
120 target: LOG_TARGET,
121 "Failed to send (bounded) a message to {} subsystem",
122 subsystem_name
123 );
124 }
125 }
126
127 pub fn try_send<P: #support_crate ::Priority>(
129 &mut self,
130 signals_received: usize,
131 message: #message_wrapper,
132 ) -> ::std::result::Result<(), #support_crate ::metered::TrySendError<#message_wrapper>> {
133 let res: ::std::result::Result<_, _> = match message {
134 #(
135 #feature_gates
136 #message_wrapper :: #consumes_variant ( inner ) => {
137 match P::priority() {
138 #support_crate ::PriorityLevel::Normal => {
139 self. #channel_name .try_send(
140 #support_crate ::make_packet(signals_received, #maybe_boxed_send)
141 )
142 },
143 #support_crate ::PriorityLevel::High => {
144 self. #channel_name .try_priority_send(
145 #support_crate ::make_packet(signals_received, #maybe_boxed_send)
146 )
147 },
148 }.map_err(|err| match err {
149 #support_crate ::metered::TrySendError::Full(err_inner) => #support_crate ::metered::TrySendError::Full(#message_wrapper:: #consumes_variant ( #maybe_unbox_error )),
150 #support_crate ::metered::TrySendError::Closed(err_inner) => #support_crate ::metered::TrySendError::Closed(#message_wrapper:: #consumes_variant ( #maybe_unbox_error )),
151 })
152 }
153 )*
154 #(
156 #message_wrapper :: #unconsumes_variant ( _ ) => Ok(()),
157 )*
158 #message_wrapper :: Empty => Ok(()),
160
161 #[allow(unreachable_patterns)]
162 unused_msg => {
164 #support_crate :: tracing :: warn!("Nothing consumes {:?}", unused_msg);
165 Ok(())
166 }
167 };
168
169 res
170 }
171
172 pub fn send_unbounded_and_log_error(
174 &self,
175 signals_received: usize,
176 message: #message_wrapper,
177 ) {
178 let res: ::std::result::Result<_, _> = match message {
179 #(
180 #feature_gates
181 #message_wrapper :: #consumes_variant (inner) => {
182 self. #channel_name_unbounded .unbounded_send(
183 #support_crate ::make_packet(signals_received, #maybe_boxed_send)
184 )
185 .map_err(|_| stringify!( #channel_name ))
186 },
187 )*
188 #(
190 #message_wrapper :: #unconsumes_variant ( _ ) => Ok(()),
191 )*
192 #message_wrapper :: Empty => Ok(()),
194
195 #[allow(unreachable_patterns)]
197 unused_msg => {
198 #support_crate :: tracing :: warn!("Nothing consumes {:?}", unused_msg);
199 Ok(())
200 }
201 };
202
203 if let Err(subsystem_name) = res {
204 #support_crate ::tracing::debug!(
205 target: LOG_TARGET,
206 "Failed to send_unbounded a message to {} subsystem",
207 subsystem_name
208 );
209 }
210 }
211 }
212
213 };
214 Ok(ts)
215}