1#![warn(missing_docs)]
20
21use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks};
22use polkadot_node_primitives::{AvailableData, ErasureChunk, Proof};
23use polkadot_node_subsystem::{
24 messages::AllMessages, overseer, FromOrchestra, OverseerSignal, SpawnGlue, SpawnedSubsystem,
25 SubsystemError, SubsystemResult, TrySendError,
26};
27use polkadot_node_subsystem_util::TimeoutExt;
28use polkadot_primitives::{ChunkIndex, Hash};
29
30use futures::{channel::mpsc, poll, prelude::*};
31use parking_lot::Mutex;
32use sp_core::testing::TaskExecutor;
33
34use std::{
35 collections::VecDeque,
36 convert::Infallible,
37 future::Future,
38 pin::Pin,
39 sync::{atomic::AtomicUsize, Arc},
40 task::{Context, Poll, Waker},
41 time::Duration,
42};
43
44pub mod mock;
46
47enum SinkState<T> {
48 Empty { read_waker: Option<Waker> },
49 Item { item: T, ready_waker: Option<Waker>, flush_waker: Option<Waker> },
50}
51
52pub struct SingleItemSink<T>(Arc<Mutex<SinkState<T>>>);
54
55impl<T> Clone for SingleItemSink<T> {
57 fn clone(&self) -> Self {
58 Self(self.0.clone())
59 }
60}
61
62pub struct SingleItemStream<T>(Arc<Mutex<SinkState<T>>>);
64
65impl<T> Sink<T> for SingleItemSink<T> {
66 type Error = Infallible;
67
68 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Infallible>> {
69 let mut state = self.0.lock();
70 match *state {
71 SinkState::Empty { .. } => Poll::Ready(Ok(())),
72 SinkState::Item { ref mut ready_waker, .. } => {
73 *ready_waker = Some(cx.waker().clone());
74 Poll::Pending
75 },
76 }
77 }
78
79 fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Infallible> {
80 let mut state = self.0.lock();
81
82 match *state {
83 SinkState::Empty { ref mut read_waker } =>
84 if let Some(waker) = read_waker.take() {
85 waker.wake();
86 },
87 _ => panic!("start_send called outside of empty sink state ensured by poll_ready"),
88 }
89
90 *state = SinkState::Item { item, ready_waker: None, flush_waker: None };
91
92 Ok(())
93 }
94
95 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Infallible>> {
96 let mut state = self.0.lock();
97 match *state {
98 SinkState::Empty { .. } => Poll::Ready(Ok(())),
99 SinkState::Item { ref mut flush_waker, .. } => {
100 *flush_waker = Some(cx.waker().clone());
101 Poll::Pending
102 },
103 }
104 }
105
106 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Infallible>> {
107 self.poll_flush(cx)
108 }
109}
110
111impl<T> Stream for SingleItemStream<T> {
112 type Item = T;
113
114 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
115 let mut state = self.0.lock();
116
117 let read_waker = Some(cx.waker().clone());
118
119 match std::mem::replace(&mut *state, SinkState::Empty { read_waker }) {
120 SinkState::Empty { .. } => Poll::Pending,
121 SinkState::Item { item, ready_waker, flush_waker } => {
122 if let Some(waker) = ready_waker {
123 waker.wake();
124 }
125
126 if let Some(waker) = flush_waker {
127 waker.wake();
128 }
129
130 Poll::Ready(Some(item))
131 },
132 }
133 }
134}
135
136pub fn single_item_sink<T>() -> (SingleItemSink<T>, SingleItemStream<T>) {
141 let inner = Arc::new(Mutex::new(SinkState::Empty { read_waker: None }));
142 (SingleItemSink(inner.clone()), SingleItemStream(inner))
143}
144
145#[derive(Clone)]
147pub struct TestSubsystemSender {
148 tx: mpsc::UnboundedSender<AllMessages>,
149 message_counter: MessageCounter,
150}
151
152pub fn sender_receiver() -> (TestSubsystemSender, mpsc::UnboundedReceiver<AllMessages>) {
154 let (tx, rx) = mpsc::unbounded();
155 (TestSubsystemSender { tx, message_counter: MessageCounter::default() }, rx)
156}
157
158#[async_trait::async_trait]
159impl<OutgoingMessage> overseer::SubsystemSender<OutgoingMessage> for TestSubsystemSender
160where
161 AllMessages: From<OutgoingMessage>,
162 OutgoingMessage: Send + 'static,
163{
164 async fn send_message(&mut self, msg: OutgoingMessage) {
165 self.send_message_with_priority::<overseer::NormalPriority>(msg).await;
166 }
167
168 async fn send_message_with_priority<P: overseer::Priority>(&mut self, msg: OutgoingMessage) {
169 self.message_counter.increment(P::priority());
170 self.tx.send(msg.into()).await.expect("test overseer no longer live");
171 }
172
173 fn try_send_message(
174 &mut self,
175 msg: OutgoingMessage,
176 ) -> Result<(), TrySendError<OutgoingMessage>> {
177 self.try_send_message_with_priority::<overseer::NormalPriority>(msg)
178 }
179
180 fn try_send_message_with_priority<P: overseer::Priority>(
181 &mut self,
182 msg: OutgoingMessage,
183 ) -> Result<(), TrySendError<OutgoingMessage>> {
184 self.message_counter.increment(P::priority());
185 self.tx.unbounded_send(msg.into()).expect("test overseer no longer live");
186 Ok(())
187 }
188
189 async fn send_messages<I>(&mut self, msgs: I)
190 where
191 I: IntoIterator<Item = OutgoingMessage> + Send,
192 I::IntoIter: Send,
193 {
194 let mut iter = stream::iter(msgs.into_iter().map(|msg| Ok(msg.into())));
195 self.tx.send_all(&mut iter).await.expect("test overseer no longer live");
196 }
197
198 fn send_unbounded_message(&mut self, msg: OutgoingMessage) {
199 self.tx.unbounded_send(msg.into()).expect("test overseer no longer live");
200 }
201}
202
203pub struct TestSubsystemContext<M, S> {
205 tx: TestSubsystemSender,
206 rx: mpsc::Receiver<FromOrchestra<M>>,
207 spawn: S,
208 message_buffer: VecDeque<FromOrchestra<M>>,
209}
210
211#[async_trait::async_trait]
212impl<M, Spawner> overseer::SubsystemContext for TestSubsystemContext<M, Spawner>
213where
214 M: overseer::AssociateOutgoing + std::fmt::Debug + Send + 'static,
215 AllMessages: From<<M as overseer::AssociateOutgoing>::OutgoingMessages>,
216 AllMessages: From<M>,
217 Spawner: overseer::gen::Spawner + Send + 'static,
218{
219 type Message = M;
220 type Sender = TestSubsystemSender;
221 type Signal = OverseerSignal;
222 type OutgoingMessages = <M as overseer::AssociateOutgoing>::OutgoingMessages;
223 type Error = SubsystemError;
224
225 async fn try_recv(&mut self) -> Result<Option<FromOrchestra<M>>, ()> {
226 if let Some(msg) = self.message_buffer.pop_front() {
227 return Ok(Some(msg))
228 }
229 match poll!(self.rx.next()) {
230 Poll::Ready(Some(msg)) => Ok(Some(msg)),
231 Poll::Ready(None) => Err(()),
232 Poll::Pending => Ok(None),
233 }
234 }
235
236 async fn recv(&mut self) -> SubsystemResult<FromOrchestra<M>> {
237 if let Some(msg) = self.message_buffer.pop_front() {
238 return Ok(msg)
239 }
240 self.rx
241 .next()
242 .await
243 .ok_or_else(|| SubsystemError::Context("Receiving end closed".to_owned()))
244 }
245
246 async fn recv_signal(&mut self) -> SubsystemResult<OverseerSignal> {
247 loop {
248 let msg = self
249 .rx
250 .next()
251 .await
252 .ok_or_else(|| SubsystemError::Context("Receiving end closed".to_owned()))?;
253 if let FromOrchestra::Signal(sig) = msg {
254 return Ok(sig)
255 } else {
256 self.message_buffer.push_back(msg)
257 }
258 }
259 }
260
261 fn spawn(
262 &mut self,
263 name: &'static str,
264 s: Pin<Box<dyn Future<Output = ()> + Send>>,
265 ) -> SubsystemResult<()> {
266 self.spawn.spawn(name, None, s);
267 Ok(())
268 }
269
270 fn spawn_blocking(
271 &mut self,
272 name: &'static str,
273 s: Pin<Box<dyn Future<Output = ()> + Send>>,
274 ) -> SubsystemResult<()> {
275 self.spawn.spawn_blocking(name, None, s);
276 Ok(())
277 }
278
279 fn sender(&mut self) -> &mut TestSubsystemSender {
280 &mut self.tx
281 }
282}
283
284pub struct TestSubsystemContextHandle<M> {
286 pub tx: mpsc::Sender<FromOrchestra<M>>,
291
292 pub rx: mpsc::UnboundedReceiver<AllMessages>,
294
295 pub message_counter: MessageCounter,
297
298 message_buffer: Option<AllMessages>,
300}
301
302impl<M> TestSubsystemContextHandle<M> {
303 pub const TIMEOUT: Duration = Duration::from_secs(120);
306
307 pub async fn send(&mut self, from_overseer: FromOrchestra<M>) {
310 self.tx
311 .send(from_overseer)
312 .timeout(Self::TIMEOUT)
313 .await
314 .expect("`fn send` does not timeout")
315 .expect("Test subsystem no longer live");
316 }
317
318 pub async fn recv(&mut self) -> AllMessages {
320 self.try_recv()
321 .timeout(Self::TIMEOUT)
322 .await
323 .expect("`fn recv` does not timeout")
324 .expect("Test subsystem no longer live")
325 }
326
327 pub async fn try_recv(&mut self) -> Option<AllMessages> {
329 if let Some(msg) = self.message_buffer.take() {
330 return Some(msg)
331 }
332
333 self.rx
334 .next()
335 .timeout(Self::TIMEOUT)
336 .await
337 .expect("`try_recv` does not timeout")
338 }
339
340 pub async fn peek(&mut self) -> Option<&AllMessages> {
342 if self.message_buffer.is_none() {
343 self.message_buffer = self
344 .rx
345 .next()
346 .timeout(Self::TIMEOUT)
347 .await
348 .expect("`try_recv` does not timeout");
349 }
350
351 self.message_buffer.as_ref()
352 }
353}
354
355pub fn make_subsystem_context<M, S>(
358 spawner: S,
359) -> (TestSubsystemContext<M, SpawnGlue<S>>, TestSubsystemContextHandle<M>) {
360 make_buffered_subsystem_context(spawner, 0)
361}
362
363#[derive(Default, Clone)]
365pub struct MessageCounter {
366 total: Arc<AtomicUsize>,
367 with_high_priority: Arc<AtomicUsize>,
368}
369
370impl MessageCounter {
371 pub fn increment(&mut self, priority_level: overseer::PriorityLevel) {
373 self.total.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
374 if matches!(priority_level, overseer::PriorityLevel::High) {
375 self.with_high_priority.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
376 }
377 }
378
379 pub fn reset(&mut self) {
381 self.total.store(0, std::sync::atomic::Ordering::SeqCst);
382 self.with_high_priority.store(0, std::sync::atomic::Ordering::SeqCst);
383 }
384
385 pub fn with_high_priority(&self) -> usize {
387 self.with_high_priority.load(std::sync::atomic::Ordering::SeqCst)
388 }
389}
390
391pub fn make_buffered_subsystem_context<M, S>(
395 spawner: S,
396 buffer_size: usize,
397) -> (TestSubsystemContext<M, SpawnGlue<S>>, TestSubsystemContextHandle<M>) {
398 let (overseer_tx, overseer_rx) = mpsc::channel(buffer_size);
399 let (all_messages_tx, all_messages_rx) = mpsc::unbounded();
400 let message_counter = MessageCounter::default();
401
402 (
403 TestSubsystemContext {
404 tx: TestSubsystemSender {
405 tx: all_messages_tx,
406 message_counter: message_counter.clone(),
407 },
408 rx: overseer_rx,
409 spawn: SpawnGlue(spawner),
410 message_buffer: VecDeque::new(),
411 },
412 TestSubsystemContextHandle {
413 tx: overseer_tx,
414 rx: all_messages_rx,
415 message_counter: message_counter.clone(),
416 message_buffer: None,
417 },
418 )
419}
420
421pub fn subsystem_test_harness<M, OverseerFactory, Overseer, TestFactory, Test>(
428 overseer_factory: OverseerFactory,
429 test_factory: TestFactory,
430) where
431 OverseerFactory: FnOnce(TestSubsystemContextHandle<M>) -> Overseer,
432 Overseer: Future<Output = ()>,
433 TestFactory: FnOnce(TestSubsystemContext<M, overseer::SpawnGlue<TaskExecutor>>) -> Test,
434 Test: Future<Output = ()>,
435{
436 let pool = TaskExecutor::new();
437 let (context, handle) = make_subsystem_context(pool);
438 let overseer = overseer_factory(handle);
439 let test = test_factory(context);
440
441 futures::pin_mut!(overseer, test);
442
443 futures::executor::block_on(async move {
444 future::join(overseer, test)
445 .timeout(Duration::from_secs(10))
446 .await
447 .expect("test timed out instead of completing")
448 });
449}
450
451pub struct ForwardSubsystem<M>(pub mpsc::Sender<M>);
458
459impl<M, Context> overseer::Subsystem<Context, SubsystemError> for ForwardSubsystem<M>
460where
461 M: overseer::AssociateOutgoing + std::fmt::Debug + Send + 'static,
462 Context: overseer::SubsystemContext<
463 Message = M,
464 Signal = OverseerSignal,
465 Error = SubsystemError,
466 OutgoingMessages = <M as overseer::AssociateOutgoing>::OutgoingMessages,
467 >,
468{
469 fn start(mut self, mut ctx: Context) -> SpawnedSubsystem {
470 let future = Box::pin(async move {
471 loop {
472 match ctx.recv().await {
473 Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => return Ok(()),
474 Ok(FromOrchestra::Communication { msg }) => {
475 let _ = self.0.send(msg).await;
476 },
477 Err(_) => return Ok(()),
478 _ => (),
479 }
480 }
481 });
482
483 SpawnedSubsystem { name: "forward-subsystem", future }
484 }
485}
486
487#[macro_export]
489macro_rules! arbitrary_order {
490 ($rx:expr; $p1:pat => $e1:expr; $p2:pat => $e2:expr) => {
491 match $rx {
493 $p1 => {
494 let __ret1 = { $e1 };
495 let __ret2 = match $rx {
496 $p2 => $e2,
497 #[allow(unreachable_patterns)]
498 _ => unreachable!("first pattern matched, second pattern did not"),
499 };
500 (__ret1, __ret2)
501 },
502 $p2 => {
503 let __ret2 = { $e2 };
504 let __ret1 = match $rx {
505 $p1 => $e1,
506 #[allow(unreachable_patterns)]
507 _ => unreachable!("second pattern matched, first pattern did not"),
508 };
509 (__ret1, __ret2)
510 },
511 #[allow(unreachable_patterns)]
512 _ => unreachable!("neither first nor second pattern matched"),
513 }
514 };
515}
516
517pub struct Yield(bool);
523
524impl Yield {
525 pub fn new() -> Self {
527 Self(false)
528 }
529}
530
531impl Future for Yield {
532 type Output = ();
533
534 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
535 if !self.0 {
536 self.0 = true;
537 cx.waker().wake_by_ref();
538 Poll::Pending
539 } else {
540 Poll::Ready(())
541 }
542 }
543}
544
545pub fn derive_erasure_chunks_with_proofs_and_root(
547 n_validators: usize,
548 available_data: &AvailableData,
549 alter_chunk: impl Fn(usize, &mut Vec<u8>),
550) -> (Vec<ErasureChunk>, Hash) {
551 let mut chunks: Vec<Vec<u8>> = obtain_chunks(n_validators, available_data).unwrap();
552
553 for (i, chunk) in chunks.iter_mut().enumerate() {
554 alter_chunk(i, chunk)
555 }
556
557 let branches = branches(chunks.as_ref());
559
560 let root = branches.root();
561 let erasure_chunks = branches
562 .enumerate()
563 .map(|(index, (proof, chunk))| ErasureChunk {
564 chunk: chunk.to_vec(),
565 index: ChunkIndex(index as _),
566 proof: Proof::try_from(proof).unwrap(),
567 })
568 .collect::<Vec<ErasureChunk>>();
569
570 (erasure_chunks, root)
571}
572
573#[cfg(test)]
574mod tests {
575 use super::*;
576
577 #[test]
578 fn macro_arbitrary_order() {
579 let mut vals = vec![Some(15_usize), None];
580 let (first, second) = arbitrary_order!(vals.pop().unwrap(); Some(fx) => fx; None => 0);
581 assert_eq!(first, 15_usize);
582 assert_eq!(second, 0_usize);
583 }
584
585 #[test]
586 fn macro_arbitrary_order_swapped() {
587 let mut vals = vec![None, Some(11_usize)];
588 let (first, second) = arbitrary_order!(vals.pop().unwrap(); Some(fx) => fx; None => 0);
589 assert_eq!(first, 11_usize);
590 assert_eq!(second, 0);
591 }
592}