referrerpolicy=no-referrer-when-downgrade

cumulus_client_consensus_aura/collators/slot_based/
block_import.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18use futures::{stream::FusedStream, StreamExt};
19use sc_consensus::{BlockImport, StateAction};
20use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
21use sp_api::{ApiExt, CallApiAt, CallContext, Core, ProvideRuntimeApi, StorageProof};
22use sp_runtime::traits::{Block as BlockT, Header as _};
23use sp_trie::proof_size_extension::ProofSizeExt;
24use std::sync::Arc;
25
26/// Handle for receiving the block and the storage proof from the [`SlotBasedBlockImport`].
27///
28/// This handle should be passed to [`Params`](super::Params) or can also be dropped if the node is
29/// not running as collator.
30pub struct SlotBasedBlockImportHandle<Block> {
31	receiver: TracingUnboundedReceiver<(Block, StorageProof)>,
32}
33
34impl<Block> SlotBasedBlockImportHandle<Block> {
35	/// Returns the next item.
36	///
37	/// The future will never return when the internal channel is closed.
38	pub async fn next(&mut self) -> (Block, StorageProof) {
39		loop {
40			if self.receiver.is_terminated() {
41				futures::pending!()
42			} else if let Some(res) = self.receiver.next().await {
43				return res
44			}
45		}
46	}
47}
48
49/// Special block import for the slot based collator.
50pub struct SlotBasedBlockImport<Block, BI, Client> {
51	inner: BI,
52	client: Arc<Client>,
53	sender: TracingUnboundedSender<(Block, StorageProof)>,
54}
55
56impl<Block, BI, Client> SlotBasedBlockImport<Block, BI, Client> {
57	/// Create a new instance.
58	///
59	/// The returned [`SlotBasedBlockImportHandle`] needs to be passed to the
60	/// [`Params`](super::Params), so that this block import instance can communicate with the
61	/// collation task. If the node is not running as a collator, just dropping the handle is fine.
62	pub fn new(inner: BI, client: Arc<Client>) -> (Self, SlotBasedBlockImportHandle<Block>) {
63		let (sender, receiver) = tracing_unbounded("SlotBasedBlockImportChannel", 1000);
64
65		(Self { sender, client, inner }, SlotBasedBlockImportHandle { receiver })
66	}
67}
68
69impl<Block, BI: Clone, Client> Clone for SlotBasedBlockImport<Block, BI, Client> {
70	fn clone(&self) -> Self {
71		Self { inner: self.inner.clone(), client: self.client.clone(), sender: self.sender.clone() }
72	}
73}
74
75#[async_trait::async_trait]
76impl<Block, BI, Client> BlockImport<Block> for SlotBasedBlockImport<Block, BI, Client>
77where
78	Block: BlockT,
79	BI: BlockImport<Block> + Send + Sync,
80	BI::Error: Into<sp_consensus::Error>,
81	Client: ProvideRuntimeApi<Block> + CallApiAt<Block> + Send + Sync,
82	Client::StateBackend: Send,
83	Client::Api: Core<Block>,
84{
85	type Error = sp_consensus::Error;
86
87	async fn check_block(
88		&self,
89		block: sc_consensus::BlockCheckParams<Block>,
90	) -> Result<sc_consensus::ImportResult, Self::Error> {
91		self.inner.check_block(block).await.map_err(Into::into)
92	}
93
94	async fn import_block(
95		&self,
96		mut params: sc_consensus::BlockImportParams<Block>,
97	) -> Result<sc_consensus::ImportResult, Self::Error> {
98		// If the channel exists and it is required to execute the block, we will execute the block
99		// here. This is done to collect the storage proof and to prevent re-execution, we push
100		// downwards the state changes. `StateAction::ApplyChanges` is ignored, because it either
101		// means that the node produced the block itself or the block was imported via state sync.
102		if !self.sender.is_closed() && !matches!(params.state_action, StateAction::ApplyChanges(_))
103		{
104			let mut runtime_api = self.client.runtime_api();
105
106			runtime_api.set_call_context(CallContext::Onchain);
107
108			runtime_api.record_proof();
109			let recorder = runtime_api
110				.proof_recorder()
111				.expect("Proof recording is enabled in the line above; qed.");
112			runtime_api.register_extension(ProofSizeExt::new(recorder));
113
114			let parent_hash = *params.header.parent_hash();
115
116			let block = Block::new(params.header.clone(), params.body.clone().unwrap_or_default());
117
118			runtime_api
119				.execute_block(parent_hash, block.clone())
120				.map_err(|e| Box::new(e) as Box<_>)?;
121
122			let storage_proof =
123				runtime_api.extract_proof().expect("Proof recording was enabled above; qed");
124
125			let state = self.client.state_at(parent_hash).map_err(|e| Box::new(e) as Box<_>)?;
126			let gen_storage_changes = runtime_api
127				.into_storage_changes(&state, parent_hash)
128				.map_err(sp_consensus::Error::ChainLookup)?;
129
130			if params.header.state_root() != &gen_storage_changes.transaction_storage_root {
131				return Err(sp_consensus::Error::Other(Box::new(
132					sp_blockchain::Error::InvalidStateRoot,
133				)))
134			}
135
136			params.state_action = StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(
137				gen_storage_changes,
138			));
139
140			let _ = self.sender.unbounded_send((block, storage_proof));
141		}
142
143		self.inner.import_block(params).await.map_err(Into::into)
144	}
145}