cumulus_client_consensus_common/
lib.rs1use codec::Decode;
19use polkadot_primitives::{
20 Block as PBlock, Hash as PHash, Header as PHeader, PersistedValidationData, ValidationCodeHash,
21};
22
23use cumulus_primitives_core::{relay_chain, AbridgedHostConfiguration};
24use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface};
25
26use sc_client_api::Backend;
27use sc_consensus::{shared_data::SharedData, BlockImport, ImportResult};
28use sp_consensus_slots::Slot;
29
30use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
31use sp_timestamp::Timestamp;
32
33use std::{sync::Arc, time::Duration};
34
35mod level_monitor;
36mod parachain_consensus;
37mod parent_search;
38#[cfg(test)]
39mod tests;
40
41pub use parent_search::*;
42
43pub use cumulus_relay_chain_streams::finalized_heads;
44pub use parachain_consensus::run_parachain_consensus;
45
46use level_monitor::LevelMonitor;
47pub use level_monitor::{LevelLimit, MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT};
48
49pub mod import_queue;
50
51pub trait ValidationCodeHashProvider<Hash> {
54 fn code_hash_at(&self, at: Hash) -> Option<ValidationCodeHash>;
55}
56
57impl<F, Hash> ValidationCodeHashProvider<Hash> for F
58where
59 F: Fn(Hash) -> Option<ValidationCodeHash>,
60{
61 fn code_hash_at(&self, at: Hash) -> Option<ValidationCodeHash> {
62 (self)(at)
63 }
64}
65
66pub struct ParachainCandidate<B> {
68 pub block: B,
70 pub proof: sp_trie::StorageProof,
72}
73
74#[async_trait::async_trait]
83pub trait ParachainConsensus<B: BlockT>: Send + Sync + dyn_clone::DynClone {
84 async fn produce_candidate(
93 &mut self,
94 parent: &B::Header,
95 relay_parent: PHash,
96 validation_data: &PersistedValidationData,
97 ) -> Option<ParachainCandidate<B>>;
98}
99
100dyn_clone::clone_trait_object!(<B> ParachainConsensus<B> where B: BlockT);
101
102#[async_trait::async_trait]
103impl<B: BlockT> ParachainConsensus<B> for Box<dyn ParachainConsensus<B> + Send + Sync> {
104 async fn produce_candidate(
105 &mut self,
106 parent: &B::Header,
107 relay_parent: PHash,
108 validation_data: &PersistedValidationData,
109 ) -> Option<ParachainCandidate<B>> {
110 (*self).produce_candidate(parent, relay_parent, validation_data).await
111 }
112}
113
114pub struct ParachainBlockImport<Block: BlockT, BI, BE> {
122 inner: BI,
123 monitor: Option<SharedData<LevelMonitor<Block, BE>>>,
124 delayed_best_block: bool,
125}
126
127impl<Block: BlockT, BI, BE: Backend<Block>> ParachainBlockImport<Block, BI, BE> {
128 pub fn new(inner: BI, backend: Arc<BE>) -> Self {
132 Self::new_with_limit(inner, backend, LevelLimit::Default)
133 }
134
135 pub fn new_with_limit(inner: BI, backend: Arc<BE>, level_leaves_max: LevelLimit) -> Self {
140 let level_limit = match level_leaves_max {
141 LevelLimit::None => None,
142 LevelLimit::Some(limit) => Some(limit),
143 LevelLimit::Default => Some(MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT),
144 };
145
146 let monitor =
147 level_limit.map(|level_limit| SharedData::new(LevelMonitor::new(level_limit, backend)));
148
149 Self { inner, monitor, delayed_best_block: false }
150 }
151
152 pub fn new_with_delayed_best_block(inner: BI, backend: Arc<BE>) -> Self {
156 Self {
157 delayed_best_block: true,
158 ..Self::new_with_limit(inner, backend, LevelLimit::Default)
159 }
160 }
161}
162
163impl<Block: BlockT, I: Clone, BE> Clone for ParachainBlockImport<Block, I, BE> {
164 fn clone(&self) -> Self {
165 ParachainBlockImport {
166 inner: self.inner.clone(),
167 monitor: self.monitor.clone(),
168 delayed_best_block: self.delayed_best_block,
169 }
170 }
171}
172
173#[async_trait::async_trait]
174impl<Block, BI, BE> BlockImport<Block> for ParachainBlockImport<Block, BI, BE>
175where
176 Block: BlockT,
177 BI: BlockImport<Block> + Send + Sync,
178 BE: Backend<Block>,
179{
180 type Error = BI::Error;
181
182 async fn check_block(
183 &self,
184 block: sc_consensus::BlockCheckParams<Block>,
185 ) -> Result<sc_consensus::ImportResult, Self::Error> {
186 self.inner.check_block(block).await
187 }
188
189 async fn import_block(
190 &self,
191 mut params: sc_consensus::BlockImportParams<Block>,
192 ) -> Result<sc_consensus::ImportResult, Self::Error> {
193 let hash = params.post_hash();
195 let number = *params.header.number();
196
197 if params.with_state() {
198 params.finalized = true;
202 }
203
204 if self.delayed_best_block {
205 params.fork_choice = Some(sc_consensus::ForkChoiceStrategy::Custom(
208 params.origin == sp_consensus::BlockOrigin::NetworkInitialSync,
209 ));
210 }
211
212 let maybe_lock = self.monitor.as_ref().map(|monitor_lock| {
213 let mut monitor = monitor_lock.shared_data_locked();
214 monitor.enforce_limit(number);
215 monitor.release_mutex()
216 });
217
218 let res = self.inner.import_block(params).await?;
219
220 if let (Some(mut monitor_lock), ImportResult::Imported(_)) = (maybe_lock, &res) {
221 let mut monitor = monitor_lock.upgrade();
222 monitor.block_imported(number, hash);
223 }
224
225 Ok(res)
226 }
227}
228
229pub trait ParachainBlockImportMarker {}
231
232impl<B: BlockT, BI, BE> ParachainBlockImportMarker for ParachainBlockImport<B, BI, BE> {}
233
234pub fn relay_slot_and_timestamp(
236 relay_parent_header: &PHeader,
237 relay_chain_slot_duration: Duration,
238) -> Option<(Slot, Timestamp)> {
239 sc_consensus_babe::find_pre_digest::<PBlock>(relay_parent_header)
240 .map(|babe_pre_digest| {
241 let slot = babe_pre_digest.slot();
242 let t = Timestamp::new(relay_chain_slot_duration.as_millis() as u64 * *slot);
243
244 (slot, t)
245 })
246 .ok()
247}
248
249pub async fn load_abridged_host_configuration(
251 relay_parent: PHash,
252 relay_client: &impl RelayChainInterface,
253) -> Result<Option<AbridgedHostConfiguration>, RelayChainError> {
254 relay_client
255 .get_storage_by_key(relay_parent, relay_chain::well_known_keys::ACTIVE_CONFIG)
256 .await?
257 .map(|bytes| {
258 AbridgedHostConfiguration::decode(&mut &bytes[..])
259 .map_err(RelayChainError::DeserializationError)
260 })
261 .transpose()
262}