referrerpolicy=no-referrer-when-downgrade

polkadot_node_subsystem_test_helpers/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Utilities for testing subsystems.
18
19#![warn(missing_docs)]
20
21use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks};
22use polkadot_node_primitives::{AvailableData, ErasureChunk, Proof};
23use polkadot_node_subsystem::{
24	messages::AllMessages, overseer, FromOrchestra, OverseerSignal, SpawnGlue, SpawnedSubsystem,
25	SubsystemError, SubsystemResult, TrySendError,
26};
27use polkadot_node_subsystem_util::TimeoutExt;
28use polkadot_primitives::{ChunkIndex, Hash};
29
30use futures::{channel::mpsc, poll, prelude::*};
31use parking_lot::Mutex;
32use sp_core::testing::TaskExecutor;
33
34use std::{
35	collections::VecDeque,
36	convert::Infallible,
37	future::Future,
38	pin::Pin,
39	sync::{atomic::AtomicUsize, Arc},
40	task::{Context, Poll, Waker},
41	time::Duration,
42};
43
44/// Generally useful mock data providers for unit tests.
45pub mod mock;
46
47enum SinkState<T> {
48	Empty { read_waker: Option<Waker> },
49	Item { item: T, ready_waker: Option<Waker>, flush_waker: Option<Waker> },
50}
51
52/// The sink half of a single-item sink that does not resolve until the item has been read.
53pub struct SingleItemSink<T>(Arc<Mutex<SinkState<T>>>);
54
55// Derive clone not possible, as it puts `Clone` constraint on `T` which is not sensible here.
56impl<T> Clone for SingleItemSink<T> {
57	fn clone(&self) -> Self {
58		Self(self.0.clone())
59	}
60}
61
62/// The stream half of a single-item sink.
63pub struct SingleItemStream<T>(Arc<Mutex<SinkState<T>>>);
64
65impl<T> Sink<T> for SingleItemSink<T> {
66	type Error = Infallible;
67
68	fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Infallible>> {
69		let mut state = self.0.lock();
70		match *state {
71			SinkState::Empty { .. } => Poll::Ready(Ok(())),
72			SinkState::Item { ref mut ready_waker, .. } => {
73				*ready_waker = Some(cx.waker().clone());
74				Poll::Pending
75			},
76		}
77	}
78
79	fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Infallible> {
80		let mut state = self.0.lock();
81
82		match *state {
83			SinkState::Empty { ref mut read_waker } =>
84				if let Some(waker) = read_waker.take() {
85					waker.wake();
86				},
87			_ => panic!("start_send called outside of empty sink state ensured by poll_ready"),
88		}
89
90		*state = SinkState::Item { item, ready_waker: None, flush_waker: None };
91
92		Ok(())
93	}
94
95	fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Infallible>> {
96		let mut state = self.0.lock();
97		match *state {
98			SinkState::Empty { .. } => Poll::Ready(Ok(())),
99			SinkState::Item { ref mut flush_waker, .. } => {
100				*flush_waker = Some(cx.waker().clone());
101				Poll::Pending
102			},
103		}
104	}
105
106	fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Infallible>> {
107		self.poll_flush(cx)
108	}
109}
110
111impl<T> Stream for SingleItemStream<T> {
112	type Item = T;
113
114	fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
115		let mut state = self.0.lock();
116
117		let read_waker = Some(cx.waker().clone());
118
119		match std::mem::replace(&mut *state, SinkState::Empty { read_waker }) {
120			SinkState::Empty { .. } => Poll::Pending,
121			SinkState::Item { item, ready_waker, flush_waker } => {
122				if let Some(waker) = ready_waker {
123					waker.wake();
124				}
125
126				if let Some(waker) = flush_waker {
127					waker.wake();
128				}
129
130				Poll::Ready(Some(item))
131			},
132		}
133	}
134}
135
136/// Create a single-item Sink/Stream pair.
137///
138/// The sink's send methods resolve at the point which the stream reads the item,
139/// not when the item is buffered.
140pub fn single_item_sink<T>() -> (SingleItemSink<T>, SingleItemStream<T>) {
141	let inner = Arc::new(Mutex::new(SinkState::Empty { read_waker: None }));
142	(SingleItemSink(inner.clone()), SingleItemStream(inner))
143}
144
145/// A test subsystem sender.
146#[derive(Clone)]
147pub struct TestSubsystemSender {
148	tx: mpsc::UnboundedSender<AllMessages>,
149	message_counter: MessageCounter,
150}
151
152/// Construct a sender/receiver pair.
153pub fn sender_receiver() -> (TestSubsystemSender, mpsc::UnboundedReceiver<AllMessages>) {
154	let (tx, rx) = mpsc::unbounded();
155	(TestSubsystemSender { tx, message_counter: MessageCounter::default() }, rx)
156}
157
158#[async_trait::async_trait]
159impl<OutgoingMessage> overseer::SubsystemSender<OutgoingMessage> for TestSubsystemSender
160where
161	AllMessages: From<OutgoingMessage>,
162	OutgoingMessage: Send + 'static,
163{
164	async fn send_message(&mut self, msg: OutgoingMessage) {
165		self.send_message_with_priority::<overseer::NormalPriority>(msg).await;
166	}
167
168	async fn send_message_with_priority<P: overseer::Priority>(&mut self, msg: OutgoingMessage) {
169		self.message_counter.increment(P::priority());
170		self.tx.send(msg.into()).await.expect("test overseer no longer live");
171	}
172
173	fn try_send_message(
174		&mut self,
175		msg: OutgoingMessage,
176	) -> Result<(), TrySendError<OutgoingMessage>> {
177		self.try_send_message_with_priority::<overseer::NormalPriority>(msg)
178	}
179
180	fn try_send_message_with_priority<P: overseer::Priority>(
181		&mut self,
182		msg: OutgoingMessage,
183	) -> Result<(), TrySendError<OutgoingMessage>> {
184		self.message_counter.increment(P::priority());
185		self.tx.unbounded_send(msg.into()).expect("test overseer no longer live");
186		Ok(())
187	}
188
189	async fn send_messages<I>(&mut self, msgs: I)
190	where
191		I: IntoIterator<Item = OutgoingMessage> + Send,
192		I::IntoIter: Send,
193	{
194		let mut iter = stream::iter(msgs.into_iter().map(|msg| Ok(msg.into())));
195		self.tx.send_all(&mut iter).await.expect("test overseer no longer live");
196	}
197
198	fn send_unbounded_message(&mut self, msg: OutgoingMessage) {
199		self.tx.unbounded_send(msg.into()).expect("test overseer no longer live");
200	}
201}
202
203/// A test subsystem context.
204pub struct TestSubsystemContext<M, S> {
205	tx: TestSubsystemSender,
206	rx: mpsc::Receiver<FromOrchestra<M>>,
207	spawn: S,
208	message_buffer: VecDeque<FromOrchestra<M>>,
209}
210
211#[async_trait::async_trait]
212impl<M, Spawner> overseer::SubsystemContext for TestSubsystemContext<M, Spawner>
213where
214	M: overseer::AssociateOutgoing + std::fmt::Debug + Send + 'static,
215	AllMessages: From<<M as overseer::AssociateOutgoing>::OutgoingMessages>,
216	AllMessages: From<M>,
217	Spawner: overseer::gen::Spawner + Send + 'static,
218{
219	type Message = M;
220	type Sender = TestSubsystemSender;
221	type Signal = OverseerSignal;
222	type OutgoingMessages = <M as overseer::AssociateOutgoing>::OutgoingMessages;
223	type Error = SubsystemError;
224
225	async fn try_recv(&mut self) -> Result<Option<FromOrchestra<M>>, ()> {
226		if let Some(msg) = self.message_buffer.pop_front() {
227			return Ok(Some(msg))
228		}
229		match poll!(self.rx.next()) {
230			Poll::Ready(Some(msg)) => Ok(Some(msg)),
231			Poll::Ready(None) => Err(()),
232			Poll::Pending => Ok(None),
233		}
234	}
235
236	async fn recv(&mut self) -> SubsystemResult<FromOrchestra<M>> {
237		if let Some(msg) = self.message_buffer.pop_front() {
238			return Ok(msg)
239		}
240		self.rx
241			.next()
242			.await
243			.ok_or_else(|| SubsystemError::Context("Receiving end closed".to_owned()))
244	}
245
246	async fn recv_signal(&mut self) -> SubsystemResult<OverseerSignal> {
247		loop {
248			let msg = self
249				.rx
250				.next()
251				.await
252				.ok_or_else(|| SubsystemError::Context("Receiving end closed".to_owned()))?;
253			if let FromOrchestra::Signal(sig) = msg {
254				return Ok(sig)
255			} else {
256				self.message_buffer.push_back(msg)
257			}
258		}
259	}
260
261	fn spawn(
262		&mut self,
263		name: &'static str,
264		s: Pin<Box<dyn Future<Output = ()> + Send>>,
265	) -> SubsystemResult<()> {
266		self.spawn.spawn(name, None, s);
267		Ok(())
268	}
269
270	fn spawn_blocking(
271		&mut self,
272		name: &'static str,
273		s: Pin<Box<dyn Future<Output = ()> + Send>>,
274	) -> SubsystemResult<()> {
275		self.spawn.spawn_blocking(name, None, s);
276		Ok(())
277	}
278
279	fn sender(&mut self) -> &mut TestSubsystemSender {
280		&mut self.tx
281	}
282}
283
284/// A handle for interacting with the subsystem context.
285pub struct TestSubsystemContextHandle<M> {
286	/// Direct access to sender of messages.
287	///
288	/// Useful for shared ownership situations (one can have multiple senders, but only one
289	/// receiver.
290	pub tx: mpsc::Sender<FromOrchestra<M>>,
291
292	/// Direct access to the receiver.
293	pub rx: mpsc::UnboundedReceiver<AllMessages>,
294
295	/// Message counter over subsystems.
296	pub message_counter: MessageCounter,
297
298	/// Intermediate buffer for a message when using `peek`.
299	message_buffer: Option<AllMessages>,
300}
301
302impl<M> TestSubsystemContextHandle<M> {
303	/// Fallback timeout value used to never block test execution
304	/// indefinitely.
305	pub const TIMEOUT: Duration = Duration::from_secs(120);
306
307	/// Send a message or signal to the subsystem. This resolves at the point in time when the
308	/// subsystem has _read_ the message.
309	pub async fn send(&mut self, from_overseer: FromOrchestra<M>) {
310		self.tx
311			.send(from_overseer)
312			.timeout(Self::TIMEOUT)
313			.await
314			.expect("`fn send` does not timeout")
315			.expect("Test subsystem no longer live");
316	}
317
318	/// Receive the next message from the subsystem.
319	pub async fn recv(&mut self) -> AllMessages {
320		self.try_recv()
321			.timeout(Self::TIMEOUT)
322			.await
323			.expect("`fn recv` does not timeout")
324			.expect("Test subsystem no longer live")
325	}
326
327	/// Receive the next message from the subsystem, or `None` if the channel has been closed.
328	pub async fn try_recv(&mut self) -> Option<AllMessages> {
329		if let Some(msg) = self.message_buffer.take() {
330			return Some(msg)
331		}
332
333		self.rx
334			.next()
335			.timeout(Self::TIMEOUT)
336			.await
337			.expect("`try_recv` does not timeout")
338	}
339
340	/// Peek into the next message from the subsystem or `None` if the channel has been closed.
341	pub async fn peek(&mut self) -> Option<&AllMessages> {
342		if self.message_buffer.is_none() {
343			self.message_buffer = self
344				.rx
345				.next()
346				.timeout(Self::TIMEOUT)
347				.await
348				.expect("`try_recv` does not timeout");
349		}
350
351		self.message_buffer.as_ref()
352	}
353}
354
355/// Make a test subsystem context with `buffer_size == 0`. This is used by most
356/// of the tests.
357pub fn make_subsystem_context<M, S>(
358	spawner: S,
359) -> (TestSubsystemContext<M, SpawnGlue<S>>, TestSubsystemContextHandle<M>) {
360	make_buffered_subsystem_context(spawner, 0)
361}
362
363/// Message counter over subsystems.
364#[derive(Default, Clone)]
365pub struct MessageCounter {
366	total: Arc<AtomicUsize>,
367	with_high_priority: Arc<AtomicUsize>,
368}
369
370impl MessageCounter {
371	/// Increment the message counter.
372	pub fn increment(&mut self, priority_level: overseer::PriorityLevel) {
373		self.total.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
374		if matches!(priority_level, overseer::PriorityLevel::High) {
375			self.with_high_priority.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
376		}
377	}
378
379	/// Reset the message counter.
380	pub fn reset(&mut self) {
381		self.total.store(0, std::sync::atomic::Ordering::SeqCst);
382		self.with_high_priority.store(0, std::sync::atomic::Ordering::SeqCst);
383	}
384
385	/// Get the messages with high priority count.
386	pub fn with_high_priority(&self) -> usize {
387		self.with_high_priority.load(std::sync::atomic::Ordering::SeqCst)
388	}
389}
390
391/// Make a test subsystem context with buffered overseer channel. Some tests (e.g.
392/// `dispute-coordinator`) create too many parallel operations and deadlock unless
393/// the channel is buffered. Usually `buffer_size=1` is enough.
394pub fn make_buffered_subsystem_context<M, S>(
395	spawner: S,
396	buffer_size: usize,
397) -> (TestSubsystemContext<M, SpawnGlue<S>>, TestSubsystemContextHandle<M>) {
398	let (overseer_tx, overseer_rx) = mpsc::channel(buffer_size);
399	let (all_messages_tx, all_messages_rx) = mpsc::unbounded();
400	let message_counter = MessageCounter::default();
401
402	(
403		TestSubsystemContext {
404			tx: TestSubsystemSender {
405				tx: all_messages_tx,
406				message_counter: message_counter.clone(),
407			},
408			rx: overseer_rx,
409			spawn: SpawnGlue(spawner),
410			message_buffer: VecDeque::new(),
411		},
412		TestSubsystemContextHandle {
413			tx: overseer_tx,
414			rx: all_messages_rx,
415			message_counter: message_counter.clone(),
416			message_buffer: None,
417		},
418	)
419}
420
421/// Test a subsystem, mocking the overseer
422///
423/// Pass in two async closures: one mocks the overseer, the other runs the test from the perspective
424/// of a subsystem.
425///
426/// Times out in 5 seconds.
427pub fn subsystem_test_harness<M, OverseerFactory, Overseer, TestFactory, Test>(
428	overseer_factory: OverseerFactory,
429	test_factory: TestFactory,
430) where
431	OverseerFactory: FnOnce(TestSubsystemContextHandle<M>) -> Overseer,
432	Overseer: Future<Output = ()>,
433	TestFactory: FnOnce(TestSubsystemContext<M, overseer::SpawnGlue<TaskExecutor>>) -> Test,
434	Test: Future<Output = ()>,
435{
436	let pool = TaskExecutor::new();
437	let (context, handle) = make_subsystem_context(pool);
438	let overseer = overseer_factory(handle);
439	let test = test_factory(context);
440
441	futures::pin_mut!(overseer, test);
442
443	futures::executor::block_on(async move {
444		future::join(overseer, test)
445			.timeout(Duration::from_secs(10))
446			.await
447			.expect("test timed out instead of completing")
448	});
449}
450
451/// A forward subsystem that implements [`Subsystem`](overseer::Subsystem).
452///
453/// It forwards all communication from the overseer to the internal message
454/// channel.
455///
456/// This subsystem is useful for testing functionality that interacts with the overseer.
457pub struct ForwardSubsystem<M>(pub mpsc::Sender<M>);
458
459impl<M, Context> overseer::Subsystem<Context, SubsystemError> for ForwardSubsystem<M>
460where
461	M: overseer::AssociateOutgoing + std::fmt::Debug + Send + 'static,
462	Context: overseer::SubsystemContext<
463		Message = M,
464		Signal = OverseerSignal,
465		Error = SubsystemError,
466		OutgoingMessages = <M as overseer::AssociateOutgoing>::OutgoingMessages,
467	>,
468{
469	fn start(mut self, mut ctx: Context) -> SpawnedSubsystem {
470		let future = Box::pin(async move {
471			loop {
472				match ctx.recv().await {
473					Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => return Ok(()),
474					Ok(FromOrchestra::Communication { msg }) => {
475						let _ = self.0.send(msg).await;
476					},
477					Err(_) => return Ok(()),
478					_ => (),
479				}
480			}
481		});
482
483		SpawnedSubsystem { name: "forward-subsystem", future }
484	}
485}
486
487/// Asserts that two patterns match, yet only one
488#[macro_export]
489macro_rules! arbitrary_order {
490	($rx:expr; $p1:pat => $e1:expr; $p2:pat => $e2:expr) => {
491		// If i.e. a enum has only two variants, `_` is unreachable.
492		match $rx {
493			$p1 => {
494				let __ret1 = { $e1 };
495				let __ret2 = match $rx {
496					$p2 => $e2,
497					#[allow(unreachable_patterns)]
498					_ => unreachable!("first pattern matched, second pattern did not"),
499				};
500				(__ret1, __ret2)
501			},
502			$p2 => {
503				let __ret2 = { $e2 };
504				let __ret1 = match $rx {
505					$p1 => $e1,
506					#[allow(unreachable_patterns)]
507					_ => unreachable!("second pattern matched, first pattern did not"),
508				};
509				(__ret1, __ret2)
510			},
511			#[allow(unreachable_patterns)]
512			_ => unreachable!("neither first nor second pattern matched"),
513		}
514	};
515}
516
517/// Future that yields the execution once and resolves
518/// immediately after.
519///
520/// Useful when one wants to poll the background task to completion
521/// before sending messages to it in order to avoid races.
522pub struct Yield(bool);
523
524impl Yield {
525	/// Returns new `Yield` future.
526	pub fn new() -> Self {
527		Self(false)
528	}
529}
530
531impl Future for Yield {
532	type Output = ();
533
534	fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
535		if !self.0 {
536			self.0 = true;
537			cx.waker().wake_by_ref();
538			Poll::Pending
539		} else {
540			Poll::Ready(())
541		}
542	}
543}
544
545/// Helper for chunking available data.
546pub fn derive_erasure_chunks_with_proofs_and_root(
547	n_validators: usize,
548	available_data: &AvailableData,
549	alter_chunk: impl Fn(usize, &mut Vec<u8>),
550) -> (Vec<ErasureChunk>, Hash) {
551	let mut chunks: Vec<Vec<u8>> = obtain_chunks(n_validators, available_data).unwrap();
552
553	for (i, chunk) in chunks.iter_mut().enumerate() {
554		alter_chunk(i, chunk)
555	}
556
557	// create proofs for each erasure chunk
558	let branches = branches(chunks.as_ref());
559
560	let root = branches.root();
561	let erasure_chunks = branches
562		.enumerate()
563		.map(|(index, (proof, chunk))| ErasureChunk {
564			chunk: chunk.to_vec(),
565			index: ChunkIndex(index as _),
566			proof: Proof::try_from(proof).unwrap(),
567		})
568		.collect::<Vec<ErasureChunk>>();
569
570	(erasure_chunks, root)
571}
572
573#[cfg(test)]
574mod tests {
575	use super::*;
576
577	#[test]
578	fn macro_arbitrary_order() {
579		let mut vals = vec![Some(15_usize), None];
580		let (first, second) = arbitrary_order!(vals.pop().unwrap(); Some(fx) => fx; None => 0);
581		assert_eq!(first, 15_usize);
582		assert_eq!(second, 0_usize);
583	}
584
585	#[test]
586	fn macro_arbitrary_order_swapped() {
587		let mut vals = vec![None, Some(11_usize)];
588		let (first, second) = arbitrary_order!(vals.pop().unwrap(); Some(fx) => fx; None => 0);
589		assert_eq!(first, 11_usize);
590		assert_eq!(second, 0);
591	}
592}