1use super::{
26 BlockStatus as BlockStatusT, BlockSyncRequester as BlockSyncRequesterT, CommunicationIn, Error,
27 SignedMessage, LOG_TARGET,
28};
29
30use finality_grandpa::voter;
31use futures::{
32 prelude::*,
33 stream::{Fuse, StreamExt},
34};
35use futures_timer::Delay;
36use log::{debug, warn};
37use parking_lot::Mutex;
38use prometheus_endpoint::{register, Gauge, PrometheusError, Registry, U64};
39use sc_client_api::{BlockImportNotification, ImportNotifications};
40use sc_utils::mpsc::TracingUnboundedReceiver;
41use sp_consensus_grandpa::AuthorityId;
42use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
43
44use std::{
45 collections::{HashMap, VecDeque},
46 pin::Pin,
47 sync::Arc,
48 task::{Context, Poll},
49 time::{Duration, Instant},
50};
51
52const LOG_PENDING_INTERVAL: Duration = Duration::from_secs(15);
53
54pub(crate) trait BlockUntilImported<Block: BlockT>: Sized {
59 type Blocked;
61
62 fn needs_waiting<S: BlockStatusT<Block>>(
64 input: Self::Blocked,
65 status_check: &S,
66 ) -> Result<DiscardWaitOrReady<Block, Self, Self::Blocked>, Error>;
67
68 fn wait_completed(self, canon_number: NumberFor<Block>) -> Option<Self::Blocked>;
71}
72
73pub(crate) enum DiscardWaitOrReady<Block: BlockT, W, R> {
79 Discard,
80 Wait(Vec<(Block::Hash, NumberFor<Block>, W)>),
81 Ready(R),
82}
83
84pub(crate) struct Metrics {
94 global_waiting_messages: Gauge<U64>,
95 local_waiting_messages: u64,
96}
97
98impl Metrics {
99 pub(crate) fn register(registry: &Registry) -> Result<Self, PrometheusError> {
100 Ok(Self {
101 global_waiting_messages: register(
102 Gauge::new(
103 "substrate_finality_grandpa_until_imported_waiting_messages_number",
104 "Number of finality grandpa messages waiting within the until imported queue.",
105 )?,
106 registry,
107 )?,
108 local_waiting_messages: 0,
109 })
110 }
111
112 fn waiting_messages_inc(&mut self) {
113 self.local_waiting_messages += 1;
114 self.global_waiting_messages.inc();
115 }
116
117 fn waiting_messages_dec(&mut self) {
118 self.local_waiting_messages -= 1;
119 self.global_waiting_messages.dec();
120 }
121}
122
123impl Clone for Metrics {
124 fn clone(&self) -> Self {
125 Metrics {
126 global_waiting_messages: self.global_waiting_messages.clone(),
127 local_waiting_messages: 0,
130 }
131 }
132}
133
134impl Drop for Metrics {
135 fn drop(&mut self) {
136 self.global_waiting_messages.sub(self.local_waiting_messages)
139 }
140}
141
142pub(crate) struct UntilImported<Block, BlockStatus, BlockSyncRequester, I, M>
144where
145 Block: BlockT,
146 I: Stream<Item = M::Blocked> + Unpin,
147 M: BlockUntilImported<Block>,
148{
149 import_notifications: Fuse<TracingUnboundedReceiver<BlockImportNotification<Block>>>,
150 block_sync_requester: BlockSyncRequester,
151 status_check: BlockStatus,
152 incoming_messages: Fuse<I>,
153 ready: VecDeque<M::Blocked>,
154 check_pending: Pin<Box<dyn Stream<Item = Result<(), std::io::Error>> + Send>>,
156 pending: HashMap<Block::Hash, (NumberFor<Block>, Instant, Vec<M>)>,
160
161 identifier: &'static str,
163 metrics: Option<Metrics>,
165}
166
167impl<Block, BlockStatus, BlockSyncRequester, I, M> Unpin
168 for UntilImported<Block, BlockStatus, BlockSyncRequester, I, M>
169where
170 Block: BlockT,
171 I: Stream<Item = M::Blocked> + Unpin,
172 M: BlockUntilImported<Block>,
173{
174}
175
176impl<Block, BlockStatus, BlockSyncRequester, I, M>
177 UntilImported<Block, BlockStatus, BlockSyncRequester, I, M>
178where
179 Block: BlockT,
180 BlockStatus: BlockStatusT<Block>,
181 BlockSyncRequester: BlockSyncRequesterT<Block>,
182 I: Stream<Item = M::Blocked> + Unpin,
183 M: BlockUntilImported<Block>,
184{
185 pub(crate) fn new(
187 import_notifications: ImportNotifications<Block>,
188 block_sync_requester: BlockSyncRequester,
189 status_check: BlockStatus,
190 incoming_messages: I,
191 identifier: &'static str,
192 metrics: Option<Metrics>,
193 ) -> Self {
194 const CHECK_PENDING_INTERVAL: Duration = Duration::from_secs(5);
200
201 let check_pending = futures::stream::unfold(Delay::new(CHECK_PENDING_INTERVAL), |delay| {
202 Box::pin(async move {
203 delay.await;
204 Some((Ok(()), Delay::new(CHECK_PENDING_INTERVAL)))
205 })
206 });
207
208 UntilImported {
209 import_notifications: import_notifications.fuse(),
210 block_sync_requester,
211 status_check,
212 incoming_messages: incoming_messages.fuse(),
213 ready: VecDeque::new(),
214 check_pending: Box::pin(check_pending),
215 pending: HashMap::new(),
216 identifier,
217 metrics,
218 }
219 }
220}
221
222impl<Block, BStatus, BSyncRequester, I, M> Stream
223 for UntilImported<Block, BStatus, BSyncRequester, I, M>
224where
225 Block: BlockT,
226 BStatus: BlockStatusT<Block>,
227 BSyncRequester: BlockSyncRequesterT<Block>,
228 I: Stream<Item = M::Blocked> + Unpin,
229 M: BlockUntilImported<Block>,
230{
231 type Item = Result<M::Blocked, Error>;
232
233 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
234 let this = &mut *self;
237
238 loop {
239 match StreamExt::poll_next_unpin(&mut this.incoming_messages, cx) {
240 Poll::Ready(None) => return Poll::Ready(None),
241 Poll::Ready(Some(input)) => {
242 match M::needs_waiting(input, &this.status_check)? {
245 DiscardWaitOrReady::Discard => {},
246 DiscardWaitOrReady::Wait(items) => {
247 for (target_hash, target_number, wait) in items {
248 this.pending
249 .entry(target_hash)
250 .or_insert_with(|| (target_number, Instant::now(), Vec::new()))
251 .2
252 .push(wait)
253 }
254 },
255 DiscardWaitOrReady::Ready(item) => this.ready.push_back(item),
256 }
257
258 if let Some(metrics) = &mut this.metrics {
259 metrics.waiting_messages_inc();
260 }
261 },
262 Poll::Pending => break,
263 }
264 }
265
266 loop {
267 match StreamExt::poll_next_unpin(&mut this.import_notifications, cx) {
268 Poll::Ready(None) => return Poll::Ready(None),
269 Poll::Ready(Some(notification)) => {
270 if let Some((_, _, messages)) = this.pending.remove(¬ification.hash) {
272 let canon_number = *notification.header.number();
273 let ready_messages =
274 messages.into_iter().filter_map(|m| m.wait_completed(canon_number));
275
276 this.ready.extend(ready_messages);
277 }
278 },
279 Poll::Pending => break,
280 }
281 }
282
283 let mut update_interval = false;
284 while let Poll::Ready(Some(Ok(()))) = this.check_pending.poll_next_unpin(cx) {
285 update_interval = true;
286 }
287
288 if update_interval {
289 let mut known_keys = Vec::new();
290 for (&block_hash, &mut (block_number, ref mut last_log, ref v)) in
291 this.pending.iter_mut()
292 {
293 if let Some(number) = this.status_check.block_number(block_hash)? {
294 known_keys.push((block_hash, number));
295 } else {
296 let next_log = *last_log + LOG_PENDING_INTERVAL;
297 if Instant::now() >= next_log {
298 debug!(
299 target: LOG_TARGET,
300 "Waiting to import block {} before {} {} messages can be imported. \
301 Requesting network sync service to retrieve block from. \
302 Possible fork?",
303 block_hash,
304 v.len(),
305 this.identifier,
306 );
307
308 this.block_sync_requester.set_sync_fork_request(
312 vec![],
313 block_hash,
314 block_number,
315 );
316
317 *last_log = next_log;
318 }
319 }
320 }
321
322 for (known_hash, canon_number) in known_keys {
323 if let Some((_, _, pending_messages)) = this.pending.remove(&known_hash) {
324 let ready_messages =
325 pending_messages.into_iter().filter_map(|m| m.wait_completed(canon_number));
326
327 this.ready.extend(ready_messages);
328 }
329 }
330 }
331
332 if let Some(ready) = this.ready.pop_front() {
333 if let Some(metrics) = &mut this.metrics {
334 metrics.waiting_messages_dec();
335 }
336 return Poll::Ready(Some(Ok(ready)))
337 }
338
339 if this.import_notifications.is_done() && this.incoming_messages.is_done() {
340 Poll::Ready(None)
341 } else {
342 Poll::Pending
343 }
344 }
345}
346
347fn warn_authority_wrong_target<H: ::std::fmt::Display>(hash: H, id: AuthorityId) {
348 warn!(
349 target: LOG_TARGET,
350 "Authority {:?} signed GRANDPA message with \
351 wrong block number for hash {}",
352 id,
353 hash,
354 );
355}
356
357impl<Block: BlockT> BlockUntilImported<Block> for SignedMessage<Block::Header> {
358 type Blocked = Self;
359
360 fn needs_waiting<BlockStatus: BlockStatusT<Block>>(
361 msg: Self::Blocked,
362 status_check: &BlockStatus,
363 ) -> Result<DiscardWaitOrReady<Block, Self, Self::Blocked>, Error> {
364 let (&target_hash, target_number) = msg.target();
365
366 if let Some(number) = status_check.block_number(target_hash)? {
367 if number != target_number {
368 warn_authority_wrong_target(target_hash, msg.id);
369 return Ok(DiscardWaitOrReady::Discard)
370 } else {
371 return Ok(DiscardWaitOrReady::Ready(msg))
372 }
373 }
374
375 Ok(DiscardWaitOrReady::Wait(vec![(target_hash, target_number, msg)]))
376 }
377
378 fn wait_completed(self, canon_number: NumberFor<Block>) -> Option<Self::Blocked> {
379 let (&target_hash, target_number) = self.target();
380 if canon_number != target_number {
381 warn_authority_wrong_target(target_hash, self.id);
382
383 None
384 } else {
385 Some(self)
386 }
387 }
388}
389
390pub(crate) type UntilVoteTargetImported<Block, BlockStatus, BlockSyncRequester, I> = UntilImported<
393 Block,
394 BlockStatus,
395 BlockSyncRequester,
396 I,
397 SignedMessage<<Block as BlockT>::Header>,
398>;
399
400pub(crate) struct BlockGlobalMessage<Block: BlockT> {
410 inner: Arc<Mutex<Option<CommunicationIn<Block>>>>,
411 target_number: NumberFor<Block>,
412}
413
414impl<Block: BlockT> Unpin for BlockGlobalMessage<Block> {}
415
416impl<Block: BlockT> BlockUntilImported<Block> for BlockGlobalMessage<Block> {
417 type Blocked = CommunicationIn<Block>;
418
419 fn needs_waiting<BlockStatus: BlockStatusT<Block>>(
420 input: Self::Blocked,
421 status_check: &BlockStatus,
422 ) -> Result<DiscardWaitOrReady<Block, Self, Self::Blocked>, Error> {
423 use std::collections::hash_map::Entry;
424
425 enum KnownOrUnknown<N> {
426 Known(N),
427 Unknown(N),
428 }
429
430 impl<N> KnownOrUnknown<N> {
431 fn number(&self) -> &N {
432 match *self {
433 KnownOrUnknown::Known(ref n) => n,
434 KnownOrUnknown::Unknown(ref n) => n,
435 }
436 }
437 }
438
439 let mut checked_hashes: HashMap<_, KnownOrUnknown<NumberFor<Block>>> = HashMap::new();
440
441 {
442 let mut query_known = |target_hash, perceived_number| -> Result<bool, Error> {
444 let canon_number = match checked_hashes.entry(target_hash) {
446 Entry::Occupied(entry) => *entry.get().number(),
447 Entry::Vacant(entry) => {
448 if let Some(number) = status_check.block_number(target_hash)? {
449 entry.insert(KnownOrUnknown::Known(number));
450 number
451 } else {
452 entry.insert(KnownOrUnknown::Unknown(perceived_number));
453 perceived_number
454 }
455 },
456 };
457
458 if canon_number != perceived_number {
459 return Ok(false)
463 }
464
465 Ok(true)
466 };
467
468 match input {
469 voter::CommunicationIn::Commit(_, ref commit, ..) => {
470 let precommit_targets =
472 commit.precommits.iter().map(|c| (c.target_number, c.target_hash));
473
474 for (target_number, target_hash) in precommit_targets {
475 if !query_known(target_hash, target_number)? {
476 return Ok(DiscardWaitOrReady::Discard)
477 }
478 }
479 },
480 voter::CommunicationIn::CatchUp(ref catch_up, ..) => {
481 let prevote_targets = catch_up
483 .prevotes
484 .iter()
485 .map(|s| (s.prevote.target_number, s.prevote.target_hash));
486
487 let precommit_targets = catch_up
488 .precommits
489 .iter()
490 .map(|s| (s.precommit.target_number, s.precommit.target_hash));
491
492 let targets = prevote_targets.chain(precommit_targets);
493
494 for (target_number, target_hash) in targets {
495 if !query_known(target_hash, target_number)? {
496 return Ok(DiscardWaitOrReady::Discard)
497 }
498 }
499 },
500 };
501 }
502
503 let unknown_hashes = checked_hashes
504 .into_iter()
505 .filter_map(|(hash, num)| match num {
506 KnownOrUnknown::Unknown(number) => Some((hash, number)),
507 KnownOrUnknown::Known(_) => None,
508 })
509 .collect::<Vec<_>>();
510
511 if unknown_hashes.is_empty() {
512 return Ok(DiscardWaitOrReady::Ready(input))
515 }
516
517 let locked_global = Arc::new(Mutex::new(Some(input)));
518
519 let items_to_await = unknown_hashes
520 .into_iter()
521 .map(|(hash, target_number)| {
522 (
523 hash,
524 target_number,
525 BlockGlobalMessage { inner: locked_global.clone(), target_number },
526 )
527 })
528 .collect();
529
530 Ok(DiscardWaitOrReady::Wait(items_to_await))
534 }
535
536 fn wait_completed(self, canon_number: NumberFor<Block>) -> Option<Self::Blocked> {
537 if self.target_number != canon_number {
538 *self.inner.lock() = None;
541 return None
542 }
543
544 match Arc::try_unwrap(self.inner) {
545 Ok(inner) => Mutex::into_inner(inner),
549 Err(_) => None,
552 }
553 }
554}
555
556pub(crate) type UntilGlobalMessageBlocksImported<Block, BlockStatus, BlockSyncRequester, I> =
559 UntilImported<Block, BlockStatus, BlockSyncRequester, I, BlockGlobalMessage<Block>>;
560
561#[cfg(test)]
562mod tests {
563 use super::*;
564 use crate::{CatchUp, CompactCommit};
565 use finality_grandpa::Precommit;
566 use futures::future::Either;
567 use futures_timer::Delay;
568 use sc_client_api::BlockImportNotification;
569 use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
570 use sp_consensus::BlockOrigin;
571 use sp_core::crypto::UncheckedFrom;
572 use substrate_test_runtime_client::runtime::{Block, Hash, Header};
573
574 #[derive(Clone)]
575 struct TestChainState {
576 sender: TracingUnboundedSender<BlockImportNotification<Block>>,
577 known_blocks: Arc<Mutex<HashMap<Hash, u64>>>,
578 }
579
580 impl TestChainState {
581 fn new() -> (Self, ImportNotifications<Block>) {
582 let (tx, rx) = tracing_unbounded("test", 100_000);
583 let state =
584 TestChainState { sender: tx, known_blocks: Arc::new(Mutex::new(HashMap::new())) };
585
586 (state, rx)
587 }
588
589 fn block_status(&self) -> TestBlockStatus {
590 TestBlockStatus { inner: self.known_blocks.clone() }
591 }
592
593 fn import_header(&self, header: Header) {
594 let hash = header.hash();
595 let number = *header.number();
596 let (tx, _rx) = tracing_unbounded("unpin-worker-channel", 10_000);
597 self.known_blocks.lock().insert(hash, number);
598 self.sender
599 .unbounded_send(BlockImportNotification::<Block>::new(
600 hash,
601 BlockOrigin::File,
602 header,
603 false,
604 None,
605 tx,
606 ))
607 .unwrap();
608 }
609 }
610
611 struct TestBlockStatus {
612 inner: Arc<Mutex<HashMap<Hash, u64>>>,
613 }
614
615 impl BlockStatusT<Block> for TestBlockStatus {
616 fn block_number(&self, hash: Hash) -> Result<Option<u64>, Error> {
617 Ok(self.inner.lock().get(&hash).map(|x| *x))
618 }
619 }
620
621 #[derive(Clone)]
622 struct TestBlockSyncRequester {
623 requests: Arc<Mutex<Vec<(Hash, NumberFor<Block>)>>>,
624 }
625
626 impl Default for TestBlockSyncRequester {
627 fn default() -> Self {
628 TestBlockSyncRequester { requests: Arc::new(Mutex::new(Vec::new())) }
629 }
630 }
631
632 impl BlockSyncRequesterT<Block> for TestBlockSyncRequester {
633 fn set_sync_fork_request(
634 &self,
635 _peers: Vec<sc_network_types::PeerId>,
636 hash: Hash,
637 number: NumberFor<Block>,
638 ) {
639 self.requests.lock().push((hash, number));
640 }
641 }
642
643 fn make_header(number: u64) -> Header {
644 Header::new(
645 number,
646 Default::default(),
647 Default::default(),
648 Default::default(),
649 Default::default(),
650 )
651 }
652
653 fn unapply_commit(msg: CommunicationIn<Block>) -> (u64, CompactCommit<Header>) {
656 match msg {
657 voter::CommunicationIn::Commit(round, commit, ..) => (round, commit),
658 _ => panic!("expected commit"),
659 }
660 }
661
662 fn unapply_catch_up(msg: CommunicationIn<Block>) -> CatchUp<Header> {
665 match msg {
666 voter::CommunicationIn::CatchUp(catch_up, ..) => catch_up,
667 _ => panic!("expected catch up"),
668 }
669 }
670
671 fn message_all_dependencies_satisfied<F>(
672 msg: CommunicationIn<Block>,
673 enact_dependencies: F,
674 ) -> CommunicationIn<Block>
675 where
676 F: FnOnce(&TestChainState),
677 {
678 let (chain_state, import_notifications) = TestChainState::new();
679 let block_status = chain_state.block_status();
680
681 enact_dependencies(&chain_state);
683
684 let (global_tx, global_rx) = tracing_unbounded("test", 100_000);
685
686 let until_imported = UntilGlobalMessageBlocksImported::new(
687 import_notifications,
688 TestBlockSyncRequester::default(),
689 block_status,
690 global_rx,
691 "global",
692 None,
693 );
694
695 global_tx.unbounded_send(msg).unwrap();
696
697 let work = until_imported.into_future();
698
699 futures::executor::block_on(work).0.unwrap().unwrap()
700 }
701
702 fn blocking_message_on_dependencies<F>(
703 msg: CommunicationIn<Block>,
704 enact_dependencies: F,
705 ) -> CommunicationIn<Block>
706 where
707 F: FnOnce(&TestChainState),
708 {
709 let (chain_state, import_notifications) = TestChainState::new();
710 let block_status = chain_state.block_status();
711
712 let (global_tx, global_rx) = tracing_unbounded("test", 100_000);
713
714 let until_imported = UntilGlobalMessageBlocksImported::new(
715 import_notifications,
716 TestBlockSyncRequester::default(),
717 block_status,
718 global_rx,
719 "global",
720 None,
721 );
722
723 global_tx.unbounded_send(msg).unwrap();
724
725 let inner_chain_state = chain_state.clone();
728 let work =
729 future::select(until_imported.into_future(), Delay::new(Duration::from_millis(100)))
730 .then(move |res| match res {
731 Either::Left(_) => panic!("timeout should have fired first"),
732 Either::Right((_, until_imported)) => {
733 enact_dependencies(&inner_chain_state);
735
736 until_imported
737 },
738 });
739
740 futures::executor::block_on(work).0.unwrap().unwrap()
741 }
742
743 #[test]
744 fn blocking_commit_message() {
745 let h1 = make_header(5);
746 let h2 = make_header(6);
747 let h3 = make_header(7);
748
749 let unknown_commit = CompactCommit::<Header> {
750 target_hash: h1.hash(),
751 target_number: 5,
752 precommits: vec![
753 Precommit { target_hash: h2.hash(), target_number: 6 },
754 Precommit { target_hash: h3.hash(), target_number: 7 },
755 ],
756 auth_data: Vec::new(), };
758
759 let unknown_commit =
760 || voter::CommunicationIn::Commit(0, unknown_commit.clone(), voter::Callback::Blank);
761
762 let res = blocking_message_on_dependencies(unknown_commit(), |chain_state| {
763 chain_state.import_header(h1);
764 chain_state.import_header(h2);
765 chain_state.import_header(h3);
766 });
767
768 assert_eq!(unapply_commit(res), unapply_commit(unknown_commit()));
769 }
770
771 #[test]
772 fn commit_message_all_known() {
773 let h1 = make_header(5);
774 let h2 = make_header(6);
775 let h3 = make_header(7);
776
777 let known_commit = CompactCommit::<Header> {
778 target_hash: h1.hash(),
779 target_number: 5,
780 precommits: vec![
781 Precommit { target_hash: h2.hash(), target_number: 6 },
782 Precommit { target_hash: h3.hash(), target_number: 7 },
783 ],
784 auth_data: Vec::new(), };
786
787 let known_commit =
788 || voter::CommunicationIn::Commit(0, known_commit.clone(), voter::Callback::Blank);
789
790 let res = message_all_dependencies_satisfied(known_commit(), |chain_state| {
791 chain_state.import_header(h1);
792 chain_state.import_header(h2);
793 chain_state.import_header(h3);
794 });
795
796 assert_eq!(unapply_commit(res), unapply_commit(known_commit()));
797 }
798
799 #[test]
800 fn blocking_catch_up_message() {
801 let h1 = make_header(5);
802 let h2 = make_header(6);
803 let h3 = make_header(7);
804
805 let signed_prevote = |header: &Header| finality_grandpa::SignedPrevote {
806 id: UncheckedFrom::unchecked_from([1; 32]),
807 signature: UncheckedFrom::unchecked_from([1; 64]),
808 prevote: finality_grandpa::Prevote {
809 target_hash: header.hash(),
810 target_number: *header.number(),
811 },
812 };
813
814 let signed_precommit = |header: &Header| finality_grandpa::SignedPrecommit {
815 id: UncheckedFrom::unchecked_from([1; 32]),
816 signature: UncheckedFrom::unchecked_from([1; 64]),
817 precommit: finality_grandpa::Precommit {
818 target_hash: header.hash(),
819 target_number: *header.number(),
820 },
821 };
822
823 let prevotes = vec![signed_prevote(&h1), signed_prevote(&h3)];
824
825 let precommits = vec![signed_precommit(&h1), signed_precommit(&h2)];
826
827 let unknown_catch_up = finality_grandpa::CatchUp {
828 round_number: 1,
829 prevotes,
830 precommits,
831 base_hash: h1.hash(),
832 base_number: *h1.number(),
833 };
834
835 let unknown_catch_up =
836 || voter::CommunicationIn::CatchUp(unknown_catch_up.clone(), voter::Callback::Blank);
837
838 let res = blocking_message_on_dependencies(unknown_catch_up(), |chain_state| {
839 chain_state.import_header(h1);
840 chain_state.import_header(h2);
841 chain_state.import_header(h3);
842 });
843
844 assert_eq!(unapply_catch_up(res), unapply_catch_up(unknown_catch_up()));
845 }
846
847 #[test]
848 fn catch_up_message_all_known() {
849 let h1 = make_header(5);
850 let h2 = make_header(6);
851 let h3 = make_header(7);
852
853 let signed_prevote = |header: &Header| finality_grandpa::SignedPrevote {
854 id: UncheckedFrom::unchecked_from([1; 32]),
855 signature: UncheckedFrom::unchecked_from([1; 64]),
856 prevote: finality_grandpa::Prevote {
857 target_hash: header.hash(),
858 target_number: *header.number(),
859 },
860 };
861
862 let signed_precommit = |header: &Header| finality_grandpa::SignedPrecommit {
863 id: UncheckedFrom::unchecked_from([1; 32]),
864 signature: UncheckedFrom::unchecked_from([1; 64]),
865 precommit: finality_grandpa::Precommit {
866 target_hash: header.hash(),
867 target_number: *header.number(),
868 },
869 };
870
871 let prevotes = vec![signed_prevote(&h1), signed_prevote(&h3)];
872
873 let precommits = vec![signed_precommit(&h1), signed_precommit(&h2)];
874
875 let unknown_catch_up = finality_grandpa::CatchUp {
876 round_number: 1,
877 prevotes,
878 precommits,
879 base_hash: h1.hash(),
880 base_number: *h1.number(),
881 };
882
883 let unknown_catch_up =
884 || voter::CommunicationIn::CatchUp(unknown_catch_up.clone(), voter::Callback::Blank);
885
886 let res = message_all_dependencies_satisfied(unknown_catch_up(), |chain_state| {
887 chain_state.import_header(h1);
888 chain_state.import_header(h2);
889 chain_state.import_header(h3);
890 });
891
892 assert_eq!(unapply_catch_up(res), unapply_catch_up(unknown_catch_up()));
893 }
894
895 #[test]
896 fn request_block_sync_for_needed_blocks() {
897 let (chain_state, import_notifications) = TestChainState::new();
898 let block_status = chain_state.block_status();
899
900 let (global_tx, global_rx) = tracing_unbounded("test", 100_000);
901
902 let block_sync_requester = TestBlockSyncRequester::default();
903
904 let until_imported = UntilGlobalMessageBlocksImported::new(
905 import_notifications,
906 block_sync_requester.clone(),
907 block_status,
908 global_rx,
909 "global",
910 None,
911 );
912
913 let h1 = make_header(5);
914 let h2 = make_header(6);
915 let h3 = make_header(7);
916
917 let unknown_commit = CompactCommit::<Header> {
920 target_hash: h1.hash(),
921 target_number: 5,
922 precommits: vec![
923 Precommit { target_hash: h2.hash(), target_number: 6 },
924 Precommit { target_hash: h3.hash(), target_number: 7 },
925 ],
926 auth_data: Vec::new(), };
928
929 let unknown_commit =
930 || voter::CommunicationIn::Commit(0, unknown_commit.clone(), voter::Callback::Blank);
931
932 global_tx.unbounded_send(unknown_commit()).unwrap();
934
935 let threads_pool = futures::executor::ThreadPool::new().unwrap();
936 threads_pool.spawn_ok(until_imported.into_future().map(|_| ()));
937
938 let assert = futures::future::poll_fn(|ctx| {
940 let block_sync_requests = block_sync_requester.requests.lock();
941
942 if block_sync_requests.contains(&(h2.hash(), *h2.number())) &&
944 block_sync_requests.contains(&(h3.hash(), *h3.number()))
945 {
946 return Poll::Ready(())
947 }
948
949 ctx.waker().wake_by_ref();
953
954 Poll::Pending
955 });
956
957 let timeout = Delay::new(Duration::from_secs(60));
960 let test = future::select(assert, timeout)
961 .map(|res| match res {
962 Either::Left(_) => {},
963 Either::Right(_) => panic!("timed out waiting for block sync request"),
964 })
965 .map(drop);
966
967 futures::executor::block_on(test);
968 }
969
970 fn test_catch_up() -> Arc<Mutex<Option<CommunicationIn<Block>>>> {
971 let header = make_header(5);
972
973 let unknown_catch_up = finality_grandpa::CatchUp {
974 round_number: 1,
975 precommits: vec![],
976 prevotes: vec![],
977 base_hash: header.hash(),
978 base_number: *header.number(),
979 };
980
981 let catch_up =
982 voter::CommunicationIn::CatchUp(unknown_catch_up.clone(), voter::Callback::Blank);
983
984 Arc::new(Mutex::new(Some(catch_up)))
985 }
986
987 #[test]
988 fn block_global_message_wait_completed_return_when_all_awaited() {
989 let msg_inner = test_catch_up();
990
991 let waiting_block_1 =
992 BlockGlobalMessage::<Block> { inner: msg_inner.clone(), target_number: 1 };
993
994 let waiting_block_2 = BlockGlobalMessage::<Block> { inner: msg_inner, target_number: 2 };
995
996 assert!(waiting_block_1.wait_completed(1).is_none());
998
999 assert!(waiting_block_2.wait_completed(2).is_some());
1002 }
1003
1004 #[test]
1005 fn block_global_message_wait_completed_return_none_on_block_number_mismatch() {
1006 let msg_inner = test_catch_up();
1007
1008 let waiting_block_1 =
1009 BlockGlobalMessage::<Block> { inner: msg_inner.clone(), target_number: 1 };
1010
1011 let waiting_block_2 = BlockGlobalMessage::<Block> { inner: msg_inner, target_number: 2 };
1012
1013 assert!(waiting_block_1.wait_completed(1234).is_none());
1015
1016 assert!(waiting_block_2.wait_completed(2).is_none());
1019 }
1020
1021 #[test]
1022 fn metrics_cleans_up_after_itself() {
1023 let r = Registry::new();
1024
1025 let mut m1 = Metrics::register(&r).unwrap();
1026 let m2 = m1.clone();
1027
1028 m1.waiting_messages_inc();
1030
1031 assert_eq!(1, m2.global_waiting_messages.get());
1033
1034 drop(m1);
1036
1037 assert_eq!(0, m2.global_waiting_messages.get());
1040 }
1041}