referrerpolicy=no-referrer-when-downgrade

sc_consensus_grandpa/
until_imported.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Helper stream for waiting until one or more blocks are imported before
20//! passing through inner items. This is done in a generic way to support
21//! many different kinds of items.
22//!
23//! This is used for votes and commit messages currently.
24
25use super::{
26	BlockStatus as BlockStatusT, BlockSyncRequester as BlockSyncRequesterT, CommunicationIn, Error,
27	SignedMessage, LOG_TARGET,
28};
29
30use finality_grandpa::voter;
31use futures::{
32	prelude::*,
33	stream::{Fuse, StreamExt},
34};
35use futures_timer::Delay;
36use log::{debug, warn};
37use parking_lot::Mutex;
38use prometheus_endpoint::{register, Gauge, PrometheusError, Registry, U64};
39use sc_client_api::{BlockImportNotification, ImportNotifications};
40use sc_utils::mpsc::TracingUnboundedReceiver;
41use sp_consensus_grandpa::AuthorityId;
42use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
43
44use std::{
45	collections::{HashMap, VecDeque},
46	pin::Pin,
47	sync::Arc,
48	task::{Context, Poll},
49	time::{Duration, Instant},
50};
51
52const LOG_PENDING_INTERVAL: Duration = Duration::from_secs(15);
53
54/// Something that needs to be withheld until specific blocks are available.
55///
56/// For example a GRANDPA commit message which is not of any use without the corresponding block
57/// that it commits on.
58pub(crate) trait BlockUntilImported<Block: BlockT>: Sized {
59	/// The type that is blocked on.
60	type Blocked;
61
62	/// Check if a new incoming item needs awaiting until a block(s) is imported.
63	fn needs_waiting<S: BlockStatusT<Block>>(
64		input: Self::Blocked,
65		status_check: &S,
66	) -> Result<DiscardWaitOrReady<Block, Self, Self::Blocked>, Error>;
67
68	/// called when the wait has completed. The canonical number is passed through
69	/// for further checks.
70	fn wait_completed(self, canon_number: NumberFor<Block>) -> Option<Self::Blocked>;
71}
72
73/// Describes whether a given [`BlockUntilImported`] (a) should be discarded, (b) is waiting for
74/// specific blocks to be imported or (c) is ready to be used.
75///
76/// A reason for discarding a [`BlockUntilImported`] would be if a referenced block is perceived
77/// under a different number than specified in the message.
78pub(crate) enum DiscardWaitOrReady<Block: BlockT, W, R> {
79	Discard,
80	Wait(Vec<(Block::Hash, NumberFor<Block>, W)>),
81	Ready(R),
82}
83
84/// Prometheus metrics for the `UntilImported` queue.
85// At a given point in time there can be more than one `UntilImported` queue. One can not register a
86// metric twice, thus queues need to share the same Prometheus metrics instead of instantiating
87// their own ones.
88//
89// When a queue is dropped it might still contain messages. In order for those to not distort the
90// Prometheus metrics, the `Metric` struct cleans up after itself within its `Drop` implementation
91// by subtracting the local_waiting_messages (the amount of messages left in the queue about to
92// be dropped) from the global_waiting_messages gauge.
93pub(crate) struct Metrics {
94	global_waiting_messages: Gauge<U64>,
95	local_waiting_messages: u64,
96}
97
98impl Metrics {
99	pub(crate) fn register(registry: &Registry) -> Result<Self, PrometheusError> {
100		Ok(Self {
101			global_waiting_messages: register(
102				Gauge::new(
103					"substrate_finality_grandpa_until_imported_waiting_messages_number",
104					"Number of finality grandpa messages waiting within the until imported queue.",
105				)?,
106				registry,
107			)?,
108			local_waiting_messages: 0,
109		})
110	}
111
112	fn waiting_messages_inc(&mut self) {
113		self.local_waiting_messages += 1;
114		self.global_waiting_messages.inc();
115	}
116
117	fn waiting_messages_dec(&mut self) {
118		self.local_waiting_messages -= 1;
119		self.global_waiting_messages.dec();
120	}
121}
122
123impl Clone for Metrics {
124	fn clone(&self) -> Self {
125		Metrics {
126			global_waiting_messages: self.global_waiting_messages.clone(),
127			// When cloned, reset local_waiting_messages, so the global counter is not reduced a
128			// second time for the same messages on `drop` of the clone.
129			local_waiting_messages: 0,
130		}
131	}
132}
133
134impl Drop for Metrics {
135	fn drop(&mut self) {
136		// Reduce the global counter by the amount of messages that were still left in the dropped
137		// queue.
138		self.global_waiting_messages.sub(self.local_waiting_messages)
139	}
140}
141
142/// Buffering incoming messages until blocks with given hashes are imported.
143pub(crate) struct UntilImported<Block, BlockStatus, BlockSyncRequester, I, M>
144where
145	Block: BlockT,
146	I: Stream<Item = M::Blocked> + Unpin,
147	M: BlockUntilImported<Block>,
148{
149	import_notifications: Fuse<TracingUnboundedReceiver<BlockImportNotification<Block>>>,
150	block_sync_requester: BlockSyncRequester,
151	status_check: BlockStatus,
152	incoming_messages: Fuse<I>,
153	ready: VecDeque<M::Blocked>,
154	/// Interval at which to check status of each awaited block.
155	check_pending: Pin<Box<dyn Stream<Item = Result<(), std::io::Error>> + Send>>,
156	/// Mapping block hashes to their block number, the point in time it was
157	/// first encountered (Instant) and a list of GRANDPA messages referencing
158	/// the block hash.
159	pending: HashMap<Block::Hash, (NumberFor<Block>, Instant, Vec<M>)>,
160
161	/// Queue identifier for differentiation in logs.
162	identifier: &'static str,
163	/// Prometheus metrics.
164	metrics: Option<Metrics>,
165}
166
167impl<Block, BlockStatus, BlockSyncRequester, I, M> Unpin
168	for UntilImported<Block, BlockStatus, BlockSyncRequester, I, M>
169where
170	Block: BlockT,
171	I: Stream<Item = M::Blocked> + Unpin,
172	M: BlockUntilImported<Block>,
173{
174}
175
176impl<Block, BlockStatus, BlockSyncRequester, I, M>
177	UntilImported<Block, BlockStatus, BlockSyncRequester, I, M>
178where
179	Block: BlockT,
180	BlockStatus: BlockStatusT<Block>,
181	BlockSyncRequester: BlockSyncRequesterT<Block>,
182	I: Stream<Item = M::Blocked> + Unpin,
183	M: BlockUntilImported<Block>,
184{
185	/// Create a new `UntilImported` wrapper.
186	pub(crate) fn new(
187		import_notifications: ImportNotifications<Block>,
188		block_sync_requester: BlockSyncRequester,
189		status_check: BlockStatus,
190		incoming_messages: I,
191		identifier: &'static str,
192		metrics: Option<Metrics>,
193	) -> Self {
194		// how often to check if pending messages that are waiting for blocks to be
195		// imported can be checked.
196		//
197		// the import notifications interval takes care of most of this; this is
198		// used in the event of missed import notifications
199		const CHECK_PENDING_INTERVAL: Duration = Duration::from_secs(5);
200
201		let check_pending = futures::stream::unfold(Delay::new(CHECK_PENDING_INTERVAL), |delay| {
202			Box::pin(async move {
203				delay.await;
204				Some((Ok(()), Delay::new(CHECK_PENDING_INTERVAL)))
205			})
206		});
207
208		UntilImported {
209			import_notifications: import_notifications.fuse(),
210			block_sync_requester,
211			status_check,
212			incoming_messages: incoming_messages.fuse(),
213			ready: VecDeque::new(),
214			check_pending: Box::pin(check_pending),
215			pending: HashMap::new(),
216			identifier,
217			metrics,
218		}
219	}
220}
221
222impl<Block, BStatus, BSyncRequester, I, M> Stream
223	for UntilImported<Block, BStatus, BSyncRequester, I, M>
224where
225	Block: BlockT,
226	BStatus: BlockStatusT<Block>,
227	BSyncRequester: BlockSyncRequesterT<Block>,
228	I: Stream<Item = M::Blocked> + Unpin,
229	M: BlockUntilImported<Block>,
230{
231	type Item = Result<M::Blocked, Error>;
232
233	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
234		// We are using a `this` variable in order to allow multiple simultaneous mutable borrow to
235		// `self`.
236		let this = &mut *self;
237
238		loop {
239			match StreamExt::poll_next_unpin(&mut this.incoming_messages, cx) {
240				Poll::Ready(None) => return Poll::Ready(None),
241				Poll::Ready(Some(input)) => {
242					// new input: schedule wait of any parts which require
243					// blocks to be known.
244					match M::needs_waiting(input, &this.status_check)? {
245						DiscardWaitOrReady::Discard => {},
246						DiscardWaitOrReady::Wait(items) => {
247							for (target_hash, target_number, wait) in items {
248								this.pending
249									.entry(target_hash)
250									.or_insert_with(|| (target_number, Instant::now(), Vec::new()))
251									.2
252									.push(wait)
253							}
254						},
255						DiscardWaitOrReady::Ready(item) => this.ready.push_back(item),
256					}
257
258					if let Some(metrics) = &mut this.metrics {
259						metrics.waiting_messages_inc();
260					}
261				},
262				Poll::Pending => break,
263			}
264		}
265
266		loop {
267			match StreamExt::poll_next_unpin(&mut this.import_notifications, cx) {
268				Poll::Ready(None) => return Poll::Ready(None),
269				Poll::Ready(Some(notification)) => {
270					// new block imported. queue up all messages tied to that hash.
271					if let Some((_, _, messages)) = this.pending.remove(&notification.hash) {
272						let canon_number = *notification.header.number();
273						let ready_messages =
274							messages.into_iter().filter_map(|m| m.wait_completed(canon_number));
275
276						this.ready.extend(ready_messages);
277					}
278				},
279				Poll::Pending => break,
280			}
281		}
282
283		let mut update_interval = false;
284		while let Poll::Ready(Some(Ok(()))) = this.check_pending.poll_next_unpin(cx) {
285			update_interval = true;
286		}
287
288		if update_interval {
289			let mut known_keys = Vec::new();
290			for (&block_hash, &mut (block_number, ref mut last_log, ref v)) in
291				this.pending.iter_mut()
292			{
293				if let Some(number) = this.status_check.block_number(block_hash)? {
294					known_keys.push((block_hash, number));
295				} else {
296					let next_log = *last_log + LOG_PENDING_INTERVAL;
297					if Instant::now() >= next_log {
298						debug!(
299							target: LOG_TARGET,
300							"Waiting to import block {} before {} {} messages can be imported. \
301							Requesting network sync service to retrieve block from. \
302							Possible fork?",
303							block_hash,
304							v.len(),
305							this.identifier,
306						);
307
308						// NOTE: when sending an empty vec of peers the
309						// underlying should make a best effort to sync the
310						// block from any peers it knows about.
311						this.block_sync_requester.set_sync_fork_request(
312							vec![],
313							block_hash,
314							block_number,
315						);
316
317						*last_log = next_log;
318					}
319				}
320			}
321
322			for (known_hash, canon_number) in known_keys {
323				if let Some((_, _, pending_messages)) = this.pending.remove(&known_hash) {
324					let ready_messages =
325						pending_messages.into_iter().filter_map(|m| m.wait_completed(canon_number));
326
327					this.ready.extend(ready_messages);
328				}
329			}
330		}
331
332		if let Some(ready) = this.ready.pop_front() {
333			if let Some(metrics) = &mut this.metrics {
334				metrics.waiting_messages_dec();
335			}
336			return Poll::Ready(Some(Ok(ready)))
337		}
338
339		if this.import_notifications.is_done() && this.incoming_messages.is_done() {
340			Poll::Ready(None)
341		} else {
342			Poll::Pending
343		}
344	}
345}
346
347fn warn_authority_wrong_target<H: ::std::fmt::Display>(hash: H, id: AuthorityId) {
348	warn!(
349		target: LOG_TARGET,
350		"Authority {:?} signed GRANDPA message with \
351		wrong block number for hash {}",
352		id,
353		hash,
354	);
355}
356
357impl<Block: BlockT> BlockUntilImported<Block> for SignedMessage<Block::Header> {
358	type Blocked = Self;
359
360	fn needs_waiting<BlockStatus: BlockStatusT<Block>>(
361		msg: Self::Blocked,
362		status_check: &BlockStatus,
363	) -> Result<DiscardWaitOrReady<Block, Self, Self::Blocked>, Error> {
364		let (&target_hash, target_number) = msg.target();
365
366		if let Some(number) = status_check.block_number(target_hash)? {
367			if number != target_number {
368				warn_authority_wrong_target(target_hash, msg.id);
369				return Ok(DiscardWaitOrReady::Discard)
370			} else {
371				return Ok(DiscardWaitOrReady::Ready(msg))
372			}
373		}
374
375		Ok(DiscardWaitOrReady::Wait(vec![(target_hash, target_number, msg)]))
376	}
377
378	fn wait_completed(self, canon_number: NumberFor<Block>) -> Option<Self::Blocked> {
379		let (&target_hash, target_number) = self.target();
380		if canon_number != target_number {
381			warn_authority_wrong_target(target_hash, self.id);
382
383			None
384		} else {
385			Some(self)
386		}
387	}
388}
389
390/// Helper type definition for the stream which waits until vote targets for
391/// signed messages are imported.
392pub(crate) type UntilVoteTargetImported<Block, BlockStatus, BlockSyncRequester, I> = UntilImported<
393	Block,
394	BlockStatus,
395	BlockSyncRequester,
396	I,
397	SignedMessage<<Block as BlockT>::Header>,
398>;
399
400/// This blocks a global message import, i.e. a commit or catch up messages,
401/// until all blocks referenced in its votes are known.
402///
403/// This is used for compact commits and catch up messages which have already
404/// been checked for structural soundness (e.g. valid signatures).
405///
406/// We use the `Arc`'s reference count to implicitly count the number of outstanding blocks that we
407/// are waiting on for the same message (i.e. other `BlockGlobalMessage` instances with the same
408/// `inner`).
409pub(crate) struct BlockGlobalMessage<Block: BlockT> {
410	inner: Arc<Mutex<Option<CommunicationIn<Block>>>>,
411	target_number: NumberFor<Block>,
412}
413
414impl<Block: BlockT> Unpin for BlockGlobalMessage<Block> {}
415
416impl<Block: BlockT> BlockUntilImported<Block> for BlockGlobalMessage<Block> {
417	type Blocked = CommunicationIn<Block>;
418
419	fn needs_waiting<BlockStatus: BlockStatusT<Block>>(
420		input: Self::Blocked,
421		status_check: &BlockStatus,
422	) -> Result<DiscardWaitOrReady<Block, Self, Self::Blocked>, Error> {
423		use std::collections::hash_map::Entry;
424
425		enum KnownOrUnknown<N> {
426			Known(N),
427			Unknown(N),
428		}
429
430		impl<N> KnownOrUnknown<N> {
431			fn number(&self) -> &N {
432				match *self {
433					KnownOrUnknown::Known(ref n) => n,
434					KnownOrUnknown::Unknown(ref n) => n,
435				}
436			}
437		}
438
439		let mut checked_hashes: HashMap<_, KnownOrUnknown<NumberFor<Block>>> = HashMap::new();
440
441		{
442			// returns false when should early exit.
443			let mut query_known = |target_hash, perceived_number| -> Result<bool, Error> {
444				// check integrity: all votes for same hash have same number.
445				let canon_number = match checked_hashes.entry(target_hash) {
446					Entry::Occupied(entry) => *entry.get().number(),
447					Entry::Vacant(entry) => {
448						if let Some(number) = status_check.block_number(target_hash)? {
449							entry.insert(KnownOrUnknown::Known(number));
450							number
451						} else {
452							entry.insert(KnownOrUnknown::Unknown(perceived_number));
453							perceived_number
454						}
455					},
456				};
457
458				if canon_number != perceived_number {
459					// invalid global message: messages targeting wrong number
460					// or at least different from other vote in same global
461					// message.
462					return Ok(false)
463				}
464
465				Ok(true)
466			};
467
468			match input {
469				voter::CommunicationIn::Commit(_, ref commit, ..) => {
470					// add known hashes from all precommits.
471					let precommit_targets =
472						commit.precommits.iter().map(|c| (c.target_number, c.target_hash));
473
474					for (target_number, target_hash) in precommit_targets {
475						if !query_known(target_hash, target_number)? {
476							return Ok(DiscardWaitOrReady::Discard)
477						}
478					}
479				},
480				voter::CommunicationIn::CatchUp(ref catch_up, ..) => {
481					// add known hashes from all prevotes and precommits.
482					let prevote_targets = catch_up
483						.prevotes
484						.iter()
485						.map(|s| (s.prevote.target_number, s.prevote.target_hash));
486
487					let precommit_targets = catch_up
488						.precommits
489						.iter()
490						.map(|s| (s.precommit.target_number, s.precommit.target_hash));
491
492					let targets = prevote_targets.chain(precommit_targets);
493
494					for (target_number, target_hash) in targets {
495						if !query_known(target_hash, target_number)? {
496							return Ok(DiscardWaitOrReady::Discard)
497						}
498					}
499				},
500			};
501		}
502
503		let unknown_hashes = checked_hashes
504			.into_iter()
505			.filter_map(|(hash, num)| match num {
506				KnownOrUnknown::Unknown(number) => Some((hash, number)),
507				KnownOrUnknown::Known(_) => None,
508			})
509			.collect::<Vec<_>>();
510
511		if unknown_hashes.is_empty() {
512			// none of the hashes in the global message were unknown.
513			// we can just return the message directly.
514			return Ok(DiscardWaitOrReady::Ready(input))
515		}
516
517		let locked_global = Arc::new(Mutex::new(Some(input)));
518
519		let items_to_await = unknown_hashes
520			.into_iter()
521			.map(|(hash, target_number)| {
522				(
523					hash,
524					target_number,
525					BlockGlobalMessage { inner: locked_global.clone(), target_number },
526				)
527			})
528			.collect();
529
530		// schedule waits for all unknown messages.
531		// when the last one of these has `wait_completed` called on it,
532		// the global message will be returned.
533		Ok(DiscardWaitOrReady::Wait(items_to_await))
534	}
535
536	fn wait_completed(self, canon_number: NumberFor<Block>) -> Option<Self::Blocked> {
537		if self.target_number != canon_number {
538			// Delete the inner message so it won't ever be forwarded. Future calls to
539			// `wait_completed` on the same `inner` will ignore it.
540			*self.inner.lock() = None;
541			return None
542		}
543
544		match Arc::try_unwrap(self.inner) {
545			// This is the last reference and thus the last outstanding block to be awaited. `inner`
546			// is either `Some(_)` or `None`. The latter implies that a previous `wait_completed`
547			// call witnessed a block number mismatch (see above).
548			Ok(inner) => Mutex::into_inner(inner),
549			// There are still other strong references to this `Arc`, thus the message is blocked on
550			// other blocks to be imported.
551			Err(_) => None,
552		}
553	}
554}
555
556/// A stream which gates off incoming global messages, i.e. commit and catch up
557/// messages, until all referenced block hashes have been imported.
558pub(crate) type UntilGlobalMessageBlocksImported<Block, BlockStatus, BlockSyncRequester, I> =
559	UntilImported<Block, BlockStatus, BlockSyncRequester, I, BlockGlobalMessage<Block>>;
560
561#[cfg(test)]
562mod tests {
563	use super::*;
564	use crate::{CatchUp, CompactCommit};
565	use finality_grandpa::Precommit;
566	use futures::future::Either;
567	use futures_timer::Delay;
568	use sc_client_api::BlockImportNotification;
569	use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
570	use sp_consensus::BlockOrigin;
571	use sp_core::crypto::UncheckedFrom;
572	use substrate_test_runtime_client::runtime::{Block, Hash, Header};
573
574	#[derive(Clone)]
575	struct TestChainState {
576		sender: TracingUnboundedSender<BlockImportNotification<Block>>,
577		known_blocks: Arc<Mutex<HashMap<Hash, u64>>>,
578	}
579
580	impl TestChainState {
581		fn new() -> (Self, ImportNotifications<Block>) {
582			let (tx, rx) = tracing_unbounded("test", 100_000);
583			let state =
584				TestChainState { sender: tx, known_blocks: Arc::new(Mutex::new(HashMap::new())) };
585
586			(state, rx)
587		}
588
589		fn block_status(&self) -> TestBlockStatus {
590			TestBlockStatus { inner: self.known_blocks.clone() }
591		}
592
593		fn import_header(&self, header: Header) {
594			let hash = header.hash();
595			let number = *header.number();
596			let (tx, _rx) = tracing_unbounded("unpin-worker-channel", 10_000);
597			self.known_blocks.lock().insert(hash, number);
598			self.sender
599				.unbounded_send(BlockImportNotification::<Block>::new(
600					hash,
601					BlockOrigin::File,
602					header,
603					false,
604					None,
605					tx,
606				))
607				.unwrap();
608		}
609	}
610
611	struct TestBlockStatus {
612		inner: Arc<Mutex<HashMap<Hash, u64>>>,
613	}
614
615	impl BlockStatusT<Block> for TestBlockStatus {
616		fn block_number(&self, hash: Hash) -> Result<Option<u64>, Error> {
617			Ok(self.inner.lock().get(&hash).map(|x| *x))
618		}
619	}
620
621	#[derive(Clone)]
622	struct TestBlockSyncRequester {
623		requests: Arc<Mutex<Vec<(Hash, NumberFor<Block>)>>>,
624	}
625
626	impl Default for TestBlockSyncRequester {
627		fn default() -> Self {
628			TestBlockSyncRequester { requests: Arc::new(Mutex::new(Vec::new())) }
629		}
630	}
631
632	impl BlockSyncRequesterT<Block> for TestBlockSyncRequester {
633		fn set_sync_fork_request(
634			&self,
635			_peers: Vec<sc_network_types::PeerId>,
636			hash: Hash,
637			number: NumberFor<Block>,
638		) {
639			self.requests.lock().push((hash, number));
640		}
641	}
642
643	fn make_header(number: u64) -> Header {
644		Header::new(
645			number,
646			Default::default(),
647			Default::default(),
648			Default::default(),
649			Default::default(),
650		)
651	}
652
653	// unwrap the commit from `CommunicationIn` returning its fields in a tuple,
654	// panics if the given message isn't a commit
655	fn unapply_commit(msg: CommunicationIn<Block>) -> (u64, CompactCommit<Header>) {
656		match msg {
657			voter::CommunicationIn::Commit(round, commit, ..) => (round, commit),
658			_ => panic!("expected commit"),
659		}
660	}
661
662	// unwrap the catch up from `CommunicationIn` returning its inner representation,
663	// panics if the given message isn't a catch up
664	fn unapply_catch_up(msg: CommunicationIn<Block>) -> CatchUp<Header> {
665		match msg {
666			voter::CommunicationIn::CatchUp(catch_up, ..) => catch_up,
667			_ => panic!("expected catch up"),
668		}
669	}
670
671	fn message_all_dependencies_satisfied<F>(
672		msg: CommunicationIn<Block>,
673		enact_dependencies: F,
674	) -> CommunicationIn<Block>
675	where
676		F: FnOnce(&TestChainState),
677	{
678		let (chain_state, import_notifications) = TestChainState::new();
679		let block_status = chain_state.block_status();
680
681		// enact all dependencies before importing the message
682		enact_dependencies(&chain_state);
683
684		let (global_tx, global_rx) = tracing_unbounded("test", 100_000);
685
686		let until_imported = UntilGlobalMessageBlocksImported::new(
687			import_notifications,
688			TestBlockSyncRequester::default(),
689			block_status,
690			global_rx,
691			"global",
692			None,
693		);
694
695		global_tx.unbounded_send(msg).unwrap();
696
697		let work = until_imported.into_future();
698
699		futures::executor::block_on(work).0.unwrap().unwrap()
700	}
701
702	fn blocking_message_on_dependencies<F>(
703		msg: CommunicationIn<Block>,
704		enact_dependencies: F,
705	) -> CommunicationIn<Block>
706	where
707		F: FnOnce(&TestChainState),
708	{
709		let (chain_state, import_notifications) = TestChainState::new();
710		let block_status = chain_state.block_status();
711
712		let (global_tx, global_rx) = tracing_unbounded("test", 100_000);
713
714		let until_imported = UntilGlobalMessageBlocksImported::new(
715			import_notifications,
716			TestBlockSyncRequester::default(),
717			block_status,
718			global_rx,
719			"global",
720			None,
721		);
722
723		global_tx.unbounded_send(msg).unwrap();
724
725		// NOTE: needs to be cloned otherwise it is moved to the stream and
726		// dropped too early.
727		let inner_chain_state = chain_state.clone();
728		let work =
729			future::select(until_imported.into_future(), Delay::new(Duration::from_millis(100)))
730				.then(move |res| match res {
731					Either::Left(_) => panic!("timeout should have fired first"),
732					Either::Right((_, until_imported)) => {
733						// timeout fired. push in the headers.
734						enact_dependencies(&inner_chain_state);
735
736						until_imported
737					},
738				});
739
740		futures::executor::block_on(work).0.unwrap().unwrap()
741	}
742
743	#[test]
744	fn blocking_commit_message() {
745		let h1 = make_header(5);
746		let h2 = make_header(6);
747		let h3 = make_header(7);
748
749		let unknown_commit = CompactCommit::<Header> {
750			target_hash: h1.hash(),
751			target_number: 5,
752			precommits: vec![
753				Precommit { target_hash: h2.hash(), target_number: 6 },
754				Precommit { target_hash: h3.hash(), target_number: 7 },
755			],
756			auth_data: Vec::new(), // not used
757		};
758
759		let unknown_commit =
760			|| voter::CommunicationIn::Commit(0, unknown_commit.clone(), voter::Callback::Blank);
761
762		let res = blocking_message_on_dependencies(unknown_commit(), |chain_state| {
763			chain_state.import_header(h1);
764			chain_state.import_header(h2);
765			chain_state.import_header(h3);
766		});
767
768		assert_eq!(unapply_commit(res), unapply_commit(unknown_commit()));
769	}
770
771	#[test]
772	fn commit_message_all_known() {
773		let h1 = make_header(5);
774		let h2 = make_header(6);
775		let h3 = make_header(7);
776
777		let known_commit = CompactCommit::<Header> {
778			target_hash: h1.hash(),
779			target_number: 5,
780			precommits: vec![
781				Precommit { target_hash: h2.hash(), target_number: 6 },
782				Precommit { target_hash: h3.hash(), target_number: 7 },
783			],
784			auth_data: Vec::new(), // not used
785		};
786
787		let known_commit =
788			|| voter::CommunicationIn::Commit(0, known_commit.clone(), voter::Callback::Blank);
789
790		let res = message_all_dependencies_satisfied(known_commit(), |chain_state| {
791			chain_state.import_header(h1);
792			chain_state.import_header(h2);
793			chain_state.import_header(h3);
794		});
795
796		assert_eq!(unapply_commit(res), unapply_commit(known_commit()));
797	}
798
799	#[test]
800	fn blocking_catch_up_message() {
801		let h1 = make_header(5);
802		let h2 = make_header(6);
803		let h3 = make_header(7);
804
805		let signed_prevote = |header: &Header| finality_grandpa::SignedPrevote {
806			id: UncheckedFrom::unchecked_from([1; 32]),
807			signature: UncheckedFrom::unchecked_from([1; 64]),
808			prevote: finality_grandpa::Prevote {
809				target_hash: header.hash(),
810				target_number: *header.number(),
811			},
812		};
813
814		let signed_precommit = |header: &Header| finality_grandpa::SignedPrecommit {
815			id: UncheckedFrom::unchecked_from([1; 32]),
816			signature: UncheckedFrom::unchecked_from([1; 64]),
817			precommit: finality_grandpa::Precommit {
818				target_hash: header.hash(),
819				target_number: *header.number(),
820			},
821		};
822
823		let prevotes = vec![signed_prevote(&h1), signed_prevote(&h3)];
824
825		let precommits = vec![signed_precommit(&h1), signed_precommit(&h2)];
826
827		let unknown_catch_up = finality_grandpa::CatchUp {
828			round_number: 1,
829			prevotes,
830			precommits,
831			base_hash: h1.hash(),
832			base_number: *h1.number(),
833		};
834
835		let unknown_catch_up =
836			|| voter::CommunicationIn::CatchUp(unknown_catch_up.clone(), voter::Callback::Blank);
837
838		let res = blocking_message_on_dependencies(unknown_catch_up(), |chain_state| {
839			chain_state.import_header(h1);
840			chain_state.import_header(h2);
841			chain_state.import_header(h3);
842		});
843
844		assert_eq!(unapply_catch_up(res), unapply_catch_up(unknown_catch_up()));
845	}
846
847	#[test]
848	fn catch_up_message_all_known() {
849		let h1 = make_header(5);
850		let h2 = make_header(6);
851		let h3 = make_header(7);
852
853		let signed_prevote = |header: &Header| finality_grandpa::SignedPrevote {
854			id: UncheckedFrom::unchecked_from([1; 32]),
855			signature: UncheckedFrom::unchecked_from([1; 64]),
856			prevote: finality_grandpa::Prevote {
857				target_hash: header.hash(),
858				target_number: *header.number(),
859			},
860		};
861
862		let signed_precommit = |header: &Header| finality_grandpa::SignedPrecommit {
863			id: UncheckedFrom::unchecked_from([1; 32]),
864			signature: UncheckedFrom::unchecked_from([1; 64]),
865			precommit: finality_grandpa::Precommit {
866				target_hash: header.hash(),
867				target_number: *header.number(),
868			},
869		};
870
871		let prevotes = vec![signed_prevote(&h1), signed_prevote(&h3)];
872
873		let precommits = vec![signed_precommit(&h1), signed_precommit(&h2)];
874
875		let unknown_catch_up = finality_grandpa::CatchUp {
876			round_number: 1,
877			prevotes,
878			precommits,
879			base_hash: h1.hash(),
880			base_number: *h1.number(),
881		};
882
883		let unknown_catch_up =
884			|| voter::CommunicationIn::CatchUp(unknown_catch_up.clone(), voter::Callback::Blank);
885
886		let res = message_all_dependencies_satisfied(unknown_catch_up(), |chain_state| {
887			chain_state.import_header(h1);
888			chain_state.import_header(h2);
889			chain_state.import_header(h3);
890		});
891
892		assert_eq!(unapply_catch_up(res), unapply_catch_up(unknown_catch_up()));
893	}
894
895	#[test]
896	fn request_block_sync_for_needed_blocks() {
897		let (chain_state, import_notifications) = TestChainState::new();
898		let block_status = chain_state.block_status();
899
900		let (global_tx, global_rx) = tracing_unbounded("test", 100_000);
901
902		let block_sync_requester = TestBlockSyncRequester::default();
903
904		let until_imported = UntilGlobalMessageBlocksImported::new(
905			import_notifications,
906			block_sync_requester.clone(),
907			block_status,
908			global_rx,
909			"global",
910			None,
911		);
912
913		let h1 = make_header(5);
914		let h2 = make_header(6);
915		let h3 = make_header(7);
916
917		// we create a commit message, with precommits for blocks 6 and 7 which
918		// we haven't imported.
919		let unknown_commit = CompactCommit::<Header> {
920			target_hash: h1.hash(),
921			target_number: 5,
922			precommits: vec![
923				Precommit { target_hash: h2.hash(), target_number: 6 },
924				Precommit { target_hash: h3.hash(), target_number: 7 },
925			],
926			auth_data: Vec::new(), // not used
927		};
928
929		let unknown_commit =
930			|| voter::CommunicationIn::Commit(0, unknown_commit.clone(), voter::Callback::Blank);
931
932		// we send the commit message and spawn the until_imported stream
933		global_tx.unbounded_send(unknown_commit()).unwrap();
934
935		let threads_pool = futures::executor::ThreadPool::new().unwrap();
936		threads_pool.spawn_ok(until_imported.into_future().map(|_| ()));
937
938		// assert that we will make sync requests
939		let assert = futures::future::poll_fn(|ctx| {
940			let block_sync_requests = block_sync_requester.requests.lock();
941
942			// we request blocks targeted by the precommits that aren't imported
943			if block_sync_requests.contains(&(h2.hash(), *h2.number())) &&
944				block_sync_requests.contains(&(h3.hash(), *h3.number()))
945			{
946				return Poll::Ready(())
947			}
948
949			// NOTE: nothing in this function is future-aware (i.e nothing gets registered to wake
950			// up this future), we manually wake up this task to avoid having to wait until the
951			// timeout below triggers.
952			ctx.waker().wake_by_ref();
953
954			Poll::Pending
955		});
956
957		// the `until_imported` stream doesn't request the blocks immediately,
958		// but it should request them after a small timeout
959		let timeout = Delay::new(Duration::from_secs(60));
960		let test = future::select(assert, timeout)
961			.map(|res| match res {
962				Either::Left(_) => {},
963				Either::Right(_) => panic!("timed out waiting for block sync request"),
964			})
965			.map(drop);
966
967		futures::executor::block_on(test);
968	}
969
970	fn test_catch_up() -> Arc<Mutex<Option<CommunicationIn<Block>>>> {
971		let header = make_header(5);
972
973		let unknown_catch_up = finality_grandpa::CatchUp {
974			round_number: 1,
975			precommits: vec![],
976			prevotes: vec![],
977			base_hash: header.hash(),
978			base_number: *header.number(),
979		};
980
981		let catch_up =
982			voter::CommunicationIn::CatchUp(unknown_catch_up.clone(), voter::Callback::Blank);
983
984		Arc::new(Mutex::new(Some(catch_up)))
985	}
986
987	#[test]
988	fn block_global_message_wait_completed_return_when_all_awaited() {
989		let msg_inner = test_catch_up();
990
991		let waiting_block_1 =
992			BlockGlobalMessage::<Block> { inner: msg_inner.clone(), target_number: 1 };
993
994		let waiting_block_2 = BlockGlobalMessage::<Block> { inner: msg_inner, target_number: 2 };
995
996		// waiting_block_2 is still waiting for block 2, thus this should return `None`.
997		assert!(waiting_block_1.wait_completed(1).is_none());
998
999		// Message only depended on block 1 and 2. Both have been imported, thus this should yield
1000		// the message.
1001		assert!(waiting_block_2.wait_completed(2).is_some());
1002	}
1003
1004	#[test]
1005	fn block_global_message_wait_completed_return_none_on_block_number_mismatch() {
1006		let msg_inner = test_catch_up();
1007
1008		let waiting_block_1 =
1009			BlockGlobalMessage::<Block> { inner: msg_inner.clone(), target_number: 1 };
1010
1011		let waiting_block_2 = BlockGlobalMessage::<Block> { inner: msg_inner, target_number: 2 };
1012
1013		// Calling wait_completed with wrong block number should yield None.
1014		assert!(waiting_block_1.wait_completed(1234).is_none());
1015
1016		// All blocks, that the message depended on, have been imported. Still, given the above
1017		// block number mismatch this should return None.
1018		assert!(waiting_block_2.wait_completed(2).is_none());
1019	}
1020
1021	#[test]
1022	fn metrics_cleans_up_after_itself() {
1023		let r = Registry::new();
1024
1025		let mut m1 = Metrics::register(&r).unwrap();
1026		let m2 = m1.clone();
1027
1028		// Add a new message to the 'queue' of m1.
1029		m1.waiting_messages_inc();
1030
1031		// m1 and m2 are synced through the shared atomic.
1032		assert_eq!(1, m2.global_waiting_messages.get());
1033
1034		// Drop 'queue' m1.
1035		drop(m1);
1036
1037		// Make sure m1 cleaned up after itself, removing all messages that were left in its queue
1038		// when dropped from the global metric.
1039		assert_eq!(0, m2.global_waiting_messages.get());
1040	}
1041}