use codec::Encode;
use futures::{
channel::oneshot,
future,
future::{Future, FutureExt},
};
use log::{debug, error, info, trace, warn};
use sc_block_builder::{BlockBuilderApi, BlockBuilderBuilder};
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_INFO};
use sc_transaction_pool_api::{InPoolTransaction, TransactionPool};
use sp_api::{ApiExt, CallApiAt, ProvideRuntimeApi};
use sp_blockchain::{ApplyExtrinsicFailed::Validity, Error::ApplyExtrinsicFailed, HeaderBackend};
use sp_consensus::{DisableProofRecording, EnableProofRecording, ProofRecording, Proposal};
use sp_core::traits::SpawnNamed;
use sp_inherents::InherentData;
use sp_runtime::{
traits::{BlakeTwo256, Block as BlockT, Hash as HashT, Header as HeaderT},
Digest, ExtrinsicInclusionMode, Percent, SaturatedConversion,
};
use std::{marker::PhantomData, pin::Pin, sync::Arc, time};
use prometheus_endpoint::Registry as PrometheusRegistry;
use sc_proposer_metrics::{EndProposingReason, MetricsLink as PrometheusMetrics};
pub const DEFAULT_BLOCK_SIZE_LIMIT: usize = 4 * 1024 * 1024 + 512;
const DEFAULT_SOFT_DEADLINE_PERCENT: Percent = Percent::from_percent(50);
const LOG_TARGET: &'static str = "basic-authorship";
pub struct ProposerFactory<A, C, PR> {
spawn_handle: Box<dyn SpawnNamed>,
client: Arc<C>,
transaction_pool: Arc<A>,
metrics: PrometheusMetrics,
default_block_size_limit: usize,
soft_deadline_percent: Percent,
telemetry: Option<TelemetryHandle>,
include_proof_in_block_size_estimation: bool,
_phantom: PhantomData<PR>,
}
impl<A, C, PR> Clone for ProposerFactory<A, C, PR> {
fn clone(&self) -> Self {
Self {
spawn_handle: self.spawn_handle.clone(),
client: self.client.clone(),
transaction_pool: self.transaction_pool.clone(),
metrics: self.metrics.clone(),
default_block_size_limit: self.default_block_size_limit,
soft_deadline_percent: self.soft_deadline_percent,
telemetry: self.telemetry.clone(),
include_proof_in_block_size_estimation: self.include_proof_in_block_size_estimation,
_phantom: self._phantom,
}
}
}
impl<A, C> ProposerFactory<A, C, DisableProofRecording> {
pub fn new(
spawn_handle: impl SpawnNamed + 'static,
client: Arc<C>,
transaction_pool: Arc<A>,
prometheus: Option<&PrometheusRegistry>,
telemetry: Option<TelemetryHandle>,
) -> Self {
ProposerFactory {
spawn_handle: Box::new(spawn_handle),
transaction_pool,
metrics: PrometheusMetrics::new(prometheus),
default_block_size_limit: DEFAULT_BLOCK_SIZE_LIMIT,
soft_deadline_percent: DEFAULT_SOFT_DEADLINE_PERCENT,
telemetry,
client,
include_proof_in_block_size_estimation: false,
_phantom: PhantomData,
}
}
}
impl<A, C> ProposerFactory<A, C, EnableProofRecording> {
pub fn with_proof_recording(
spawn_handle: impl SpawnNamed + 'static,
client: Arc<C>,
transaction_pool: Arc<A>,
prometheus: Option<&PrometheusRegistry>,
telemetry: Option<TelemetryHandle>,
) -> Self {
ProposerFactory {
client,
spawn_handle: Box::new(spawn_handle),
transaction_pool,
metrics: PrometheusMetrics::new(prometheus),
default_block_size_limit: DEFAULT_BLOCK_SIZE_LIMIT,
soft_deadline_percent: DEFAULT_SOFT_DEADLINE_PERCENT,
telemetry,
include_proof_in_block_size_estimation: true,
_phantom: PhantomData,
}
}
pub fn disable_proof_in_block_size_estimation(&mut self) {
self.include_proof_in_block_size_estimation = false;
}
}
impl<A, C, PR> ProposerFactory<A, C, PR> {
pub fn set_default_block_size_limit(&mut self, limit: usize) {
self.default_block_size_limit = limit;
}
pub fn set_soft_deadline(&mut self, percent: Percent) {
self.soft_deadline_percent = percent;
}
}
impl<Block, C, A, PR> ProposerFactory<A, C, PR>
where
A: TransactionPool<Block = Block> + 'static,
Block: BlockT,
C: HeaderBackend<Block> + ProvideRuntimeApi<Block> + Send + Sync + 'static,
C::Api: ApiExt<Block> + BlockBuilderApi<Block>,
{
fn init_with_now(
&mut self,
parent_header: &<Block as BlockT>::Header,
now: Box<dyn Fn() -> time::Instant + Send + Sync>,
) -> Proposer<Block, C, A, PR> {
let parent_hash = parent_header.hash();
info!(
"๐ Starting consensus session on top of parent {:?} (#{})",
parent_hash,
parent_header.number()
);
let proposer = Proposer::<_, _, _, PR> {
spawn_handle: self.spawn_handle.clone(),
client: self.client.clone(),
parent_hash,
parent_number: *parent_header.number(),
transaction_pool: self.transaction_pool.clone(),
now,
metrics: self.metrics.clone(),
default_block_size_limit: self.default_block_size_limit,
soft_deadline_percent: self.soft_deadline_percent,
telemetry: self.telemetry.clone(),
_phantom: PhantomData,
include_proof_in_block_size_estimation: self.include_proof_in_block_size_estimation,
};
proposer
}
}
impl<A, Block, C, PR> sp_consensus::Environment<Block> for ProposerFactory<A, C, PR>
where
A: TransactionPool<Block = Block> + 'static,
Block: BlockT,
C: HeaderBackend<Block> + ProvideRuntimeApi<Block> + CallApiAt<Block> + Send + Sync + 'static,
C::Api: ApiExt<Block> + BlockBuilderApi<Block>,
PR: ProofRecording,
{
type CreateProposer = future::Ready<Result<Self::Proposer, Self::Error>>;
type Proposer = Proposer<Block, C, A, PR>;
type Error = sp_blockchain::Error;
fn init(&mut self, parent_header: &<Block as BlockT>::Header) -> Self::CreateProposer {
future::ready(Ok(self.init_with_now(parent_header, Box::new(time::Instant::now))))
}
}
pub struct Proposer<Block: BlockT, C, A: TransactionPool, PR> {
spawn_handle: Box<dyn SpawnNamed>,
client: Arc<C>,
parent_hash: Block::Hash,
parent_number: <<Block as BlockT>::Header as HeaderT>::Number,
transaction_pool: Arc<A>,
now: Box<dyn Fn() -> time::Instant + Send + Sync>,
metrics: PrometheusMetrics,
default_block_size_limit: usize,
include_proof_in_block_size_estimation: bool,
soft_deadline_percent: Percent,
telemetry: Option<TelemetryHandle>,
_phantom: PhantomData<PR>,
}
impl<A, Block, C, PR> sp_consensus::Proposer<Block> for Proposer<Block, C, A, PR>
where
A: TransactionPool<Block = Block> + 'static,
Block: BlockT,
C: HeaderBackend<Block> + ProvideRuntimeApi<Block> + CallApiAt<Block> + Send + Sync + 'static,
C::Api: ApiExt<Block> + BlockBuilderApi<Block>,
PR: ProofRecording,
{
type Proposal =
Pin<Box<dyn Future<Output = Result<Proposal<Block, PR::Proof>, Self::Error>> + Send>>;
type Error = sp_blockchain::Error;
type ProofRecording = PR;
type Proof = PR::Proof;
fn propose(
self,
inherent_data: InherentData,
inherent_digests: Digest,
max_duration: time::Duration,
block_size_limit: Option<usize>,
) -> Self::Proposal {
let (tx, rx) = oneshot::channel();
let spawn_handle = self.spawn_handle.clone();
spawn_handle.spawn_blocking(
"basic-authorship-proposer",
None,
Box::pin(async move {
let deadline = (self.now)() + max_duration - max_duration / 3;
let res = self
.propose_with(inherent_data, inherent_digests, deadline, block_size_limit)
.await;
if tx.send(res).is_err() {
trace!(
target: LOG_TARGET,
"Could not send block production result to proposer!"
);
}
}),
);
async move { rx.await? }.boxed()
}
}
const MAX_SKIPPED_TRANSACTIONS: usize = 8;
impl<A, Block, C, PR> Proposer<Block, C, A, PR>
where
A: TransactionPool<Block = Block>,
Block: BlockT,
C: HeaderBackend<Block> + ProvideRuntimeApi<Block> + CallApiAt<Block> + Send + Sync + 'static,
C::Api: ApiExt<Block> + BlockBuilderApi<Block>,
PR: ProofRecording,
{
async fn propose_with(
self,
inherent_data: InherentData,
inherent_digests: Digest,
deadline: time::Instant,
block_size_limit: Option<usize>,
) -> Result<Proposal<Block, PR::Proof>, sp_blockchain::Error> {
let block_timer = time::Instant::now();
let mut block_builder = BlockBuilderBuilder::new(&*self.client)
.on_parent_block(self.parent_hash)
.with_parent_block_number(self.parent_number)
.with_proof_recording(PR::ENABLED)
.with_inherent_digests(inherent_digests)
.build()?;
self.apply_inherents(&mut block_builder, inherent_data)?;
let mode = block_builder.extrinsic_inclusion_mode();
let end_reason = match mode {
ExtrinsicInclusionMode::AllExtrinsics =>
self.apply_extrinsics(&mut block_builder, deadline, block_size_limit).await?,
ExtrinsicInclusionMode::OnlyInherents => EndProposingReason::TransactionForbidden,
};
let (block, storage_changes, proof) = block_builder.build()?.into_inner();
let block_took = block_timer.elapsed();
let proof =
PR::into_proof(proof).map_err(|e| sp_blockchain::Error::Application(Box::new(e)))?;
self.print_summary(&block, end_reason, block_took, block_timer.elapsed());
Ok(Proposal { block, proof, storage_changes })
}
fn apply_inherents(
&self,
block_builder: &mut sc_block_builder::BlockBuilder<'_, Block, C>,
inherent_data: InherentData,
) -> Result<(), sp_blockchain::Error> {
let create_inherents_start = time::Instant::now();
let inherents = block_builder.create_inherents(inherent_data)?;
let create_inherents_end = time::Instant::now();
self.metrics.report(|metrics| {
metrics.create_inherents_time.observe(
create_inherents_end
.saturating_duration_since(create_inherents_start)
.as_secs_f64(),
);
});
for inherent in inherents {
match block_builder.push(inherent) {
Err(ApplyExtrinsicFailed(Validity(e))) if e.exhausted_resources() => {
warn!(
target: LOG_TARGET,
"โ ๏ธ Dropping non-mandatory inherent from overweight block."
)
},
Err(ApplyExtrinsicFailed(Validity(e))) if e.was_mandatory() => {
error!(
"โ๏ธ Mandatory inherent extrinsic returned error. Block cannot be produced."
);
return Err(ApplyExtrinsicFailed(Validity(e)))
},
Err(e) => {
warn!(
target: LOG_TARGET,
"โ๏ธ Inherent extrinsic returned unexpected error: {}. Dropping.", e
);
},
Ok(_) => {},
}
}
Ok(())
}
async fn apply_extrinsics(
&self,
block_builder: &mut sc_block_builder::BlockBuilder<'_, Block, C>,
deadline: time::Instant,
block_size_limit: Option<usize>,
) -> Result<EndProposingReason, sp_blockchain::Error> {
let now = (self.now)();
let left = deadline.saturating_duration_since(now);
let left_micros: u64 = left.as_micros().saturated_into();
let soft_deadline =
now + time::Duration::from_micros(self.soft_deadline_percent.mul_floor(left_micros));
let mut skipped = 0;
let mut unqueue_invalid = Vec::new();
let delay = deadline.saturating_duration_since((self.now)()) / 8;
let mut pending_iterator =
self.transaction_pool.ready_at_with_timeout(self.parent_hash, delay).await;
let block_size_limit = block_size_limit.unwrap_or(self.default_block_size_limit);
debug!(target: LOG_TARGET, "Attempting to push transactions from the pool at {:?}.", self.parent_hash);
let mut transaction_pushed = false;
let end_reason = loop {
let pending_tx = if let Some(pending_tx) = pending_iterator.next() {
pending_tx
} else {
debug!(
target: LOG_TARGET,
"No more transactions, proceeding with proposing."
);
break EndProposingReason::NoMoreTransactions
};
let now = (self.now)();
if now > deadline {
debug!(
target: LOG_TARGET,
"Consensus deadline reached when pushing block transactions, \
proceeding with proposing."
);
break EndProposingReason::HitDeadline
}
let pending_tx_data = (**pending_tx.data()).clone();
let pending_tx_hash = pending_tx.hash().clone();
let block_size =
block_builder.estimate_block_size(self.include_proof_in_block_size_estimation);
if block_size + pending_tx_data.encoded_size() > block_size_limit {
pending_iterator.report_invalid(&pending_tx);
if skipped < MAX_SKIPPED_TRANSACTIONS {
skipped += 1;
debug!(
target: LOG_TARGET,
"Transaction would overflow the block size limit, \
but will try {} more transactions before quitting.",
MAX_SKIPPED_TRANSACTIONS - skipped,
);
continue
} else if now < soft_deadline {
debug!(
target: LOG_TARGET,
"Transaction would overflow the block size limit, \
but we still have time before the soft deadline, so \
we will try a bit more."
);
continue
} else {
debug!(
target: LOG_TARGET,
"Reached block size limit, proceeding with proposing."
);
break EndProposingReason::HitBlockSizeLimit
}
}
trace!(target: LOG_TARGET, "[{:?}] Pushing to the block.", pending_tx_hash);
match sc_block_builder::BlockBuilder::push(block_builder, pending_tx_data) {
Ok(()) => {
transaction_pushed = true;
trace!(target: LOG_TARGET, "[{:?}] Pushed to the block.", pending_tx_hash);
},
Err(ApplyExtrinsicFailed(Validity(e))) if e.exhausted_resources() => {
pending_iterator.report_invalid(&pending_tx);
if skipped < MAX_SKIPPED_TRANSACTIONS {
skipped += 1;
debug!(target: LOG_TARGET,
"Block seems full, but will try {} more transactions before quitting.",
MAX_SKIPPED_TRANSACTIONS - skipped,
);
} else if (self.now)() < soft_deadline {
debug!(target: LOG_TARGET,
"Block seems full, but we still have time before the soft deadline, \
so we will try a bit more before quitting."
);
} else {
debug!(
target: LOG_TARGET,
"Reached block weight limit, proceeding with proposing."
);
break EndProposingReason::HitBlockWeightLimit
}
},
Err(e) => {
pending_iterator.report_invalid(&pending_tx);
debug!(
target: LOG_TARGET,
"[{:?}] Invalid transaction: {} at: {}", pending_tx_hash, e, self.parent_hash
);
unqueue_invalid.push(pending_tx_hash);
},
}
};
if matches!(end_reason, EndProposingReason::HitBlockSizeLimit) && !transaction_pushed {
warn!(
target: LOG_TARGET,
"Hit block size limit of `{}` without including any transaction!", block_size_limit,
);
}
self.transaction_pool.remove_invalid(&unqueue_invalid);
Ok(end_reason)
}
fn print_summary(
&self,
block: &Block,
end_reason: EndProposingReason,
block_took: time::Duration,
propose_took: time::Duration,
) {
let extrinsics = block.extrinsics();
self.metrics.report(|metrics| {
metrics.number_of_transactions.set(extrinsics.len() as u64);
metrics.block_constructed.observe(block_took.as_secs_f64());
metrics.report_end_proposing_reason(end_reason);
metrics.create_block_proposal_time.observe(propose_took.as_secs_f64());
});
let extrinsics_summary = if extrinsics.is_empty() {
"no extrinsics".to_string()
} else {
format!(
"extrinsics ({}): [{}]",
extrinsics.len(),
extrinsics
.iter()
.map(|xt| BlakeTwo256::hash_of(xt).to_string())
.collect::<Vec<_>>()
.join(", ")
)
};
if log::log_enabled!(log::Level::Info) {
info!(
"๐ Prepared block for proposing at {} ({} ms) hash: {:?}; parent_hash: {}; end: {:?}; extrinsics_count: {}",
block.header().number(),
block_took.as_millis(),
<Block as BlockT>::Hash::from(block.header().hash()),
block.header().parent_hash(),
end_reason,
extrinsics.len()
)
} else if log::log_enabled!(log::Level::Trace) {
trace!(
"๐ Prepared block for proposing at {} ({} ms) hash: {:?}; parent_hash: {}; end: {:?}; {extrinsics_summary}",
block.header().number(),
block_took.as_millis(),
<Block as BlockT>::Hash::from(block.header().hash()),
block.header().parent_hash(),
end_reason
);
}
telemetry!(
self.telemetry;
CONSENSUS_INFO;
"prepared_block_for_proposing";
"number" => ?block.header().number(),
"hash" => ?<Block as BlockT>::Hash::from(block.header().hash()),
);
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::executor::block_on;
use parking_lot::Mutex;
use sc_client_api::Backend;
use sc_transaction_pool::BasicPool;
use sc_transaction_pool_api::{ChainEvent, MaintainedTransactionPool, TransactionSource};
use sp_api::Core;
use sp_blockchain::HeaderBackend;
use sp_consensus::{BlockOrigin, Environment, Proposer};
use sp_runtime::{generic::BlockId, traits::NumberFor, Perbill};
use substrate_test_runtime_client::{
prelude::*,
runtime::{Block as TestBlock, Extrinsic, ExtrinsicBuilder, Transfer},
TestClientBuilder, TestClientBuilderExt,
};
const SOURCE: TransactionSource = TransactionSource::External;
const HUGE: u32 = 649000000;
const MEDIUM: u32 = 250000000;
const TINY: u32 = 1000;
fn extrinsic(nonce: u64) -> Extrinsic {
ExtrinsicBuilder::new_fill_block(Perbill::from_parts(TINY)).nonce(nonce).build()
}
fn chain_event<B: BlockT>(header: B::Header) -> ChainEvent<B>
where
NumberFor<B>: From<u64>,
{
ChainEvent::NewBestBlock { hash: header.hash(), tree_route: None }
}
#[test]
fn should_cease_building_block_when_deadline_is_reached() {
let client = Arc::new(substrate_test_runtime_client::new());
let spawner = sp_core::testing::TaskExecutor::new();
let txpool = Arc::from(BasicPool::new_full(
Default::default(),
true.into(),
None,
spawner.clone(),
client.clone(),
));
let hashof0 = client.info().genesis_hash;
block_on(txpool.submit_at(hashof0, SOURCE, vec![extrinsic(0), extrinsic(1)])).unwrap();
block_on(
txpool.maintain(chain_event(
client.expect_header(hashof0).expect("there should be header"),
)),
);
let mut proposer_factory =
ProposerFactory::new(spawner.clone(), client.clone(), txpool.clone(), None, None);
let cell = Mutex::new((false, time::Instant::now()));
let proposer = proposer_factory.init_with_now(
&client.expect_header(hashof0).unwrap(),
Box::new(move || {
let mut value = cell.lock();
if !value.0 {
value.0 = true;
return value.1
}
let old = value.1;
let new = old + time::Duration::from_secs(1);
*value = (true, new);
old
}),
);
let deadline = time::Duration::from_secs(3);
let block =
block_on(proposer.propose(Default::default(), Default::default(), deadline, None))
.map(|r| r.block)
.unwrap();
assert_eq!(block.extrinsics().len(), 1);
assert_eq!(txpool.ready().count(), 2);
}
#[test]
fn should_not_panic_when_deadline_is_reached() {
let client = Arc::new(substrate_test_runtime_client::new());
let spawner = sp_core::testing::TaskExecutor::new();
let txpool = Arc::from(BasicPool::new_full(
Default::default(),
true.into(),
None,
spawner.clone(),
client.clone(),
));
let mut proposer_factory =
ProposerFactory::new(spawner.clone(), client.clone(), txpool.clone(), None, None);
let cell = Mutex::new((false, time::Instant::now()));
let proposer = proposer_factory.init_with_now(
&client.expect_header(client.info().genesis_hash).unwrap(),
Box::new(move || {
let mut value = cell.lock();
if !value.0 {
value.0 = true;
return value.1
}
let new = value.1 + time::Duration::from_secs(160);
*value = (true, new);
new
}),
);
let deadline = time::Duration::from_secs(1);
block_on(proposer.propose(Default::default(), Default::default(), deadline, None))
.map(|r| r.block)
.unwrap();
}
#[test]
fn proposed_storage_changes_should_match_execute_block_storage_changes() {
let (client, backend) = TestClientBuilder::new().build_with_backend();
let client = Arc::new(client);
let spawner = sp_core::testing::TaskExecutor::new();
let txpool = Arc::from(BasicPool::new_full(
Default::default(),
true.into(),
None,
spawner.clone(),
client.clone(),
));
let genesis_hash = client.info().best_hash;
block_on(txpool.submit_at(genesis_hash, SOURCE, vec![extrinsic(0)])).unwrap();
block_on(
txpool.maintain(chain_event(
client
.expect_header(client.info().genesis_hash)
.expect("there should be header"),
)),
);
let mut proposer_factory =
ProposerFactory::new(spawner.clone(), client.clone(), txpool.clone(), None, None);
let proposer = proposer_factory.init_with_now(
&client.header(genesis_hash).unwrap().unwrap(),
Box::new(move || time::Instant::now()),
);
let deadline = time::Duration::from_secs(9);
let proposal =
block_on(proposer.propose(Default::default(), Default::default(), deadline, None))
.unwrap();
assert_eq!(proposal.block.extrinsics().len(), 1);
let api = client.runtime_api();
api.execute_block(genesis_hash, proposal.block).unwrap();
let state = backend.state_at(genesis_hash).unwrap();
let storage_changes = api.into_storage_changes(&state, genesis_hash).unwrap();
assert_eq!(
proposal.storage_changes.transaction_storage_root,
storage_changes.transaction_storage_root,
);
}
#[test]
fn should_not_remove_invalid_transactions_from_the_same_sender_after_one_was_invalid() {
let client = Arc::new(substrate_test_runtime_client::new());
let spawner = sp_core::testing::TaskExecutor::new();
let txpool = Arc::from(BasicPool::new_full(
Default::default(),
true.into(),
None,
spawner.clone(),
client.clone(),
));
let medium = |nonce| {
ExtrinsicBuilder::new_fill_block(Perbill::from_parts(MEDIUM))
.nonce(nonce)
.build()
};
let huge = |nonce| {
ExtrinsicBuilder::new_fill_block(Perbill::from_parts(HUGE)).nonce(nonce).build()
};
block_on(txpool.submit_at(
client.info().genesis_hash,
SOURCE,
vec![medium(0), medium(1), huge(2), medium(3), huge(4), medium(5), medium(6)],
))
.unwrap();
let mut proposer_factory =
ProposerFactory::new(spawner.clone(), client.clone(), txpool.clone(), None, None);
let mut propose_block = |client: &TestClient,
parent_number,
expected_block_extrinsics,
expected_pool_transactions| {
let hash = client.expect_block_hash_from_id(&BlockId::Number(parent_number)).unwrap();
let proposer = proposer_factory.init_with_now(
&client.expect_header(hash).unwrap(),
Box::new(move || time::Instant::now()),
);
let deadline = time::Duration::from_secs(900);
let block =
block_on(proposer.propose(Default::default(), Default::default(), deadline, None))
.map(|r| r.block)
.unwrap();
assert_eq!(
txpool.ready().count(),
expected_pool_transactions,
"at block: {}",
block.header.number
);
assert_eq!(
block.extrinsics().len(),
expected_block_extrinsics,
"at block: {}",
block.header.number
);
block
};
let import_and_maintain = |client: Arc<TestClient>, block: TestBlock| {
let hash = block.hash();
block_on(client.import(BlockOrigin::Own, block)).unwrap();
block_on(txpool.maintain(chain_event(
client.expect_header(hash).expect("there should be header"),
)));
};
block_on(
txpool.maintain(chain_event(
client
.expect_header(client.info().genesis_hash)
.expect("there should be header"),
)),
);
assert_eq!(txpool.ready().count(), 7);
let block = propose_block(&client, 0, 2, 7);
import_and_maintain(client.clone(), block.clone());
assert_eq!(txpool.ready().count(), 5);
let block = propose_block(&client, 1, 1, 5);
import_and_maintain(client.clone(), block.clone());
assert_eq!(txpool.ready().count(), 4);
let block = propose_block(&client, 2, 1, 4);
import_and_maintain(client.clone(), block.clone());
assert_eq!(txpool.ready().count(), 3);
let block = propose_block(&client, 3, 1, 3);
import_and_maintain(client.clone(), block.clone());
assert_eq!(txpool.ready().count(), 2);
let block = propose_block(&client, 4, 2, 2);
import_and_maintain(client.clone(), block.clone());
assert_eq!(txpool.ready().count(), 0);
}
#[test]
fn should_cease_building_block_when_block_limit_is_reached() {
let client = Arc::new(substrate_test_runtime_client::new());
let spawner = sp_core::testing::TaskExecutor::new();
let txpool = Arc::from(BasicPool::new_full(
Default::default(),
true.into(),
None,
spawner.clone(),
client.clone(),
));
let genesis_hash = client.info().genesis_hash;
let genesis_header = client.expect_header(genesis_hash).expect("there should be header");
let extrinsics_num = 5;
let extrinsics = std::iter::once(
Transfer {
from: Sr25519Keyring::Alice.into(),
to: Sr25519Keyring::Bob.into(),
amount: 100,
nonce: 0,
}
.into_unchecked_extrinsic(),
)
.chain((1..extrinsics_num as u64).map(extrinsic))
.collect::<Vec<_>>();
let block_limit = genesis_header.encoded_size() +
extrinsics
.iter()
.take(extrinsics_num - 1)
.map(Encode::encoded_size)
.sum::<usize>() +
Vec::<Extrinsic>::new().encoded_size();
block_on(txpool.submit_at(genesis_hash, SOURCE, extrinsics.clone())).unwrap();
block_on(txpool.maintain(chain_event(genesis_header.clone())));
let mut proposer_factory =
ProposerFactory::new(spawner.clone(), client.clone(), txpool.clone(), None, None);
let proposer = block_on(proposer_factory.init(&genesis_header)).unwrap();
let deadline = time::Duration::from_secs(300);
let block = block_on(proposer.propose(
Default::default(),
Default::default(),
deadline,
Some(block_limit),
))
.map(|r| r.block)
.unwrap();
assert_eq!(block.extrinsics().len(), extrinsics_num - 1);
let proposer = block_on(proposer_factory.init(&genesis_header)).unwrap();
let block =
block_on(proposer.propose(Default::default(), Default::default(), deadline, None))
.map(|r| r.block)
.unwrap();
assert_eq!(block.extrinsics().len(), extrinsics_num);
let mut proposer_factory = ProposerFactory::with_proof_recording(
spawner.clone(),
client.clone(),
txpool.clone(),
None,
None,
);
let proposer = block_on(proposer_factory.init(&genesis_header)).unwrap();
let block_limit = {
let builder = BlockBuilderBuilder::new(&*client)
.on_parent_block(genesis_header.hash())
.with_parent_block_number(0)
.enable_proof_recording()
.build()
.unwrap();
builder.estimate_block_size(true) + extrinsics[0].encoded_size()
};
let block = block_on(proposer.propose(
Default::default(),
Default::default(),
deadline,
Some(block_limit),
))
.map(|r| r.block)
.unwrap();
assert_eq!(block.extrinsics().len(), 1);
}
#[test]
fn should_keep_adding_transactions_after_exhausts_resources_before_soft_deadline() {
let client = Arc::new(substrate_test_runtime_client::new());
let spawner = sp_core::testing::TaskExecutor::new();
let txpool = Arc::from(BasicPool::new_full(
Default::default(),
true.into(),
None,
spawner.clone(),
client.clone(),
));
let genesis_hash = client.info().genesis_hash;
let tiny = |nonce| {
ExtrinsicBuilder::new_fill_block(Perbill::from_parts(TINY)).nonce(nonce).build()
};
let huge = |who| {
ExtrinsicBuilder::new_fill_block(Perbill::from_parts(HUGE))
.signer(Sr25519Keyring::numeric(who))
.build()
};
block_on(
txpool.submit_at(
genesis_hash,
SOURCE,
(0..MAX_SKIPPED_TRANSACTIONS * 2)
.into_iter()
.map(huge)
.chain((0..MAX_SKIPPED_TRANSACTIONS as u64).into_iter().map(tiny))
.collect(),
),
)
.unwrap();
block_on(txpool.maintain(chain_event(
client.expect_header(genesis_hash).expect("there should be header"),
)));
assert_eq!(txpool.ready().count(), MAX_SKIPPED_TRANSACTIONS * 3);
let mut proposer_factory =
ProposerFactory::new(spawner.clone(), client.clone(), txpool.clone(), None, None);
let cell = Mutex::new(time::Instant::now());
let proposer = proposer_factory.init_with_now(
&client.expect_header(genesis_hash).unwrap(),
Box::new(move || {
let mut value = cell.lock();
let old = *value;
*value = old + time::Duration::from_secs(1);
old
}),
);
let deadline = time::Duration::from_secs(900);
let block =
block_on(proposer.propose(Default::default(), Default::default(), deadline, None))
.map(|r| r.block)
.unwrap();
assert_eq!(block.extrinsics().len(), MAX_SKIPPED_TRANSACTIONS + 1);
}
#[test]
fn should_only_skip_up_to_some_limit_after_soft_deadline() {
let client = Arc::new(substrate_test_runtime_client::new());
let spawner = sp_core::testing::TaskExecutor::new();
let txpool = Arc::from(BasicPool::new_full(
Default::default(),
true.into(),
None,
spawner.clone(),
client.clone(),
));
let genesis_hash = client.info().genesis_hash;
let tiny = |who| {
ExtrinsicBuilder::new_fill_block(Perbill::from_parts(TINY))
.signer(Sr25519Keyring::numeric(who))
.nonce(1)
.build()
};
let huge = |who| {
ExtrinsicBuilder::new_fill_block(Perbill::from_parts(HUGE))
.signer(Sr25519Keyring::numeric(who))
.build()
};
block_on(
txpool.submit_at(
genesis_hash,
SOURCE,
(0..MAX_SKIPPED_TRANSACTIONS + 2)
.into_iter()
.map(huge)
.chain((0..MAX_SKIPPED_TRANSACTIONS + 2).into_iter().map(tiny))
.collect(),
),
)
.unwrap();
block_on(txpool.maintain(chain_event(
client.expect_header(genesis_hash).expect("there should be header"),
)));
assert_eq!(txpool.ready().count(), MAX_SKIPPED_TRANSACTIONS * 2 + 4);
let mut proposer_factory =
ProposerFactory::new(spawner.clone(), client.clone(), txpool.clone(), None, None);
let deadline = time::Duration::from_secs(600);
let cell = Arc::new(Mutex::new((0, time::Instant::now())));
let cell2 = cell.clone();
let proposer = proposer_factory.init_with_now(
&client.expect_header(genesis_hash).unwrap(),
Box::new(move || {
let mut value = cell.lock();
let (called, old) = *value;
let increase = if called == 1 {
deadline / 2
} else {
time::Duration::from_millis(0)
};
*value = (called + 1, old + increase);
old
}),
);
let block =
block_on(proposer.propose(Default::default(), Default::default(), deadline, None))
.map(|r| r.block)
.unwrap();
assert!(
(1..3).contains(&block.extrinsics().len()),
"Block shall contain one or two extrinsics."
);
assert!(
cell2.lock().0 > MAX_SKIPPED_TRANSACTIONS,
"Not enough calls to current time, which indicates the test might have ended because of deadline, not soft deadline"
);
}
}