1use polkadot_node_subsystem::*;
24pub use polkadot_node_subsystem::{messages::*, overseer, FromOrchestra};
25use std::{collections::VecDeque, future::Future, pin::Pin};
26
27pub trait MessageInterceptor<Sender>: Send + Sync + Clone + 'static
29where
30 Sender: overseer::SubsystemSender<<Self::Message as overseer::AssociateOutgoing>::OutgoingMessages>
31 + Clone
32 + 'static,
33{
34 type Message: overseer::AssociateOutgoing + Send + 'static;
36
37 fn intercept_incoming(
43 &self,
44 _sender: &mut Sender,
45 msg: FromOrchestra<Self::Message>,
46 ) -> Option<FromOrchestra<Self::Message>> {
47 Some(msg)
48 }
49
50 fn need_intercept_outgoing(
53 &self,
54 _msg: &<Self::Message as overseer::AssociateOutgoing>::OutgoingMessages,
55 ) -> bool {
56 false
57 }
58 fn intercept_outgoing(
60 &self,
61 _msg: &<Self::Message as overseer::AssociateOutgoing>::OutgoingMessages,
62 ) -> Option<<Self::Message as overseer::AssociateOutgoing>::OutgoingMessages> {
63 None
64 }
65}
66
67#[derive(Clone)]
69pub struct InterceptedSender<Sender, Fil> {
70 inner: Sender,
71 message_filter: Fil,
72}
73
74#[async_trait::async_trait]
75impl<OutgoingMessage, Sender, Fil> overseer::SubsystemSender<OutgoingMessage> for InterceptedSender<Sender, Fil>
76where
77 OutgoingMessage: overseer::AssociateOutgoing + Send + 'static + TryFrom<overseer::AllMessages>,
78 Sender: overseer::SubsystemSender<OutgoingMessage>
79 + overseer::SubsystemSender<
80 <
81 <Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
82 >::OutgoingMessages
83 >,
84 Fil: MessageInterceptor<Sender>,
85 <Fil as MessageInterceptor<Sender>>::Message: overseer::AssociateOutgoing,
86 <
87 <Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
88 >::OutgoingMessages:
89 From<OutgoingMessage> + Send + Sync,
90 <OutgoingMessage as TryFrom<overseer::AllMessages>>::Error: std::fmt::Debug,
91{
92 async fn send_message(&mut self, msg: OutgoingMessage) {
93 self.send_message_with_priority::<overseer::NormalPriority>(msg).await;
94 }
95
96 async fn send_message_with_priority<P: Priority>(&mut self, msg: OutgoingMessage) {
97 let msg = <
98 <<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
99 >::OutgoingMessages as From<OutgoingMessage>>::from(msg);
100 if self.message_filter.need_intercept_outgoing(&msg) {
101 if let Some(msg) = self.message_filter.intercept_outgoing(&msg) {
102 self.inner.send_message(msg).await;
103 }
104 }
105 else {
106 self.inner.send_message(msg).await;
107 }
108 }
109
110 fn try_send_message(
111 &mut self,
112 msg: OutgoingMessage,
113 ) -> Result<(), polkadot_node_subsystem_util::metered::TrySendError<OutgoingMessage>> {
114 self.try_send_message_with_priority::<overseer::NormalPriority>(msg)
115 }
116
117 fn try_send_message_with_priority<P: Priority>(&mut self, msg: OutgoingMessage) -> Result<(), TrySendError<OutgoingMessage>> {
118 let msg = <
119 <<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
120 >::OutgoingMessages as From<OutgoingMessage>>::from(msg);
121 if self.message_filter.need_intercept_outgoing(&msg) {
122 if let Some(real_msg) = self.message_filter.intercept_outgoing(&msg) {
123 let orig_msg : OutgoingMessage = msg.into().try_into().expect("must be able to recover the original message");
124 self.inner.try_send_message(real_msg).map_err(|e| {
125 match e {
126 TrySendError::Full(_) => TrySendError::Full(orig_msg),
127 TrySendError::Closed(_) => TrySendError::Closed(orig_msg),
128 }
129 })
130 }
131 else {
132 Ok(())
134 }
135 }
136 else {
137 let orig_msg : OutgoingMessage = msg.into().try_into().expect("must be able to recover the original message");
138 self.inner.try_send_message(orig_msg)
139 }
140 }
141
142 async fn send_messages<T>(&mut self, msgs: T)
143 where
144 T: IntoIterator<Item = OutgoingMessage> + Send,
145 T::IntoIter: Send,
146 {
147 for msg in msgs {
148 self.send_message(msg).await;
149 }
150 }
151
152 fn send_unbounded_message(&mut self, msg: OutgoingMessage) {
153 let msg = <
154 <<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
155 >::OutgoingMessages as From<OutgoingMessage>>::from(msg);
156 if self.message_filter.need_intercept_outgoing(&msg) {
157 if let Some(msg) = self.message_filter.intercept_outgoing(&msg) {
158 self.inner.send_unbounded_message(msg);
159 }
160 }
161 else {
162 self.inner.send_unbounded_message(msg);
163 }
164 }
165}
166
167pub struct InterceptedContext<Context, Fil>
169where
170 Context: overseer::SubsystemContext<Error=SubsystemError, Signal=OverseerSignal>,
171 Fil: MessageInterceptor<<Context as overseer::SubsystemContext>::Sender>,
172 <Context as overseer::SubsystemContext>::Sender:
173 overseer::SubsystemSender<
174 <
175 <
176 Fil as MessageInterceptor<<Context as overseer::SubsystemContext>::Sender>
177 >::Message as overseer::AssociateOutgoing
178 >::OutgoingMessages,
179 >,
180{
181 inner: Context,
182 message_filter: Fil,
183 sender: InterceptedSender<<Context as overseer::SubsystemContext>::Sender, Fil>,
184 message_buffer: VecDeque<FromOrchestra<<Context as overseer::SubsystemContext>::Message>>,
185}
186
187impl<Context, Fil> InterceptedContext<Context, Fil>
188where
189 Context: overseer::SubsystemContext<Error=SubsystemError,Signal=OverseerSignal>,
190 Fil: MessageInterceptor<
191 <Context as overseer::SubsystemContext>::Sender,
192 Message = <Context as overseer::SubsystemContext>::Message,
193 >,
194 <Context as overseer::SubsystemContext>::Message: overseer::AssociateOutgoing,
195 <Context as overseer::SubsystemContext>::Sender: overseer::SubsystemSender<
196 <<Context as overseer::SubsystemContext>::Message as overseer::AssociateOutgoing>::OutgoingMessages
197 >
198{
199 pub fn new(mut inner: Context, message_filter: Fil) -> Self {
200 let sender = InterceptedSender::<<Context as overseer::SubsystemContext>::Sender, Fil> {
201 inner: inner.sender().clone(),
202 message_filter: message_filter.clone(),
203 };
204 Self { inner, message_filter, sender, message_buffer: VecDeque::new() }
205 }
206}
207
208#[async_trait::async_trait]
209impl<Context, Fil> overseer::SubsystemContext for InterceptedContext<Context, Fil>
210where
211 Context: overseer::SubsystemContext<Error=SubsystemError,Signal=OverseerSignal>,
212 <Context as overseer::SubsystemContext>::Message:
213 overseer::AssociateOutgoing,
214 <Context as overseer::SubsystemContext>::Sender:
215 overseer::SubsystemSender<
216 <<Context as overseer::SubsystemContext>::Message as overseer::AssociateOutgoing>::OutgoingMessages
217 >,
218 InterceptedSender<<Context as overseer::SubsystemContext>::Sender, Fil>:
219 overseer::SubsystemSender<
220 <<Context as overseer::SubsystemContext>::Message as overseer::AssociateOutgoing>::OutgoingMessages
221 >,
222 Fil: MessageInterceptor<
223 <Context as overseer::SubsystemContext>::Sender,
224 Message = <Context as overseer::SubsystemContext>::Message,
225 >,
226{
227 type Message = <Context as overseer::SubsystemContext>::Message;
228 type Sender = InterceptedSender<<Context as overseer::SubsystemContext>::Sender, Fil>;
229 type Error = SubsystemError;
230 type OutgoingMessages = <<Context as overseer::SubsystemContext>::Message as overseer::AssociateOutgoing>::OutgoingMessages;
231 type Signal = OverseerSignal;
232
233 async fn try_recv(&mut self) -> Result<Option<FromOrchestra<Self::Message>>, ()> {
234 loop {
235 match self.inner.try_recv().await? {
236 None => return Ok(None),
237 Some(msg) =>
238 if let Some(msg) =
239 self.message_filter.intercept_incoming(self.inner.sender(), msg)
240 {
241 return Ok(Some(msg))
242 },
243 }
244 }
245 }
246
247 async fn recv(&mut self) -> SubsystemResult<FromOrchestra<Self::Message>> {
248 if let Some(msg) = self.message_buffer.pop_front() {
249 return Ok(msg)
250 }
251 loop {
252 let msg = self.inner.recv().await?;
253 if let Some(msg) = self.message_filter.intercept_incoming(self.inner.sender(), msg) {
254 return Ok(msg)
255 }
256 }
257 }
258
259 async fn recv_signal(&mut self) -> SubsystemResult<Self::Signal> {
260 loop {
261 let msg = self.inner.recv().await?;
262 if let Some(msg) = self.message_filter.intercept_incoming(self.inner.sender(), msg) {
263 if let FromOrchestra::Signal(sig) = msg {
264 return Ok(sig)
265 } else {
266 self.message_buffer.push_back(msg)
267 }
268 }
269 }
270 }
271
272 fn spawn(
273 &mut self,
274 name: &'static str,
275 s: Pin<Box<dyn Future<Output = ()> + Send>>,
276 ) -> SubsystemResult<()> {
277 self.inner.spawn(name, s)
278 }
279
280 fn spawn_blocking(
281 &mut self,
282 name: &'static str,
283 s: Pin<Box<dyn Future<Output = ()> + Send>>,
284 ) -> SubsystemResult<()> {
285 self.inner.spawn_blocking(name, s)
286 }
287
288 fn sender(&mut self) -> &mut Self::Sender {
289 &mut self.sender
290 }
291}
292
293pub struct InterceptedSubsystem<Sub, Interceptor> {
295 pub subsystem: Sub,
296 pub message_interceptor: Interceptor,
297}
298
299impl<Sub, Interceptor> InterceptedSubsystem<Sub, Interceptor> {
300 pub fn new(subsystem: Sub, message_interceptor: Interceptor) -> Self {
301 Self { subsystem, message_interceptor }
302 }
303}
304
305impl<Context, Sub, Interceptor> overseer::Subsystem<Context, SubsystemError> for InterceptedSubsystem<Sub, Interceptor>
306where
307 Context:
308 overseer::SubsystemContext<Error=SubsystemError,Signal=OverseerSignal> + Sync + Send,
309 InterceptedContext<Context, Interceptor>:
310 overseer::SubsystemContext<Error=SubsystemError,Signal=OverseerSignal>,
311 Sub:
312 overseer::Subsystem<InterceptedContext<Context, Interceptor>, SubsystemError>,
313 Interceptor:
314 MessageInterceptor<
315 <Context as overseer::SubsystemContext>::Sender,
316 Message = <Context as overseer::SubsystemContext>::Message,
317 >,
318 <Context as overseer::SubsystemContext>::Message:
319 overseer::AssociateOutgoing,
320 <Context as overseer::SubsystemContext>::Sender:
321 overseer::SubsystemSender<
322 <<Context as overseer::SubsystemContext>::Message as overseer::AssociateOutgoing
323 >::OutgoingMessages
324 >,
325{
326 fn start(self, ctx: Context) -> SpawnedSubsystem {
327 let ctx = InterceptedContext::new(ctx, self.message_interceptor);
328 overseer::Subsystem::<InterceptedContext<Context, Interceptor>, SubsystemError>::start(
329 self.subsystem,
330 ctx,
331 )
332 }
333}