1use codec::Codec;
24use cumulus_client_consensus_common::ParachainBlockImportMarker;
25use cumulus_primitives_core::{CumulusDigestItem, RelayBlockIdentifier};
26use parking_lot::Mutex;
27use polkadot_primitives::Hash as RHash;
28use sc_consensus::{
29 import_queue::{BasicQueue, Verifier as VerifierT},
30 BlockImport, BlockImportParams, ForkChoiceStrategy,
31};
32use sc_consensus_aura::{standalone as aura_internal, AuthoritiesTracker, CompatibilityMode};
33use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE};
34use schnellru::{ByLength, LruMap};
35use sp_api::ProvideRuntimeApi;
36use sp_block_builder::BlockBuilder as BlockBuilderApi;
37use sp_blockchain::{HeaderBackend, HeaderMetadata};
38use sp_consensus::{error::Error as ConsensusError, BlockOrigin};
39use sp_consensus_aura::{AuraApi, Slot, SlotDuration};
40use sp_core::crypto::Pair;
41use sp_inherents::CreateInherentDataProviders;
42use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
43use std::{fmt::Debug, sync::Arc};
44
45const LRU_WINDOW: u32 = 512;
46const EQUIVOCATION_LIMIT: usize = 16;
47
48struct NaiveEquivocationDefender<N> {
49 cache: LruMap<(u64, N, RHash), usize>,
51}
52
53impl<N: std::hash::Hash + PartialEq> Default for NaiveEquivocationDefender<N> {
54 fn default() -> Self {
55 NaiveEquivocationDefender { cache: LruMap::new(ByLength::new(LRU_WINDOW)) }
56 }
57}
58
59impl<N: std::hash::Hash + PartialEq> NaiveEquivocationDefender<N> {
60 fn insert_and_check(&mut self, slot: Slot, block_number: N, relay_chain_parent: RHash) -> bool {
62 let val = self
63 .cache
64 .get_or_insert((*slot, block_number, relay_chain_parent), || 0)
65 .expect("insertion with ByLength limiter always succeeds; qed");
66
67 if *val == EQUIVOCATION_LIMIT {
68 true
69 } else {
70 *val += 1;
71 false
72 }
73 }
74}
75
76pub struct Verifier<P: Pair, Client, Block: BlockT, CIDP> {
78 client: Arc<Client>,
79 create_inherent_data_providers: CIDP,
80 defender: Mutex<NaiveEquivocationDefender<NumberFor<Block>>>,
81 telemetry: Option<TelemetryHandle>,
82 authorities_tracker: AuthoritiesTracker<P, Block, Client>,
83}
84
85impl<P, Client, Block, CIDP> Verifier<P, Client, Block, CIDP>
86where
87 P: Pair,
88 P::Signature: Codec,
89 P::Public: Codec + Debug,
90 Block: BlockT,
91 Client: ProvideRuntimeApi<Block> + Send + Sync,
92 <Client as ProvideRuntimeApi<Block>>::Api: BlockBuilderApi<Block> + AuraApi<Block, P::Public>,
93
94 CIDP: CreateInherentDataProviders<Block, ()>,
95{
96 pub fn new(
99 client: Arc<Client>,
100 inherent_data_provider: CIDP,
101 telemetry: Option<TelemetryHandle>,
102 ) -> Self {
103 Self {
104 client: client.clone(),
105 create_inherent_data_providers: inherent_data_provider,
106 defender: Mutex::new(NaiveEquivocationDefender::default()),
107 telemetry,
108 authorities_tracker: AuthoritiesTracker::new(client),
109 }
110 }
111}
112
113#[async_trait::async_trait]
114impl<P, Client, Block, CIDP> VerifierT<Block> for Verifier<P, Client, Block, CIDP>
115where
116 P: Pair,
117 P::Signature: Codec,
118 P::Public: Codec + Debug,
119 Block: BlockT,
120 Client: HeaderBackend<Block>
121 + HeaderMetadata<Block, Error = sp_blockchain::Error>
122 + ProvideRuntimeApi<Block>
123 + Send
124 + Sync,
125 <Client as ProvideRuntimeApi<Block>>::Api: BlockBuilderApi<Block> + AuraApi<Block, P::Public>,
126
127 CIDP: CreateInherentDataProviders<Block, ()>,
128{
129 async fn verify(
130 &self,
131 mut block_params: BlockImportParams<Block>,
132 ) -> Result<BlockImportParams<Block>, String> {
133 if block_params.state_action.skip_execution_checks() || block_params.with_state() {
138 block_params.fork_choice = Some(ForkChoiceStrategy::Custom(block_params.with_state()));
139 return Ok(block_params)
140 }
141
142 let post_hash = block_params.header.hash();
143 let parent_hash = *block_params.header.parent_hash();
144
145 {
147 let authorities = self
148 .authorities_tracker
149 .fetch_or_update(&block_params.header, &CompatibilityMode::None)
150 .map_err(|e| format!("Could not fetch authorities: {}", e))?;
151
152 let slot_duration = self
153 .client
154 .runtime_api()
155 .slot_duration(parent_hash)
156 .map_err(|e| e.to_string())?;
157
158 let slot_now = slot_now(slot_duration);
159 let res = aura_internal::check_header_slot_and_seal::<Block, P>(
160 slot_now,
161 block_params.header,
162 &authorities,
163 );
164
165 match res {
166 Ok((pre_header, slot, seal_digest)) => {
167 telemetry!(
168 self.telemetry;
169 CONSENSUS_TRACE;
170 "aura.checked_and_importing";
171 "pre_header" => ?pre_header,
172 );
173
174 let relay_parent =
177 match CumulusDigestItem::find_relay_block_identifier(pre_header.digest()) {
178 None => Default::default(),
179 Some(RelayBlockIdentifier::ByHash(h)) |
180 Some(RelayBlockIdentifier::ByStorageRoot {
181 storage_root: h, ..
182 }) => h,
183 };
184
185 block_params.header = pre_header;
186 block_params.post_digests.push(seal_digest);
187 block_params.fork_choice = Some(ForkChoiceStrategy::LongestChain);
188 block_params.post_hash = Some(post_hash);
189
190 if self.defender.lock().insert_and_check(
195 slot,
196 *block_params.header.number(),
197 relay_parent,
198 ) && !matches!(block_params.origin, BlockOrigin::ConsensusBroadcast)
199 {
200 return Err(format!(
201 "Rejecting block {:?} due to excessive equivocations at slot",
202 post_hash,
203 ))
204 }
205
206 self.authorities_tracker.import(&block_params.header).map_err(|e| {
207 format!(
208 "Could not import authorities for block {:?} at number {}: {e}",
209 block_params.header.hash(),
210 block_params.header.number(),
211 )
212 })?;
213 },
214 Err(aura_internal::SealVerificationError::Deferred(hdr, slot)) => {
215 telemetry!(
216 self.telemetry;
217 CONSENSUS_DEBUG;
218 "aura.header_too_far_in_future";
219 "hash" => ?post_hash,
220 "a" => ?hdr,
221 "b" => ?slot,
222 );
223
224 return Err(format!(
225 "Rejecting block ({:?}) from future slot {:?}",
226 post_hash, slot
227 ))
228 },
229 Err(e) =>
230 return Err(format!(
231 "Rejecting block ({:?}) with invalid seal ({:?})",
232 post_hash, e
233 )),
234 }
235 }
236
237 if let Some(body) = block_params.body.clone() {
239 let block = Block::new(block_params.header.clone(), body);
240 let create_inherent_data_providers = self
241 .create_inherent_data_providers
242 .create_inherent_data_providers(parent_hash, ())
243 .await
244 .map_err(|e| format!("Could not create inherent data {:?}", e))?;
245
246 sp_block_builder::check_inherents(
247 self.client.clone(),
248 parent_hash,
249 block,
250 &create_inherent_data_providers,
251 )
252 .await
253 .map_err(|e| format!("Error checking block inherents {:?}", e))?;
254 }
255
256 Ok(block_params)
257 }
258}
259
260fn slot_now(slot_duration: SlotDuration) -> Slot {
261 let timestamp = sp_timestamp::InherentDataProvider::from_system_time().timestamp();
262 Slot::from_timestamp(timestamp, slot_duration)
263}
264
265pub fn fully_verifying_import_queue<P, Client, Block: BlockT, I, CIDP>(
275 client: Arc<Client>,
276 block_import: I,
277 create_inherent_data_providers: CIDP,
278 spawner: &impl sp_core::traits::SpawnEssentialNamed,
279 registry: Option<&prometheus_endpoint::Registry>,
280 telemetry: Option<TelemetryHandle>,
281) -> BasicQueue<Block>
282where
283 P: Pair + 'static,
284 P::Signature: Codec,
285 P::Public: Codec + Debug,
286 I: BlockImport<Block, Error = ConsensusError>
287 + ParachainBlockImportMarker
288 + Send
289 + Sync
290 + 'static,
291 Client: HeaderBackend<Block>
292 + HeaderMetadata<Block, Error = sp_blockchain::Error>
293 + ProvideRuntimeApi<Block>
294 + Send
295 + Sync
296 + 'static,
297 <Client as ProvideRuntimeApi<Block>>::Api: BlockBuilderApi<Block> + AuraApi<Block, P::Public>,
298 CIDP: CreateInherentDataProviders<Block, ()> + 'static,
299{
300 let verifier = Verifier::<P, _, _, _> {
301 client: client.clone(),
302 create_inherent_data_providers,
303 defender: Mutex::new(NaiveEquivocationDefender::default()),
304 telemetry,
305 authorities_tracker: AuthoritiesTracker::new(client.clone()),
306 };
307
308 BasicQueue::new(verifier, Box::new(block_import), None, spawner, registry)
309}
310
311#[cfg(test)]
312mod test {
313 use super::*;
314 use codec::Encode;
315 use cumulus_test_client::{
316 runtime::Block, seal_block, Client, InitBlockBuilder, TestClientBuilder,
317 TestClientBuilderExt,
318 };
319 use cumulus_test_relay_sproof_builder::RelayStateSproofBuilder;
320 use futures::FutureExt;
321 use polkadot_primitives::{HeadData, PersistedValidationData};
322 use sc_client_api::HeaderBackend;
323 use sp_consensus_aura::sr25519;
324 use sp_tracing::try_init_simple;
325 use std::{collections::HashSet, sync::Arc};
326
327 #[test]
328 fn import_equivocated_blocks_from_recovery() {
329 try_init_simple();
330
331 let client = Arc::new(TestClientBuilder::default().build());
332
333 let verifier = Verifier::<sr25519::AuthorityPair, Client, Block, _> {
334 client: client.clone(),
335 create_inherent_data_providers: |_, _| async move {
336 Ok(sp_timestamp::InherentDataProvider::from_system_time())
337 },
338 defender: Mutex::new(NaiveEquivocationDefender::default()),
339 telemetry: None,
340 authorities_tracker: AuthoritiesTracker::new(client.clone()),
341 };
342
343 let genesis = client.info().best_hash;
344 let mut sproof = RelayStateSproofBuilder::default();
345 sproof.included_para_head = Some(HeadData(client.header(genesis).unwrap().encode()));
346 sproof.para_id = cumulus_test_client::runtime::PARACHAIN_ID.into();
347
348 let validation_data = PersistedValidationData {
349 relay_parent_number: 1,
350 parent_head: client.header(genesis).unwrap().encode().into(),
351 ..Default::default()
352 };
353
354 let block_builder = client.init_block_builder(Some(validation_data), sproof);
355 let block = block_builder.block_builder.build().unwrap();
356
357 let mut blocks = Vec::new();
358 for _ in 0..EQUIVOCATION_LIMIT + 1 {
359 blocks.push(seal_block(block.block.clone(), &client))
360 }
361
362 assert_eq!(blocks.iter().map(|b| b.hash()).collect::<HashSet<_>>().len(), blocks.len());
365
366 blocks.iter().take(EQUIVOCATION_LIMIT).for_each(|block| {
367 let mut params =
368 BlockImportParams::new(BlockOrigin::NetworkBroadcast, block.header().clone());
369 params.body = Some(block.extrinsics().to_vec());
370 verifier.verify(params).now_or_never().unwrap().unwrap();
371 });
372
373 let extra_blocks =
378 vec![blocks[EQUIVOCATION_LIMIT / 2].clone(), blocks.last().unwrap().clone()];
379
380 extra_blocks.into_iter().for_each(|block| {
381 let mut params =
382 BlockImportParams::new(BlockOrigin::NetworkBroadcast, block.header().clone());
383 params.body = Some(block.extrinsics().to_vec());
384 assert!(verifier
385 .verify(params)
386 .now_or_never()
387 .unwrap()
388 .map(drop)
389 .unwrap_err()
390 .contains("excessive equivocations at slot"));
391
392 let mut params =
394 BlockImportParams::new(BlockOrigin::ConsensusBroadcast, block.header().clone());
395 params.body = Some(block.extrinsics().to_vec());
396 assert!(verifier.verify(params).now_or_never().unwrap().is_ok());
397 });
398 }
399}