use super::{
BlockStatus as BlockStatusT, BlockSyncRequester as BlockSyncRequesterT, CommunicationIn, Error,
SignedMessage, LOG_TARGET,
};
use finality_grandpa::voter;
use futures::{
prelude::*,
stream::{Fuse, StreamExt},
};
use futures_timer::Delay;
use log::{debug, warn};
use parking_lot::Mutex;
use prometheus_endpoint::{register, Gauge, PrometheusError, Registry, U64};
use sc_client_api::{BlockImportNotification, ImportNotifications};
use sc_utils::mpsc::TracingUnboundedReceiver;
use sp_consensus_grandpa::AuthorityId;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
use std::{
collections::{HashMap, VecDeque},
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::{Duration, Instant},
};
const LOG_PENDING_INTERVAL: Duration = Duration::from_secs(15);
pub(crate) trait BlockUntilImported<Block: BlockT>: Sized {
type Blocked;
fn needs_waiting<S: BlockStatusT<Block>>(
input: Self::Blocked,
status_check: &S,
) -> Result<DiscardWaitOrReady<Block, Self, Self::Blocked>, Error>;
fn wait_completed(self, canon_number: NumberFor<Block>) -> Option<Self::Blocked>;
}
pub(crate) enum DiscardWaitOrReady<Block: BlockT, W, R> {
Discard,
Wait(Vec<(Block::Hash, NumberFor<Block>, W)>),
Ready(R),
}
pub(crate) struct Metrics {
global_waiting_messages: Gauge<U64>,
local_waiting_messages: u64,
}
impl Metrics {
pub(crate) fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
global_waiting_messages: register(
Gauge::new(
"substrate_finality_grandpa_until_imported_waiting_messages_number",
"Number of finality grandpa messages waiting within the until imported queue.",
)?,
registry,
)?,
local_waiting_messages: 0,
})
}
fn waiting_messages_inc(&mut self) {
self.local_waiting_messages += 1;
self.global_waiting_messages.inc();
}
fn waiting_messages_dec(&mut self) {
self.local_waiting_messages -= 1;
self.global_waiting_messages.dec();
}
}
impl Clone for Metrics {
fn clone(&self) -> Self {
Metrics {
global_waiting_messages: self.global_waiting_messages.clone(),
local_waiting_messages: 0,
}
}
}
impl Drop for Metrics {
fn drop(&mut self) {
self.global_waiting_messages.sub(self.local_waiting_messages)
}
}
pub(crate) struct UntilImported<Block, BlockStatus, BlockSyncRequester, I, M>
where
Block: BlockT,
I: Stream<Item = M::Blocked> + Unpin,
M: BlockUntilImported<Block>,
{
import_notifications: Fuse<TracingUnboundedReceiver<BlockImportNotification<Block>>>,
block_sync_requester: BlockSyncRequester,
status_check: BlockStatus,
incoming_messages: Fuse<I>,
ready: VecDeque<M::Blocked>,
check_pending: Pin<Box<dyn Stream<Item = Result<(), std::io::Error>> + Send>>,
pending: HashMap<Block::Hash, (NumberFor<Block>, Instant, Vec<M>)>,
identifier: &'static str,
metrics: Option<Metrics>,
}
impl<Block, BlockStatus, BlockSyncRequester, I, M> Unpin
for UntilImported<Block, BlockStatus, BlockSyncRequester, I, M>
where
Block: BlockT,
I: Stream<Item = M::Blocked> + Unpin,
M: BlockUntilImported<Block>,
{
}
impl<Block, BlockStatus, BlockSyncRequester, I, M>
UntilImported<Block, BlockStatus, BlockSyncRequester, I, M>
where
Block: BlockT,
BlockStatus: BlockStatusT<Block>,
BlockSyncRequester: BlockSyncRequesterT<Block>,
I: Stream<Item = M::Blocked> + Unpin,
M: BlockUntilImported<Block>,
{
pub(crate) fn new(
import_notifications: ImportNotifications<Block>,
block_sync_requester: BlockSyncRequester,
status_check: BlockStatus,
incoming_messages: I,
identifier: &'static str,
metrics: Option<Metrics>,
) -> Self {
const CHECK_PENDING_INTERVAL: Duration = Duration::from_secs(5);
let check_pending = futures::stream::unfold(Delay::new(CHECK_PENDING_INTERVAL), |delay| {
Box::pin(async move {
delay.await;
Some((Ok(()), Delay::new(CHECK_PENDING_INTERVAL)))
})
});
UntilImported {
import_notifications: import_notifications.fuse(),
block_sync_requester,
status_check,
incoming_messages: incoming_messages.fuse(),
ready: VecDeque::new(),
check_pending: Box::pin(check_pending),
pending: HashMap::new(),
identifier,
metrics,
}
}
}
impl<Block, BStatus, BSyncRequester, I, M> Stream
for UntilImported<Block, BStatus, BSyncRequester, I, M>
where
Block: BlockT,
BStatus: BlockStatusT<Block>,
BSyncRequester: BlockSyncRequesterT<Block>,
I: Stream<Item = M::Blocked> + Unpin,
M: BlockUntilImported<Block>,
{
type Item = Result<M::Blocked, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = &mut *self;
loop {
match StreamExt::poll_next_unpin(&mut this.incoming_messages, cx) {
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(input)) => {
match M::needs_waiting(input, &this.status_check)? {
DiscardWaitOrReady::Discard => {},
DiscardWaitOrReady::Wait(items) => {
for (target_hash, target_number, wait) in items {
this.pending
.entry(target_hash)
.or_insert_with(|| (target_number, Instant::now(), Vec::new()))
.2
.push(wait)
}
},
DiscardWaitOrReady::Ready(item) => this.ready.push_back(item),
}
if let Some(metrics) = &mut this.metrics {
metrics.waiting_messages_inc();
}
},
Poll::Pending => break,
}
}
loop {
match StreamExt::poll_next_unpin(&mut this.import_notifications, cx) {
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(notification)) => {
if let Some((_, _, messages)) = this.pending.remove(¬ification.hash) {
let canon_number = *notification.header.number();
let ready_messages =
messages.into_iter().filter_map(|m| m.wait_completed(canon_number));
this.ready.extend(ready_messages);
}
},
Poll::Pending => break,
}
}
let mut update_interval = false;
while let Poll::Ready(Some(Ok(()))) = this.check_pending.poll_next_unpin(cx) {
update_interval = true;
}
if update_interval {
let mut known_keys = Vec::new();
for (&block_hash, &mut (block_number, ref mut last_log, ref v)) in
this.pending.iter_mut()
{
if let Some(number) = this.status_check.block_number(block_hash)? {
known_keys.push((block_hash, number));
} else {
let next_log = *last_log + LOG_PENDING_INTERVAL;
if Instant::now() >= next_log {
debug!(
target: LOG_TARGET,
"Waiting to import block {} before {} {} messages can be imported. \
Requesting network sync service to retrieve block from. \
Possible fork?",
block_hash,
v.len(),
this.identifier,
);
this.block_sync_requester.set_sync_fork_request(
vec![],
block_hash,
block_number,
);
*last_log = next_log;
}
}
}
for (known_hash, canon_number) in known_keys {
if let Some((_, _, pending_messages)) = this.pending.remove(&known_hash) {
let ready_messages =
pending_messages.into_iter().filter_map(|m| m.wait_completed(canon_number));
this.ready.extend(ready_messages);
}
}
}
if let Some(ready) = this.ready.pop_front() {
if let Some(metrics) = &mut this.metrics {
metrics.waiting_messages_dec();
}
return Poll::Ready(Some(Ok(ready)))
}
if this.import_notifications.is_done() && this.incoming_messages.is_done() {
Poll::Ready(None)
} else {
Poll::Pending
}
}
}
fn warn_authority_wrong_target<H: ::std::fmt::Display>(hash: H, id: AuthorityId) {
warn!(
target: LOG_TARGET,
"Authority {:?} signed GRANDPA message with \
wrong block number for hash {}",
id,
hash,
);
}
impl<Block: BlockT> BlockUntilImported<Block> for SignedMessage<Block::Header> {
type Blocked = Self;
fn needs_waiting<BlockStatus: BlockStatusT<Block>>(
msg: Self::Blocked,
status_check: &BlockStatus,
) -> Result<DiscardWaitOrReady<Block, Self, Self::Blocked>, Error> {
let (&target_hash, target_number) = msg.target();
if let Some(number) = status_check.block_number(target_hash)? {
if number != target_number {
warn_authority_wrong_target(target_hash, msg.id);
return Ok(DiscardWaitOrReady::Discard)
} else {
return Ok(DiscardWaitOrReady::Ready(msg))
}
}
Ok(DiscardWaitOrReady::Wait(vec![(target_hash, target_number, msg)]))
}
fn wait_completed(self, canon_number: NumberFor<Block>) -> Option<Self::Blocked> {
let (&target_hash, target_number) = self.target();
if canon_number != target_number {
warn_authority_wrong_target(target_hash, self.id);
None
} else {
Some(self)
}
}
}
pub(crate) type UntilVoteTargetImported<Block, BlockStatus, BlockSyncRequester, I> = UntilImported<
Block,
BlockStatus,
BlockSyncRequester,
I,
SignedMessage<<Block as BlockT>::Header>,
>;
pub(crate) struct BlockGlobalMessage<Block: BlockT> {
inner: Arc<Mutex<Option<CommunicationIn<Block>>>>,
target_number: NumberFor<Block>,
}
impl<Block: BlockT> Unpin for BlockGlobalMessage<Block> {}
impl<Block: BlockT> BlockUntilImported<Block> for BlockGlobalMessage<Block> {
type Blocked = CommunicationIn<Block>;
fn needs_waiting<BlockStatus: BlockStatusT<Block>>(
input: Self::Blocked,
status_check: &BlockStatus,
) -> Result<DiscardWaitOrReady<Block, Self, Self::Blocked>, Error> {
use std::collections::hash_map::Entry;
enum KnownOrUnknown<N> {
Known(N),
Unknown(N),
}
impl<N> KnownOrUnknown<N> {
fn number(&self) -> &N {
match *self {
KnownOrUnknown::Known(ref n) => n,
KnownOrUnknown::Unknown(ref n) => n,
}
}
}
let mut checked_hashes: HashMap<_, KnownOrUnknown<NumberFor<Block>>> = HashMap::new();
{
let mut query_known = |target_hash, perceived_number| -> Result<bool, Error> {
let canon_number = match checked_hashes.entry(target_hash) {
Entry::Occupied(entry) => *entry.get().number(),
Entry::Vacant(entry) => {
if let Some(number) = status_check.block_number(target_hash)? {
entry.insert(KnownOrUnknown::Known(number));
number
} else {
entry.insert(KnownOrUnknown::Unknown(perceived_number));
perceived_number
}
},
};
if canon_number != perceived_number {
return Ok(false)
}
Ok(true)
};
match input {
voter::CommunicationIn::Commit(_, ref commit, ..) => {
let precommit_targets =
commit.precommits.iter().map(|c| (c.target_number, c.target_hash));
for (target_number, target_hash) in precommit_targets {
if !query_known(target_hash, target_number)? {
return Ok(DiscardWaitOrReady::Discard)
}
}
},
voter::CommunicationIn::CatchUp(ref catch_up, ..) => {
let prevote_targets = catch_up
.prevotes
.iter()
.map(|s| (s.prevote.target_number, s.prevote.target_hash));
let precommit_targets = catch_up
.precommits
.iter()
.map(|s| (s.precommit.target_number, s.precommit.target_hash));
let targets = prevote_targets.chain(precommit_targets);
for (target_number, target_hash) in targets {
if !query_known(target_hash, target_number)? {
return Ok(DiscardWaitOrReady::Discard)
}
}
},
};
}
let unknown_hashes = checked_hashes
.into_iter()
.filter_map(|(hash, num)| match num {
KnownOrUnknown::Unknown(number) => Some((hash, number)),
KnownOrUnknown::Known(_) => None,
})
.collect::<Vec<_>>();
if unknown_hashes.is_empty() {
return Ok(DiscardWaitOrReady::Ready(input))
}
let locked_global = Arc::new(Mutex::new(Some(input)));
let items_to_await = unknown_hashes
.into_iter()
.map(|(hash, target_number)| {
(
hash,
target_number,
BlockGlobalMessage { inner: locked_global.clone(), target_number },
)
})
.collect();
Ok(DiscardWaitOrReady::Wait(items_to_await))
}
fn wait_completed(self, canon_number: NumberFor<Block>) -> Option<Self::Blocked> {
if self.target_number != canon_number {
*self.inner.lock() = None;
return None
}
match Arc::try_unwrap(self.inner) {
Ok(inner) => Mutex::into_inner(inner),
Err(_) => None,
}
}
}
pub(crate) type UntilGlobalMessageBlocksImported<Block, BlockStatus, BlockSyncRequester, I> =
UntilImported<Block, BlockStatus, BlockSyncRequester, I, BlockGlobalMessage<Block>>;
#[cfg(test)]
mod tests {
use super::*;
use crate::{CatchUp, CompactCommit};
use finality_grandpa::Precommit;
use futures::future::Either;
use futures_timer::Delay;
use sc_client_api::BlockImportNotification;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
use sp_consensus::BlockOrigin;
use sp_core::crypto::UncheckedFrom;
use substrate_test_runtime_client::runtime::{Block, Hash, Header};
#[derive(Clone)]
struct TestChainState {
sender: TracingUnboundedSender<BlockImportNotification<Block>>,
known_blocks: Arc<Mutex<HashMap<Hash, u64>>>,
}
impl TestChainState {
fn new() -> (Self, ImportNotifications<Block>) {
let (tx, rx) = tracing_unbounded("test", 100_000);
let state =
TestChainState { sender: tx, known_blocks: Arc::new(Mutex::new(HashMap::new())) };
(state, rx)
}
fn block_status(&self) -> TestBlockStatus {
TestBlockStatus { inner: self.known_blocks.clone() }
}
fn import_header(&self, header: Header) {
let hash = header.hash();
let number = *header.number();
let (tx, _rx) = tracing_unbounded("unpin-worker-channel", 10_000);
self.known_blocks.lock().insert(hash, number);
self.sender
.unbounded_send(BlockImportNotification::<Block>::new(
hash,
BlockOrigin::File,
header,
false,
None,
tx,
))
.unwrap();
}
}
struct TestBlockStatus {
inner: Arc<Mutex<HashMap<Hash, u64>>>,
}
impl BlockStatusT<Block> for TestBlockStatus {
fn block_number(&self, hash: Hash) -> Result<Option<u64>, Error> {
Ok(self.inner.lock().get(&hash).map(|x| *x))
}
}
#[derive(Clone)]
struct TestBlockSyncRequester {
requests: Arc<Mutex<Vec<(Hash, NumberFor<Block>)>>>,
}
impl Default for TestBlockSyncRequester {
fn default() -> Self {
TestBlockSyncRequester { requests: Arc::new(Mutex::new(Vec::new())) }
}
}
impl BlockSyncRequesterT<Block> for TestBlockSyncRequester {
fn set_sync_fork_request(
&self,
_peers: Vec<sc_network_types::PeerId>,
hash: Hash,
number: NumberFor<Block>,
) {
self.requests.lock().push((hash, number));
}
}
fn make_header(number: u64) -> Header {
Header::new(
number,
Default::default(),
Default::default(),
Default::default(),
Default::default(),
)
}
fn unapply_commit(msg: CommunicationIn<Block>) -> (u64, CompactCommit<Header>) {
match msg {
voter::CommunicationIn::Commit(round, commit, ..) => (round, commit),
_ => panic!("expected commit"),
}
}
fn unapply_catch_up(msg: CommunicationIn<Block>) -> CatchUp<Header> {
match msg {
voter::CommunicationIn::CatchUp(catch_up, ..) => catch_up,
_ => panic!("expected catch up"),
}
}
fn message_all_dependencies_satisfied<F>(
msg: CommunicationIn<Block>,
enact_dependencies: F,
) -> CommunicationIn<Block>
where
F: FnOnce(&TestChainState),
{
let (chain_state, import_notifications) = TestChainState::new();
let block_status = chain_state.block_status();
enact_dependencies(&chain_state);
let (global_tx, global_rx) = tracing_unbounded("test", 100_000);
let until_imported = UntilGlobalMessageBlocksImported::new(
import_notifications,
TestBlockSyncRequester::default(),
block_status,
global_rx,
"global",
None,
);
global_tx.unbounded_send(msg).unwrap();
let work = until_imported.into_future();
futures::executor::block_on(work).0.unwrap().unwrap()
}
fn blocking_message_on_dependencies<F>(
msg: CommunicationIn<Block>,
enact_dependencies: F,
) -> CommunicationIn<Block>
where
F: FnOnce(&TestChainState),
{
let (chain_state, import_notifications) = TestChainState::new();
let block_status = chain_state.block_status();
let (global_tx, global_rx) = tracing_unbounded("test", 100_000);
let until_imported = UntilGlobalMessageBlocksImported::new(
import_notifications,
TestBlockSyncRequester::default(),
block_status,
global_rx,
"global",
None,
);
global_tx.unbounded_send(msg).unwrap();
let inner_chain_state = chain_state.clone();
let work =
future::select(until_imported.into_future(), Delay::new(Duration::from_millis(100)))
.then(move |res| match res {
Either::Left(_) => panic!("timeout should have fired first"),
Either::Right((_, until_imported)) => {
enact_dependencies(&inner_chain_state);
until_imported
},
});
futures::executor::block_on(work).0.unwrap().unwrap()
}
#[test]
fn blocking_commit_message() {
let h1 = make_header(5);
let h2 = make_header(6);
let h3 = make_header(7);
let unknown_commit = CompactCommit::<Header> {
target_hash: h1.hash(),
target_number: 5,
precommits: vec![
Precommit { target_hash: h2.hash(), target_number: 6 },
Precommit { target_hash: h3.hash(), target_number: 7 },
],
auth_data: Vec::new(), };
let unknown_commit =
|| voter::CommunicationIn::Commit(0, unknown_commit.clone(), voter::Callback::Blank);
let res = blocking_message_on_dependencies(unknown_commit(), |chain_state| {
chain_state.import_header(h1);
chain_state.import_header(h2);
chain_state.import_header(h3);
});
assert_eq!(unapply_commit(res), unapply_commit(unknown_commit()));
}
#[test]
fn commit_message_all_known() {
let h1 = make_header(5);
let h2 = make_header(6);
let h3 = make_header(7);
let known_commit = CompactCommit::<Header> {
target_hash: h1.hash(),
target_number: 5,
precommits: vec![
Precommit { target_hash: h2.hash(), target_number: 6 },
Precommit { target_hash: h3.hash(), target_number: 7 },
],
auth_data: Vec::new(), };
let known_commit =
|| voter::CommunicationIn::Commit(0, known_commit.clone(), voter::Callback::Blank);
let res = message_all_dependencies_satisfied(known_commit(), |chain_state| {
chain_state.import_header(h1);
chain_state.import_header(h2);
chain_state.import_header(h3);
});
assert_eq!(unapply_commit(res), unapply_commit(known_commit()));
}
#[test]
fn blocking_catch_up_message() {
let h1 = make_header(5);
let h2 = make_header(6);
let h3 = make_header(7);
let signed_prevote = |header: &Header| finality_grandpa::SignedPrevote {
id: UncheckedFrom::unchecked_from([1; 32]),
signature: UncheckedFrom::unchecked_from([1; 64]),
prevote: finality_grandpa::Prevote {
target_hash: header.hash(),
target_number: *header.number(),
},
};
let signed_precommit = |header: &Header| finality_grandpa::SignedPrecommit {
id: UncheckedFrom::unchecked_from([1; 32]),
signature: UncheckedFrom::unchecked_from([1; 64]),
precommit: finality_grandpa::Precommit {
target_hash: header.hash(),
target_number: *header.number(),
},
};
let prevotes = vec![signed_prevote(&h1), signed_prevote(&h3)];
let precommits = vec![signed_precommit(&h1), signed_precommit(&h2)];
let unknown_catch_up = finality_grandpa::CatchUp {
round_number: 1,
prevotes,
precommits,
base_hash: h1.hash(),
base_number: *h1.number(),
};
let unknown_catch_up =
|| voter::CommunicationIn::CatchUp(unknown_catch_up.clone(), voter::Callback::Blank);
let res = blocking_message_on_dependencies(unknown_catch_up(), |chain_state| {
chain_state.import_header(h1);
chain_state.import_header(h2);
chain_state.import_header(h3);
});
assert_eq!(unapply_catch_up(res), unapply_catch_up(unknown_catch_up()));
}
#[test]
fn catch_up_message_all_known() {
let h1 = make_header(5);
let h2 = make_header(6);
let h3 = make_header(7);
let signed_prevote = |header: &Header| finality_grandpa::SignedPrevote {
id: UncheckedFrom::unchecked_from([1; 32]),
signature: UncheckedFrom::unchecked_from([1; 64]),
prevote: finality_grandpa::Prevote {
target_hash: header.hash(),
target_number: *header.number(),
},
};
let signed_precommit = |header: &Header| finality_grandpa::SignedPrecommit {
id: UncheckedFrom::unchecked_from([1; 32]),
signature: UncheckedFrom::unchecked_from([1; 64]),
precommit: finality_grandpa::Precommit {
target_hash: header.hash(),
target_number: *header.number(),
},
};
let prevotes = vec![signed_prevote(&h1), signed_prevote(&h3)];
let precommits = vec![signed_precommit(&h1), signed_precommit(&h2)];
let unknown_catch_up = finality_grandpa::CatchUp {
round_number: 1,
prevotes,
precommits,
base_hash: h1.hash(),
base_number: *h1.number(),
};
let unknown_catch_up =
|| voter::CommunicationIn::CatchUp(unknown_catch_up.clone(), voter::Callback::Blank);
let res = message_all_dependencies_satisfied(unknown_catch_up(), |chain_state| {
chain_state.import_header(h1);
chain_state.import_header(h2);
chain_state.import_header(h3);
});
assert_eq!(unapply_catch_up(res), unapply_catch_up(unknown_catch_up()));
}
#[test]
fn request_block_sync_for_needed_blocks() {
let (chain_state, import_notifications) = TestChainState::new();
let block_status = chain_state.block_status();
let (global_tx, global_rx) = tracing_unbounded("test", 100_000);
let block_sync_requester = TestBlockSyncRequester::default();
let until_imported = UntilGlobalMessageBlocksImported::new(
import_notifications,
block_sync_requester.clone(),
block_status,
global_rx,
"global",
None,
);
let h1 = make_header(5);
let h2 = make_header(6);
let h3 = make_header(7);
let unknown_commit = CompactCommit::<Header> {
target_hash: h1.hash(),
target_number: 5,
precommits: vec![
Precommit { target_hash: h2.hash(), target_number: 6 },
Precommit { target_hash: h3.hash(), target_number: 7 },
],
auth_data: Vec::new(), };
let unknown_commit =
|| voter::CommunicationIn::Commit(0, unknown_commit.clone(), voter::Callback::Blank);
global_tx.unbounded_send(unknown_commit()).unwrap();
let threads_pool = futures::executor::ThreadPool::new().unwrap();
threads_pool.spawn_ok(until_imported.into_future().map(|_| ()));
let assert = futures::future::poll_fn(|ctx| {
let block_sync_requests = block_sync_requester.requests.lock();
if block_sync_requests.contains(&(h2.hash(), *h2.number())) &&
block_sync_requests.contains(&(h3.hash(), *h3.number()))
{
return Poll::Ready(())
}
ctx.waker().wake_by_ref();
Poll::Pending
});
let timeout = Delay::new(Duration::from_secs(60));
let test = future::select(assert, timeout)
.map(|res| match res {
Either::Left(_) => {},
Either::Right(_) => panic!("timed out waiting for block sync request"),
})
.map(drop);
futures::executor::block_on(test);
}
fn test_catch_up() -> Arc<Mutex<Option<CommunicationIn<Block>>>> {
let header = make_header(5);
let unknown_catch_up = finality_grandpa::CatchUp {
round_number: 1,
precommits: vec![],
prevotes: vec![],
base_hash: header.hash(),
base_number: *header.number(),
};
let catch_up =
voter::CommunicationIn::CatchUp(unknown_catch_up.clone(), voter::Callback::Blank);
Arc::new(Mutex::new(Some(catch_up)))
}
#[test]
fn block_global_message_wait_completed_return_when_all_awaited() {
let msg_inner = test_catch_up();
let waiting_block_1 =
BlockGlobalMessage::<Block> { inner: msg_inner.clone(), target_number: 1 };
let waiting_block_2 = BlockGlobalMessage::<Block> { inner: msg_inner, target_number: 2 };
assert!(waiting_block_1.wait_completed(1).is_none());
assert!(waiting_block_2.wait_completed(2).is_some());
}
#[test]
fn block_global_message_wait_completed_return_none_on_block_number_mismatch() {
let msg_inner = test_catch_up();
let waiting_block_1 =
BlockGlobalMessage::<Block> { inner: msg_inner.clone(), target_number: 1 };
let waiting_block_2 = BlockGlobalMessage::<Block> { inner: msg_inner, target_number: 2 };
assert!(waiting_block_1.wait_completed(1234).is_none());
assert!(waiting_block_2.wait_completed(2).is_none());
}
#[test]
fn metrics_cleans_up_after_itself() {
let r = Registry::new();
let mut m1 = Metrics::register(&r).unwrap();
let m2 = m1.clone();
m1.waiting_messages_inc();
assert_eq!(1, m2.global_waiting_messages.get());
drop(m1);
assert_eq!(0, m2.global_waiting_messages.get());
}
}