use futures::prelude::*;
use futures_timer::Delay;
use prometheus_endpoint::Registry;
use sc_client_api::{
backend::{Backend as ClientBackend, Finalizer},
client::BlockchainEvents,
};
use sc_consensus::{
block_import::{BlockImport, BlockImportParams, ForkChoiceStrategy},
import_queue::{BasicQueue, BoxBlockImport, Verifier},
};
use sp_blockchain::HeaderBackend;
use sp_consensus::{Environment, Proposer, SelectChain};
use sp_core::traits::SpawnNamed;
use sp_inherents::CreateInherentDataProviders;
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
use std::{marker::PhantomData, sync::Arc, time::Duration};
mod error;
mod finalize_block;
mod seal_block;
pub mod consensus;
pub mod rpc;
pub use self::{
consensus::ConsensusDataProvider,
error::Error,
finalize_block::{finalize_block, FinalizeBlockParams},
rpc::{CreatedBlock, EngineCommand},
seal_block::{seal_block, SealBlockParams, MAX_PROPOSAL_DURATION},
};
use sc_transaction_pool_api::TransactionPool;
use sp_api::ProvideRuntimeApi;
const LOG_TARGET: &str = "manual-seal";
pub const MANUAL_SEAL_ENGINE_ID: ConsensusEngineId = [b'm', b'a', b'n', b'l'];
struct ManualSealVerifier;
#[async_trait::async_trait]
impl<B: BlockT> Verifier<B> for ManualSealVerifier {
async fn verify(
&self,
mut block: BlockImportParams<B>,
) -> Result<BlockImportParams<B>, String> {
block.finalized = false;
block.fork_choice = Some(ForkChoiceStrategy::LongestChain);
Ok(block)
}
}
pub fn import_queue<Block>(
block_import: BoxBlockImport<Block>,
spawner: &impl sp_core::traits::SpawnEssentialNamed,
registry: Option<&Registry>,
) -> BasicQueue<Block>
where
Block: BlockT,
{
BasicQueue::new(ManualSealVerifier, block_import, None, spawner, registry)
}
pub struct ManualSealParams<B: BlockT, BI, E, C: ProvideRuntimeApi<B>, TP, SC, CS, CIDP, P> {
pub block_import: BI,
pub env: E,
pub client: Arc<C>,
pub pool: Arc<TP>,
pub commands_stream: CS,
pub select_chain: SC,
pub consensus_data_provider: Option<Box<dyn ConsensusDataProvider<B, Proof = P>>>,
pub create_inherent_data_providers: CIDP,
}
pub struct InstantSealParams<B: BlockT, BI, E, C: ProvideRuntimeApi<B>, TP, SC, CIDP, P> {
pub block_import: BI,
pub env: E,
pub client: Arc<C>,
pub pool: Arc<TP>,
pub select_chain: SC,
pub consensus_data_provider: Option<Box<dyn ConsensusDataProvider<B, Proof = P>>>,
pub create_inherent_data_providers: CIDP,
}
pub struct DelayedFinalizeParams<C, S> {
pub client: Arc<C>,
pub spawn_handle: S,
pub delay_sec: u64,
}
pub async fn run_manual_seal<B, BI, CB, E, C, TP, SC, CS, CIDP, P>(
ManualSealParams {
mut block_import,
mut env,
client,
pool,
mut commands_stream,
select_chain,
consensus_data_provider,
create_inherent_data_providers,
}: ManualSealParams<B, BI, E, C, TP, SC, CS, CIDP, P>,
) where
B: BlockT + 'static,
BI: BlockImport<B, Error = sp_consensus::Error> + Send + Sync + 'static,
C: HeaderBackend<B> + Finalizer<B, CB> + ProvideRuntimeApi<B> + 'static,
CB: ClientBackend<B> + 'static,
E: Environment<B> + 'static,
E::Proposer: Proposer<B, Proof = P>,
CS: Stream<Item = EngineCommand<<B as BlockT>::Hash>> + Unpin + 'static,
SC: SelectChain<B> + 'static,
TP: TransactionPool<Block = B>,
CIDP: CreateInherentDataProviders<B, ()>,
P: codec::Encode + Send + Sync + 'static,
{
while let Some(command) = commands_stream.next().await {
match command {
EngineCommand::SealNewBlock { create_empty, finalize, parent_hash, sender } => {
seal_block(SealBlockParams {
sender,
parent_hash,
finalize,
create_empty,
env: &mut env,
select_chain: &select_chain,
block_import: &mut block_import,
consensus_data_provider: consensus_data_provider.as_deref(),
pool: pool.clone(),
client: client.clone(),
create_inherent_data_providers: &create_inherent_data_providers,
})
.await;
},
EngineCommand::FinalizeBlock { hash, sender, justification } => {
let justification = justification.map(|j| (MANUAL_SEAL_ENGINE_ID, j));
finalize_block(FinalizeBlockParams {
hash,
sender,
justification,
finalizer: client.clone(),
_phantom: PhantomData,
})
.await
},
}
}
}
pub async fn run_instant_seal<B, BI, CB, E, C, TP, SC, CIDP, P>(
InstantSealParams {
block_import,
env,
client,
pool,
select_chain,
consensus_data_provider,
create_inherent_data_providers,
}: InstantSealParams<B, BI, E, C, TP, SC, CIDP, P>,
) where
B: BlockT + 'static,
BI: BlockImport<B, Error = sp_consensus::Error> + Send + Sync + 'static,
C: HeaderBackend<B> + Finalizer<B, CB> + ProvideRuntimeApi<B> + 'static,
CB: ClientBackend<B> + 'static,
E: Environment<B> + 'static,
E::Proposer: Proposer<B, Proof = P>,
SC: SelectChain<B> + 'static,
TP: TransactionPool<Block = B>,
CIDP: CreateInherentDataProviders<B, ()>,
P: codec::Encode + Send + Sync + 'static,
{
let commands_stream = pool.import_notification_stream().map(|_| EngineCommand::SealNewBlock {
create_empty: true,
finalize: false,
parent_hash: None,
sender: None,
});
run_manual_seal(ManualSealParams {
block_import,
env,
client,
pool,
commands_stream,
select_chain,
consensus_data_provider,
create_inherent_data_providers,
})
.await
}
pub async fn run_instant_seal_and_finalize<B, BI, CB, E, C, TP, SC, CIDP, P>(
InstantSealParams {
block_import,
env,
client,
pool,
select_chain,
consensus_data_provider,
create_inherent_data_providers,
}: InstantSealParams<B, BI, E, C, TP, SC, CIDP, P>,
) where
B: BlockT + 'static,
BI: BlockImport<B, Error = sp_consensus::Error> + Send + Sync + 'static,
C: HeaderBackend<B> + Finalizer<B, CB> + ProvideRuntimeApi<B> + 'static,
CB: ClientBackend<B> + 'static,
E: Environment<B> + 'static,
E::Proposer: Proposer<B, Proof = P>,
SC: SelectChain<B> + 'static,
TP: TransactionPool<Block = B>,
CIDP: CreateInherentDataProviders<B, ()>,
P: codec::Encode + Send + Sync + 'static,
{
let commands_stream = pool.import_notification_stream().map(|_| EngineCommand::SealNewBlock {
create_empty: false,
finalize: true,
parent_hash: None,
sender: None,
});
run_manual_seal(ManualSealParams {
block_import,
env,
client,
pool,
commands_stream,
select_chain,
consensus_data_provider,
create_inherent_data_providers,
})
.await
}
pub async fn run_delayed_finalize<B, CB, C, S>(
DelayedFinalizeParams { client, spawn_handle, delay_sec }: DelayedFinalizeParams<C, S>,
) where
B: BlockT + 'static,
CB: ClientBackend<B> + 'static,
C: HeaderBackend<B> + Finalizer<B, CB> + ProvideRuntimeApi<B> + BlockchainEvents<B> + 'static,
S: SpawnNamed,
{
let mut block_import_stream = client.import_notification_stream();
while let Some(notification) = block_import_stream.next().await {
let delay = Delay::new(Duration::from_secs(delay_sec));
let cloned_client = client.clone();
spawn_handle.spawn(
"delayed-finalize",
None,
Box::pin(async move {
delay.await;
finalize_block(FinalizeBlockParams {
hash: notification.hash,
sender: None,
justification: None,
finalizer: cloned_client,
_phantom: PhantomData,
})
.await
}),
);
}
}
#[cfg(test)]
mod tests {
use super::*;
use sc_basic_authorship::ProposerFactory;
use sc_consensus::ImportedAux;
use sc_transaction_pool::{BasicPool, FullChainApi, Options, RevalidationType};
use sc_transaction_pool_api::{MaintainedTransactionPool, TransactionPool, TransactionSource};
use sp_inherents::InherentData;
use sp_runtime::generic::{Digest, DigestItem};
use substrate_test_runtime_client::{
DefaultTestClientBuilderExt, Sr25519Keyring::*, TestClientBuilder, TestClientBuilderExt,
};
use substrate_test_runtime_transaction_pool::{uxt, TestApi};
fn api() -> Arc<TestApi> {
Arc::new(TestApi::empty())
}
const SOURCE: TransactionSource = TransactionSource::External;
struct TestDigestProvider<C> {
_client: Arc<C>,
}
impl<B, C> ConsensusDataProvider<B> for TestDigestProvider<C>
where
B: BlockT,
C: ProvideRuntimeApi<B> + Send + Sync,
{
type Proof = ();
fn create_digest(
&self,
_parent: &B::Header,
_inherents: &InherentData,
) -> Result<Digest, Error> {
Ok(Digest { logs: vec![] })
}
fn append_block_import(
&self,
_parent: &B::Header,
params: &mut BlockImportParams<B>,
_inherents: &InherentData,
_proof: Self::Proof,
) -> Result<(), Error> {
params.post_digests.push(DigestItem::Other(vec![1]));
Ok(())
}
}
#[tokio::test]
async fn instant_seal() {
let builder = TestClientBuilder::new();
let (client, select_chain) = builder.build_with_longest_chain();
let client = Arc::new(client);
let spawner = sp_core::testing::TaskExecutor::new();
let genesis_hash = client.info().genesis_hash;
let pool_api = Arc::new(FullChainApi::new(client.clone(), None, &spawner.clone()));
let pool = Arc::new(BasicPool::with_revalidation_type(
Options::default(),
true.into(),
pool_api,
None,
RevalidationType::Full,
spawner.clone(),
0,
genesis_hash,
genesis_hash,
));
let env = ProposerFactory::new(spawner.clone(), client.clone(), pool.clone(), None, None);
let (sender, receiver) = futures::channel::oneshot::channel();
let mut sender = Arc::new(Some(sender));
let commands_stream =
pool.pool().validated_pool().import_notification_stream().map(move |_| {
let mut_sender = Arc::get_mut(&mut sender).unwrap();
let sender = std::mem::take(mut_sender);
EngineCommand::SealNewBlock {
create_empty: false,
finalize: true,
parent_hash: None,
sender,
}
});
tokio::spawn(run_manual_seal(ManualSealParams {
block_import: client.clone(),
env,
client: client.clone(),
pool: pool.clone(),
commands_stream,
select_chain,
create_inherent_data_providers: |_, _| async { Ok(()) },
consensus_data_provider: None,
}));
let result = pool.submit_one(genesis_hash, SOURCE, uxt(Alice, 0)).await;
assert!(result.is_ok());
let created_block = receiver.await.unwrap().unwrap();
assert_eq!(
created_block,
CreatedBlock {
hash: created_block.hash,
aux: ImportedAux {
header_only: false,
clear_justification_requests: false,
needs_justification: false,
bad_justification: false,
is_new_best: true,
},
proof_size: 0
}
);
assert!(client.header(created_block.hash).unwrap().is_some());
assert_eq!(client.header(created_block.hash).unwrap().unwrap().number, 1)
}
#[allow(unused)]
async fn instant_seal_delayed_finalize() {
let builder = TestClientBuilder::new();
let (client, select_chain) = builder.build_with_longest_chain();
let client = Arc::new(client);
let spawner = sp_core::testing::TaskExecutor::new();
let genesis_hash = client.info().genesis_hash;
let pool_api = Arc::new(FullChainApi::new(client.clone(), None, &spawner.clone()));
let pool = Arc::new(BasicPool::with_revalidation_type(
Options::default(),
true.into(),
pool_api,
None,
RevalidationType::Full,
spawner.clone(),
0,
genesis_hash,
genesis_hash,
));
let env = ProposerFactory::new(spawner.clone(), client.clone(), pool.clone(), None, None);
let (sender, receiver) = futures::channel::oneshot::channel();
let mut sender = Arc::new(Some(sender));
let commands_stream =
pool.pool().validated_pool().import_notification_stream().map(move |_| {
let mut_sender = Arc::get_mut(&mut sender).unwrap();
let sender = std::mem::take(mut_sender);
EngineCommand::SealNewBlock {
create_empty: false,
finalize: false,
parent_hash: None,
sender,
}
});
tokio::spawn(run_manual_seal(ManualSealParams {
block_import: client.clone(),
commands_stream,
env,
client: client.clone(),
pool: pool.clone(),
select_chain,
create_inherent_data_providers: |_, _| async { Ok(()) },
consensus_data_provider: None,
}));
let delay_sec = 5;
tokio::spawn(run_delayed_finalize(DelayedFinalizeParams {
client: client.clone(),
delay_sec,
spawn_handle: spawner,
}));
let mut finality_stream = client.finality_notification_stream();
let result = pool.submit_one(genesis_hash, SOURCE, uxt(Alice, 0)).await;
assert!(result.is_ok());
let created_block = receiver.await.unwrap().unwrap();
assert_eq!(
created_block,
CreatedBlock {
hash: created_block.hash,
aux: ImportedAux {
header_only: false,
clear_justification_requests: false,
needs_justification: false,
bad_justification: false,
is_new_best: true,
},
proof_size: created_block.proof_size
}
);
assert!(client.header(created_block.hash).unwrap().is_some());
assert_eq!(client.header(created_block.hash).unwrap().unwrap().number, 1);
assert_eq!(client.info().finalized_hash, client.info().genesis_hash);
let finalized = finality_stream.select_next_some().await;
assert_eq!(finalized.hash, created_block.hash);
}
#[tokio::test]
async fn manual_seal_and_finalization() {
let builder = TestClientBuilder::new();
let (client, select_chain) = builder.build_with_longest_chain();
let client = Arc::new(client);
let spawner = sp_core::testing::TaskExecutor::new();
let genesis_hash = client.info().genesis_hash;
let pool_api = Arc::new(FullChainApi::new(client.clone(), None, &spawner.clone()));
let pool = Arc::new(BasicPool::with_revalidation_type(
Options::default(),
true.into(),
pool_api,
None,
RevalidationType::Full,
spawner.clone(),
0,
genesis_hash,
genesis_hash,
));
let env = ProposerFactory::new(spawner.clone(), client.clone(), pool.clone(), None, None);
let (mut sink, commands_stream) = futures::channel::mpsc::channel(1024);
tokio::spawn(run_manual_seal(ManualSealParams {
block_import: client.clone(),
env,
client: client.clone(),
pool: pool.clone(),
commands_stream,
select_chain,
consensus_data_provider: None,
create_inherent_data_providers: |_, _| async { Ok(()) },
}));
let result = pool.submit_one(genesis_hash, SOURCE, uxt(Alice, 0)).await;
assert!(result.is_ok());
let (tx, rx) = futures::channel::oneshot::channel();
sink.send(EngineCommand::SealNewBlock {
parent_hash: None,
sender: Some(tx),
create_empty: false,
finalize: false,
})
.await
.unwrap();
let created_block = rx.await.unwrap().unwrap();
assert_eq!(
created_block,
CreatedBlock {
hash: created_block.hash,
aux: ImportedAux {
header_only: false,
clear_justification_requests: false,
needs_justification: false,
bad_justification: false,
is_new_best: true,
},
proof_size: 0
}
);
let header = client.header(created_block.hash).unwrap().unwrap();
let (tx, rx) = futures::channel::oneshot::channel();
sink.send(EngineCommand::FinalizeBlock {
sender: Some(tx),
hash: header.hash(),
justification: None,
})
.await
.unwrap();
rx.await.unwrap().unwrap();
}
#[tokio::test]
async fn manual_seal_fork_blocks() {
let builder = TestClientBuilder::new();
let (client, select_chain) = builder.build_with_longest_chain();
let client = Arc::new(client);
let pool_api = Arc::new(FullChainApi::new(
client.clone(),
None,
&sp_core::testing::TaskExecutor::new(),
));
let spawner = sp_core::testing::TaskExecutor::new();
let genesis_hash = client.info().genesis_hash;
let pool = Arc::new(BasicPool::with_revalidation_type(
Options::default(),
true.into(),
pool_api.clone(),
None,
RevalidationType::Full,
spawner.clone(),
0,
genesis_hash,
genesis_hash,
));
let env = ProposerFactory::new(spawner.clone(), client.clone(), pool.clone(), None, None);
let (mut sink, commands_stream) = futures::channel::mpsc::channel(1024);
tokio::spawn(run_manual_seal(ManualSealParams {
block_import: client.clone(),
env,
client: client.clone(),
pool: pool.clone(),
commands_stream,
select_chain,
consensus_data_provider: None,
create_inherent_data_providers: |_, _| async { Ok(()) },
}));
let result = pool.submit_one(genesis_hash, SOURCE, uxt(Alice, 0)).await;
assert!(result.is_ok());
let (tx, rx) = futures::channel::oneshot::channel();
sink.send(EngineCommand::SealNewBlock {
parent_hash: None,
sender: Some(tx),
create_empty: false,
finalize: false,
})
.await
.unwrap();
let created_block = rx.await.unwrap().unwrap();
assert_eq!(
created_block,
CreatedBlock {
hash: created_block.hash,
aux: ImportedAux {
header_only: false,
clear_justification_requests: false,
needs_justification: false,
bad_justification: false,
is_new_best: true
},
proof_size: 0
}
);
assert!(pool.submit_one(created_block.hash, SOURCE, uxt(Alice, 1)).await.is_ok());
let header = client.header(created_block.hash).expect("db error").expect("imported above");
assert_eq!(header.number, 1);
pool.maintain(sc_transaction_pool_api::ChainEvent::NewBestBlock {
hash: header.hash(),
tree_route: None,
})
.await;
let (tx1, rx1) = futures::channel::oneshot::channel();
assert!(sink
.send(EngineCommand::SealNewBlock {
parent_hash: Some(created_block.hash),
sender: Some(tx1),
create_empty: false,
finalize: false,
})
.await
.is_ok());
assert_matches::assert_matches!(rx1.await.expect("should be no error receiving"), Ok(_));
assert!(pool.submit_one(created_block.hash, SOURCE, uxt(Bob, 0)).await.is_ok());
let (tx2, rx2) = futures::channel::oneshot::channel();
assert!(sink
.send(EngineCommand::SealNewBlock {
parent_hash: Some(created_block.hash),
sender: Some(tx2),
create_empty: false,
finalize: false,
})
.await
.is_ok());
let imported = rx2.await.unwrap().unwrap();
assert!(client.header(imported.hash).unwrap().is_some())
}
#[tokio::test]
async fn manual_seal_post_hash() {
let builder = TestClientBuilder::new();
let (client, select_chain) = builder.build_with_longest_chain();
let client = Arc::new(client);
let spawner = sp_core::testing::TaskExecutor::new();
let genesis_hash = client.header(client.info().genesis_hash).unwrap().unwrap().hash();
let pool = Arc::new(BasicPool::with_revalidation_type(
Options::default(),
true.into(),
api(),
None,
RevalidationType::Full,
spawner.clone(),
0,
genesis_hash,
genesis_hash,
));
let env = ProposerFactory::new(spawner.clone(), client.clone(), pool.clone(), None, None);
let (mut sink, commands_stream) = futures::channel::mpsc::channel(1024);
tokio::spawn(run_manual_seal(ManualSealParams {
block_import: client.clone(),
env,
client: client.clone(),
pool: pool.clone(),
commands_stream,
select_chain,
consensus_data_provider: Some(Box::new(TestDigestProvider { _client: client.clone() })),
create_inherent_data_providers: |_, _| async { Ok(()) },
}));
let (tx, rx) = futures::channel::oneshot::channel();
sink.send(EngineCommand::SealNewBlock {
parent_hash: None,
sender: Some(tx),
create_empty: true,
finalize: false,
})
.await
.unwrap();
let created_block = rx.await.unwrap().unwrap();
let header = client.header(created_block.hash).unwrap().unwrap();
assert_eq!(header.number, 1);
}
}