referrerpolicy=no-referrer-when-downgrade

sc_consensus/import_queue/
basic_queue.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18use futures::{
19	prelude::*,
20	task::{Context, Poll},
21};
22use log::{debug, trace};
23use prometheus_endpoint::Registry;
24use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
25use sp_consensus::BlockOrigin;
26use sp_runtime::{
27	traits::{Block as BlockT, Header as HeaderT, NumberFor},
28	Justification, Justifications,
29};
30use std::pin::Pin;
31
32use crate::{
33	import_queue::{
34		buffered_link::{self, BufferedLinkReceiver, BufferedLinkSender},
35		import_single_block_metered, verify_single_block_metered, BlockImportError,
36		BlockImportStatus, BoxBlockImport, BoxJustificationImport, ImportQueue, ImportQueueService,
37		IncomingBlock, JustificationImportResult, Link, RuntimeOrigin,
38		SingleBlockVerificationOutcome, Verifier, LOG_TARGET,
39	},
40	metrics::Metrics,
41};
42
43/// Interface to a basic block import queue that is importing blocks sequentially in a separate
44/// task, with plugable verification.
45pub struct BasicQueue<B: BlockT> {
46	/// Handle for sending justification and block import messages to the background task.
47	handle: BasicQueueHandle<B>,
48	/// Results coming from the worker task.
49	result_port: BufferedLinkReceiver<B>,
50}
51
52impl<B: BlockT> Drop for BasicQueue<B> {
53	fn drop(&mut self) {
54		// Flush the queue and close the receiver to terminate the future.
55		self.handle.close();
56		self.result_port.close();
57	}
58}
59
60impl<B: BlockT> BasicQueue<B> {
61	/// Instantiate a new basic queue, with given verifier.
62	///
63	/// This creates a background task, and calls `on_start` on the justification importer.
64	pub fn new<V>(
65		verifier: V,
66		block_import: BoxBlockImport<B>,
67		justification_import: Option<BoxJustificationImport<B>>,
68		spawner: &impl sp_core::traits::SpawnEssentialNamed,
69		prometheus_registry: Option<&Registry>,
70	) -> Self
71	where
72		V: Verifier<B> + 'static,
73	{
74		let (result_sender, result_port) = buffered_link::buffered_link(100_000);
75
76		let metrics = prometheus_registry.and_then(|r| {
77			Metrics::register(r)
78				.map_err(|err| {
79					log::warn!("Failed to register Prometheus metrics: {}", err);
80				})
81				.ok()
82		});
83
84		let (future, justification_sender, block_import_sender) = BlockImportWorker::new(
85			result_sender,
86			verifier,
87			block_import,
88			justification_import,
89			metrics,
90		);
91
92		spawner.spawn_essential_blocking(
93			"basic-block-import-worker",
94			Some("block-import"),
95			future.boxed(),
96		);
97
98		Self {
99			handle: BasicQueueHandle::new(justification_sender, block_import_sender),
100			result_port,
101		}
102	}
103}
104
105#[derive(Clone)]
106struct BasicQueueHandle<B: BlockT> {
107	/// Channel to send justification import messages to the background task.
108	justification_sender: TracingUnboundedSender<worker_messages::ImportJustification<B>>,
109	/// Channel to send block import messages to the background task.
110	block_import_sender: TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
111}
112
113impl<B: BlockT> BasicQueueHandle<B> {
114	pub fn new(
115		justification_sender: TracingUnboundedSender<worker_messages::ImportJustification<B>>,
116		block_import_sender: TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
117	) -> Self {
118		Self { justification_sender, block_import_sender }
119	}
120
121	pub fn close(&mut self) {
122		self.justification_sender.close();
123		self.block_import_sender.close();
124	}
125}
126
127impl<B: BlockT> ImportQueueService<B> for BasicQueueHandle<B> {
128	fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
129		if blocks.is_empty() {
130			return
131		}
132
133		trace!(target: LOG_TARGET, "Scheduling {} blocks for import", blocks.len());
134		let res = self
135			.block_import_sender
136			.unbounded_send(worker_messages::ImportBlocks(origin, blocks));
137
138		if res.is_err() {
139			log::error!(
140				target: LOG_TARGET,
141				"import_blocks: Background import task is no longer alive"
142			);
143		}
144	}
145
146	fn import_justifications(
147		&mut self,
148		who: RuntimeOrigin,
149		hash: B::Hash,
150		number: NumberFor<B>,
151		justifications: Justifications,
152	) {
153		for justification in justifications {
154			let res = self.justification_sender.unbounded_send(
155				worker_messages::ImportJustification(who, hash, number, justification),
156			);
157
158			if res.is_err() {
159				log::error!(
160					target: LOG_TARGET,
161					"import_justification: Background import task is no longer alive"
162				);
163			}
164		}
165	}
166}
167
168#[async_trait::async_trait]
169impl<B: BlockT> ImportQueue<B> for BasicQueue<B> {
170	/// Get handle to [`ImportQueueService`].
171	fn service(&self) -> Box<dyn ImportQueueService<B>> {
172		Box::new(self.handle.clone())
173	}
174
175	/// Get a reference to the handle to [`ImportQueueService`].
176	fn service_ref(&mut self) -> &mut dyn ImportQueueService<B> {
177		&mut self.handle
178	}
179
180	/// Poll actions from network.
181	fn poll_actions(&mut self, cx: &mut Context, link: &dyn Link<B>) {
182		if self.result_port.poll_actions(cx, link).is_err() {
183			log::error!(
184				target: LOG_TARGET,
185				"poll_actions: Background import task is no longer alive"
186			);
187		}
188	}
189
190	/// Start asynchronous runner for import queue.
191	///
192	/// Takes an object implementing [`Link`] which allows the import queue to
193	/// influence the synchronization process.
194	async fn run(mut self, link: &dyn Link<B>) {
195		loop {
196			if let Err(_) = self.result_port.next_action(link).await {
197				log::error!(target: "sync", "poll_actions: Background import task is no longer alive");
198				return
199			}
200		}
201	}
202}
203
204/// Messages designated to the background worker.
205mod worker_messages {
206	use super::*;
207
208	pub struct ImportBlocks<B: BlockT>(pub BlockOrigin, pub Vec<IncomingBlock<B>>);
209	pub struct ImportJustification<B: BlockT>(
210		pub RuntimeOrigin,
211		pub B::Hash,
212		pub NumberFor<B>,
213		pub Justification,
214	);
215}
216
217/// The process of importing blocks.
218///
219/// This polls the `block_import_receiver` for new blocks to import and than awaits on
220/// importing these blocks. After each block is imported, this async function yields once
221/// to give other futures the possibility to be run.
222///
223/// Returns when `block_import` ended.
224async fn block_import_process<B: BlockT>(
225	mut block_import: BoxBlockImport<B>,
226	verifier: impl Verifier<B>,
227	result_sender: BufferedLinkSender<B>,
228	mut block_import_receiver: TracingUnboundedReceiver<worker_messages::ImportBlocks<B>>,
229	metrics: Option<Metrics>,
230) {
231	loop {
232		let worker_messages::ImportBlocks(origin, blocks) = match block_import_receiver.next().await
233		{
234			Some(blocks) => blocks,
235			None => {
236				log::debug!(
237					target: LOG_TARGET,
238					"Stopping block import because the import channel was closed!",
239				);
240				return
241			},
242		};
243
244		let res =
245			import_many_blocks(&mut block_import, origin, blocks, &verifier, metrics.clone()).await;
246
247		result_sender.blocks_processed(res.imported, res.block_count, res.results);
248	}
249}
250
251struct BlockImportWorker<B: BlockT> {
252	result_sender: BufferedLinkSender<B>,
253	justification_import: Option<BoxJustificationImport<B>>,
254	metrics: Option<Metrics>,
255}
256
257impl<B: BlockT> BlockImportWorker<B> {
258	fn new<V>(
259		result_sender: BufferedLinkSender<B>,
260		verifier: V,
261		block_import: BoxBlockImport<B>,
262		justification_import: Option<BoxJustificationImport<B>>,
263		metrics: Option<Metrics>,
264	) -> (
265		impl Future<Output = ()> + Send,
266		TracingUnboundedSender<worker_messages::ImportJustification<B>>,
267		TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
268	)
269	where
270		V: Verifier<B> + 'static,
271	{
272		use worker_messages::*;
273
274		let (justification_sender, mut justification_port) =
275			tracing_unbounded("mpsc_import_queue_worker_justification", 100_000);
276
277		let (block_import_sender, block_import_receiver) =
278			tracing_unbounded("mpsc_import_queue_worker_blocks", 100_000);
279
280		let mut worker = BlockImportWorker { result_sender, justification_import, metrics };
281
282		let future = async move {
283			// Let's initialize `justification_import`
284			if let Some(justification_import) = worker.justification_import.as_mut() {
285				for (hash, number) in justification_import.on_start().await {
286					worker.result_sender.request_justification(&hash, number);
287				}
288			}
289
290			let block_import_process = block_import_process(
291				block_import,
292				verifier,
293				worker.result_sender.clone(),
294				block_import_receiver,
295				worker.metrics.clone(),
296			);
297			futures::pin_mut!(block_import_process);
298
299			loop {
300				// If the results sender is closed, that means that the import queue is shutting
301				// down and we should end this future.
302				if worker.result_sender.is_closed() {
303					log::debug!(
304						target: LOG_TARGET,
305						"Stopping block import because result channel was closed!",
306					);
307					return
308				}
309
310				// Make sure to first process all justifications
311				while let Poll::Ready(justification) = futures::poll!(justification_port.next()) {
312					match justification {
313						Some(ImportJustification(who, hash, number, justification)) =>
314							worker.import_justification(who, hash, number, justification).await,
315						None => {
316							log::debug!(
317								target: LOG_TARGET,
318								"Stopping block import because justification channel was closed!",
319							);
320							return
321						},
322					}
323				}
324
325				if let Poll::Ready(()) = futures::poll!(&mut block_import_process) {
326					return
327				}
328
329				// All futures that we polled are now pending.
330				futures::pending!()
331			}
332		};
333
334		(future, justification_sender, block_import_sender)
335	}
336
337	async fn import_justification(
338		&mut self,
339		who: RuntimeOrigin,
340		hash: B::Hash,
341		number: NumberFor<B>,
342		justification: Justification,
343	) {
344		let started = std::time::Instant::now();
345
346		let import_result = match self.justification_import.as_mut() {
347			Some(justification_import) => {
348				let result = justification_import
349				.import_justification(hash, number, justification)
350				.await
351				.map_err(|e| {
352					debug!(
353						target: LOG_TARGET,
354						"Justification import failed for hash = {:?} with number = {:?} coming from node = {:?} with error: {}",
355						hash,
356						number,
357						who,
358						e,
359					);
360					e
361				});
362				match result {
363					Ok(()) => JustificationImportResult::Success,
364					Err(sp_consensus::Error::OutdatedJustification) =>
365						JustificationImportResult::OutdatedJustification,
366					Err(_) => JustificationImportResult::Failure,
367				}
368			},
369			None => JustificationImportResult::Failure,
370		};
371
372		if let Some(metrics) = self.metrics.as_ref() {
373			metrics.justification_import_time.observe(started.elapsed().as_secs_f64());
374		}
375
376		self.result_sender.justification_imported(who, &hash, number, import_result);
377	}
378}
379
380/// Result of [`import_many_blocks`].
381struct ImportManyBlocksResult<B: BlockT> {
382	/// The number of blocks imported successfully.
383	imported: usize,
384	/// The total number of blocks processed.
385	block_count: usize,
386	/// The import results for each block.
387	results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
388}
389
390/// Import several blocks at once, returning import result for each block.
391///
392/// This will yield after each imported block once, to ensure that other futures can
393/// be called as well.
394async fn import_many_blocks<B: BlockT, V: Verifier<B>>(
395	import_handle: &mut BoxBlockImport<B>,
396	blocks_origin: BlockOrigin,
397	blocks: Vec<IncomingBlock<B>>,
398	verifier: &V,
399	metrics: Option<Metrics>,
400) -> ImportManyBlocksResult<B> {
401	let count = blocks.len();
402
403	let blocks_range = match (
404		blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())),
405		blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
406	) {
407		(Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
408		(Some(first), Some(_)) => format!(" ({})", first),
409		_ => Default::default(),
410	};
411
412	trace!(target: LOG_TARGET, "Starting import of {} blocks {}", count, blocks_range);
413
414	let mut imported = 0;
415	let mut results = vec![];
416	let mut has_error = false;
417	let mut blocks = blocks.into_iter();
418
419	// Blocks in the response/drain should be in ascending order.
420	loop {
421		// Is there any block left to import?
422		let block = match blocks.next() {
423			Some(b) => b,
424			None => {
425				// No block left to import, success!
426				return ImportManyBlocksResult { block_count: count, imported, results }
427			},
428		};
429
430		let block_number = block.header.as_ref().map(|h| *h.number());
431		let block_hash = block.hash;
432		let import_result = if has_error {
433			Err(BlockImportError::Cancelled)
434		} else {
435			let verification_fut = verify_single_block_metered(
436				import_handle,
437				blocks_origin,
438				block,
439				verifier,
440				metrics.as_ref(),
441			);
442			match verification_fut.await {
443				Ok(SingleBlockVerificationOutcome::Imported(import_status)) => Ok(import_status),
444				Ok(SingleBlockVerificationOutcome::Verified(import_parameters)) => {
445					// The actual import.
446					import_single_block_metered(import_handle, import_parameters, metrics.as_ref())
447						.await
448				},
449				Err(e) => Err(e),
450			}
451		};
452
453		if let Some(metrics) = metrics.as_ref() {
454			metrics.report_import::<B>(&import_result);
455		}
456
457		if import_result.is_ok() {
458			trace!(
459				target: LOG_TARGET,
460				"Block imported successfully {:?} ({})",
461				block_number,
462				block_hash,
463			);
464			imported += 1;
465		} else {
466			has_error = true;
467		}
468
469		results.push((import_result, block_hash));
470
471		Yield::new().await
472	}
473}
474
475/// A future that will always `yield` on the first call of `poll` but schedules the
476/// current task for re-execution.
477///
478/// This is done by getting the waker and calling `wake_by_ref` followed by returning
479/// `Pending`. The next time the `poll` is called, it will return `Ready`.
480struct Yield(bool);
481
482impl Yield {
483	fn new() -> Self {
484		Self(false)
485	}
486}
487
488impl Future for Yield {
489	type Output = ();
490
491	fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
492		if !self.0 {
493			self.0 = true;
494			cx.waker().wake_by_ref();
495			Poll::Pending
496		} else {
497			Poll::Ready(())
498		}
499	}
500}
501
502#[cfg(test)]
503mod tests {
504	use super::*;
505	use crate::{
506		block_import::{
507			BlockCheckParams, BlockImport, BlockImportParams, ImportResult, JustificationImport,
508		},
509		import_queue::Verifier,
510	};
511	use futures::{executor::block_on, Future};
512	use parking_lot::Mutex;
513	use sp_test_primitives::{Block, BlockNumber, Hash, Header};
514
515	#[async_trait::async_trait]
516	impl Verifier<Block> for () {
517		async fn verify(
518			&self,
519			block: BlockImportParams<Block>,
520		) -> Result<BlockImportParams<Block>, String> {
521			Ok(BlockImportParams::new(block.origin, block.header))
522		}
523	}
524
525	#[async_trait::async_trait]
526	impl BlockImport<Block> for () {
527		type Error = sp_consensus::Error;
528
529		async fn check_block(
530			&self,
531			_block: BlockCheckParams<Block>,
532		) -> Result<ImportResult, Self::Error> {
533			Ok(ImportResult::imported(false))
534		}
535
536		async fn import_block(
537			&self,
538			_block: BlockImportParams<Block>,
539		) -> Result<ImportResult, Self::Error> {
540			Ok(ImportResult::imported(true))
541		}
542	}
543
544	#[async_trait::async_trait]
545	impl JustificationImport<Block> for () {
546		type Error = sp_consensus::Error;
547
548		async fn on_start(&mut self) -> Vec<(Hash, BlockNumber)> {
549			Vec::new()
550		}
551
552		async fn import_justification(
553			&mut self,
554			_hash: Hash,
555			_number: BlockNumber,
556			_justification: Justification,
557		) -> Result<(), Self::Error> {
558			Ok(())
559		}
560	}
561
562	#[derive(Debug, PartialEq)]
563	enum Event {
564		JustificationImported(Hash),
565		BlockImported(Hash),
566	}
567
568	#[derive(Default)]
569	struct TestLink {
570		events: Mutex<Vec<Event>>,
571	}
572
573	impl Link<Block> for TestLink {
574		fn blocks_processed(
575			&self,
576			_imported: usize,
577			_count: usize,
578			results: Vec<(Result<BlockImportStatus<BlockNumber>, BlockImportError>, Hash)>,
579		) {
580			if let Some(hash) = results.into_iter().find_map(|(r, h)| r.ok().map(|_| h)) {
581				self.events.lock().push(Event::BlockImported(hash));
582			}
583		}
584
585		fn justification_imported(
586			&self,
587			_who: RuntimeOrigin,
588			hash: &Hash,
589			_number: BlockNumber,
590			_import_result: JustificationImportResult,
591		) {
592			self.events.lock().push(Event::JustificationImported(*hash))
593		}
594	}
595
596	#[test]
597	fn prioritizes_finality_work_over_block_import() {
598		let (result_sender, mut result_port) = buffered_link::buffered_link(100_000);
599
600		let (worker, finality_sender, block_import_sender) =
601			BlockImportWorker::new(result_sender, (), Box::new(()), Some(Box::new(())), None);
602		futures::pin_mut!(worker);
603
604		let import_block = |n| {
605			let header = Header {
606				parent_hash: Hash::random(),
607				number: n,
608				extrinsics_root: Hash::random(),
609				state_root: Default::default(),
610				digest: Default::default(),
611			};
612
613			let hash = header.hash();
614
615			block_import_sender
616				.unbounded_send(worker_messages::ImportBlocks(
617					BlockOrigin::Own,
618					vec![IncomingBlock {
619						hash,
620						header: Some(header),
621						body: None,
622						indexed_body: None,
623						justifications: None,
624						origin: None,
625						allow_missing_state: false,
626						import_existing: false,
627						state: None,
628						skip_execution: false,
629					}],
630				))
631				.unwrap();
632
633			hash
634		};
635
636		let import_justification = || {
637			let hash = Hash::random();
638			finality_sender
639				.unbounded_send(worker_messages::ImportJustification(
640					sc_network_types::PeerId::random(),
641					hash,
642					1,
643					(*b"TEST", Vec::new()),
644				))
645				.unwrap();
646
647			hash
648		};
649
650		let link = TestLink::default();
651
652		// we send a bunch of tasks to the worker
653		let block1 = import_block(1);
654		let block2 = import_block(2);
655		let block3 = import_block(3);
656		let justification1 = import_justification();
657		let justification2 = import_justification();
658		let block4 = import_block(4);
659		let block5 = import_block(5);
660		let block6 = import_block(6);
661		let justification3 = import_justification();
662
663		// we poll the worker until we have processed 9 events
664		block_on(futures::future::poll_fn(|cx| {
665			while link.events.lock().len() < 9 {
666				match Future::poll(Pin::new(&mut worker), cx) {
667					Poll::Pending => {},
668					Poll::Ready(()) => panic!("import queue worker should not conclude."),
669				}
670
671				result_port.poll_actions(cx, &link).unwrap();
672			}
673
674			Poll::Ready(())
675		}));
676
677		// all justification tasks must be done before any block import work
678		assert_eq!(
679			&*link.events.lock(),
680			&[
681				Event::JustificationImported(justification1),
682				Event::JustificationImported(justification2),
683				Event::JustificationImported(justification3),
684				Event::BlockImported(block1),
685				Event::BlockImported(block2),
686				Event::BlockImported(block3),
687				Event::BlockImported(block4),
688				Event::BlockImported(block5),
689				Event::BlockImported(block6),
690			]
691		);
692	}
693}