1use futures::prelude::*;
23use futures_timer::Delay;
24use prometheus_endpoint::Registry;
25use sc_client_api::{
26 backend::{Backend as ClientBackend, Finalizer},
27 client::BlockchainEvents,
28};
29use sc_consensus::{
30 block_import::{BlockImport, BlockImportParams, ForkChoiceStrategy},
31 import_queue::{BasicQueue, BoxBlockImport, Verifier},
32};
33use sp_blockchain::HeaderBackend;
34use sp_consensus::{Environment, Proposer, SelectChain};
35use sp_core::traits::SpawnNamed;
36use sp_inherents::CreateInherentDataProviders;
37use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
38use std::{marker::PhantomData, sync::Arc, time::Duration};
39
40mod error;
41mod finalize_block;
42mod seal_block;
43
44pub mod consensus;
45pub mod rpc;
46
47pub use self::{
48 consensus::ConsensusDataProvider,
49 error::Error,
50 finalize_block::{finalize_block, FinalizeBlockParams},
51 rpc::{CreatedBlock, EngineCommand},
52 seal_block::{seal_block, SealBlockParams, MAX_PROPOSAL_DURATION},
53};
54use sc_transaction_pool_api::TransactionPool;
55use sp_api::ProvideRuntimeApi;
56
57const LOG_TARGET: &str = "manual-seal";
58
59pub const MANUAL_SEAL_ENGINE_ID: ConsensusEngineId = [b'm', b'a', b'n', b'l'];
61
62struct ManualSealVerifier;
64
65#[async_trait::async_trait]
66impl<B: BlockT> Verifier<B> for ManualSealVerifier {
67 async fn verify(
68 &self,
69 mut block: BlockImportParams<B>,
70 ) -> Result<BlockImportParams<B>, String> {
71 block.finalized = false;
72 block.fork_choice = Some(ForkChoiceStrategy::LongestChain);
73 Ok(block)
74 }
75}
76
77pub fn import_queue<Block>(
79 block_import: BoxBlockImport<Block>,
80 spawner: &impl sp_core::traits::SpawnEssentialNamed,
81 registry: Option<&Registry>,
82) -> BasicQueue<Block>
83where
84 Block: BlockT,
85{
86 BasicQueue::new(ManualSealVerifier, block_import, None, spawner, registry)
87}
88
89pub struct ManualSealParams<B: BlockT, BI, E, C: ProvideRuntimeApi<B>, TP, SC, CS, CIDP> {
91 pub block_import: BI,
93
94 pub env: E,
96
97 pub client: Arc<C>,
99
100 pub pool: Arc<TP>,
102
103 pub commands_stream: CS,
106
107 pub select_chain: SC,
109
110 pub consensus_data_provider: Option<Box<dyn ConsensusDataProvider<B>>>,
112
113 pub create_inherent_data_providers: CIDP,
115}
116
117pub struct InstantSealParams<B: BlockT, BI, E, C: ProvideRuntimeApi<B>, TP, SC, CIDP> {
119 pub block_import: BI,
121
122 pub env: E,
124
125 pub client: Arc<C>,
127
128 pub pool: Arc<TP>,
130
131 pub select_chain: SC,
133
134 pub consensus_data_provider: Option<Box<dyn ConsensusDataProvider<B>>>,
136
137 pub create_inherent_data_providers: CIDP,
139}
140
141pub struct DelayedFinalizeParams<C, S> {
143 pub client: Arc<C>,
145
146 pub spawn_handle: S,
148
149 pub delay_sec: u64,
151}
152
153pub async fn run_manual_seal<B, BI, CB, E, C, TP, SC, CS, CIDP>(
155 ManualSealParams {
156 mut block_import,
157 mut env,
158 client,
159 pool,
160 mut commands_stream,
161 select_chain,
162 consensus_data_provider,
163 create_inherent_data_providers,
164 }: ManualSealParams<B, BI, E, C, TP, SC, CS, CIDP>,
165) where
166 B: BlockT + 'static,
167 BI: BlockImport<B, Error = sp_consensus::Error> + Send + Sync + 'static,
168 C: HeaderBackend<B> + Finalizer<B, CB> + ProvideRuntimeApi<B> + 'static,
169 CB: ClientBackend<B> + 'static,
170 E: Environment<B> + 'static,
171 E::Proposer: Proposer<B>,
172 CS: Stream<Item = EngineCommand<<B as BlockT>::Hash>> + Unpin + 'static,
173 SC: SelectChain<B> + 'static,
174 TP: TransactionPool<Block = B>,
175 CIDP: CreateInherentDataProviders<B, ()>,
176{
177 while let Some(command) = commands_stream.next().await {
178 match command {
179 EngineCommand::SealNewBlock { create_empty, finalize, parent_hash, sender } => {
180 seal_block(SealBlockParams {
181 sender,
182 parent_hash,
183 finalize,
184 create_empty,
185 env: &mut env,
186 select_chain: &select_chain,
187 block_import: &mut block_import,
188 consensus_data_provider: consensus_data_provider.as_deref(),
189 pool: pool.clone(),
190 client: client.clone(),
191 create_inherent_data_providers: &create_inherent_data_providers,
192 })
193 .await;
194 },
195 EngineCommand::FinalizeBlock { hash, sender, justification } => {
196 let justification = justification.map(|j| (MANUAL_SEAL_ENGINE_ID, j));
197 finalize_block(FinalizeBlockParams {
198 hash,
199 sender,
200 justification,
201 finalizer: client.clone(),
202 _phantom: PhantomData,
203 })
204 .await
205 },
206 }
207 }
208}
209
210pub async fn run_instant_seal<B, BI, CB, E, C, TP, SC, CIDP>(
214 InstantSealParams {
215 block_import,
216 env,
217 client,
218 pool,
219 select_chain,
220 consensus_data_provider,
221 create_inherent_data_providers,
222 }: InstantSealParams<B, BI, E, C, TP, SC, CIDP>,
223) where
224 B: BlockT + 'static,
225 BI: BlockImport<B, Error = sp_consensus::Error> + Send + Sync + 'static,
226 C: HeaderBackend<B> + Finalizer<B, CB> + ProvideRuntimeApi<B> + 'static,
227 CB: ClientBackend<B> + 'static,
228 E: Environment<B> + 'static,
229 E::Proposer: Proposer<B>,
230 SC: SelectChain<B> + 'static,
231 TP: TransactionPool<Block = B>,
232 CIDP: CreateInherentDataProviders<B, ()>,
233{
234 let commands_stream = pool.import_notification_stream().map(|_| EngineCommand::SealNewBlock {
237 create_empty: true,
238 finalize: false,
239 parent_hash: None,
240 sender: None,
241 });
242
243 run_manual_seal(ManualSealParams {
244 block_import,
245 env,
246 client,
247 pool,
248 commands_stream,
249 select_chain,
250 consensus_data_provider,
251 create_inherent_data_providers,
252 })
253 .await
254}
255
256pub async fn run_instant_seal_and_finalize<B, BI, CB, E, C, TP, SC, CIDP>(
263 InstantSealParams {
264 block_import,
265 env,
266 client,
267 pool,
268 select_chain,
269 consensus_data_provider,
270 create_inherent_data_providers,
271 }: InstantSealParams<B, BI, E, C, TP, SC, CIDP>,
272) where
273 B: BlockT + 'static,
274 BI: BlockImport<B, Error = sp_consensus::Error> + Send + Sync + 'static,
275 C: HeaderBackend<B> + Finalizer<B, CB> + ProvideRuntimeApi<B> + 'static,
276 CB: ClientBackend<B> + 'static,
277 E: Environment<B> + 'static,
278 E::Proposer: Proposer<B>,
279 SC: SelectChain<B> + 'static,
280 TP: TransactionPool<Block = B>,
281 CIDP: CreateInherentDataProviders<B, ()>,
282{
283 let commands_stream = pool.import_notification_stream().map(|_| EngineCommand::SealNewBlock {
286 create_empty: false,
287 finalize: true,
288 parent_hash: None,
289 sender: None,
290 });
291
292 run_manual_seal(ManualSealParams {
293 block_import,
294 env,
295 client,
296 pool,
297 commands_stream,
298 select_chain,
299 consensus_data_provider,
300 create_inherent_data_providers,
301 })
302 .await
303}
304
305pub async fn run_delayed_finalize<B, CB, C, S>(
313 DelayedFinalizeParams { client, spawn_handle, delay_sec }: DelayedFinalizeParams<C, S>,
314) where
315 B: BlockT + 'static,
316 CB: ClientBackend<B> + 'static,
317 C: HeaderBackend<B> + Finalizer<B, CB> + ProvideRuntimeApi<B> + BlockchainEvents<B> + 'static,
318 S: SpawnNamed,
319{
320 let mut block_import_stream = client.import_notification_stream();
321
322 while let Some(notification) = block_import_stream.next().await {
323 let delay = Delay::new(Duration::from_secs(delay_sec));
324 let cloned_client = client.clone();
325 spawn_handle.spawn(
326 "delayed-finalize",
327 None,
328 Box::pin(async move {
329 delay.await;
330 finalize_block(FinalizeBlockParams {
331 hash: notification.hash,
332 sender: None,
333 justification: None,
334 finalizer: cloned_client,
335 _phantom: PhantomData,
336 })
337 .await
338 }),
339 );
340 }
341}
342
343#[cfg(test)]
344mod tests {
345 use super::*;
346 use assert_matches::assert_matches;
347 use sc_basic_authorship::ProposerFactory;
348 use sc_consensus::ImportedAux;
349 use sc_transaction_pool::{BasicPool, FullChainApi, Options, RevalidationType};
350 use sc_transaction_pool_api::{MaintainedTransactionPool, TransactionPool, TransactionSource};
351 use sp_api::StorageProof;
352 use sp_inherents::InherentData;
353 use sp_runtime::generic::{Digest, DigestItem};
354 use substrate_test_runtime_client::{
355 DefaultTestClientBuilderExt, Sr25519Keyring::*, TestClientBuilder, TestClientBuilderExt,
356 };
357 use substrate_test_runtime_transaction_pool::{uxt, TestApi};
358
359 fn api() -> Arc<TestApi> {
360 Arc::new(TestApi::empty())
361 }
362
363 const SOURCE: TransactionSource = TransactionSource::External;
364
365 struct TestDigestProvider<C> {
366 _client: Arc<C>,
367 }
368 impl<B, C> ConsensusDataProvider<B> for TestDigestProvider<C>
369 where
370 B: BlockT,
371 C: ProvideRuntimeApi<B> + Send + Sync,
372 {
373 fn create_digest(
374 &self,
375 _parent: &B::Header,
376 _inherents: &InherentData,
377 ) -> Result<Digest, Error> {
378 Ok(Digest { logs: vec![] })
379 }
380
381 fn append_block_import(
382 &self,
383 _parent: &B::Header,
384 params: &mut BlockImportParams<B>,
385 _inherents: &InherentData,
386 _proof: StorageProof,
387 ) -> Result<(), Error> {
388 params.post_digests.push(DigestItem::Other(vec![1]));
389 Ok(())
390 }
391 }
392
393 #[tokio::test]
394 async fn instant_seal() {
395 let builder = TestClientBuilder::new();
396 let (client, select_chain) = builder.build_with_longest_chain();
397 let client = Arc::new(client);
398 let spawner = sp_core::testing::TaskExecutor::new();
399 let genesis_hash = client.info().genesis_hash;
400 let pool_api = Arc::new(FullChainApi::new(client.clone(), None, &spawner.clone()));
401 let pool = Arc::new(BasicPool::with_revalidation_type(
402 Options::default(),
403 true.into(),
404 pool_api,
405 None,
406 RevalidationType::Full,
407 spawner.clone(),
408 0,
409 genesis_hash,
410 genesis_hash,
411 ));
412 let env = ProposerFactory::new(spawner.clone(), client.clone(), pool.clone(), None, None);
413 let (sender, receiver) = futures::channel::oneshot::channel();
416 let mut sender = Arc::new(Some(sender));
417 let commands_stream =
418 pool.pool().validated_pool().import_notification_stream().map(move |_| {
419 let mut_sender = Arc::get_mut(&mut sender).unwrap();
421 let sender = std::mem::take(mut_sender);
422 EngineCommand::SealNewBlock {
423 create_empty: false,
424 finalize: true,
425 parent_hash: None,
426 sender,
427 }
428 });
429
430 tokio::spawn(run_manual_seal(ManualSealParams {
432 block_import: client.clone(),
433 env,
434 client: client.clone(),
435 pool: pool.clone(),
436 commands_stream,
437 select_chain,
438 create_inherent_data_providers: |_, _| async { Ok(()) },
439 consensus_data_provider: None,
440 }));
441
442 let result = pool.submit_one(genesis_hash, SOURCE, uxt(Alice, 0)).await;
444 assert!(result.is_ok());
446 let created_block = receiver.await.unwrap().unwrap();
448 assert_matches!(
449 created_block,
450 CreatedBlock {
451 hash: _,
452 aux: ImportedAux {
453 header_only: false,
454 clear_justification_requests: false,
455 needs_justification: false,
456 bad_justification: false,
457 is_new_best: true,
458 },
459 proof_size: _
460 }
461 );
462 assert!(client.header(created_block.hash).unwrap().is_some());
464 assert_eq!(client.header(created_block.hash).unwrap().unwrap().number, 1)
465 }
466
467 #[allow(unused)]
471 async fn instant_seal_delayed_finalize() {
472 let builder = TestClientBuilder::new();
473 let (client, select_chain) = builder.build_with_longest_chain();
474 let client = Arc::new(client);
475 let spawner = sp_core::testing::TaskExecutor::new();
476 let genesis_hash = client.info().genesis_hash;
477 let pool_api = Arc::new(FullChainApi::new(client.clone(), None, &spawner.clone()));
478 let pool = Arc::new(BasicPool::with_revalidation_type(
479 Options::default(),
480 true.into(),
481 pool_api,
482 None,
483 RevalidationType::Full,
484 spawner.clone(),
485 0,
486 genesis_hash,
487 genesis_hash,
488 ));
489 let env = ProposerFactory::new(spawner.clone(), client.clone(), pool.clone(), None, None);
490 let (sender, receiver) = futures::channel::oneshot::channel();
493 let mut sender = Arc::new(Some(sender));
494 let commands_stream =
495 pool.pool().validated_pool().import_notification_stream().map(move |_| {
496 let mut_sender = Arc::get_mut(&mut sender).unwrap();
498 let sender = std::mem::take(mut_sender);
499 EngineCommand::SealNewBlock {
500 create_empty: false,
501 finalize: false,
503 parent_hash: None,
504 sender,
505 }
506 });
507
508 tokio::spawn(run_manual_seal(ManualSealParams {
510 block_import: client.clone(),
511 commands_stream,
512 env,
513 client: client.clone(),
514 pool: pool.clone(),
515 select_chain,
516 create_inherent_data_providers: |_, _| async { Ok(()) },
517 consensus_data_provider: None,
518 }));
519
520 let delay_sec = 5;
521
522 tokio::spawn(run_delayed_finalize(DelayedFinalizeParams {
524 client: client.clone(),
525 delay_sec,
526 spawn_handle: spawner,
527 }));
528
529 let mut finality_stream = client.finality_notification_stream();
530 let result = pool.submit_one(genesis_hash, SOURCE, uxt(Alice, 0)).await;
532 assert!(result.is_ok());
534 let created_block = receiver.await.unwrap().unwrap();
536 assert_eq!(
537 created_block,
538 CreatedBlock {
539 hash: created_block.hash,
540 aux: ImportedAux {
541 header_only: false,
542 clear_justification_requests: false,
543 needs_justification: false,
544 bad_justification: false,
545 is_new_best: true,
546 },
547 proof_size: created_block.proof_size
548 }
549 );
550 assert!(client.header(created_block.hash).unwrap().is_some());
552 assert_eq!(client.header(created_block.hash).unwrap().unwrap().number, 1);
553
554 assert_eq!(client.info().finalized_hash, client.info().genesis_hash);
555
556 let finalized = finality_stream.select_next_some().await;
557 assert_eq!(finalized.hash, created_block.hash);
558 }
559
560 #[tokio::test]
561 async fn manual_seal_and_finalization() {
562 let builder = TestClientBuilder::new();
563 let (client, select_chain) = builder.build_with_longest_chain();
564 let client = Arc::new(client);
565 let spawner = sp_core::testing::TaskExecutor::new();
566 let genesis_hash = client.info().genesis_hash;
567 let pool_api = Arc::new(FullChainApi::new(client.clone(), None, &spawner.clone()));
568 let pool = Arc::new(BasicPool::with_revalidation_type(
569 Options::default(),
570 true.into(),
571 pool_api,
572 None,
573 RevalidationType::Full,
574 spawner.clone(),
575 0,
576 genesis_hash,
577 genesis_hash,
578 ));
579 let env = ProposerFactory::new(spawner.clone(), client.clone(), pool.clone(), None, None);
580 let (mut sink, commands_stream) = futures::channel::mpsc::channel(1024);
583
584 tokio::spawn(run_manual_seal(ManualSealParams {
586 block_import: client.clone(),
587 env,
588 client: client.clone(),
589 pool: pool.clone(),
590 commands_stream,
591 select_chain,
592 consensus_data_provider: None,
593 create_inherent_data_providers: |_, _| async { Ok(()) },
594 }));
595
596 let result = pool.submit_one(genesis_hash, SOURCE, uxt(Alice, 0)).await;
598 assert!(result.is_ok());
600 let (tx, rx) = futures::channel::oneshot::channel();
601 sink.send(EngineCommand::SealNewBlock {
602 parent_hash: None,
603 sender: Some(tx),
604 create_empty: false,
605 finalize: false,
606 })
607 .await
608 .unwrap();
609 let created_block = rx.await.unwrap().unwrap();
610
611 assert_matches!(
613 created_block,
614 CreatedBlock {
615 hash: _,
616 aux: ImportedAux {
617 header_only: false,
618 clear_justification_requests: false,
619 needs_justification: false,
620 bad_justification: false,
621 is_new_best: true,
622 },
623 proof_size: _
624 }
625 );
626 let header = client.header(created_block.hash).unwrap().unwrap();
628 let (tx, rx) = futures::channel::oneshot::channel();
629 sink.send(EngineCommand::FinalizeBlock {
630 sender: Some(tx),
631 hash: header.hash(),
632 justification: None,
633 })
634 .await
635 .unwrap();
636 rx.await.unwrap().unwrap();
638 }
639
640 #[tokio::test]
641 async fn manual_seal_fork_blocks() {
642 let builder = TestClientBuilder::new();
643 let (client, select_chain) = builder.build_with_longest_chain();
644 let client = Arc::new(client);
645 let pool_api = Arc::new(FullChainApi::new(
646 client.clone(),
647 None,
648 &sp_core::testing::TaskExecutor::new(),
649 ));
650 let spawner = sp_core::testing::TaskExecutor::new();
651 let genesis_hash = client.info().genesis_hash;
652 let pool = Arc::new(BasicPool::with_revalidation_type(
653 Options::default(),
654 true.into(),
655 pool_api.clone(),
656 None,
657 RevalidationType::Full,
658 spawner.clone(),
659 0,
660 genesis_hash,
661 genesis_hash,
662 ));
663 let env = ProposerFactory::new(spawner.clone(), client.clone(), pool.clone(), None, None);
664 let (mut sink, commands_stream) = futures::channel::mpsc::channel(1024);
667
668 tokio::spawn(run_manual_seal(ManualSealParams {
670 block_import: client.clone(),
671 env,
672 client: client.clone(),
673 pool: pool.clone(),
674 commands_stream,
675 select_chain,
676 consensus_data_provider: None,
677 create_inherent_data_providers: |_, _| async { Ok(()) },
678 }));
679
680 let result = pool.submit_one(genesis_hash, SOURCE, uxt(Alice, 0)).await;
682 assert!(result.is_ok());
684
685 let (tx, rx) = futures::channel::oneshot::channel();
686 sink.send(EngineCommand::SealNewBlock {
687 parent_hash: None,
688 sender: Some(tx),
689 create_empty: false,
690 finalize: false,
691 })
692 .await
693 .unwrap();
694 let created_block = rx.await.unwrap().unwrap();
695
696 assert_matches!(
698 created_block,
699 CreatedBlock {
700 hash: _,
701 aux: ImportedAux {
702 header_only: false,
703 clear_justification_requests: false,
704 needs_justification: false,
705 bad_justification: false,
706 is_new_best: true
707 },
708 proof_size: _
709 }
710 );
711
712 assert!(pool.submit_one(created_block.hash, SOURCE, uxt(Alice, 1)).await.is_ok());
713
714 let header = client.header(created_block.hash).expect("db error").expect("imported above");
715 assert_eq!(header.number, 1);
716 pool.maintain(sc_transaction_pool_api::ChainEvent::NewBestBlock {
717 hash: header.hash(),
718 tree_route: None,
719 })
720 .await;
721
722 let (tx1, rx1) = futures::channel::oneshot::channel();
723 assert!(sink
724 .send(EngineCommand::SealNewBlock {
725 parent_hash: Some(created_block.hash),
726 sender: Some(tx1),
727 create_empty: false,
728 finalize: false,
729 })
730 .await
731 .is_ok());
732 assert_matches!(rx1.await.expect("should be no error receiving"), Ok(_));
733
734 assert!(pool.submit_one(created_block.hash, SOURCE, uxt(Bob, 0)).await.is_ok());
735 let (tx2, rx2) = futures::channel::oneshot::channel();
736 assert!(sink
737 .send(EngineCommand::SealNewBlock {
738 parent_hash: Some(created_block.hash),
739 sender: Some(tx2),
740 create_empty: false,
741 finalize: false,
742 })
743 .await
744 .is_ok());
745 let imported = rx2.await.unwrap().unwrap();
746 assert!(client.header(imported.hash).unwrap().is_some())
748 }
749
750 #[tokio::test]
751 async fn manual_seal_post_hash() {
752 let builder = TestClientBuilder::new();
753 let (client, select_chain) = builder.build_with_longest_chain();
754 let client = Arc::new(client);
755 let spawner = sp_core::testing::TaskExecutor::new();
756 let genesis_hash = client.header(client.info().genesis_hash).unwrap().unwrap().hash();
757 let pool = Arc::new(BasicPool::with_revalidation_type(
758 Options::default(),
759 true.into(),
760 api(),
761 None,
762 RevalidationType::Full,
763 spawner.clone(),
764 0,
765 genesis_hash,
766 genesis_hash,
767 ));
768 let env = ProposerFactory::new(spawner.clone(), client.clone(), pool.clone(), None, None);
769
770 let (mut sink, commands_stream) = futures::channel::mpsc::channel(1024);
771
772 tokio::spawn(run_manual_seal(ManualSealParams {
774 block_import: client.clone(),
775 env,
776 client: client.clone(),
777 pool: pool.clone(),
778 commands_stream,
779 select_chain,
780 consensus_data_provider: Some(Box::new(TestDigestProvider { _client: client.clone() })),
782 create_inherent_data_providers: |_, _| async { Ok(()) },
783 }));
784
785 let (tx, rx) = futures::channel::oneshot::channel();
786 sink.send(EngineCommand::SealNewBlock {
787 parent_hash: None,
788 sender: Some(tx),
789 create_empty: true,
790 finalize: false,
791 })
792 .await
793 .unwrap();
794 let created_block = rx.await.unwrap().unwrap();
795
796 let header = client.header(created_block.hash).unwrap().unwrap();
798 assert_eq!(header.number, 1);
799 }
800}