1use futures::prelude::*;
23use futures_timer::Delay;
24use prometheus_endpoint::Registry;
25use sc_client_api::{
26 backend::{Backend as ClientBackend, Finalizer},
27 client::BlockchainEvents,
28};
29use sc_consensus::{
30 block_import::{BlockImport, BlockImportParams, ForkChoiceStrategy},
31 import_queue::{BasicQueue, BoxBlockImport, Verifier},
32};
33use sp_blockchain::HeaderBackend;
34use sp_consensus::{Environment, Proposer, SelectChain};
35use sp_core::traits::SpawnNamed;
36use sp_inherents::CreateInherentDataProviders;
37use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
38use std::{marker::PhantomData, sync::Arc, time::Duration};
39
40mod error;
41mod finalize_block;
42mod seal_block;
43
44pub mod consensus;
45pub mod rpc;
46
47pub use self::{
48 consensus::ConsensusDataProvider,
49 error::Error,
50 finalize_block::{finalize_block, FinalizeBlockParams},
51 rpc::{CreatedBlock, EngineCommand},
52 seal_block::{seal_block, SealBlockParams, MAX_PROPOSAL_DURATION},
53};
54use sc_transaction_pool_api::TransactionPool;
55use sp_api::ProvideRuntimeApi;
56
57const LOG_TARGET: &str = "manual-seal";
58
59pub const MANUAL_SEAL_ENGINE_ID: ConsensusEngineId = [b'm', b'a', b'n', b'l'];
61
62struct ManualSealVerifier;
64
65#[async_trait::async_trait]
66impl<B: BlockT> Verifier<B> for ManualSealVerifier {
67 async fn verify(
68 &self,
69 mut block: BlockImportParams<B>,
70 ) -> Result<BlockImportParams<B>, String> {
71 block.finalized = false;
72 block.fork_choice = Some(ForkChoiceStrategy::LongestChain);
73 Ok(block)
74 }
75}
76
77pub fn import_queue<Block>(
79 block_import: BoxBlockImport<Block>,
80 spawner: &impl sp_core::traits::SpawnEssentialNamed,
81 registry: Option<&Registry>,
82) -> BasicQueue<Block>
83where
84 Block: BlockT,
85{
86 BasicQueue::new(ManualSealVerifier, block_import, None, spawner, registry)
87}
88
89pub struct ManualSealParams<B: BlockT, BI, E, C: ProvideRuntimeApi<B>, TP, SC, CS, CIDP, P> {
91 pub block_import: BI,
93
94 pub env: E,
96
97 pub client: Arc<C>,
99
100 pub pool: Arc<TP>,
102
103 pub commands_stream: CS,
106
107 pub select_chain: SC,
109
110 pub consensus_data_provider: Option<Box<dyn ConsensusDataProvider<B, Proof = P>>>,
112
113 pub create_inherent_data_providers: CIDP,
115}
116
117pub struct InstantSealParams<B: BlockT, BI, E, C: ProvideRuntimeApi<B>, TP, SC, CIDP, P> {
119 pub block_import: BI,
121
122 pub env: E,
124
125 pub client: Arc<C>,
127
128 pub pool: Arc<TP>,
130
131 pub select_chain: SC,
133
134 pub consensus_data_provider: Option<Box<dyn ConsensusDataProvider<B, Proof = P>>>,
136
137 pub create_inherent_data_providers: CIDP,
139}
140
141pub struct DelayedFinalizeParams<C, S> {
143 pub client: Arc<C>,
145
146 pub spawn_handle: S,
148
149 pub delay_sec: u64,
151}
152
153pub async fn run_manual_seal<B, BI, CB, E, C, TP, SC, CS, CIDP, P>(
155 ManualSealParams {
156 mut block_import,
157 mut env,
158 client,
159 pool,
160 mut commands_stream,
161 select_chain,
162 consensus_data_provider,
163 create_inherent_data_providers,
164 }: ManualSealParams<B, BI, E, C, TP, SC, CS, CIDP, P>,
165) where
166 B: BlockT + 'static,
167 BI: BlockImport<B, Error = sp_consensus::Error> + Send + Sync + 'static,
168 C: HeaderBackend<B> + Finalizer<B, CB> + ProvideRuntimeApi<B> + 'static,
169 CB: ClientBackend<B> + 'static,
170 E: Environment<B> + 'static,
171 E::Proposer: Proposer<B, Proof = P>,
172 CS: Stream<Item = EngineCommand<<B as BlockT>::Hash>> + Unpin + 'static,
173 SC: SelectChain<B> + 'static,
174 TP: TransactionPool<Block = B>,
175 CIDP: CreateInherentDataProviders<B, ()>,
176 P: codec::Encode + Send + Sync + 'static,
177{
178 while let Some(command) = commands_stream.next().await {
179 match command {
180 EngineCommand::SealNewBlock { create_empty, finalize, parent_hash, sender } => {
181 seal_block(SealBlockParams {
182 sender,
183 parent_hash,
184 finalize,
185 create_empty,
186 env: &mut env,
187 select_chain: &select_chain,
188 block_import: &mut block_import,
189 consensus_data_provider: consensus_data_provider.as_deref(),
190 pool: pool.clone(),
191 client: client.clone(),
192 create_inherent_data_providers: &create_inherent_data_providers,
193 })
194 .await;
195 },
196 EngineCommand::FinalizeBlock { hash, sender, justification } => {
197 let justification = justification.map(|j| (MANUAL_SEAL_ENGINE_ID, j));
198 finalize_block(FinalizeBlockParams {
199 hash,
200 sender,
201 justification,
202 finalizer: client.clone(),
203 _phantom: PhantomData,
204 })
205 .await
206 },
207 }
208 }
209}
210
211pub async fn run_instant_seal<B, BI, CB, E, C, TP, SC, CIDP, P>(
215 InstantSealParams {
216 block_import,
217 env,
218 client,
219 pool,
220 select_chain,
221 consensus_data_provider,
222 create_inherent_data_providers,
223 }: InstantSealParams<B, BI, E, C, TP, SC, CIDP, P>,
224) where
225 B: BlockT + 'static,
226 BI: BlockImport<B, Error = sp_consensus::Error> + Send + Sync + 'static,
227 C: HeaderBackend<B> + Finalizer<B, CB> + ProvideRuntimeApi<B> + 'static,
228 CB: ClientBackend<B> + 'static,
229 E: Environment<B> + 'static,
230 E::Proposer: Proposer<B, Proof = P>,
231 SC: SelectChain<B> + 'static,
232 TP: TransactionPool<Block = B>,
233 CIDP: CreateInherentDataProviders<B, ()>,
234 P: codec::Encode + Send + Sync + 'static,
235{
236 let commands_stream = pool.import_notification_stream().map(|_| EngineCommand::SealNewBlock {
239 create_empty: true,
240 finalize: false,
241 parent_hash: None,
242 sender: None,
243 });
244
245 run_manual_seal(ManualSealParams {
246 block_import,
247 env,
248 client,
249 pool,
250 commands_stream,
251 select_chain,
252 consensus_data_provider,
253 create_inherent_data_providers,
254 })
255 .await
256}
257
258pub async fn run_instant_seal_and_finalize<B, BI, CB, E, C, TP, SC, CIDP, P>(
265 InstantSealParams {
266 block_import,
267 env,
268 client,
269 pool,
270 select_chain,
271 consensus_data_provider,
272 create_inherent_data_providers,
273 }: InstantSealParams<B, BI, E, C, TP, SC, CIDP, P>,
274) where
275 B: BlockT + 'static,
276 BI: BlockImport<B, Error = sp_consensus::Error> + Send + Sync + 'static,
277 C: HeaderBackend<B> + Finalizer<B, CB> + ProvideRuntimeApi<B> + 'static,
278 CB: ClientBackend<B> + 'static,
279 E: Environment<B> + 'static,
280 E::Proposer: Proposer<B, Proof = P>,
281 SC: SelectChain<B> + 'static,
282 TP: TransactionPool<Block = B>,
283 CIDP: CreateInherentDataProviders<B, ()>,
284 P: codec::Encode + Send + Sync + 'static,
285{
286 let commands_stream = pool.import_notification_stream().map(|_| EngineCommand::SealNewBlock {
289 create_empty: false,
290 finalize: true,
291 parent_hash: None,
292 sender: None,
293 });
294
295 run_manual_seal(ManualSealParams {
296 block_import,
297 env,
298 client,
299 pool,
300 commands_stream,
301 select_chain,
302 consensus_data_provider,
303 create_inherent_data_providers,
304 })
305 .await
306}
307
308pub async fn run_delayed_finalize<B, CB, C, S>(
316 DelayedFinalizeParams { client, spawn_handle, delay_sec }: DelayedFinalizeParams<C, S>,
317) where
318 B: BlockT + 'static,
319 CB: ClientBackend<B> + 'static,
320 C: HeaderBackend<B> + Finalizer<B, CB> + ProvideRuntimeApi<B> + BlockchainEvents<B> + 'static,
321 S: SpawnNamed,
322{
323 let mut block_import_stream = client.import_notification_stream();
324
325 while let Some(notification) = block_import_stream.next().await {
326 let delay = Delay::new(Duration::from_secs(delay_sec));
327 let cloned_client = client.clone();
328 spawn_handle.spawn(
329 "delayed-finalize",
330 None,
331 Box::pin(async move {
332 delay.await;
333 finalize_block(FinalizeBlockParams {
334 hash: notification.hash,
335 sender: None,
336 justification: None,
337 finalizer: cloned_client,
338 _phantom: PhantomData,
339 })
340 .await
341 }),
342 );
343 }
344}
345
346#[cfg(test)]
347mod tests {
348 use super::*;
349 use sc_basic_authorship::ProposerFactory;
350 use sc_consensus::ImportedAux;
351 use sc_transaction_pool::{BasicPool, FullChainApi, Options, RevalidationType};
352 use sc_transaction_pool_api::{MaintainedTransactionPool, TransactionPool, TransactionSource};
353 use sp_inherents::InherentData;
354 use sp_runtime::generic::{Digest, DigestItem};
355 use substrate_test_runtime_client::{
356 DefaultTestClientBuilderExt, Sr25519Keyring::*, TestClientBuilder, TestClientBuilderExt,
357 };
358 use substrate_test_runtime_transaction_pool::{uxt, TestApi};
359
360 fn api() -> Arc<TestApi> {
361 Arc::new(TestApi::empty())
362 }
363
364 const SOURCE: TransactionSource = TransactionSource::External;
365
366 struct TestDigestProvider<C> {
367 _client: Arc<C>,
368 }
369 impl<B, C> ConsensusDataProvider<B> for TestDigestProvider<C>
370 where
371 B: BlockT,
372 C: ProvideRuntimeApi<B> + Send + Sync,
373 {
374 type Proof = ();
375
376 fn create_digest(
377 &self,
378 _parent: &B::Header,
379 _inherents: &InherentData,
380 ) -> Result<Digest, Error> {
381 Ok(Digest { logs: vec![] })
382 }
383
384 fn append_block_import(
385 &self,
386 _parent: &B::Header,
387 params: &mut BlockImportParams<B>,
388 _inherents: &InherentData,
389 _proof: Self::Proof,
390 ) -> Result<(), Error> {
391 params.post_digests.push(DigestItem::Other(vec![1]));
392 Ok(())
393 }
394 }
395
396 #[tokio::test]
397 async fn instant_seal() {
398 let builder = TestClientBuilder::new();
399 let (client, select_chain) = builder.build_with_longest_chain();
400 let client = Arc::new(client);
401 let spawner = sp_core::testing::TaskExecutor::new();
402 let genesis_hash = client.info().genesis_hash;
403 let pool_api = Arc::new(FullChainApi::new(client.clone(), None, &spawner.clone()));
404 let pool = Arc::new(BasicPool::with_revalidation_type(
405 Options::default(),
406 true.into(),
407 pool_api,
408 None,
409 RevalidationType::Full,
410 spawner.clone(),
411 0,
412 genesis_hash,
413 genesis_hash,
414 ));
415 let env = ProposerFactory::new(spawner.clone(), client.clone(), pool.clone(), None, None);
416 let (sender, receiver) = futures::channel::oneshot::channel();
419 let mut sender = Arc::new(Some(sender));
420 let commands_stream =
421 pool.pool().validated_pool().import_notification_stream().map(move |_| {
422 let mut_sender = Arc::get_mut(&mut sender).unwrap();
424 let sender = std::mem::take(mut_sender);
425 EngineCommand::SealNewBlock {
426 create_empty: false,
427 finalize: true,
428 parent_hash: None,
429 sender,
430 }
431 });
432
433 tokio::spawn(run_manual_seal(ManualSealParams {
435 block_import: client.clone(),
436 env,
437 client: client.clone(),
438 pool: pool.clone(),
439 commands_stream,
440 select_chain,
441 create_inherent_data_providers: |_, _| async { Ok(()) },
442 consensus_data_provider: None,
443 }));
444
445 let result = pool.submit_one(genesis_hash, SOURCE, uxt(Alice, 0)).await;
447 assert!(result.is_ok());
449 let created_block = receiver.await.unwrap().unwrap();
451 assert_eq!(
452 created_block,
453 CreatedBlock {
454 hash: created_block.hash,
455 aux: ImportedAux {
456 header_only: false,
457 clear_justification_requests: false,
458 needs_justification: false,
459 bad_justification: false,
460 is_new_best: true,
461 },
462 proof_size: 0
463 }
464 );
465 assert!(client.header(created_block.hash).unwrap().is_some());
467 assert_eq!(client.header(created_block.hash).unwrap().unwrap().number, 1)
468 }
469
470 #[allow(unused)]
474 async fn instant_seal_delayed_finalize() {
475 let builder = TestClientBuilder::new();
476 let (client, select_chain) = builder.build_with_longest_chain();
477 let client = Arc::new(client);
478 let spawner = sp_core::testing::TaskExecutor::new();
479 let genesis_hash = client.info().genesis_hash;
480 let pool_api = Arc::new(FullChainApi::new(client.clone(), None, &spawner.clone()));
481 let pool = Arc::new(BasicPool::with_revalidation_type(
482 Options::default(),
483 true.into(),
484 pool_api,
485 None,
486 RevalidationType::Full,
487 spawner.clone(),
488 0,
489 genesis_hash,
490 genesis_hash,
491 ));
492 let env = ProposerFactory::new(spawner.clone(), client.clone(), pool.clone(), None, None);
493 let (sender, receiver) = futures::channel::oneshot::channel();
496 let mut sender = Arc::new(Some(sender));
497 let commands_stream =
498 pool.pool().validated_pool().import_notification_stream().map(move |_| {
499 let mut_sender = Arc::get_mut(&mut sender).unwrap();
501 let sender = std::mem::take(mut_sender);
502 EngineCommand::SealNewBlock {
503 create_empty: false,
504 finalize: false,
506 parent_hash: None,
507 sender,
508 }
509 });
510
511 tokio::spawn(run_manual_seal(ManualSealParams {
513 block_import: client.clone(),
514 commands_stream,
515 env,
516 client: client.clone(),
517 pool: pool.clone(),
518 select_chain,
519 create_inherent_data_providers: |_, _| async { Ok(()) },
520 consensus_data_provider: None,
521 }));
522
523 let delay_sec = 5;
524
525 tokio::spawn(run_delayed_finalize(DelayedFinalizeParams {
527 client: client.clone(),
528 delay_sec,
529 spawn_handle: spawner,
530 }));
531
532 let mut finality_stream = client.finality_notification_stream();
533 let result = pool.submit_one(genesis_hash, SOURCE, uxt(Alice, 0)).await;
535 assert!(result.is_ok());
537 let created_block = receiver.await.unwrap().unwrap();
539 assert_eq!(
540 created_block,
541 CreatedBlock {
542 hash: created_block.hash,
543 aux: ImportedAux {
544 header_only: false,
545 clear_justification_requests: false,
546 needs_justification: false,
547 bad_justification: false,
548 is_new_best: true,
549 },
550 proof_size: created_block.proof_size
551 }
552 );
553 assert!(client.header(created_block.hash).unwrap().is_some());
555 assert_eq!(client.header(created_block.hash).unwrap().unwrap().number, 1);
556
557 assert_eq!(client.info().finalized_hash, client.info().genesis_hash);
558
559 let finalized = finality_stream.select_next_some().await;
560 assert_eq!(finalized.hash, created_block.hash);
561 }
562
563 #[tokio::test]
564 async fn manual_seal_and_finalization() {
565 let builder = TestClientBuilder::new();
566 let (client, select_chain) = builder.build_with_longest_chain();
567 let client = Arc::new(client);
568 let spawner = sp_core::testing::TaskExecutor::new();
569 let genesis_hash = client.info().genesis_hash;
570 let pool_api = Arc::new(FullChainApi::new(client.clone(), None, &spawner.clone()));
571 let pool = Arc::new(BasicPool::with_revalidation_type(
572 Options::default(),
573 true.into(),
574 pool_api,
575 None,
576 RevalidationType::Full,
577 spawner.clone(),
578 0,
579 genesis_hash,
580 genesis_hash,
581 ));
582 let env = ProposerFactory::new(spawner.clone(), client.clone(), pool.clone(), None, None);
583 let (mut sink, commands_stream) = futures::channel::mpsc::channel(1024);
586
587 tokio::spawn(run_manual_seal(ManualSealParams {
589 block_import: client.clone(),
590 env,
591 client: client.clone(),
592 pool: pool.clone(),
593 commands_stream,
594 select_chain,
595 consensus_data_provider: None,
596 create_inherent_data_providers: |_, _| async { Ok(()) },
597 }));
598
599 let result = pool.submit_one(genesis_hash, SOURCE, uxt(Alice, 0)).await;
601 assert!(result.is_ok());
603 let (tx, rx) = futures::channel::oneshot::channel();
604 sink.send(EngineCommand::SealNewBlock {
605 parent_hash: None,
606 sender: Some(tx),
607 create_empty: false,
608 finalize: false,
609 })
610 .await
611 .unwrap();
612 let created_block = rx.await.unwrap().unwrap();
613
614 assert_eq!(
616 created_block,
617 CreatedBlock {
618 hash: created_block.hash,
619 aux: ImportedAux {
620 header_only: false,
621 clear_justification_requests: false,
622 needs_justification: false,
623 bad_justification: false,
624 is_new_best: true,
625 },
626 proof_size: 0
627 }
628 );
629 let header = client.header(created_block.hash).unwrap().unwrap();
631 let (tx, rx) = futures::channel::oneshot::channel();
632 sink.send(EngineCommand::FinalizeBlock {
633 sender: Some(tx),
634 hash: header.hash(),
635 justification: None,
636 })
637 .await
638 .unwrap();
639 rx.await.unwrap().unwrap();
641 }
642
643 #[tokio::test]
644 async fn manual_seal_fork_blocks() {
645 let builder = TestClientBuilder::new();
646 let (client, select_chain) = builder.build_with_longest_chain();
647 let client = Arc::new(client);
648 let pool_api = Arc::new(FullChainApi::new(
649 client.clone(),
650 None,
651 &sp_core::testing::TaskExecutor::new(),
652 ));
653 let spawner = sp_core::testing::TaskExecutor::new();
654 let genesis_hash = client.info().genesis_hash;
655 let pool = Arc::new(BasicPool::with_revalidation_type(
656 Options::default(),
657 true.into(),
658 pool_api.clone(),
659 None,
660 RevalidationType::Full,
661 spawner.clone(),
662 0,
663 genesis_hash,
664 genesis_hash,
665 ));
666 let env = ProposerFactory::new(spawner.clone(), client.clone(), pool.clone(), None, None);
667 let (mut sink, commands_stream) = futures::channel::mpsc::channel(1024);
670
671 tokio::spawn(run_manual_seal(ManualSealParams {
673 block_import: client.clone(),
674 env,
675 client: client.clone(),
676 pool: pool.clone(),
677 commands_stream,
678 select_chain,
679 consensus_data_provider: None,
680 create_inherent_data_providers: |_, _| async { Ok(()) },
681 }));
682
683 let result = pool.submit_one(genesis_hash, SOURCE, uxt(Alice, 0)).await;
685 assert!(result.is_ok());
687
688 let (tx, rx) = futures::channel::oneshot::channel();
689 sink.send(EngineCommand::SealNewBlock {
690 parent_hash: None,
691 sender: Some(tx),
692 create_empty: false,
693 finalize: false,
694 })
695 .await
696 .unwrap();
697 let created_block = rx.await.unwrap().unwrap();
698
699 assert_eq!(
701 created_block,
702 CreatedBlock {
703 hash: created_block.hash,
704 aux: ImportedAux {
705 header_only: false,
706 clear_justification_requests: false,
707 needs_justification: false,
708 bad_justification: false,
709 is_new_best: true
710 },
711 proof_size: 0
712 }
713 );
714
715 assert!(pool.submit_one(created_block.hash, SOURCE, uxt(Alice, 1)).await.is_ok());
716
717 let header = client.header(created_block.hash).expect("db error").expect("imported above");
718 assert_eq!(header.number, 1);
719 pool.maintain(sc_transaction_pool_api::ChainEvent::NewBestBlock {
720 hash: header.hash(),
721 tree_route: None,
722 })
723 .await;
724
725 let (tx1, rx1) = futures::channel::oneshot::channel();
726 assert!(sink
727 .send(EngineCommand::SealNewBlock {
728 parent_hash: Some(created_block.hash),
729 sender: Some(tx1),
730 create_empty: false,
731 finalize: false,
732 })
733 .await
734 .is_ok());
735 assert_matches::assert_matches!(rx1.await.expect("should be no error receiving"), Ok(_));
736
737 assert!(pool.submit_one(created_block.hash, SOURCE, uxt(Bob, 0)).await.is_ok());
738 let (tx2, rx2) = futures::channel::oneshot::channel();
739 assert!(sink
740 .send(EngineCommand::SealNewBlock {
741 parent_hash: Some(created_block.hash),
742 sender: Some(tx2),
743 create_empty: false,
744 finalize: false,
745 })
746 .await
747 .is_ok());
748 let imported = rx2.await.unwrap().unwrap();
749 assert!(client.header(imported.hash).unwrap().is_some())
751 }
752
753 #[tokio::test]
754 async fn manual_seal_post_hash() {
755 let builder = TestClientBuilder::new();
756 let (client, select_chain) = builder.build_with_longest_chain();
757 let client = Arc::new(client);
758 let spawner = sp_core::testing::TaskExecutor::new();
759 let genesis_hash = client.header(client.info().genesis_hash).unwrap().unwrap().hash();
760 let pool = Arc::new(BasicPool::with_revalidation_type(
761 Options::default(),
762 true.into(),
763 api(),
764 None,
765 RevalidationType::Full,
766 spawner.clone(),
767 0,
768 genesis_hash,
769 genesis_hash,
770 ));
771 let env = ProposerFactory::new(spawner.clone(), client.clone(), pool.clone(), None, None);
772
773 let (mut sink, commands_stream) = futures::channel::mpsc::channel(1024);
774
775 tokio::spawn(run_manual_seal(ManualSealParams {
777 block_import: client.clone(),
778 env,
779 client: client.clone(),
780 pool: pool.clone(),
781 commands_stream,
782 select_chain,
783 consensus_data_provider: Some(Box::new(TestDigestProvider { _client: client.clone() })),
785 create_inherent_data_providers: |_, _| async { Ok(()) },
786 }));
787
788 let (tx, rx) = futures::channel::oneshot::channel();
789 sink.send(EngineCommand::SealNewBlock {
790 parent_hash: None,
791 sender: Some(tx),
792 create_empty: true,
793 finalize: false,
794 })
795 .await
796 .unwrap();
797 let created_block = rx.await.unwrap().unwrap();
798
799 let header = client.header(created_block.hash).unwrap().unwrap();
801 assert_eq!(header.number, 1);
802 }
803}