#![warn(missing_docs)]
use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks};
use polkadot_node_primitives::{AvailableData, ErasureChunk, Proof};
use polkadot_node_subsystem::{
messages::AllMessages, overseer, FromOrchestra, OverseerSignal, SpawnGlue, SpawnedSubsystem,
SubsystemError, SubsystemResult, TrySendError,
};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_primitives::{ChunkIndex, Hash};
use futures::{channel::mpsc, poll, prelude::*};
use parking_lot::Mutex;
use sp_core::testing::TaskExecutor;
use std::{
collections::VecDeque,
convert::Infallible,
future::Future,
pin::Pin,
sync::{atomic::AtomicUsize, Arc},
task::{Context, Poll, Waker},
time::Duration,
};
pub mod mock;
enum SinkState<T> {
Empty { read_waker: Option<Waker> },
Item { item: T, ready_waker: Option<Waker>, flush_waker: Option<Waker> },
}
pub struct SingleItemSink<T>(Arc<Mutex<SinkState<T>>>);
impl<T> Clone for SingleItemSink<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
pub struct SingleItemStream<T>(Arc<Mutex<SinkState<T>>>);
impl<T> Sink<T> for SingleItemSink<T> {
type Error = Infallible;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Infallible>> {
let mut state = self.0.lock();
match *state {
SinkState::Empty { .. } => Poll::Ready(Ok(())),
SinkState::Item { ref mut ready_waker, .. } => {
*ready_waker = Some(cx.waker().clone());
Poll::Pending
},
}
}
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Infallible> {
let mut state = self.0.lock();
match *state {
SinkState::Empty { ref mut read_waker } =>
if let Some(waker) = read_waker.take() {
waker.wake();
},
_ => panic!("start_send called outside of empty sink state ensured by poll_ready"),
}
*state = SinkState::Item { item, ready_waker: None, flush_waker: None };
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Infallible>> {
let mut state = self.0.lock();
match *state {
SinkState::Empty { .. } => Poll::Ready(Ok(())),
SinkState::Item { ref mut flush_waker, .. } => {
*flush_waker = Some(cx.waker().clone());
Poll::Pending
},
}
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Infallible>> {
self.poll_flush(cx)
}
}
impl<T> Stream for SingleItemStream<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut state = self.0.lock();
let read_waker = Some(cx.waker().clone());
match std::mem::replace(&mut *state, SinkState::Empty { read_waker }) {
SinkState::Empty { .. } => Poll::Pending,
SinkState::Item { item, ready_waker, flush_waker } => {
if let Some(waker) = ready_waker {
waker.wake();
}
if let Some(waker) = flush_waker {
waker.wake();
}
Poll::Ready(Some(item))
},
}
}
}
pub fn single_item_sink<T>() -> (SingleItemSink<T>, SingleItemStream<T>) {
let inner = Arc::new(Mutex::new(SinkState::Empty { read_waker: None }));
(SingleItemSink(inner.clone()), SingleItemStream(inner))
}
#[derive(Clone)]
pub struct TestSubsystemSender {
tx: mpsc::UnboundedSender<AllMessages>,
message_counter: MessageCounter,
}
pub fn sender_receiver() -> (TestSubsystemSender, mpsc::UnboundedReceiver<AllMessages>) {
let (tx, rx) = mpsc::unbounded();
(TestSubsystemSender { tx, message_counter: MessageCounter::default() }, rx)
}
#[async_trait::async_trait]
impl<OutgoingMessage> overseer::SubsystemSender<OutgoingMessage> for TestSubsystemSender
where
AllMessages: From<OutgoingMessage>,
OutgoingMessage: Send + 'static,
{
async fn send_message(&mut self, msg: OutgoingMessage) {
self.send_message_with_priority::<overseer::NormalPriority>(msg).await;
}
async fn send_message_with_priority<P: overseer::Priority>(&mut self, msg: OutgoingMessage) {
self.message_counter.increment(P::priority());
self.tx.send(msg.into()).await.expect("test overseer no longer live");
}
fn try_send_message(
&mut self,
msg: OutgoingMessage,
) -> Result<(), TrySendError<OutgoingMessage>> {
self.try_send_message_with_priority::<overseer::NormalPriority>(msg)
}
fn try_send_message_with_priority<P: overseer::Priority>(
&mut self,
msg: OutgoingMessage,
) -> Result<(), TrySendError<OutgoingMessage>> {
self.message_counter.increment(P::priority());
self.tx.unbounded_send(msg.into()).expect("test overseer no longer live");
Ok(())
}
async fn send_messages<I>(&mut self, msgs: I)
where
I: IntoIterator<Item = OutgoingMessage> + Send,
I::IntoIter: Send,
{
let mut iter = stream::iter(msgs.into_iter().map(|msg| Ok(msg.into())));
self.tx.send_all(&mut iter).await.expect("test overseer no longer live");
}
fn send_unbounded_message(&mut self, msg: OutgoingMessage) {
self.tx.unbounded_send(msg.into()).expect("test overseer no longer live");
}
}
pub struct TestSubsystemContext<M, S> {
tx: TestSubsystemSender,
rx: mpsc::Receiver<FromOrchestra<M>>,
spawn: S,
message_buffer: VecDeque<FromOrchestra<M>>,
}
#[async_trait::async_trait]
impl<M, Spawner> overseer::SubsystemContext for TestSubsystemContext<M, Spawner>
where
M: overseer::AssociateOutgoing + std::fmt::Debug + Send + 'static,
AllMessages: From<<M as overseer::AssociateOutgoing>::OutgoingMessages>,
AllMessages: From<M>,
Spawner: overseer::gen::Spawner + Send + 'static,
{
type Message = M;
type Sender = TestSubsystemSender;
type Signal = OverseerSignal;
type OutgoingMessages = <M as overseer::AssociateOutgoing>::OutgoingMessages;
type Error = SubsystemError;
async fn try_recv(&mut self) -> Result<Option<FromOrchestra<M>>, ()> {
if let Some(msg) = self.message_buffer.pop_front() {
return Ok(Some(msg))
}
match poll!(self.rx.next()) {
Poll::Ready(Some(msg)) => Ok(Some(msg)),
Poll::Ready(None) => Err(()),
Poll::Pending => Ok(None),
}
}
async fn recv(&mut self) -> SubsystemResult<FromOrchestra<M>> {
if let Some(msg) = self.message_buffer.pop_front() {
return Ok(msg)
}
self.rx
.next()
.await
.ok_or_else(|| SubsystemError::Context("Receiving end closed".to_owned()))
}
async fn recv_signal(&mut self) -> SubsystemResult<OverseerSignal> {
loop {
let msg = self
.rx
.next()
.await
.ok_or_else(|| SubsystemError::Context("Receiving end closed".to_owned()))?;
if let FromOrchestra::Signal(sig) = msg {
return Ok(sig)
} else {
self.message_buffer.push_back(msg)
}
}
}
fn spawn(
&mut self,
name: &'static str,
s: Pin<Box<dyn Future<Output = ()> + Send>>,
) -> SubsystemResult<()> {
self.spawn.spawn(name, None, s);
Ok(())
}
fn spawn_blocking(
&mut self,
name: &'static str,
s: Pin<Box<dyn Future<Output = ()> + Send>>,
) -> SubsystemResult<()> {
self.spawn.spawn_blocking(name, None, s);
Ok(())
}
fn sender(&mut self) -> &mut TestSubsystemSender {
&mut self.tx
}
}
pub struct TestSubsystemContextHandle<M> {
pub tx: mpsc::Sender<FromOrchestra<M>>,
pub rx: mpsc::UnboundedReceiver<AllMessages>,
pub message_counter: MessageCounter,
message_buffer: Option<AllMessages>,
}
impl<M> TestSubsystemContextHandle<M> {
pub const TIMEOUT: Duration = Duration::from_secs(120);
pub async fn send(&mut self, from_overseer: FromOrchestra<M>) {
self.tx
.send(from_overseer)
.timeout(Self::TIMEOUT)
.await
.expect("`fn send` does not timeout")
.expect("Test subsystem no longer live");
}
pub async fn recv(&mut self) -> AllMessages {
self.try_recv()
.timeout(Self::TIMEOUT)
.await
.expect("`fn recv` does not timeout")
.expect("Test subsystem no longer live")
}
pub async fn try_recv(&mut self) -> Option<AllMessages> {
if let Some(msg) = self.message_buffer.take() {
return Some(msg)
}
self.rx
.next()
.timeout(Self::TIMEOUT)
.await
.expect("`try_recv` does not timeout")
}
pub async fn peek(&mut self) -> Option<&AllMessages> {
if self.message_buffer.is_none() {
self.message_buffer = self
.rx
.next()
.timeout(Self::TIMEOUT)
.await
.expect("`try_recv` does not timeout");
}
self.message_buffer.as_ref()
}
}
pub fn make_subsystem_context<M, S>(
spawner: S,
) -> (TestSubsystemContext<M, SpawnGlue<S>>, TestSubsystemContextHandle<M>) {
make_buffered_subsystem_context(spawner, 0)
}
#[derive(Default, Clone)]
pub struct MessageCounter {
total: Arc<AtomicUsize>,
with_high_priority: Arc<AtomicUsize>,
}
impl MessageCounter {
pub fn increment(&mut self, priority_level: overseer::PriorityLevel) {
self.total.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if matches!(priority_level, overseer::PriorityLevel::High) {
self.with_high_priority.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
}
pub fn reset(&mut self) {
self.total.store(0, std::sync::atomic::Ordering::SeqCst);
self.with_high_priority.store(0, std::sync::atomic::Ordering::SeqCst);
}
pub fn with_high_priority(&self) -> usize {
self.with_high_priority.load(std::sync::atomic::Ordering::SeqCst)
}
}
pub fn make_buffered_subsystem_context<M, S>(
spawner: S,
buffer_size: usize,
) -> (TestSubsystemContext<M, SpawnGlue<S>>, TestSubsystemContextHandle<M>) {
let (overseer_tx, overseer_rx) = mpsc::channel(buffer_size);
let (all_messages_tx, all_messages_rx) = mpsc::unbounded();
let message_counter = MessageCounter::default();
(
TestSubsystemContext {
tx: TestSubsystemSender {
tx: all_messages_tx,
message_counter: message_counter.clone(),
},
rx: overseer_rx,
spawn: SpawnGlue(spawner),
message_buffer: VecDeque::new(),
},
TestSubsystemContextHandle {
tx: overseer_tx,
rx: all_messages_rx,
message_counter: message_counter.clone(),
message_buffer: None,
},
)
}
pub fn subsystem_test_harness<M, OverseerFactory, Overseer, TestFactory, Test>(
overseer_factory: OverseerFactory,
test_factory: TestFactory,
) where
OverseerFactory: FnOnce(TestSubsystemContextHandle<M>) -> Overseer,
Overseer: Future<Output = ()>,
TestFactory: FnOnce(TestSubsystemContext<M, overseer::SpawnGlue<TaskExecutor>>) -> Test,
Test: Future<Output = ()>,
{
let pool = TaskExecutor::new();
let (context, handle) = make_subsystem_context(pool);
let overseer = overseer_factory(handle);
let test = test_factory(context);
futures::pin_mut!(overseer, test);
futures::executor::block_on(async move {
future::join(overseer, test)
.timeout(Duration::from_secs(10))
.await
.expect("test timed out instead of completing")
});
}
pub struct ForwardSubsystem<M>(pub mpsc::Sender<M>);
impl<M, Context> overseer::Subsystem<Context, SubsystemError> for ForwardSubsystem<M>
where
M: overseer::AssociateOutgoing + std::fmt::Debug + Send + 'static,
Context: overseer::SubsystemContext<
Message = M,
Signal = OverseerSignal,
Error = SubsystemError,
OutgoingMessages = <M as overseer::AssociateOutgoing>::OutgoingMessages,
>,
{
fn start(mut self, mut ctx: Context) -> SpawnedSubsystem {
let future = Box::pin(async move {
loop {
match ctx.recv().await {
Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => return Ok(()),
Ok(FromOrchestra::Communication { msg }) => {
let _ = self.0.send(msg).await;
},
Err(_) => return Ok(()),
_ => (),
}
}
});
SpawnedSubsystem { name: "forward-subsystem", future }
}
}
#[macro_export]
macro_rules! arbitrary_order {
($rx:expr; $p1:pat => $e1:expr; $p2:pat => $e2:expr) => {
match $rx {
$p1 => {
let __ret1 = { $e1 };
let __ret2 = match $rx {
$p2 => $e2,
#[allow(unreachable_patterns)]
_ => unreachable!("first pattern matched, second pattern did not"),
};
(__ret1, __ret2)
},
$p2 => {
let __ret2 = { $e2 };
let __ret1 = match $rx {
$p1 => $e1,
#[allow(unreachable_patterns)]
_ => unreachable!("second pattern matched, first pattern did not"),
};
(__ret1, __ret2)
},
#[allow(unreachable_patterns)]
_ => unreachable!("neither first nor second pattern matched"),
}
};
}
pub struct Yield(bool);
impl Yield {
pub fn new() -> Self {
Self(false)
}
}
impl Future for Yield {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.0 {
self.0 = true;
cx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(())
}
}
}
pub fn derive_erasure_chunks_with_proofs_and_root(
n_validators: usize,
available_data: &AvailableData,
alter_chunk: impl Fn(usize, &mut Vec<u8>),
) -> (Vec<ErasureChunk>, Hash) {
let mut chunks: Vec<Vec<u8>> = obtain_chunks(n_validators, available_data).unwrap();
for (i, chunk) in chunks.iter_mut().enumerate() {
alter_chunk(i, chunk)
}
let branches = branches(chunks.as_ref());
let root = branches.root();
let erasure_chunks = branches
.enumerate()
.map(|(index, (proof, chunk))| ErasureChunk {
chunk: chunk.to_vec(),
index: ChunkIndex(index as _),
proof: Proof::try_from(proof).unwrap(),
})
.collect::<Vec<ErasureChunk>>();
(erasure_chunks, root)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn macro_arbitrary_order() {
let mut vals = vec![Some(15_usize), None];
let (first, second) = arbitrary_order!(vals.pop().unwrap(); Some(fx) => fx; None => 0);
assert_eq!(first, 15_usize);
assert_eq!(second, 0_usize);
}
#[test]
fn macro_arbitrary_order_swapped() {
let mut vals = vec![None, Some(11_usize)];
let (first, second) = arbitrary_order!(vals.pop().unwrap(); Some(fx) => fx; None => 0);
assert_eq!(first, 11_usize);
assert_eq!(second, 0);
}
}