cumulus_client_consensus_common/
lib.rs1use codec::Decode;
19use polkadot_primitives::{Block as PBlock, Hash as PHash, Header as PHeader, ValidationCodeHash};
20
21use cumulus_primitives_core::{relay_chain, AbridgedHostConfiguration};
22use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface};
23
24use sc_client_api::Backend;
25use sc_consensus::{shared_data::SharedData, BlockImport, ImportResult};
26use sp_consensus_slots::Slot;
27
28use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
29use sp_timestamp::Timestamp;
30
31use std::{sync::Arc, time::Duration};
32
33mod level_monitor;
34mod parachain_consensus;
35mod parent_search;
36#[cfg(test)]
37mod tests;
38
39pub use parent_search::*;
40
41pub use cumulus_relay_chain_streams::finalized_heads;
42pub use parachain_consensus::spawn_parachain_consensus_tasks;
43
44use level_monitor::LevelMonitor;
45pub use level_monitor::{LevelLimit, MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT};
46
47pub mod import_queue;
48
49const LOG_TARGET: &str = "consensus::common";
50
51pub trait ValidationCodeHashProvider<Hash> {
54 fn code_hash_at(&self, at: Hash) -> Option<ValidationCodeHash>;
55}
56
57impl<F, Hash> ValidationCodeHashProvider<Hash> for F
58where
59 F: Fn(Hash) -> Option<ValidationCodeHash>,
60{
61 fn code_hash_at(&self, at: Hash) -> Option<ValidationCodeHash> {
62 (self)(at)
63 }
64}
65
66pub struct ParachainCandidate<B> {
68 pub block: B,
70 pub proof: sp_trie::StorageProof,
72}
73
74pub struct ParachainBlockImport<Block: BlockT, BI, BE> {
82 inner: BI,
83 monitor: Option<SharedData<LevelMonitor<Block, BE>>>,
84 delayed_best_block: bool,
85}
86
87impl<Block: BlockT, BI, BE: Backend<Block>> ParachainBlockImport<Block, BI, BE> {
88 pub fn new(inner: BI, backend: Arc<BE>) -> Self {
92 Self::new_with_limit(inner, backend, LevelLimit::Default)
93 }
94
95 pub fn new_with_limit(inner: BI, backend: Arc<BE>, level_leaves_max: LevelLimit) -> Self {
100 let level_limit = match level_leaves_max {
101 LevelLimit::None => None,
102 LevelLimit::Some(limit) => Some(limit),
103 LevelLimit::Default => Some(MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT),
104 };
105
106 let monitor =
107 level_limit.map(|level_limit| SharedData::new(LevelMonitor::new(level_limit, backend)));
108
109 Self { inner, monitor, delayed_best_block: false }
110 }
111
112 pub fn new_with_delayed_best_block(inner: BI, backend: Arc<BE>) -> Self {
116 Self {
117 delayed_best_block: true,
118 ..Self::new_with_limit(inner, backend, LevelLimit::Default)
119 }
120 }
121}
122
123impl<Block: BlockT, I: Clone, BE> Clone for ParachainBlockImport<Block, I, BE> {
124 fn clone(&self) -> Self {
125 ParachainBlockImport {
126 inner: self.inner.clone(),
127 monitor: self.monitor.clone(),
128 delayed_best_block: self.delayed_best_block,
129 }
130 }
131}
132
133#[async_trait::async_trait]
134impl<Block, BI, BE> BlockImport<Block> for ParachainBlockImport<Block, BI, BE>
135where
136 Block: BlockT,
137 BI: BlockImport<Block> + Send + Sync,
138 BE: Backend<Block>,
139{
140 type Error = BI::Error;
141
142 async fn check_block(
143 &self,
144 block: sc_consensus::BlockCheckParams<Block>,
145 ) -> Result<sc_consensus::ImportResult, Self::Error> {
146 self.inner.check_block(block).await
147 }
148
149 async fn import_block(
150 &self,
151 mut params: sc_consensus::BlockImportParams<Block>,
152 ) -> Result<sc_consensus::ImportResult, Self::Error> {
153 let hash = params.post_hash();
155 let number = *params.header.number();
156
157 if params.with_state() {
158 params.finalized = true;
162 }
163
164 if self.delayed_best_block {
165 params.fork_choice = Some(sc_consensus::ForkChoiceStrategy::Custom(
168 params.origin == sp_consensus::BlockOrigin::NetworkInitialSync,
169 ));
170 }
171
172 let maybe_lock = self.monitor.as_ref().map(|monitor_lock| {
173 let mut monitor = monitor_lock.shared_data_locked();
174 monitor.enforce_limit(number);
175 monitor.release_mutex()
176 });
177
178 let res = self.inner.import_block(params).await?;
179
180 if let (Some(mut monitor_lock), ImportResult::Imported(_)) = (maybe_lock, &res) {
181 let mut monitor = monitor_lock.upgrade();
182 monitor.block_imported(number, hash);
183 }
184
185 Ok(res)
186 }
187}
188
189pub trait ParachainBlockImportMarker {}
191
192impl<B: BlockT, BI, BE> ParachainBlockImportMarker for ParachainBlockImport<B, BI, BE> {}
193
194pub fn get_relay_slot(relay_header: &PHeader) -> Option<Slot> {
196 match sc_consensus_babe::find_pre_digest::<PBlock>(relay_header) {
197 Ok(pre_digest) => Some(pre_digest.slot()),
198 Err(err) => {
199 tracing::error!(
200 target: LOG_TARGET,
201 hash = %relay_header.hash(),
202 ?err,
203 "Relay chain block does not contain a BABE pre-digest. This should never happen.",
204 );
205 None
206 },
207 }
208}
209
210pub fn get_relay_slot_and_timestamp(
212 relay_header: &PHeader,
213 relay_slot_duration: Duration,
214) -> Option<(Slot, Timestamp)> {
215 get_relay_slot(relay_header).map(|slot| {
216 let t = Timestamp::new(relay_slot_duration.as_millis() as u64 * *slot);
217 (slot, t)
218 })
219}
220
221pub async fn load_abridged_host_configuration(
223 relay_parent: PHash,
224 relay_client: &impl RelayChainInterface,
225) -> Result<Option<AbridgedHostConfiguration>, RelayChainError> {
226 relay_client
227 .get_storage_by_key(relay_parent, relay_chain::well_known_keys::ACTIVE_CONFIG)
228 .await?
229 .map(|bytes| {
230 AbridgedHostConfiguration::decode(&mut &bytes[..])
231 .map_err(RelayChainError::DeserializationError)
232 })
233 .transpose()
234}