referrerpolicy=no-referrer-when-downgrade

malus/
interceptor.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! A small set of wrapping types to cover most of our adversary test cases.
18//!
19//! This allows types with internal mutability to synchronize across
20//! multiple subsystems and intercept or replace incoming and outgoing
21//! messages on the overseer level.
22
23use polkadot_node_subsystem::*;
24pub use polkadot_node_subsystem::{messages::*, overseer, FromOrchestra};
25use std::{collections::VecDeque, future::Future, pin::Pin};
26
27/// Filter incoming and outgoing messages.
28pub trait MessageInterceptor<Sender>: Send + Sync + Clone + 'static
29where
30	Sender: overseer::SubsystemSender<<Self::Message as overseer::AssociateOutgoing>::OutgoingMessages>
31		+ Clone
32		+ 'static,
33{
34	/// The message type the original subsystem handles incoming.
35	type Message: overseer::AssociateOutgoing + Send + 'static;
36
37	/// Filter messages that are to be received by
38	/// the subsystem.
39	///
40	/// For non-trivial cases, the `sender` can be used to send
41	/// multiple messages after doing some additional processing.
42	fn intercept_incoming(
43		&self,
44		_sender: &mut Sender,
45		msg: FromOrchestra<Self::Message>,
46	) -> Option<FromOrchestra<Self::Message>> {
47		Some(msg)
48	}
49
50	/// Specifies if we need to replace some outgoing message with another (potentially empty)
51	/// message
52	fn need_intercept_outgoing(
53		&self,
54		_msg: &<Self::Message as overseer::AssociateOutgoing>::OutgoingMessages,
55	) -> bool {
56		false
57	}
58	/// Send modified message instead of the original one
59	fn intercept_outgoing(
60		&self,
61		_msg: &<Self::Message as overseer::AssociateOutgoing>::OutgoingMessages,
62	) -> Option<<Self::Message as overseer::AssociateOutgoing>::OutgoingMessages> {
63		None
64	}
65}
66
67/// A sender with the outgoing messages filtered.
68#[derive(Clone)]
69pub struct InterceptedSender<Sender, Fil> {
70	inner: Sender,
71	message_filter: Fil,
72}
73
74#[async_trait::async_trait]
75impl<OutgoingMessage, Sender, Fil> overseer::SubsystemSender<OutgoingMessage> for InterceptedSender<Sender, Fil>
76where
77	OutgoingMessage: overseer::AssociateOutgoing + Send + 'static + TryFrom<overseer::AllMessages>,
78	Sender: overseer::SubsystemSender<OutgoingMessage>
79		+ overseer::SubsystemSender<
80				<
81					<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
82				>::OutgoingMessages
83			>,
84	Fil: MessageInterceptor<Sender>,
85	<Fil as MessageInterceptor<Sender>>::Message: overseer::AssociateOutgoing,
86	<
87		<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
88	>::OutgoingMessages:
89		From<OutgoingMessage> + Send + Sync,
90	<OutgoingMessage as TryFrom<overseer::AllMessages>>::Error: std::fmt::Debug,
91{
92	async fn send_message(&mut self, msg: OutgoingMessage) {
93		self.send_message_with_priority::<overseer::NormalPriority>(msg).await;
94	}
95
96	async fn send_message_with_priority<P: Priority>(&mut self, msg: OutgoingMessage) {
97		let msg = <
98					<<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
99				>::OutgoingMessages as From<OutgoingMessage>>::from(msg);
100		if self.message_filter.need_intercept_outgoing(&msg) {
101			if let Some(msg) = self.message_filter.intercept_outgoing(&msg) {
102				self.inner.send_message(msg).await;
103			}
104		}
105		else {
106			self.inner.send_message(msg).await;
107		}
108	}
109
110	fn try_send_message(
111		&mut self,
112		msg: OutgoingMessage,
113	) -> Result<(), polkadot_node_subsystem_util::metered::TrySendError<OutgoingMessage>> {
114		self.try_send_message_with_priority::<overseer::NormalPriority>(msg)
115	}
116
117	fn try_send_message_with_priority<P: Priority>(&mut self, msg: OutgoingMessage) -> Result<(), TrySendError<OutgoingMessage>> {
118		let msg = <
119				<<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
120			>::OutgoingMessages as From<OutgoingMessage>>::from(msg);
121		if self.message_filter.need_intercept_outgoing(&msg) {
122			if let Some(real_msg) = self.message_filter.intercept_outgoing(&msg) {
123				let orig_msg : OutgoingMessage = msg.into().try_into().expect("must be able to recover the original message");
124				self.inner.try_send_message(real_msg).map_err(|e| {
125					match e {
126						TrySendError::Full(_) => TrySendError::Full(orig_msg),
127						TrySendError::Closed(_) => TrySendError::Closed(orig_msg),
128					}
129				})
130			}
131			else {
132				// No message to send after intercepting
133				Ok(())
134			}
135		}
136		else {
137			let orig_msg : OutgoingMessage = msg.into().try_into().expect("must be able to recover the original message");
138			self.inner.try_send_message(orig_msg)
139		}
140	}
141
142	async fn send_messages<T>(&mut self, msgs: T)
143	where
144		T: IntoIterator<Item = OutgoingMessage> + Send,
145		T::IntoIter: Send,
146	{
147		for msg in msgs {
148			self.send_message(msg).await;
149		}
150	}
151
152	fn send_unbounded_message(&mut self, msg: OutgoingMessage) {
153		let msg = <
154				<<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
155			>::OutgoingMessages as From<OutgoingMessage>>::from(msg);
156		if self.message_filter.need_intercept_outgoing(&msg) {
157			if let Some(msg) = self.message_filter.intercept_outgoing(&msg) {
158				self.inner.send_unbounded_message(msg);
159			}
160		}
161		else {
162			self.inner.send_unbounded_message(msg);
163		}
164	}
165}
166
167/// A subsystem context, that filters the outgoing messages.
168pub struct InterceptedContext<Context, Fil>
169where
170	Context: overseer::SubsystemContext<Error=SubsystemError, Signal=OverseerSignal>,
171	Fil: MessageInterceptor<<Context as overseer::SubsystemContext>::Sender>,
172	<Context as overseer::SubsystemContext>::Sender:
173		overseer::SubsystemSender<
174			<
175				<
176					Fil as MessageInterceptor<<Context as overseer::SubsystemContext>::Sender>
177				>::Message as overseer::AssociateOutgoing
178			>::OutgoingMessages,
179		>,
180{
181	inner: Context,
182	message_filter: Fil,
183	sender: InterceptedSender<<Context as overseer::SubsystemContext>::Sender, Fil>,
184	message_buffer: VecDeque<FromOrchestra<<Context as overseer::SubsystemContext>::Message>>,
185}
186
187impl<Context, Fil> InterceptedContext<Context, Fil>
188where
189	Context: overseer::SubsystemContext<Error=SubsystemError,Signal=OverseerSignal>,
190	Fil: MessageInterceptor<
191		<Context as overseer::SubsystemContext>::Sender,
192		Message = <Context as overseer::SubsystemContext>::Message,
193	>,
194	<Context as overseer::SubsystemContext>::Message: overseer::AssociateOutgoing,
195	<Context as overseer::SubsystemContext>::Sender: overseer::SubsystemSender<
196		<<Context as overseer::SubsystemContext>::Message as overseer::AssociateOutgoing>::OutgoingMessages
197	>
198{
199	pub fn new(mut inner: Context, message_filter: Fil) -> Self {
200		let sender = InterceptedSender::<<Context as overseer::SubsystemContext>::Sender, Fil> {
201			inner: inner.sender().clone(),
202			message_filter: message_filter.clone(),
203		};
204		Self { inner, message_filter, sender, message_buffer: VecDeque::new() }
205	}
206}
207
208#[async_trait::async_trait]
209impl<Context, Fil> overseer::SubsystemContext for InterceptedContext<Context, Fil>
210where
211	Context: overseer::SubsystemContext<Error=SubsystemError,Signal=OverseerSignal>,
212	<Context as overseer::SubsystemContext>::Message:
213		overseer::AssociateOutgoing,
214	<Context as overseer::SubsystemContext>::Sender:
215		overseer::SubsystemSender<
216			<<Context as overseer::SubsystemContext>::Message as overseer::AssociateOutgoing>::OutgoingMessages
217		>,
218	InterceptedSender<<Context as overseer::SubsystemContext>::Sender, Fil>:
219		overseer::SubsystemSender<
220			<<Context as overseer::SubsystemContext>::Message as overseer::AssociateOutgoing>::OutgoingMessages
221		>,
222	Fil: MessageInterceptor<
223		<Context as overseer::SubsystemContext>::Sender,
224		Message = <Context as overseer::SubsystemContext>::Message,
225	>,
226{
227	type Message = <Context as overseer::SubsystemContext>::Message;
228	type Sender = InterceptedSender<<Context as overseer::SubsystemContext>::Sender, Fil>;
229	type Error = SubsystemError;
230	type OutgoingMessages = <<Context as overseer::SubsystemContext>::Message as overseer::AssociateOutgoing>::OutgoingMessages;
231	type Signal = OverseerSignal;
232
233	async fn try_recv(&mut self) -> Result<Option<FromOrchestra<Self::Message>>, ()> {
234		loop {
235			match self.inner.try_recv().await? {
236				None => return Ok(None),
237				Some(msg) =>
238					if let Some(msg) =
239						self.message_filter.intercept_incoming(self.inner.sender(), msg)
240					{
241						return Ok(Some(msg))
242					},
243			}
244		}
245	}
246
247	async fn recv(&mut self) -> SubsystemResult<FromOrchestra<Self::Message>> {
248		if let Some(msg) = self.message_buffer.pop_front() {
249			return Ok(msg)
250		}
251		loop {
252			let msg = self.inner.recv().await?;
253			if let Some(msg) = self.message_filter.intercept_incoming(self.inner.sender(), msg) {
254				return Ok(msg)
255			}
256		}
257	}
258
259	async fn recv_signal(&mut self) -> SubsystemResult<Self::Signal> {
260		loop {
261			let msg = self.inner.recv().await?;
262			if let Some(msg) = self.message_filter.intercept_incoming(self.inner.sender(), msg) {
263				if let FromOrchestra::Signal(sig) = msg {
264					return Ok(sig)
265				} else {
266					self.message_buffer.push_back(msg)
267				}
268			}
269		}
270	}
271
272	fn spawn(
273		&mut self,
274		name: &'static str,
275		s: Pin<Box<dyn Future<Output = ()> + Send>>,
276	) -> SubsystemResult<()> {
277		self.inner.spawn(name, s)
278	}
279
280	fn spawn_blocking(
281		&mut self,
282		name: &'static str,
283		s: Pin<Box<dyn Future<Output = ()> + Send>>,
284	) -> SubsystemResult<()> {
285		self.inner.spawn_blocking(name, s)
286	}
287
288	fn sender(&mut self) -> &mut Self::Sender {
289		&mut self.sender
290	}
291}
292
293/// A subsystem to which incoming and outgoing filters are applied.
294pub struct InterceptedSubsystem<Sub, Interceptor> {
295	pub subsystem: Sub,
296	pub message_interceptor: Interceptor,
297}
298
299impl<Sub, Interceptor> InterceptedSubsystem<Sub, Interceptor> {
300	pub fn new(subsystem: Sub, message_interceptor: Interceptor) -> Self {
301		Self { subsystem, message_interceptor }
302	}
303}
304
305impl<Context, Sub, Interceptor> overseer::Subsystem<Context, SubsystemError> for InterceptedSubsystem<Sub, Interceptor>
306where
307	Context:
308		overseer::SubsystemContext<Error=SubsystemError,Signal=OverseerSignal> + Sync + Send,
309	InterceptedContext<Context, Interceptor>:
310		overseer::SubsystemContext<Error=SubsystemError,Signal=OverseerSignal>,
311	Sub:
312		overseer::Subsystem<InterceptedContext<Context, Interceptor>, SubsystemError>,
313	Interceptor:
314		MessageInterceptor<
315			<Context as overseer::SubsystemContext>::Sender,
316			Message = <Context as overseer::SubsystemContext>::Message,
317		>,
318	<Context as overseer::SubsystemContext>::Message:
319		overseer::AssociateOutgoing,
320	<Context as overseer::SubsystemContext>::Sender:
321		overseer::SubsystemSender<
322				<<Context as overseer::SubsystemContext>::Message as overseer::AssociateOutgoing
323			>::OutgoingMessages
324		>,
325{
326	fn start(self, ctx: Context) -> SpawnedSubsystem {
327		let ctx = InterceptedContext::new(ctx, self.message_interceptor);
328		overseer::Subsystem::<InterceptedContext<Context, Interceptor>, SubsystemError>::start(
329			self.subsystem,
330			ctx,
331		)
332	}
333}