cumulus_client_consensus_aura/collators/slot_based/
block_import.rs1use futures::{stream::FusedStream, StreamExt};
19use sc_consensus::{BlockImport, StateAction};
20use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
21use sp_api::{ApiExt, CallApiAt, CallContext, Core, ProvideRuntimeApi, StorageProof};
22use sp_runtime::traits::{Block as BlockT, Header as _};
23use sp_trie::proof_size_extension::ProofSizeExt;
24use std::sync::Arc;
25
26pub struct SlotBasedBlockImportHandle<Block> {
31 receiver: TracingUnboundedReceiver<(Block, StorageProof)>,
32}
33
34impl<Block> SlotBasedBlockImportHandle<Block> {
35 pub async fn next(&mut self) -> (Block, StorageProof) {
39 loop {
40 if self.receiver.is_terminated() {
41 futures::pending!()
42 } else if let Some(res) = self.receiver.next().await {
43 return res
44 }
45 }
46 }
47}
48
49pub struct SlotBasedBlockImport<Block, BI, Client> {
51 inner: BI,
52 client: Arc<Client>,
53 sender: TracingUnboundedSender<(Block, StorageProof)>,
54}
55
56impl<Block, BI, Client> SlotBasedBlockImport<Block, BI, Client> {
57 pub fn new(inner: BI, client: Arc<Client>) -> (Self, SlotBasedBlockImportHandle<Block>) {
63 let (sender, receiver) = tracing_unbounded("SlotBasedBlockImportChannel", 1000);
64
65 (Self { sender, client, inner }, SlotBasedBlockImportHandle { receiver })
66 }
67}
68
69impl<Block, BI: Clone, Client> Clone for SlotBasedBlockImport<Block, BI, Client> {
70 fn clone(&self) -> Self {
71 Self { inner: self.inner.clone(), client: self.client.clone(), sender: self.sender.clone() }
72 }
73}
74
75#[async_trait::async_trait]
76impl<Block, BI, Client> BlockImport<Block> for SlotBasedBlockImport<Block, BI, Client>
77where
78 Block: BlockT,
79 BI: BlockImport<Block> + Send + Sync,
80 BI::Error: Into<sp_consensus::Error>,
81 Client: ProvideRuntimeApi<Block> + CallApiAt<Block> + Send + Sync,
82 Client::StateBackend: Send,
83 Client::Api: Core<Block>,
84{
85 type Error = sp_consensus::Error;
86
87 async fn check_block(
88 &self,
89 block: sc_consensus::BlockCheckParams<Block>,
90 ) -> Result<sc_consensus::ImportResult, Self::Error> {
91 self.inner.check_block(block).await.map_err(Into::into)
92 }
93
94 async fn import_block(
95 &self,
96 mut params: sc_consensus::BlockImportParams<Block>,
97 ) -> Result<sc_consensus::ImportResult, Self::Error> {
98 if !self.sender.is_closed() && !matches!(params.state_action, StateAction::ApplyChanges(_))
103 {
104 let mut runtime_api = self.client.runtime_api();
105
106 runtime_api.set_call_context(CallContext::Onchain);
107
108 runtime_api.record_proof();
109 let recorder = runtime_api
110 .proof_recorder()
111 .expect("Proof recording is enabled in the line above; qed.");
112 runtime_api.register_extension(ProofSizeExt::new(recorder));
113
114 let parent_hash = *params.header.parent_hash();
115
116 let block = Block::new(params.header.clone(), params.body.clone().unwrap_or_default());
117
118 runtime_api
119 .execute_block(parent_hash, block.clone())
120 .map_err(|e| Box::new(e) as Box<_>)?;
121
122 let storage_proof =
123 runtime_api.extract_proof().expect("Proof recording was enabled above; qed");
124
125 let state = self.client.state_at(parent_hash).map_err(|e| Box::new(e) as Box<_>)?;
126 let gen_storage_changes = runtime_api
127 .into_storage_changes(&state, parent_hash)
128 .map_err(sp_consensus::Error::ChainLookup)?;
129
130 if params.header.state_root() != &gen_storage_changes.transaction_storage_root {
131 return Err(sp_consensus::Error::Other(Box::new(
132 sp_blockchain::Error::InvalidStateRoot,
133 )))
134 }
135
136 params.state_action = StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(
137 gen_storage_changes,
138 ));
139
140 let _ = self.sender.unbounded_send((block, storage_proof));
141 }
142
143 self.inner.import_block(params).await.map_err(Into::into)
144 }
145}