use futures::{
prelude::*,
task::{Context, Poll},
};
use futures_timer::Delay;
use log::{debug, trace};
use prometheus_endpoint::Registry;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_consensus::BlockOrigin;
use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT, NumberFor},
Justification, Justifications,
};
use std::{pin::Pin, time::Duration};
use crate::{
import_queue::{
buffered_link::{self, BufferedLinkReceiver, BufferedLinkSender},
import_single_block_metered, BlockImportError, BlockImportStatus, BoxBlockImport,
BoxJustificationImport, ImportQueue, ImportQueueService, IncomingBlock, Link,
RuntimeOrigin, Verifier, LOG_TARGET,
},
metrics::Metrics,
};
pub struct BasicQueue<B: BlockT> {
handle: BasicQueueHandle<B>,
result_port: BufferedLinkReceiver<B>,
}
impl<B: BlockT> Drop for BasicQueue<B> {
fn drop(&mut self) {
self.handle.close();
self.result_port.close();
}
}
impl<B: BlockT> BasicQueue<B> {
pub fn new<V: 'static + Verifier<B>>(
verifier: V,
block_import: BoxBlockImport<B>,
justification_import: Option<BoxJustificationImport<B>>,
spawner: &impl sp_core::traits::SpawnEssentialNamed,
prometheus_registry: Option<&Registry>,
) -> Self {
let (result_sender, result_port) = buffered_link::buffered_link(100_000);
let metrics = prometheus_registry.and_then(|r| {
Metrics::register(r)
.map_err(|err| {
log::warn!("Failed to register Prometheus metrics: {}", err);
})
.ok()
});
let (future, justification_sender, block_import_sender) = BlockImportWorker::new(
result_sender,
verifier,
block_import,
justification_import,
metrics,
);
spawner.spawn_essential_blocking(
"basic-block-import-worker",
Some("block-import"),
future.boxed(),
);
Self {
handle: BasicQueueHandle::new(justification_sender, block_import_sender),
result_port,
}
}
}
#[derive(Clone)]
struct BasicQueueHandle<B: BlockT> {
justification_sender: TracingUnboundedSender<worker_messages::ImportJustification<B>>,
block_import_sender: TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
}
impl<B: BlockT> BasicQueueHandle<B> {
pub fn new(
justification_sender: TracingUnboundedSender<worker_messages::ImportJustification<B>>,
block_import_sender: TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
) -> Self {
Self { justification_sender, block_import_sender }
}
pub fn close(&mut self) {
self.justification_sender.close();
self.block_import_sender.close();
}
}
impl<B: BlockT> ImportQueueService<B> for BasicQueueHandle<B> {
fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
if blocks.is_empty() {
return
}
trace!(target: LOG_TARGET, "Scheduling {} blocks for import", blocks.len());
let res = self
.block_import_sender
.unbounded_send(worker_messages::ImportBlocks(origin, blocks));
if res.is_err() {
log::error!(
target: LOG_TARGET,
"import_blocks: Background import task is no longer alive"
);
}
}
fn import_justifications(
&mut self,
who: RuntimeOrigin,
hash: B::Hash,
number: NumberFor<B>,
justifications: Justifications,
) {
for justification in justifications {
let res = self.justification_sender.unbounded_send(
worker_messages::ImportJustification(who, hash, number, justification),
);
if res.is_err() {
log::error!(
target: LOG_TARGET,
"import_justification: Background import task is no longer alive"
);
}
}
}
}
#[async_trait::async_trait]
impl<B: BlockT> ImportQueue<B> for BasicQueue<B> {
fn service(&self) -> Box<dyn ImportQueueService<B>> {
Box::new(self.handle.clone())
}
fn service_ref(&mut self) -> &mut dyn ImportQueueService<B> {
&mut self.handle
}
fn poll_actions(&mut self, cx: &mut Context, link: &mut dyn Link<B>) {
if self.result_port.poll_actions(cx, link).is_err() {
log::error!(
target: LOG_TARGET,
"poll_actions: Background import task is no longer alive"
);
}
}
async fn run(mut self, mut link: Box<dyn Link<B>>) {
loop {
if let Err(_) = self.result_port.next_action(&mut *link).await {
log::error!(target: "sync", "poll_actions: Background import task is no longer alive");
return
}
}
}
}
mod worker_messages {
use super::*;
pub struct ImportBlocks<B: BlockT>(pub BlockOrigin, pub Vec<IncomingBlock<B>>);
pub struct ImportJustification<B: BlockT>(
pub RuntimeOrigin,
pub B::Hash,
pub NumberFor<B>,
pub Justification,
);
}
async fn block_import_process<B: BlockT>(
mut block_import: BoxBlockImport<B>,
mut verifier: impl Verifier<B>,
mut result_sender: BufferedLinkSender<B>,
mut block_import_receiver: TracingUnboundedReceiver<worker_messages::ImportBlocks<B>>,
metrics: Option<Metrics>,
delay_between_blocks: Duration,
) {
loop {
let worker_messages::ImportBlocks(origin, blocks) = match block_import_receiver.next().await
{
Some(blocks) => blocks,
None => {
log::debug!(
target: LOG_TARGET,
"Stopping block import because the import channel was closed!",
);
return
},
};
let res = import_many_blocks(
&mut block_import,
origin,
blocks,
&mut verifier,
delay_between_blocks,
metrics.clone(),
)
.await;
result_sender.blocks_processed(res.imported, res.block_count, res.results);
}
}
struct BlockImportWorker<B: BlockT> {
result_sender: BufferedLinkSender<B>,
justification_import: Option<BoxJustificationImport<B>>,
metrics: Option<Metrics>,
}
impl<B: BlockT> BlockImportWorker<B> {
fn new<V: 'static + Verifier<B>>(
result_sender: BufferedLinkSender<B>,
verifier: V,
block_import: BoxBlockImport<B>,
justification_import: Option<BoxJustificationImport<B>>,
metrics: Option<Metrics>,
) -> (
impl Future<Output = ()> + Send,
TracingUnboundedSender<worker_messages::ImportJustification<B>>,
TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
) {
use worker_messages::*;
let (justification_sender, mut justification_port) =
tracing_unbounded("mpsc_import_queue_worker_justification", 100_000);
let (block_import_sender, block_import_port) =
tracing_unbounded("mpsc_import_queue_worker_blocks", 100_000);
let mut worker = BlockImportWorker { result_sender, justification_import, metrics };
let delay_between_blocks = Duration::default();
let future = async move {
if let Some(justification_import) = worker.justification_import.as_mut() {
for (hash, number) in justification_import.on_start().await {
worker.result_sender.request_justification(&hash, number);
}
}
let block_import_process = block_import_process(
block_import,
verifier,
worker.result_sender.clone(),
block_import_port,
worker.metrics.clone(),
delay_between_blocks,
);
futures::pin_mut!(block_import_process);
loop {
if worker.result_sender.is_closed() {
log::debug!(
target: LOG_TARGET,
"Stopping block import because result channel was closed!",
);
return
}
while let Poll::Ready(justification) = futures::poll!(justification_port.next()) {
match justification {
Some(ImportJustification(who, hash, number, justification)) =>
worker.import_justification(who, hash, number, justification).await,
None => {
log::debug!(
target: LOG_TARGET,
"Stopping block import because justification channel was closed!",
);
return
},
}
}
if let Poll::Ready(()) = futures::poll!(&mut block_import_process) {
return
}
futures::pending!()
}
};
(future, justification_sender, block_import_sender)
}
async fn import_justification(
&mut self,
who: RuntimeOrigin,
hash: B::Hash,
number: NumberFor<B>,
justification: Justification,
) {
let started = std::time::Instant::now();
let success = match self.justification_import.as_mut() {
Some(justification_import) => justification_import
.import_justification(hash, number, justification)
.await
.map_err(|e| {
debug!(
target: LOG_TARGET,
"Justification import failed for hash = {:?} with number = {:?} coming from node = {:?} with error: {}",
hash,
number,
who,
e,
);
e
})
.is_ok(),
None => false,
};
if let Some(metrics) = self.metrics.as_ref() {
metrics.justification_import_time.observe(started.elapsed().as_secs_f64());
}
self.result_sender.justification_imported(who, &hash, number, success);
}
}
struct ImportManyBlocksResult<B: BlockT> {
imported: usize,
block_count: usize,
results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
}
async fn import_many_blocks<B: BlockT, V: Verifier<B>>(
import_handle: &mut BoxBlockImport<B>,
blocks_origin: BlockOrigin,
blocks: Vec<IncomingBlock<B>>,
verifier: &mut V,
delay_between_blocks: Duration,
metrics: Option<Metrics>,
) -> ImportManyBlocksResult<B> {
let count = blocks.len();
let blocks_range = match (
blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())),
blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
) {
(Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
(Some(first), Some(_)) => format!(" ({})", first),
_ => Default::default(),
};
trace!(target: LOG_TARGET, "Starting import of {} blocks {}", count, blocks_range);
let mut imported = 0;
let mut results = vec![];
let mut has_error = false;
let mut blocks = blocks.into_iter();
loop {
let block = match blocks.next() {
Some(b) => b,
None => {
return ImportManyBlocksResult { block_count: count, imported, results }
},
};
let block_number = block.header.as_ref().map(|h| *h.number());
let block_hash = block.hash;
let import_result = if has_error {
Err(BlockImportError::Cancelled)
} else {
import_single_block_metered(
import_handle,
blocks_origin,
block,
verifier,
metrics.clone(),
)
.await
};
if let Some(metrics) = metrics.as_ref() {
metrics.report_import::<B>(&import_result);
}
if import_result.is_ok() {
trace!(
target: LOG_TARGET,
"Block imported successfully {:?} ({})",
block_number,
block_hash,
);
imported += 1;
} else {
has_error = true;
}
results.push((import_result, block_hash));
if delay_between_blocks != Duration::default() && !has_error {
Delay::new(delay_between_blocks).await;
} else {
Yield::new().await
}
}
}
struct Yield(bool);
impl Yield {
fn new() -> Self {
Self(false)
}
}
impl Future for Yield {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if !self.0 {
self.0 = true;
cx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(())
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
block_import::{
BlockCheckParams, BlockImport, BlockImportParams, ImportResult, JustificationImport,
},
import_queue::Verifier,
};
use futures::{executor::block_on, Future};
use sp_test_primitives::{Block, BlockNumber, Hash, Header};
#[async_trait::async_trait]
impl Verifier<Block> for () {
async fn verify(
&mut self,
block: BlockImportParams<Block>,
) -> Result<BlockImportParams<Block>, String> {
Ok(BlockImportParams::new(block.origin, block.header))
}
}
#[async_trait::async_trait]
impl BlockImport<Block> for () {
type Error = sp_consensus::Error;
async fn check_block(
&mut self,
_block: BlockCheckParams<Block>,
) -> Result<ImportResult, Self::Error> {
Ok(ImportResult::imported(false))
}
async fn import_block(
&mut self,
_block: BlockImportParams<Block>,
) -> Result<ImportResult, Self::Error> {
Ok(ImportResult::imported(true))
}
}
#[async_trait::async_trait]
impl JustificationImport<Block> for () {
type Error = sp_consensus::Error;
async fn on_start(&mut self) -> Vec<(Hash, BlockNumber)> {
Vec::new()
}
async fn import_justification(
&mut self,
_hash: Hash,
_number: BlockNumber,
_justification: Justification,
) -> Result<(), Self::Error> {
Ok(())
}
}
#[derive(Debug, PartialEq)]
enum Event {
JustificationImported(Hash),
BlockImported(Hash),
}
#[derive(Default)]
struct TestLink {
events: Vec<Event>,
}
impl Link<Block> for TestLink {
fn blocks_processed(
&mut self,
_imported: usize,
_count: usize,
results: Vec<(Result<BlockImportStatus<BlockNumber>, BlockImportError>, Hash)>,
) {
if let Some(hash) = results.into_iter().find_map(|(r, h)| r.ok().map(|_| h)) {
self.events.push(Event::BlockImported(hash));
}
}
fn justification_imported(
&mut self,
_who: RuntimeOrigin,
hash: &Hash,
_number: BlockNumber,
_success: bool,
) {
self.events.push(Event::JustificationImported(*hash))
}
}
#[test]
fn prioritizes_finality_work_over_block_import() {
let (result_sender, mut result_port) = buffered_link::buffered_link(100_000);
let (worker, finality_sender, block_import_sender) =
BlockImportWorker::new(result_sender, (), Box::new(()), Some(Box::new(())), None);
futures::pin_mut!(worker);
let import_block = |n| {
let header = Header {
parent_hash: Hash::random(),
number: n,
extrinsics_root: Hash::random(),
state_root: Default::default(),
digest: Default::default(),
};
let hash = header.hash();
block_import_sender
.unbounded_send(worker_messages::ImportBlocks(
BlockOrigin::Own,
vec![IncomingBlock {
hash,
header: Some(header),
body: None,
indexed_body: None,
justifications: None,
origin: None,
allow_missing_state: false,
import_existing: false,
state: None,
skip_execution: false,
}],
))
.unwrap();
hash
};
let import_justification = || {
let hash = Hash::random();
finality_sender
.unbounded_send(worker_messages::ImportJustification(
libp2p_identity::PeerId::random(),
hash,
1,
(*b"TEST", Vec::new()),
))
.unwrap();
hash
};
let mut link = TestLink::default();
let block1 = import_block(1);
let block2 = import_block(2);
let block3 = import_block(3);
let justification1 = import_justification();
let justification2 = import_justification();
let block4 = import_block(4);
let block5 = import_block(5);
let block6 = import_block(6);
let justification3 = import_justification();
block_on(futures::future::poll_fn(|cx| {
while link.events.len() < 9 {
match Future::poll(Pin::new(&mut worker), cx) {
Poll::Pending => {},
Poll::Ready(()) => panic!("import queue worker should not conclude."),
}
result_port.poll_actions(cx, &mut link).unwrap();
}
Poll::Ready(())
}));
assert_eq!(
link.events,
vec![
Event::JustificationImported(justification1),
Event::JustificationImported(justification2),
Event::JustificationImported(justification3),
Event::BlockImported(block1),
Event::BlockImported(block2),
Event::BlockImported(block3),
Event::BlockImported(block4),
Event::BlockImported(block5),
Event::BlockImported(block6),
]
);
}
}