referrerpolicy=no-referrer-when-downgrade

pallet_revive_eth_rpc/
receipt_provider.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// 	http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17use crate::{
18	Address, AddressOrAddresses, BlockInfoProvider, BlockNumberOrTag, Bytes, ChainMetadata,
19	ClientError, FilterTopic, ReceiptExtractor, SubxtBlockInfoProvider, SyncLabel, SyncStateKey,
20	block_sync::SyncCheckpoint,
21	client::{SubstrateBlock, SubstrateBlockNumber},
22};
23use pallet_revive::evm::{Filter, Log, ReceiptInfo, TransactionSigned};
24use sp_core::{H256, U256};
25use sqlx::{QueryBuilder, Row, Sqlite, SqlitePool, query};
26use std::{
27	collections::{BTreeMap, HashMap},
28	sync::Arc,
29};
30use tokio::sync::Mutex;
31
32const LOG_TARGET: &str = "eth-rpc::receipt_provider";
33const MAX_LOG_RESULTS: usize = 10_000;
34
35/// Parse a SQLite row from the `logs` table into a [`Log`].
36fn parse_log_row(row: sqlx::sqlite::SqliteRow) -> Result<Log, sqlx::Error> {
37	let block_hash: Vec<u8> = row.try_get("block_hash")?;
38	let transaction_index: i64 = row.try_get("transaction_index")?;
39	let log_index: i64 = row.try_get("log_index")?;
40	let address: Vec<u8> = row.try_get("address")?;
41	let block_number: i64 = row.try_get("block_number")?;
42	let transaction_hash: Vec<u8> = row.try_get("transaction_hash")?;
43	let topic_0: Option<Vec<u8>> = row.try_get("topic_0")?;
44	let topic_1: Option<Vec<u8>> = row.try_get("topic_1")?;
45	let topic_2: Option<Vec<u8>> = row.try_get("topic_2")?;
46	let topic_3: Option<Vec<u8>> = row.try_get("topic_3")?;
47	let data: Option<Vec<u8>> = row.try_get("data")?;
48
49	let topics = [topic_0, topic_1, topic_2, topic_3]
50		.iter()
51		.filter_map(|t| t.as_ref().map(|t| H256::from_slice(t)))
52		.collect::<Vec<_>>();
53
54	Ok(Log {
55		address: Address::from_slice(&address),
56		block_hash: H256::from_slice(&block_hash),
57		block_number: U256::from(block_number as u64),
58		data: data.map(Bytes::from),
59		log_index: U256::from(log_index as u64),
60		topics,
61		transaction_hash: H256::from_slice(&transaction_hash),
62		transaction_index: U256::from(transaction_index as u64),
63		removed: false,
64	})
65}
66
67/// SQLite connection pool with precomputed bulk-insert chunk sizes.
68#[derive(Clone)]
69pub struct DbContext {
70	pool: SqlitePool,
71	/// Max bound parameters per query.
72	max_variable_number: usize,
73	/// Chunk size for bulk INSERT into `transaction_hashes`.
74	tx_insert_chunk_size: usize,
75	/// Chunk size for bulk INSERT into `logs`.
76	log_insert_chunk_size: usize,
77}
78
79impl DbContext {
80	/// Conservative default for `SQLITE_LIMIT_VARIABLE_NUMBER`; SQLite >=3.32 uses 32766.
81	pub const DEFAULT_MAX_VARIABLE_NUMBER: usize = 999;
82	/// Columns in the `transaction_hashes` table.
83	const TX_HASH_COLUMNS: usize = 3;
84	/// Columns in the `logs` table.
85	const LOG_COLUMNS: usize = 11;
86
87	pub fn new(pool: SqlitePool, max_variable_number: usize) -> Self {
88		assert!(
89			max_variable_number >= Self::LOG_COLUMNS,
90			"SQLite max_variable_number ({max_variable_number}) must be >= {}",
91			Self::LOG_COLUMNS
92		);
93		Self {
94			pool,
95			max_variable_number,
96			tx_insert_chunk_size: max_variable_number / Self::TX_HASH_COLUMNS,
97			log_insert_chunk_size: max_variable_number / Self::LOG_COLUMNS,
98		}
99	}
100}
101
102/// ReceiptProvider stores transaction receipts and logs in a SQLite database.
103#[derive(Clone)]
104pub struct ReceiptProvider<B: BlockInfoProvider = SubxtBlockInfoProvider> {
105	/// The database pool.
106	db_ctx: DbContext,
107	/// The block provider used to fetch blocks, and reconstruct receipts.
108	block_provider: B,
109	/// A means to extract receipts from extrinsics.
110	receipt_extractor: ReceiptExtractor,
111	/// When `Some`, old blocks will be pruned.
112	keep_latest_n_blocks: Option<usize>,
113	/// A Map of the latest block numbers to block hashes.
114	block_number_to_hashes: Arc<Mutex<BTreeMap<SubstrateBlockNumber, BlockHashMap>>>,
115}
116
117/// Substrate block to Ethereum block mapping
118#[derive(Clone, Debug, PartialEq, Eq)]
119struct BlockHashMap {
120	substrate_hash: H256,
121	ethereum_hash: H256,
122}
123
124impl BlockHashMap {
125	fn new(substrate_hash: H256, ethereum_hash: H256) -> Self {
126		Self { substrate_hash, ethereum_hash }
127	}
128}
129
130/// Provides information about a block,
131/// This is an abstraction on top of [`SubstrateBlock`] that can't be mocked in tests.
132/// Can be removed once <https://github.com/paritytech/subxt/issues/1883> is fixed.
133pub trait BlockInfo {
134	/// Returns the block hash.
135	fn hash(&self) -> H256;
136	/// Returns the block number.
137	fn number(&self) -> SubstrateBlockNumber;
138}
139
140impl BlockInfo for SubstrateBlock {
141	fn hash(&self) -> H256 {
142		SubstrateBlock::hash(self)
143	}
144	fn number(&self) -> SubstrateBlockNumber {
145		SubstrateBlock::number(self)
146	}
147}
148
149/// Maximum number of entries kept in the block to hash map.
150pub const MAX_CACHED_BLOCKS: usize = 256;
151
152/// Upsert a sync label row, updating only when the existing `block_number`
153/// compares with `$op` against the new value. `$op` must be `"<"` or `">"`.
154macro_rules! upsert_sync_label {
155	($pool:expr, $op:literal, $label:expr, $checkpoint:expr) => {{
156		let label_str = $label.to_string();
157		let block_number = $checkpoint.block_number as i64;
158		let block_hash = $checkpoint.block_hash.map(|h| h.as_bytes().to_vec());
159		query!(
160			"INSERT INTO sync_state (label, block_number, block_hash)
161			VALUES ($1, $2, $3)
162			ON CONFLICT(label) DO UPDATE
163				SET block_number = excluded.block_number, block_hash = excluded.block_hash
164			WHERE sync_state.block_number " +
165				$op + " excluded.block_number
166			",
167			label_str,
168			block_number,
169			block_hash
170		)
171		.execute($pool)
172		.await?;
173	}};
174}
175
176async fn insert_block_mapping<'e, E: sqlx::Executor<'e, Database = Sqlite>>(
177	executor: E,
178	block_map: &BlockHashMap,
179) -> Result<sqlx::sqlite::SqliteQueryResult, sqlx::Error> {
180	let ethereum_hash_ref = block_map.ethereum_hash.as_ref();
181	let substrate_hash_ref = block_map.substrate_hash.as_ref();
182	query!(
183		r#"
184			INSERT OR REPLACE INTO eth_to_substrate_blocks (ethereum_block_hash, substrate_block_hash)
185			VALUES ($1, $2)
186			"#,
187		ethereum_hash_ref,
188		substrate_hash_ref,
189	)
190	.execute(executor)
191	.await
192}
193
194impl<B: BlockInfoProvider> ReceiptProvider<B> {
195	/// Create a new `ReceiptProvider` with the given database URL and block provider.
196	pub async fn new(
197		db_ctx: DbContext,
198		block_provider: B,
199		receipt_extractor: ReceiptExtractor,
200		keep_latest_n_blocks: Option<usize>,
201	) -> Result<Self, ClientError> {
202		sqlx::migrate!()
203			.run(&db_ctx.pool)
204			.await
205			.map_err(|e| sqlx::Error::Migrate(e.into()))?;
206
207		let provider = Self {
208			db_ctx,
209			block_provider,
210			receipt_extractor,
211			keep_latest_n_blocks,
212			block_number_to_hashes: Default::default(),
213		};
214		provider.restore_first_evm_block().await?;
215
216		Ok(provider)
217	}
218
219	/// Returns `true` if the block is before the auto-discovered `first_evm_block`.
220	pub fn is_before_earliest_block(&self, at: &BlockNumberOrTag) -> bool {
221		match at {
222			BlockNumberOrTag::U256(block_number) => {
223				if *block_number > U256::from(u32::MAX) {
224					return false;
225				}
226				self.receipt_extractor.is_before_first_evm_block(block_number.as_u32())
227			},
228			BlockNumberOrTag::BlockTag(_) => false,
229		}
230	}
231
232	/// The auto-discovered first EVM block, or `None` if not yet discovered.
233	pub fn first_evm_block(&self) -> Option<SubstrateBlockNumber> {
234		self.receipt_extractor.first_evm_block()
235	}
236
237	/// Set the auto-discovered first EVM block (in-memory + persisted to DB).
238	pub async fn set_first_evm_block(
239		&self,
240		block_number: SubstrateBlockNumber,
241	) -> Result<(), ClientError> {
242		self.receipt_extractor.set_first_evm_block(block_number);
243		self.set_sync_label(ChainMetadata::FirstEvmBlock, SyncCheckpoint::from_number(block_number))
244			.await
245	}
246
247	/// Restore `first_evm_block` from DB, clearing it if the boundary has shifted.
248	async fn restore_first_evm_block(&self) -> Result<(), ClientError> {
249		let Some(evm_first) =
250			self.get_sync_label(ChainMetadata::FirstEvmBlock).await?.map(|c| c.block_number)
251		else {
252			return Ok(());
253		};
254
255		let has_evm_hash = |block_number: SubstrateBlockNumber| async move {
256			match self.block_provider.block_by_number(block_number).await.ok().flatten() {
257				Some(block) => self
258					.receipt_extractor
259					.get_ethereum_block_hash(&block.hash(), block_number as u64)
260					.await
261					.is_some(),
262				None => false,
263			}
264		};
265
266		// Stale if evm_first no longer has an EVM hash, or its predecessor now does.
267		let current_has_evm = has_evm_hash(evm_first).await;
268		let predecessor_has_evm =
269			if evm_first > 0 { has_evm_hash(evm_first - 1).await } else { false };
270
271		if !current_has_evm || predecessor_has_evm {
272			log::warn!(target: LOG_TARGET,
273				"๐Ÿ—„๏ธ Stored first-evm-block=#{evm_first} is stale \
274				 (has_evm={current_has_evm}, predecessor_has_evm={predecessor_has_evm}), \
275				 clearing.");
276			if let Err(e) = self.delete_sync_label(ChainMetadata::FirstEvmBlock).await {
277				log::error!(target: LOG_TARGET,
278					"๐Ÿ—„๏ธ Failed to clear stale first-evm-block from DB: {e:?}");
279			}
280		} else {
281			self.receipt_extractor.set_first_evm_block(evm_first);
282		}
283		Ok(())
284	}
285
286	// Get block hash and transaction index by transaction hash
287	pub async fn find_transaction(&self, transaction_hash: &H256) -> Option<(H256, usize)> {
288		let transaction_hash_bytes = transaction_hash.as_ref();
289		let result = query!(
290			r#"
291			SELECT block_hash, transaction_index
292			FROM transaction_hashes
293			WHERE transaction_hash = $1
294			"#,
295			transaction_hash_bytes
296		)
297		.fetch_optional(&self.db_ctx.pool)
298		.await
299		.inspect_err(|err| {
300			log::trace!(target: LOG_TARGET,
301				"find_transaction: DB query failed for tx {transaction_hash:?}: {err:?}");
302		})
303		.ok()?
304		.or_else(|| {
305			log::trace!(target: LOG_TARGET,
306				"find_transaction: tx {transaction_hash:?} not found in DB");
307			None
308		})?;
309
310		let block_hash = H256::from_slice(&result.block_hash[..]);
311		let transaction_index = result.transaction_index.try_into().ok()?;
312		Some((block_hash, transaction_index))
313	}
314
315	/// Get the Substrate block hash for the given Ethereum block hash.
316	pub async fn get_substrate_hash(&self, ethereum_block_hash: &H256) -> Option<H256> {
317		let ethereum_hash = ethereum_block_hash.as_ref();
318		let result = query!(
319			r#"
320			SELECT substrate_block_hash
321			FROM eth_to_substrate_blocks
322			WHERE ethereum_block_hash = $1
323			"#,
324			ethereum_hash
325		)
326		.fetch_optional(&self.db_ctx.pool)
327		.await
328		.inspect_err(|e| {
329			log::error!(target: LOG_TARGET, "failed to get block mapping for ethereum block {ethereum_block_hash:?}, err: {e:?}");
330		})
331		.ok()?
332		.or_else(||{
333			log::trace!(target: LOG_TARGET, "No block mapping found for ethereum block: {ethereum_block_hash:?}");
334			None
335		})?;
336
337		log::trace!(target: LOG_TARGET, "Get block mapping ethereum block: {:?} -> substrate block: {ethereum_block_hash:?}", H256::from_slice(&result.substrate_block_hash[..]));
338
339		Some(H256::from_slice(&result.substrate_block_hash[..]))
340	}
341
342	/// Get the Ethereum block hash for the given Substrate block hash.
343	pub async fn get_ethereum_hash(&self, substrate_block_hash: &H256) -> Option<H256> {
344		let substrate_hash = substrate_block_hash.as_ref();
345		let result = query!(
346			r#"
347			SELECT ethereum_block_hash
348			FROM eth_to_substrate_blocks
349			WHERE substrate_block_hash = $1
350			"#,
351			substrate_hash
352		)
353		.fetch_optional(&self.db_ctx.pool)
354		.await
355		.inspect_err(|e| {
356			log::error!(target: LOG_TARGET, "failed to get block mapping for substrate block {substrate_block_hash:?}, err: {e:?}");
357		})
358		.ok()?
359		.or_else(||{
360			log::trace!(target: LOG_TARGET, "No block mapping found for substrate block: {substrate_block_hash:?}");
361			None
362		})?;
363
364		log::trace!(target: LOG_TARGET, "Get block mapping substrate block: {substrate_block_hash:?} -> ethereum block: {:?}", H256::from_slice(&result.ethereum_block_hash[..]));
365
366		Some(H256::from_slice(&result.ethereum_block_hash[..]))
367	}
368
369	/// Deletes older records from the database.
370	async fn remove(&self, block_mappings: &[BlockHashMap]) -> Result<(), ClientError> {
371		if block_mappings.is_empty() {
372			return Ok(());
373		}
374		log::debug!(target: LOG_TARGET, "Removing block hashes: {block_mappings:?}");
375
376		let mut db_tx = self.db_ctx.pool.begin().await?;
377
378		for chunk in block_mappings.chunks(self.db_ctx.max_variable_number) {
379			let placeholders = vec!["?"; chunk.len()].join(", ");
380			let sql_tx =
381				format!("DELETE FROM transaction_hashes WHERE block_hash in ({placeholders})");
382			let sql_logs = format!("DELETE FROM logs WHERE block_hash in ({placeholders})");
383			let sql_mappings = format!(
384				"DELETE FROM eth_to_substrate_blocks WHERE substrate_block_hash in ({placeholders})"
385			);
386
387			let mut delete_tx_query = sqlx::query(&sql_tx);
388			let mut delete_logs_query = sqlx::query(&sql_logs);
389			let mut delete_mappings_query = sqlx::query(&sql_mappings);
390
391			for block_map in chunk {
392				delete_tx_query = delete_tx_query.bind(block_map.substrate_hash.as_ref());
393				delete_logs_query = delete_logs_query.bind(block_map.ethereum_hash.as_ref());
394				delete_mappings_query =
395					delete_mappings_query.bind(block_map.substrate_hash.as_ref());
396			}
397
398			delete_tx_query.execute(&mut *db_tx).await?;
399			delete_logs_query.execute(&mut *db_tx).await?;
400			delete_mappings_query.execute(&mut *db_tx).await?;
401		}
402
403		db_tx.commit().await?;
404		Ok(())
405	}
406
407	/// Read a sync label entry.
408	pub async fn get_sync_label(
409		&self,
410		label: impl SyncStateKey,
411	) -> Result<Option<SyncCheckpoint>, ClientError> {
412		let label_str = label.to_string();
413		let row = query!(
414			r#"
415			SELECT block_number, block_hash
416			FROM sync_state
417			WHERE label = $1
418			"#,
419			label_str
420		)
421		.fetch_optional(&self.db_ctx.pool)
422		.await?;
423
424		match row {
425			Some(row) => {
426				let block_number: SubstrateBlockNumber =
427					row.block_number.try_into().map_err(|_| {
428						sqlx::Error::Decode(
429							format!("block_number {} overflows u32", row.block_number).into(),
430						)
431					})?;
432				Ok(Some(SyncCheckpoint {
433					block_number,
434					block_hash: row
435						.block_hash
436						.filter(|b| b.len() == 32)
437						.map(|b| H256::from_slice(&b)),
438				}))
439			},
440			None => Ok(None),
441		}
442	}
443
444	/// Upsert a sync label entry.
445	pub async fn set_sync_label(
446		&self,
447		label: impl SyncStateKey,
448		checkpoint: SyncCheckpoint,
449	) -> Result<(), ClientError> {
450		let label_str = label.to_string();
451		let block_number = checkpoint.block_number as i64;
452		let block_hash = checkpoint.block_hash.map(|h| h.as_bytes().to_vec());
453		query!(
454			r#"
455			INSERT OR REPLACE INTO sync_state (label, block_number, block_hash)
456			VALUES ($1, $2, $3)
457			"#,
458			label_str,
459			block_number,
460			block_hash,
461		)
462		.execute(&self.db_ctx.pool)
463		.await?;
464		Ok(())
465	}
466
467	/// Delete a sync label entry.
468	pub async fn delete_sync_label(&self, label: impl SyncStateKey) -> Result<(), ClientError> {
469		let label_str = label.to_string();
470		query!(
471			r#"
472			DELETE FROM sync_state WHERE label = $1
473			"#,
474			label_str,
475		)
476		.execute(&self.db_ctx.pool)
477		.await?;
478		Ok(())
479	}
480
481	/// Atomically update a sync label entry only if the new block number is strictly higher.
482	///
483	/// Inserts the row if it doesn't exist yet.
484	pub async fn advance_sync_label(
485		&self,
486		label: SyncLabel,
487		checkpoint: SyncCheckpoint,
488	) -> Result<(), ClientError> {
489		upsert_sync_label!(&self.db_ctx.pool, "<", label, checkpoint);
490		Ok(())
491	}
492
493	/// Atomically update a sync label entry only if the new block number is lower.
494	///
495	/// Inserts the row if it doesn't exist yet.
496	pub async fn recede_sync_label(
497		&self,
498		label: SyncLabel,
499		checkpoint: SyncCheckpoint,
500	) -> Result<(), ClientError> {
501		upsert_sync_label!(&self.db_ctx.pool, ">", label, checkpoint);
502		Ok(())
503	}
504
505	/// Look up the ethereum block hash for a previously processed block from the in-memory cache.
506	pub async fn get_processed_eth_block_hash(
507		&self,
508		block_number: SubstrateBlockNumber,
509		substrate_hash: H256,
510	) -> Option<H256> {
511		self.block_number_to_hashes
512			.lock()
513			.await
514			.get(&block_number)
515			.filter(|entry| entry.substrate_hash == substrate_hash)
516			.map(|entry| entry.ethereum_hash)
517	}
518
519	/// Fetch receipts from the given block, using a pre-fetched ethereum block hash.
520	pub async fn receipts_from_block(
521		&self,
522		block: &SubstrateBlock,
523		ethereum_hash: H256,
524	) -> Result<Vec<(TransactionSigned, ReceiptInfo)>, ClientError> {
525		self.receipt_extractor
526			.extract_from_block_with_eth_hash(block, ethereum_hash)
527			.await
528	}
529
530	/// Like [`Self::insert_block_receipts`] but writes only to the DB (no cache update).
531	/// Used for historic sync where fork detection is unnecessary.
532	pub async fn insert_block_receipts_past(
533		&self,
534		block: &SubstrateBlock,
535		ethereum_hash: &H256,
536	) -> Result<(), ClientError> {
537		let receipts = self
538			.receipt_extractor
539			.extract_from_block_with_eth_hash(block, *ethereum_hash)
540			.await?;
541		self.insert_into_db(block, &receipts, ethereum_hash).await?;
542		Ok(())
543	}
544
545	/// Insert pre-extracted receipts and update the block cache (with fork detection).
546	pub async fn insert_block_receipts(
547		&self,
548		block: &SubstrateBlock,
549		receipts: &[(TransactionSigned, ReceiptInfo)],
550		ethereum_hash: &H256,
551	) -> Result<(), ClientError> {
552		self.insert(block, receipts, ethereum_hash).await
553	}
554
555	/// Insert receipts into the provider, updating the in-memory block cache for fork detection.
556	///
557	/// Note: Can be merged into `insert_block_receipts` once <https://github.com/paritytech/subxt/issues/1883> is fixed and subxt let
558	/// us create Mock `SubstrateBlock`
559	async fn insert(
560		&self,
561		block: &impl BlockInfo,
562		receipts: &[(TransactionSigned, ReceiptInfo)],
563		ethereum_hash: &H256,
564	) -> Result<(), ClientError> {
565		let block_map = BlockHashMap::new(block.hash(), *ethereum_hash);
566		self.prune_blocks(block.number(), &block_map).await?;
567		self.insert_into_db(block, receipts, ethereum_hash).await?;
568		Ok(())
569	}
570
571	/// Handle fork detection (always) and DB pruning (temporary mode only).
572	async fn prune_blocks(
573		&self,
574		block_number: SubstrateBlockNumber,
575		block_map: &BlockHashMap,
576	) -> Result<(), ClientError> {
577		let mut to_remove = Vec::new();
578		let mut block_number_to_hash = self.block_number_to_hashes.lock().await;
579
580		// Fork? - If inserting the same block number with a different hash, remove the old ones.
581		match block_number_to_hash.insert(block_number, block_map.clone()) {
582			Some(old_block_map) if &old_block_map != block_map => {
583				to_remove.push(old_block_map);
584
585				// Now loop through the blocks that were building on top of the old fork and remove
586				// them.
587				let mut next_block_number = block_number.saturating_add(1);
588				while let Some(old_block_map) = block_number_to_hash.remove(&next_block_number) {
589					to_remove.push(old_block_map);
590					next_block_number = next_block_number.saturating_add(1);
591				}
592			},
593			_ => {},
594		}
595
596		if let Some(keep_latest_n_blocks) = self.keep_latest_n_blocks {
597			// If we have more blocks than we should keep, remove the oldest ones by count
598			// (not by block number range, to handle gaps correctly)
599			while block_number_to_hash.len() > keep_latest_n_blocks {
600				// Remove the block with the smallest number (first in BTreeMap)
601				if let Some((_, block_map)) = block_number_to_hash.pop_first() {
602					to_remove.push(block_map);
603				}
604			}
605		} else {
606			// Evict oldest entries to prevent unbounded growth.
607			// Forks deeper than MAX_CACHED_BLOCKS(256) are unlikely.
608			while block_number_to_hash.len() > MAX_CACHED_BLOCKS {
609				block_number_to_hash.pop_first();
610			}
611		}
612
613		// Release the lock.
614		drop(block_number_to_hash);
615
616		if !to_remove.is_empty() {
617			log::trace!(target: LOG_TARGET, "Pruning old blocks: {to_remove:?}");
618			self.remove(&to_remove).await?;
619		}
620
621		Ok(())
622	}
623
624	/// Insert receipts into the database without updating the in-memory block cache.
625	async fn insert_into_db(
626		&self,
627		block: &impl BlockInfo,
628		receipts: &[(TransactionSigned, ReceiptInfo)],
629		ethereum_hash: &H256,
630	) -> Result<(), ClientError> {
631		let block_number = block.number() as i64;
632		let substrate_block_hash = block.hash();
633		let substrate_hash_ref = substrate_block_hash.as_ref();
634		let ethereum_hash_ref = ethereum_hash.as_ref();
635
636		log::trace!(target: LOG_TARGET, "Inserting receipts for block #{block_number} ethereum: {ethereum_hash:?} substrate: {substrate_block_hash:?}");
637
638		// Check if mapping already exists (eg. added when processing best block and we are now
639		// processing finalized block)
640		let result = sqlx::query!(
641			r#"SELECT EXISTS(SELECT 1 FROM eth_to_substrate_blocks WHERE substrate_block_hash = $1) AS "exists!:bool""#, substrate_hash_ref
642		)
643		.fetch_one(&self.db_ctx.pool)
644		.await?;
645
646		// Assuming that if no mapping exists then no relevant entries in transaction_hashes and
647		// logs exist
648		if result.exists {
649			log::trace!(target: LOG_TARGET,
650				"Skipping receipt insert for block #{block_number} ({substrate_block_hash:?}): \
651				 mapping already exists. ETH hash: {ethereum_hash:?}, receipts count: {count}",
652				count = receipts.len(),
653			);
654			return Ok(());
655		}
656
657		let mut db_tx = self.db_ctx.pool.begin().await?;
658
659		for chunk in receipts.chunks(self.db_ctx.tx_insert_chunk_size) {
660			let mut query_builder = QueryBuilder::<Sqlite>::new(
661				"INSERT OR REPLACE INTO transaction_hashes (transaction_hash, block_hash, transaction_index) ",
662			);
663			query_builder.push_values(chunk, |mut row, (_, receipt)| {
664				row.push_bind(receipt.transaction_hash.as_ref() as &[u8])
665					.push_bind(substrate_hash_ref)
666					.push_bind(receipt.transaction_index.as_u32() as i32);
667			});
668			query_builder.build().execute(&mut *db_tx).await?;
669		}
670
671		let all_logs: Vec<(i32, &[u8], &Log)> = receipts
672			.iter()
673			.flat_map(|(_, receipt)| {
674				let tx_index = receipt.transaction_index.as_u32() as i32;
675				let tx_hash: &[u8] = receipt.transaction_hash.as_ref();
676				receipt.logs.iter().map(move |log| (tx_index, tx_hash, log))
677			})
678			.collect();
679
680		for chunk in all_logs.chunks(self.db_ctx.log_insert_chunk_size) {
681			let mut query_builder = QueryBuilder::<Sqlite>::new(
682				"INSERT OR REPLACE INTO logs(block_hash, transaction_index, log_index, address, block_number, transaction_hash, topic_0, topic_1, topic_2, topic_3, data) ",
683			);
684			query_builder.push_values(chunk, |mut row, (tx_index, tx_hash, log)| {
685				row.push_bind(ethereum_hash_ref)
686					.push_bind(*tx_index)
687					.push_bind(log.log_index.as_u32() as i32)
688					.push_bind(log.address.as_ref() as &[u8])
689					.push_bind(block_number)
690					.push_bind(*tx_hash)
691					.push_bind(log.topics.first().map(|v| &v[..]))
692					.push_bind(log.topics.get(1).map(|v| &v[..]))
693					.push_bind(log.topics.get(2).map(|v| &v[..]))
694					.push_bind(log.topics.get(3).map(|v| &v[..]))
695					.push_bind(log.data.as_ref().map(|v| &v.0[..]));
696			});
697			query_builder.build().execute(&mut *db_tx).await?;
698		}
699
700		let block_map = BlockHashMap::new(substrate_block_hash, *ethereum_hash);
701		insert_block_mapping(&mut *db_tx, &block_map).await?;
702
703		db_tx.commit().await?;
704		log::trace!(target: LOG_TARGET, "Inserted {} receipts for block #{block_number} ethereum: {ethereum_hash:?} substrate: {substrate_block_hash:?}", receipts.len());
705
706		Ok(())
707	}
708
709	/// Get logs that match the given filter.
710	///
711	/// `resolve_block_number` converts a [`BlockNumberOrTag`] to a concrete block number.
712	pub async fn logs(
713		&self,
714		filter: Option<Filter>,
715		resolve_block_number: impl Fn(BlockNumberOrTag) -> anyhow::Result<U256>,
716	) -> anyhow::Result<Vec<Log>> {
717		let mut qb = QueryBuilder::<Sqlite>::new("SELECT logs.* FROM logs WHERE 1=1");
718		let filter = filter.unwrap_or_default();
719
720		let from_block = filter.from_block.map(&resolve_block_number).transpose()?;
721		let to_block = filter.to_block.map(&resolve_block_number).transpose()?;
722		let latest_block = U256::from(self.block_provider.latest_block_number().await);
723
724		match (from_block, to_block, filter.block_hash) {
725			(Some(_), _, Some(_)) | (_, Some(_), Some(_)) => {
726				anyhow::bail!("block number and block hash cannot be used together");
727			},
728
729			(Some(block), _, _) | (_, Some(block), _) if block > latest_block => {
730				anyhow::bail!("block number exceeds latest block");
731			},
732			(Some(from_block), Some(to_block), None) if from_block > to_block => {
733				anyhow::bail!("invalid block range params");
734			},
735			(Some(from_block), Some(to_block), None) if from_block == to_block => {
736				qb.push(" AND block_number = ").push_bind(from_block.as_u64() as i64);
737			},
738			(Some(from_block), Some(to_block), None) => {
739				qb.push(" AND block_number BETWEEN ")
740					.push_bind(from_block.as_u64() as i64)
741					.push(" AND ")
742					.push_bind(to_block.as_u64() as i64);
743			},
744			(Some(from_block), None, None) => {
745				qb.push(" AND block_number >= ").push_bind(from_block.as_u64() as i64);
746			},
747			(None, Some(to_block), None) => {
748				qb.push(" AND block_number <= ").push_bind(to_block.as_u64() as i64);
749			},
750			(None, None, Some(hash)) => {
751				qb.push(" AND block_hash = ").push_bind(hash.0.to_vec());
752			},
753			(None, None, None) => {
754				qb.push(" AND block_number = ").push_bind(latest_block.as_u64() as i64);
755			},
756		}
757
758		if let Some(addresses) = filter.address {
759			match addresses {
760				AddressOrAddresses::Address(addr) => {
761					qb.push(" AND address = ").push_bind(addr.0.to_vec());
762				},
763				AddressOrAddresses::Addresses(addrs) => {
764					qb.push(" AND address IN (");
765					let mut separated = qb.separated(", ");
766					for addr in addrs {
767						separated.push_bind(addr.0.to_vec());
768					}
769					separated.push_unseparated(")");
770				},
771			}
772		}
773
774		if let Some(topics) = filter.topics {
775			if topics.len() > 4 {
776				return Err(anyhow::anyhow!("exceed max topics"));
777			}
778
779			for (i, topic) in topics.into_iter().enumerate() {
780				match topic {
781					FilterTopic::Single(hash) => {
782						qb.push(format_args!(" AND topic_{i} = ")).push_bind(hash.0.to_vec());
783					},
784					FilterTopic::Multiple(hashes) => {
785						qb.push(format_args!(" AND topic_{i} IN ("));
786						let mut separated = qb.separated(", ");
787						for hash in hashes {
788							separated.push_bind(hash.0.to_vec());
789						}
790						separated.push_unseparated(")");
791					},
792				}
793			}
794		}
795
796		qb.push(" LIMIT ").push_bind(MAX_LOG_RESULTS as i64);
797
798		let logs = qb.build().try_map(parse_log_row).fetch_all(&self.db_ctx.pool).await?;
799
800		if logs.len() == MAX_LOG_RESULTS {
801			log::warn!(
802				target: LOG_TARGET,
803				"Log query hit limit of {MAX_LOG_RESULTS}; results may be truncated",
804			);
805		}
806
807		Ok(logs)
808	}
809
810	/// Fetch all logs for a given block from the database.
811	pub async fn logs_by_block_number(
812		&self,
813		block_number: SubstrateBlockNumber,
814		ethereum_hash: H256,
815	) -> Result<Vec<Log>, ClientError> {
816		let mut query_builder =
817			QueryBuilder::<Sqlite>::new("SELECT logs.* FROM logs WHERE block_number = ");
818		query_builder
819			.push_bind(block_number as i64)
820			.push(" AND block_hash = ")
821			.push_bind(ethereum_hash.as_bytes().to_vec())
822			.push(" ORDER BY log_index LIMIT ")
823			.push_bind(MAX_LOG_RESULTS as i64);
824
825		let logs = query_builder
826			.build()
827			.try_map(parse_log_row)
828			.fetch_all(&self.db_ctx.pool)
829			.await?;
830
831		if logs.len() == MAX_LOG_RESULTS {
832			log::warn!(
833				target: LOG_TARGET,
834				"Log query for block {block_number} hit limit of {MAX_LOG_RESULTS}; results may be truncated",
835			);
836		}
837
838		Ok(logs)
839	}
840
841	/// Get the number of receipts per block.
842	pub async fn receipts_count_per_block(&self, block_hash: &H256) -> Option<usize> {
843		let block_hash = block_hash.as_ref();
844		let row = query!(
845			r#"
846            SELECT COUNT(*) as count
847            FROM transaction_hashes
848            WHERE block_hash = $1
849            "#,
850			block_hash
851		)
852		.fetch_one(&self.db_ctx.pool)
853		.await
854		.ok()?;
855
856		let count = row.count as usize;
857		Some(count)
858	}
859
860	/// Return all transaction hashes for the given block hash.
861	pub async fn block_transaction_hashes(
862		&self,
863		block_hash: &H256,
864	) -> Option<HashMap<usize, H256>> {
865		let block_hash = block_hash.as_ref();
866		let rows = query!(
867			r#"
868		      SELECT transaction_index, transaction_hash
869		      FROM transaction_hashes
870		      WHERE block_hash = $1
871		      "#,
872			block_hash
873		)
874		.map(|row| {
875			let transaction_index = row.transaction_index as usize;
876			let transaction_hash = H256::from_slice(&row.transaction_hash);
877			(transaction_index, transaction_hash)
878		})
879		.fetch_all(&self.db_ctx.pool)
880		.await
881		.ok()?;
882
883		Some(rows.into_iter().collect())
884	}
885
886	/// Get the receipt for the given block hash and transaction index.
887	pub async fn receipt_by_block_hash_and_index(
888		&self,
889		block_hash: &H256,
890		transaction_index: usize,
891	) -> Option<ReceiptInfo> {
892		let block = self.block_provider.block_by_hash(block_hash).await.ok()??;
893		let (_, receipt) = self
894			.receipt_extractor
895			.extract_from_transaction(&block, transaction_index)
896			.await
897			.ok()?;
898		Some(receipt)
899	}
900
901	/// Get the receipt for the given transaction hash.
902	pub async fn receipt_by_hash(&self, transaction_hash: &H256) -> Option<ReceiptInfo> {
903		let (block_hash, transaction_index) = self.find_transaction(transaction_hash).await?;
904
905		let block = match self.block_provider.block_by_hash(&block_hash).await {
906			Ok(Some(b)) => b,
907			Ok(None) => {
908				log::trace!(target: LOG_TARGET,
909					"receipt_by_hash: block {block_hash:?} not available from node (pruned?) for tx {transaction_hash:?}");
910				return None;
911			},
912			Err(err) => {
913				log::trace!(target: LOG_TARGET,
914					"receipt_by_hash: failed to fetch block {block_hash:?} for tx {transaction_hash:?}: {err:?}");
915				return None;
916			},
917		};
918
919		match self.receipt_extractor.extract_from_transaction(&block, transaction_index).await {
920			Ok((_, receipt)) => Some(receipt),
921			Err(err) => {
922				log::trace!(target: LOG_TARGET,
923					"receipt_by_hash: extraction failed for tx {transaction_hash:?} in block {block_hash:?}: {err:?}");
924				None
925			},
926		}
927	}
928
929	/// Get the signed transaction for the given transaction hash.
930	pub async fn signed_tx_by_hash(&self, transaction_hash: &H256) -> Option<TransactionSigned> {
931		let (block_hash, transaction_index) = self.find_transaction(transaction_hash).await?;
932
933		let block = self.block_provider.block_by_hash(&block_hash).await.ok()??;
934		let (signed_tx, _) = self
935			.receipt_extractor
936			.extract_from_transaction(&block, transaction_index)
937			.await
938			.inspect_err(|err| {
939				log::trace!(target: LOG_TARGET,
940					"signed_tx_by_hash: extraction failed for tx {transaction_hash:?} \
941					 in block {block_hash:?}: {err:?}");
942			})
943			.ok()?;
944		Some(signed_tx)
945	}
946}
947
948#[cfg(test)]
949mod tests {
950	use super::*;
951	use crate::test::{MockBlockInfo, MockBlockInfoProvider};
952	use pallet_revive::evm::{BlockTag, ReceiptInfo, TransactionSigned};
953	use pretty_assertions::assert_eq;
954	use sp_core::{H160, H256};
955	use sqlx::SqlitePool;
956
957	async fn count(pool: &SqlitePool, table: &str, block_hash: Option<H256>) -> usize {
958		let count: i64 = match block_hash {
959			None => {
960				sqlx::query_scalar(&format!("SELECT COUNT(*) FROM {table}"))
961					.fetch_one(pool)
962					.await
963			},
964			Some(hash) => {
965				sqlx::query_scalar(&format!("SELECT COUNT(*) FROM {table} WHERE block_hash = ?"))
966					.bind(hash.as_ref())
967					.fetch_one(pool)
968					.await
969			},
970		}
971		.unwrap();
972
973		count as _
974	}
975
976	fn mock_provider() -> ReceiptProvider<MockBlockInfoProvider> {
977		ReceiptProvider {
978			db_ctx: DbContext::new(
979				SqlitePool::connect_lazy("sqlite::memory:").unwrap(),
980				DbContext::DEFAULT_MAX_VARIABLE_NUMBER,
981			),
982			block_provider: MockBlockInfoProvider {},
983			receipt_extractor: ReceiptExtractor::new_mock(),
984			keep_latest_n_blocks: None,
985			block_number_to_hashes: Default::default(),
986		}
987	}
988
989	/// Test resolver that handles Latest โ†’ `latest` and Earliest โ†’ 0.
990	fn mock_resolve_block_number_with_latest(
991		latest: u64,
992	) -> impl Fn(BlockNumberOrTag) -> anyhow::Result<U256> {
993		move |block: BlockNumberOrTag| match block {
994			BlockNumberOrTag::U256(v) => Ok(v),
995			BlockNumberOrTag::BlockTag(BlockTag::Earliest) => Ok(U256::zero()),
996			BlockNumberOrTag::BlockTag(BlockTag::Latest) => Ok(U256::from(latest)),
997			BlockNumberOrTag::BlockTag(tag) => anyhow::bail!("Unsupported tag: {tag:?}"),
998		}
999	}
1000
1001	impl ReceiptProvider<MockBlockInfoProvider> {
1002		fn with_db_ctx(mut self, db_ctx: DbContext) -> Self {
1003			self.db_ctx = db_ctx;
1004			self
1005		}
1006
1007		fn with_extractor(mut self, extractor: ReceiptExtractor) -> Self {
1008			self.receipt_extractor = extractor;
1009			self
1010		}
1011
1012		fn with_keep_latest(mut self, n: Option<usize>) -> Self {
1013			self.keep_latest_n_blocks = n;
1014			self
1015		}
1016	}
1017
1018	async fn setup_sqlite_provider(pool: SqlitePool) -> ReceiptProvider<MockBlockInfoProvider> {
1019		mock_provider()
1020			.with_db_ctx(DbContext::new(pool, DbContext::DEFAULT_MAX_VARIABLE_NUMBER))
1021			.with_keep_latest(Some(10))
1022	}
1023
1024	#[sqlx::test]
1025	async fn test_insert_remove(pool: SqlitePool) -> anyhow::Result<()> {
1026		let provider = setup_sqlite_provider(pool).await;
1027		let block = MockBlockInfo { hash: H256::default(), number: 0 };
1028		let receipts = vec![(
1029			TransactionSigned::default(),
1030			ReceiptInfo {
1031				logs: vec![Log { block_hash: block.hash, ..Default::default() }],
1032				..Default::default()
1033			},
1034		)];
1035		let ethereum_hash = H256::from([1_u8; 32]);
1036		let block_map = BlockHashMap::new(block.hash(), ethereum_hash);
1037
1038		provider.insert(&block, &receipts, &ethereum_hash).await?;
1039		let row = provider.find_transaction(&receipts[0].1.transaction_hash).await;
1040		assert_eq!(row, Some((block.hash, 0)));
1041		assert_eq!(count(&provider.db_ctx.pool, "transaction_hashes", Some(block.hash())).await, 1);
1042		assert_eq!(count(&provider.db_ctx.pool, "logs", Some(ethereum_hash)).await, 1);
1043
1044		provider.remove(&[block_map]).await?;
1045		assert_eq!(count(&provider.db_ctx.pool, "transaction_hashes", Some(block.hash())).await, 0);
1046		assert_eq!(count(&provider.db_ctx.pool, "logs", Some(ethereum_hash)).await, 0);
1047		Ok(())
1048	}
1049
1050	#[sqlx::test]
1051	async fn test_prune(pool: SqlitePool) -> anyhow::Result<()> {
1052		let provider = setup_sqlite_provider(pool).await;
1053		let n = provider.keep_latest_n_blocks.unwrap();
1054
1055		for i in 0..2 * n {
1056			let block = MockBlockInfo { hash: H256::from([i as u8; 32]), number: i as _ };
1057			let transaction_hash = H256::from([i as u8; 32]);
1058			let receipts = vec![(
1059				TransactionSigned::default(),
1060				ReceiptInfo {
1061					transaction_hash,
1062					logs: vec![Log {
1063						block_hash: block.hash,
1064						transaction_hash,
1065						..Default::default()
1066					}],
1067					..Default::default()
1068				},
1069			)];
1070			let ethereum_hash = H256::from([(i + 1) as u8; 32]);
1071			provider.insert(&block, &receipts, &ethereum_hash).await?;
1072		}
1073		assert_eq!(count(&provider.db_ctx.pool, "transaction_hashes", None).await, n);
1074		assert_eq!(count(&provider.db_ctx.pool, "logs", None).await, n);
1075		assert_eq!(count(&provider.db_ctx.pool, "eth_to_substrate_blocks", None).await, n);
1076		assert_eq!(provider.block_number_to_hashes.lock().await.len(), n);
1077
1078		return Ok(());
1079	}
1080
1081	#[sqlx::test]
1082	async fn test_fork(pool: SqlitePool) -> anyhow::Result<()> {
1083		let provider = setup_sqlite_provider(pool).await;
1084
1085		let build_block = |seed, number| {
1086			let block = MockBlockInfo { hash: H256::from([seed; 32]), number };
1087			let transaction_hash = H256::from([seed; 32]);
1088			let receipts = vec![(
1089				TransactionSigned::default(),
1090				ReceiptInfo {
1091					transaction_hash,
1092					logs: vec![Log {
1093						block_hash: block.hash,
1094						transaction_hash,
1095						..Default::default()
1096					}],
1097					..Default::default()
1098				},
1099			)];
1100			let ethereum_hash = H256::from([seed + 1; 32]);
1101
1102			(block, receipts, ethereum_hash)
1103		};
1104
1105		// Build 4 blocks on consecutive heights: 0,1,2,3.
1106		let (block0, receipts, ethereum_hash_0) = build_block(0, 0);
1107		provider.insert(&block0, &receipts, &ethereum_hash_0).await?;
1108		let (block1, receipts, ethereum_hash_1) = build_block(1, 1);
1109		provider.insert(&block1, &receipts, &ethereum_hash_1).await?;
1110		let (block2, receipts, ethereum_hash_2) = build_block(2, 2);
1111		provider.insert(&block2, &receipts, &ethereum_hash_2).await?;
1112		let (block3, receipts, ethereum_hash_3) = build_block(3, 3);
1113		provider.insert(&block3, &receipts, &ethereum_hash_3).await?;
1114
1115		assert_eq!(count(&provider.db_ctx.pool, "transaction_hashes", None).await, 4);
1116		assert_eq!(count(&provider.db_ctx.pool, "logs", None).await, 4);
1117		assert_eq!(count(&provider.db_ctx.pool, "eth_to_substrate_blocks", None).await, 4);
1118		assert_eq!(
1119			provider.block_number_to_hashes.lock().await.clone(),
1120			[
1121				(0, BlockHashMap::new(block0.hash, ethereum_hash_0)),
1122				(1, BlockHashMap::new(block1.hash, ethereum_hash_1)),
1123				(2, BlockHashMap::new(block2.hash, ethereum_hash_2)),
1124				(3, BlockHashMap::new(block3.hash, ethereum_hash_3))
1125			]
1126			.into(),
1127		);
1128
1129		// Now build another block on height 1.
1130		let (fork_block, receipts, ethereum_hash_fork) = build_block(4, 1);
1131		provider.insert(&fork_block, &receipts, &ethereum_hash_fork).await?;
1132
1133		assert_eq!(count(&provider.db_ctx.pool, "transaction_hashes", None).await, 2);
1134		assert_eq!(count(&provider.db_ctx.pool, "logs", None).await, 2);
1135		assert_eq!(count(&provider.db_ctx.pool, "eth_to_substrate_blocks", None).await, 2);
1136
1137		assert_eq!(
1138			provider.block_number_to_hashes.lock().await.clone(),
1139			[
1140				(0, BlockHashMap::new(block0.hash, ethereum_hash_0)),
1141				(1, BlockHashMap::new(fork_block.hash, ethereum_hash_fork))
1142			]
1143			.into(),
1144		);
1145
1146		return Ok(());
1147	}
1148
1149	#[sqlx::test]
1150	async fn test_reorg_same_transaction_hash(pool: SqlitePool) -> anyhow::Result<()> {
1151		let provider = setup_sqlite_provider(pool).await;
1152
1153		// Build two blocks at the same height with the same transaction hash
1154		let tx_hash = H256::from([42u8; 32]);
1155
1156		// Block A at height 1
1157		let block_a = MockBlockInfo { hash: H256::from([1u8; 32]), number: 1 };
1158		let ethereum_hash_a = H256::from([2u8; 32]);
1159		let receipts_a = vec![(
1160			TransactionSigned::default(),
1161			ReceiptInfo {
1162				transaction_hash: tx_hash,
1163				transaction_index: U256::from(0),
1164				..Default::default()
1165			},
1166		)];
1167
1168		provider.insert(&block_a, &receipts_a, &ethereum_hash_a).await?;
1169
1170		// Verify transaction points to block A
1171		let (found_hash, _) = provider.find_transaction(&tx_hash).await.unwrap();
1172		assert_eq!(found_hash, block_a.hash);
1173
1174		// Clear the in-memory map to simulate server restart
1175		provider.block_number_to_hashes.lock().await.clear();
1176
1177		// Block B at same height 1 (re-org) with SAME transaction
1178		let block_b = MockBlockInfo { hash: H256::from([3u8; 32]), number: 1 };
1179		let ethereum_hash_b = H256::from([4u8; 32]);
1180		let receipts_b = vec![(
1181			TransactionSigned::default(),
1182			ReceiptInfo {
1183				transaction_hash: tx_hash, // Same tx hash!
1184				transaction_index: U256::from(0),
1185				..Default::default()
1186			},
1187		)];
1188
1189		// This should NOT fail with UNIQUE constraint violation
1190		provider.insert(&block_b, &receipts_b, &ethereum_hash_b).await?;
1191
1192		// Transaction should now point to block B
1193		let (found_hash, _) = provider.find_transaction(&tx_hash).await.unwrap();
1194		assert_eq!(found_hash, block_b.hash);
1195
1196		Ok(())
1197	}
1198
1199	#[sqlx::test]
1200	async fn test_receipts_count_per_block(pool: SqlitePool) -> anyhow::Result<()> {
1201		let provider = setup_sqlite_provider(pool).await;
1202		let block = MockBlockInfo { hash: H256::default(), number: 0 };
1203		let receipts = vec![
1204			(
1205				TransactionSigned::default(),
1206				ReceiptInfo { transaction_hash: H256::from([0u8; 32]), ..Default::default() },
1207			),
1208			(
1209				TransactionSigned::default(),
1210				ReceiptInfo { transaction_hash: H256::from([1u8; 32]), ..Default::default() },
1211			),
1212		];
1213		let ethereum_hash = H256::from([2u8; 32]);
1214
1215		provider.insert(&block, &receipts, &ethereum_hash).await?;
1216		let count = provider.receipts_count_per_block(&block.hash).await;
1217		assert_eq!(count, Some(2));
1218		Ok(())
1219	}
1220
1221	#[sqlx::test]
1222	async fn test_query_logs(pool: SqlitePool) -> anyhow::Result<()> {
1223		let provider = setup_sqlite_provider(pool).await;
1224		let block1 = MockBlockInfo { hash: H256::from([1u8; 32]), number: 1 };
1225		let block2 = MockBlockInfo { hash: H256::from([2u8; 32]), number: 2 };
1226		let ethereum_hash1 = H256::from([3u8; 32]);
1227		let ethereum_hash2 = H256::from([4u8; 32]);
1228		let log1 = Log {
1229			block_hash: ethereum_hash1,
1230			block_number: block1.number.into(),
1231			address: H160::from([1u8; 20]),
1232			topics: vec![H256::from([1u8; 32]), H256::from([2u8; 32])],
1233			data: Some(vec![0u8; 32].into()),
1234			transaction_hash: H256::default(),
1235			transaction_index: U256::from(1),
1236			log_index: U256::from(1),
1237			..Default::default()
1238		};
1239		let log2 = Log {
1240			block_hash: ethereum_hash2,
1241			block_number: block2.number.into(),
1242			address: H160::from([2u8; 20]),
1243			topics: vec![H256::from([2u8; 32]), H256::from([3u8; 32])],
1244			transaction_hash: H256::from([1u8; 32]),
1245			transaction_index: U256::from(2),
1246			log_index: U256::from(1),
1247			..Default::default()
1248		};
1249
1250		provider
1251			.insert(
1252				&block1,
1253				&vec![(
1254					TransactionSigned::default(),
1255					ReceiptInfo {
1256						logs: vec![log1.clone()],
1257						transaction_hash: log1.transaction_hash,
1258						transaction_index: log1.transaction_index,
1259						..Default::default()
1260					},
1261				)],
1262				&ethereum_hash1,
1263			)
1264			.await?;
1265		provider
1266			.insert(
1267				&block2,
1268				&vec![(
1269					TransactionSigned::default(),
1270					ReceiptInfo {
1271						logs: vec![log2.clone()],
1272						transaction_hash: log2.transaction_hash,
1273						transaction_index: log2.transaction_index,
1274						..Default::default()
1275					},
1276				)],
1277				&ethereum_hash2,
1278			)
1279			.await?;
1280
1281		let resolve_block_number = mock_resolve_block_number_with_latest(block2.number.into());
1282
1283		// Empty filter
1284		let logs = provider.logs(None, &resolve_block_number).await?;
1285		assert_eq!(logs, vec![log2.clone()]);
1286
1287		// from_block filter
1288		let logs = provider
1289			.logs(
1290				Some(Filter { from_block: Some(log2.block_number.into()), ..Default::default() }),
1291				&resolve_block_number,
1292			)
1293			.await?;
1294		assert_eq!(logs, vec![log2.clone()]);
1295
1296		// from_block filter (using latest block)
1297		let logs = provider
1298			.logs(
1299				Some(Filter { from_block: Some(BlockTag::Latest.into()), ..Default::default() }),
1300				&resolve_block_number,
1301			)
1302			.await?;
1303		assert_eq!(logs, vec![log2.clone()]);
1304
1305		// to_block filter
1306		let logs = provider
1307			.logs(
1308				Some(Filter { to_block: Some(log1.block_number.into()), ..Default::default() }),
1309				&resolve_block_number,
1310			)
1311			.await?;
1312		assert_eq!(logs, vec![log1.clone()]);
1313
1314		// block_hash filter
1315		let logs = provider
1316			.logs(
1317				Some(Filter { block_hash: Some(log1.block_hash), ..Default::default() }),
1318				&resolve_block_number,
1319			)
1320			.await?;
1321		assert_eq!(logs, vec![log1.clone()]);
1322
1323		// single address
1324		let logs = provider
1325			.logs(
1326				Some(Filter {
1327					from_block: Some(BlockTag::Earliest.into()),
1328					address: Some(log1.address.into()),
1329					..Default::default()
1330				}),
1331				&resolve_block_number,
1332			)
1333			.await?;
1334		assert_eq!(logs, vec![log1.clone()]);
1335
1336		// multiple addresses
1337		let logs = provider
1338			.logs(
1339				Some(Filter {
1340					from_block: Some(BlockTag::Earliest.into()),
1341					address: Some(vec![log1.address, log2.address].into()),
1342					..Default::default()
1343				}),
1344				&resolve_block_number,
1345			)
1346			.await?;
1347		assert_eq!(logs, vec![log1.clone(), log2.clone()]);
1348
1349		// single topic
1350		let logs = provider
1351			.logs(
1352				Some(Filter {
1353					from_block: Some(BlockTag::Earliest.into()),
1354					topics: Some(vec![FilterTopic::Single(log1.topics[0])]),
1355					..Default::default()
1356				}),
1357				&resolve_block_number,
1358			)
1359			.await?;
1360		assert_eq!(logs, vec![log1.clone()]);
1361
1362		// multiple topic
1363		let logs = provider
1364			.logs(
1365				Some(Filter {
1366					from_block: Some(BlockTag::Earliest.into()),
1367					topics: Some(vec![
1368						FilterTopic::Single(log1.topics[0]),
1369						FilterTopic::Single(log1.topics[1]),
1370					]),
1371					..Default::default()
1372				}),
1373				&resolve_block_number,
1374			)
1375			.await?;
1376		assert_eq!(logs, vec![log1.clone()]);
1377
1378		// multiple topic for topic_0
1379		let logs = provider
1380			.logs(
1381				Some(Filter {
1382					from_block: Some(BlockTag::Earliest.into()),
1383					topics: Some(vec![FilterTopic::Multiple(vec![log1.topics[0], log2.topics[0]])]),
1384					..Default::default()
1385				}),
1386				&resolve_block_number,
1387			)
1388			.await?;
1389		assert_eq!(logs, vec![log1.clone(), log2.clone()]);
1390
1391		// Altogether
1392		let logs = provider
1393			.logs(
1394				Some(Filter {
1395					from_block: Some(BlockTag::Earliest.into()),
1396					to_block: Some(BlockTag::Latest.into()),
1397					block_hash: None,
1398					address: Some(vec![log1.address, log2.address].into()),
1399					topics: Some(vec![FilterTopic::Multiple(vec![log1.topics[0], log2.topics[0]])]),
1400				}),
1401				&resolve_block_number,
1402			)
1403			.await?;
1404		assert_eq!(logs, vec![log1.clone(), log2.clone()]);
1405		Ok(())
1406	}
1407
1408	#[sqlx::test]
1409	async fn test_block_mapping_insert_get(pool: SqlitePool) -> anyhow::Result<()> {
1410		let provider = setup_sqlite_provider(pool).await;
1411		let ethereum_hash = H256::from([1u8; 32]);
1412		let substrate_hash = H256::from([2u8; 32]);
1413		let block_map = BlockHashMap::new(substrate_hash, ethereum_hash);
1414
1415		// Insert mapping
1416		insert_block_mapping(&provider.db_ctx.pool, &block_map).await?;
1417
1418		// Test forward lookup
1419		let resolved = provider.get_substrate_hash(&ethereum_hash).await;
1420		assert_eq!(resolved, Some(substrate_hash));
1421
1422		// Test reverse lookup
1423		let resolved = provider.get_ethereum_hash(&substrate_hash).await;
1424		assert_eq!(resolved, Some(ethereum_hash));
1425
1426		Ok(())
1427	}
1428
1429	#[sqlx::test]
1430	async fn test_block_mapping_remove(pool: SqlitePool) -> anyhow::Result<()> {
1431		let provider = setup_sqlite_provider(pool).await;
1432		let ethereum_hash1 = H256::from([1u8; 32]);
1433		let ethereum_hash2 = H256::from([2u8; 32]);
1434		let substrate_hash1 = H256::from([3u8; 32]);
1435		let substrate_hash2 = H256::from([4u8; 32]);
1436		let block_map1 = BlockHashMap::new(substrate_hash1, ethereum_hash1);
1437		let block_map2 = BlockHashMap::new(substrate_hash2, ethereum_hash2);
1438
1439		// Insert mappings
1440		insert_block_mapping(&provider.db_ctx.pool, &block_map1).await?;
1441		insert_block_mapping(&provider.db_ctx.pool, &block_map2).await?;
1442
1443		// Verify they exist
1444		assert_eq!(
1445			provider.get_substrate_hash(&block_map1.ethereum_hash).await,
1446			Some(block_map1.substrate_hash)
1447		);
1448		assert_eq!(
1449			provider.get_substrate_hash(&block_map2.ethereum_hash).await,
1450			Some(block_map2.substrate_hash)
1451		);
1452
1453		// Remove one mapping
1454		provider.remove(&[block_map1]).await?;
1455
1456		// Verify removal
1457		assert_eq!(provider.get_substrate_hash(&ethereum_hash1).await, None);
1458		assert_eq!(provider.get_substrate_hash(&ethereum_hash2).await, Some(substrate_hash2));
1459
1460		Ok(())
1461	}
1462
1463	#[sqlx::test]
1464	async fn test_block_mapping_pruning_integration(pool: SqlitePool) -> anyhow::Result<()> {
1465		let provider = setup_sqlite_provider(pool).await;
1466		let ethereum_hash = H256::from([1u8; 32]);
1467		let substrate_hash = H256::from([2u8; 32]);
1468		let block_map = BlockHashMap::new(substrate_hash, ethereum_hash);
1469
1470		// Insert mapping
1471		insert_block_mapping(&provider.db_ctx.pool, &block_map).await?;
1472		assert_eq!(
1473			provider.get_substrate_hash(&block_map.ethereum_hash).await,
1474			Some(block_map.substrate_hash)
1475		);
1476
1477		// Remove substrate block (this should also remove the mapping)
1478		provider.remove(&[block_map.clone()]).await?;
1479
1480		// Mapping should be gone
1481		assert_eq!(provider.get_substrate_hash(&block_map.ethereum_hash).await, None);
1482
1483		Ok(())
1484	}
1485
1486	#[sqlx::test]
1487	async fn test_logs_with_ethereum_block_hash_mapping(pool: SqlitePool) -> anyhow::Result<()> {
1488		let provider = setup_sqlite_provider(pool).await;
1489		let ethereum_hash = H256::from([1u8; 32]);
1490		let substrate_hash = H256::from([2u8; 32]);
1491		let block_number = 1u64;
1492
1493		// Create a log with ethereum hash
1494		let log = Log {
1495			block_hash: ethereum_hash,
1496			block_number: block_number.into(),
1497			address: H160::from([1u8; 20]),
1498			topics: vec![H256::from([1u8; 32])],
1499			transaction_hash: H256::from([3u8; 32]),
1500			transaction_index: U256::from(0),
1501			log_index: U256::from(0),
1502			data: Some(vec![0u8; 32].into()),
1503			..Default::default()
1504		};
1505
1506		// Insert the log
1507		let block = MockBlockInfo { hash: substrate_hash, number: block_number as u32 };
1508		let receipts = vec![(
1509			TransactionSigned::default(),
1510			ReceiptInfo {
1511				logs: vec![log.clone()],
1512				transaction_hash: log.transaction_hash,
1513				transaction_index: log.transaction_index,
1514				..Default::default()
1515			},
1516		)];
1517		provider.insert(&block, &receipts, &ethereum_hash).await?;
1518
1519		// Query logs using Ethereum block hash (should resolve to substrate hash)
1520		let logs = provider
1521			.logs(
1522				Some(Filter { block_hash: Some(ethereum_hash), ..Default::default() }),
1523				mock_resolve_block_number_with_latest(block.number.into()),
1524			)
1525			.await?;
1526		assert_eq!(logs, vec![log]);
1527
1528		Ok(())
1529	}
1530
1531	#[sqlx::test]
1532	async fn test_mapping_count(pool: SqlitePool) -> anyhow::Result<()> {
1533		let provider = setup_sqlite_provider(pool).await;
1534
1535		// Initially no mappings
1536		assert_eq!(count(&provider.db_ctx.pool, "eth_to_substrate_blocks", None).await, 0);
1537
1538		let block_map1 = BlockHashMap::new(H256::from([1u8; 32]), H256::from([2u8; 32]));
1539		let block_map2 = BlockHashMap::new(H256::from([3u8; 32]), H256::from([4u8; 32]));
1540
1541		// Insert some mappings
1542		insert_block_mapping(&provider.db_ctx.pool, &block_map1).await?;
1543		insert_block_mapping(&provider.db_ctx.pool, &block_map2).await?;
1544
1545		assert_eq!(count(&provider.db_ctx.pool, "eth_to_substrate_blocks", None).await, 2);
1546
1547		// Remove one
1548		provider.remove(&[block_map1]).await?;
1549		assert_eq!(count(&provider.db_ctx.pool, "eth_to_substrate_blocks", None).await, 1);
1550
1551		Ok(())
1552	}
1553
1554	#[sqlx::test]
1555	async fn restore_first_evm_block_clears_stale(pool: SqlitePool) -> anyhow::Result<()> {
1556		let provider = setup_sqlite_provider(pool).await;
1557
1558		// Persist first_evm_block = 42.
1559		provider
1560			.set_sync_label(ChainMetadata::FirstEvmBlock, SyncCheckpoint::from_number(42))
1561			.await?;
1562
1563		// MockBlockInfoProvider returns no blocks, so has_evm_hash is always false.
1564		// This means evm_first=42 is stale (no longer has an EVM hash).
1565		provider.restore_first_evm_block().await?;
1566
1567		// The value should have been cleared (not restored to the extractor).
1568		assert_eq!(provider.first_evm_block(), None);
1569
1570		// DB row should have been deleted.
1571		let checkpoint = provider.get_sync_label(ChainMetadata::FirstEvmBlock).await?;
1572		assert!(checkpoint.is_none());
1573		Ok(())
1574	}
1575
1576	#[sqlx::test]
1577	async fn advance_sync_label_only_increases(pool: SqlitePool) -> anyhow::Result<()> {
1578		let provider = setup_sqlite_provider(pool).await;
1579		let hash_a = H256::repeat_byte(0xAA);
1580		let hash_b = H256::repeat_byte(0xBB);
1581
1582		// First insert creates the row.
1583		provider
1584			.advance_sync_label(SyncLabel::Head, SyncCheckpoint::new(100, hash_a))
1585			.await?;
1586		let checkpoint = provider.get_sync_label(SyncLabel::Head).await?.unwrap();
1587		assert_eq!((checkpoint.block_number, checkpoint.block_hash), (100, Some(hash_a)));
1588
1589		// Higher value advances.
1590		provider
1591			.advance_sync_label(SyncLabel::Head, SyncCheckpoint::new(200, hash_b))
1592			.await?;
1593		let checkpoint = provider.get_sync_label(SyncLabel::Head).await?.unwrap();
1594		assert_eq!((checkpoint.block_number, checkpoint.block_hash), (200, Some(hash_b)));
1595
1596		// Lower and equal values are ignored (strict >).
1597		provider
1598			.advance_sync_label(SyncLabel::Head, SyncCheckpoint::new(50, hash_a))
1599			.await?;
1600		provider
1601			.advance_sync_label(SyncLabel::Head, SyncCheckpoint::new(200, hash_a))
1602			.await?;
1603		let checkpoint = provider.get_sync_label(SyncLabel::Head).await?.unwrap();
1604		assert_eq!((checkpoint.block_number, checkpoint.block_hash), (200, Some(hash_b)));
1605
1606		Ok(())
1607	}
1608
1609	#[sqlx::test]
1610	async fn recede_sync_label_only_decreases(pool: SqlitePool) -> anyhow::Result<()> {
1611		let provider = setup_sqlite_provider(pool).await;
1612		let hash_a = H256::repeat_byte(0xAA);
1613		let hash_b = H256::repeat_byte(0xBB);
1614
1615		// First insert creates the row.
1616		provider
1617			.recede_sync_label(SyncLabel::Tail, SyncCheckpoint::new(100, hash_a))
1618			.await?;
1619		let checkpoint = provider.get_sync_label(SyncLabel::Tail).await?.unwrap();
1620		assert_eq!((checkpoint.block_number, checkpoint.block_hash), (100, Some(hash_a)));
1621
1622		// Lower value recedes.
1623		provider
1624			.recede_sync_label(SyncLabel::Tail, SyncCheckpoint::new(50, hash_b))
1625			.await?;
1626		let checkpoint = provider.get_sync_label(SyncLabel::Tail).await?.unwrap();
1627		assert_eq!((checkpoint.block_number, checkpoint.block_hash), (50, Some(hash_b)));
1628
1629		// Higher and equal values are ignored (strict <).
1630		provider
1631			.recede_sync_label(SyncLabel::Tail, SyncCheckpoint::new(200, hash_a))
1632			.await?;
1633		provider
1634			.recede_sync_label(SyncLabel::Tail, SyncCheckpoint::new(50, hash_a))
1635			.await?;
1636		let checkpoint = provider.get_sync_label(SyncLabel::Tail).await?.unwrap();
1637		assert_eq!((checkpoint.block_number, checkpoint.block_hash), (50, Some(hash_b)));
1638
1639		Ok(())
1640	}
1641
1642	#[tokio::test]
1643	async fn is_before_earliest_block_edge_cases() {
1644		// U256 > u32::MAX should never be considered "before floor"
1645		let extractor = ReceiptExtractor::new_mock();
1646		extractor.set_first_evm_block(10);
1647		let provider = mock_provider().with_extractor(extractor);
1648
1649		let huge = BlockNumberOrTag::U256(U256::from(u64::MAX));
1650		assert!(!provider.is_before_earliest_block(&huge));
1651
1652		let just_over = BlockNumberOrTag::U256(U256::from(u32::MAX as u64 + 1));
1653		assert!(!provider.is_before_earliest_block(&just_over));
1654
1655		// Sentinel first_evm_block (u32::MAX) is permissive โ€” no queries rejected.
1656		let provider = mock_provider();
1657		assert!(!provider.is_before_earliest_block(&BlockNumberOrTag::U256(U256::from(0u32))));
1658		assert!(
1659			!provider.is_before_earliest_block(&BlockNumberOrTag::U256(U256::from(1_000_000u32)))
1660		);
1661
1662		// Tag-based queries are never rejected.
1663		assert!(!provider.is_before_earliest_block(&BlockNumberOrTag::BlockTag(
1664			pallet_revive::evm::BlockTag::Latest
1665		)));
1666	}
1667
1668	#[sqlx::test]
1669	async fn persistent_mode_caps_in_memory_map(pool: SqlitePool) -> anyhow::Result<()> {
1670		// Persistent DB mode: keep_latest_n_blocks = None
1671		let provider = mock_provider()
1672			.with_db_ctx(DbContext::new(pool, DbContext::DEFAULT_MAX_VARIABLE_NUMBER));
1673
1674		// Insert more than MAX_CACHED_BLOCKS blocks.
1675		let start_block: u64 = 1;
1676		let n = MAX_CACHED_BLOCKS + 1;
1677		let end_block = start_block + n as u64;
1678		for i in start_block..end_block {
1679			let block = MockBlockInfo { hash: H256::from_low_u64_be(i), number: i as _ };
1680			let receipts = vec![(
1681				TransactionSigned::default(),
1682				ReceiptInfo {
1683					transaction_hash: H256::from_low_u64_be(i),
1684					logs: vec![Log {
1685						block_hash: block.hash,
1686						transaction_hash: H256::from_low_u64_be(i),
1687						..Default::default()
1688					}],
1689					..Default::default()
1690				},
1691			)];
1692			let ethereum_hash = H256::from_low_u64_be(i + 1);
1693			provider.insert(&block, &receipts, &ethereum_hash).await?;
1694		}
1695
1696		// The map is capped at MAX_CACHED_BLOCKS.
1697		let map = provider.block_number_to_hashes.lock().await;
1698		assert_eq!(map.len(), MAX_CACHED_BLOCKS);
1699
1700		// The oldest block (1) should have been evicted, keeping blocks 2..=MAX+1.
1701		assert!(!map.contains_key(&1));
1702		assert!(map.contains_key(&2));
1703		assert!(map.contains_key(&(MAX_CACHED_BLOCKS as u32 + 1)));
1704		drop(map);
1705
1706		// All blocks are still in the DB.
1707		assert_eq!(count(&provider.db_ctx.pool, "eth_to_substrate_blocks", None).await, n);
1708
1709		Ok(())
1710	}
1711
1712	fn make_hash(i: usize, fill: u8) -> H256 {
1713		let mut hash = [fill; 32];
1714		hash[..8].copy_from_slice(&i.to_le_bytes());
1715		H256::from(hash)
1716	}
1717
1718	fn make_receipts(
1719		tx_offset: usize,
1720		n_tx: usize,
1721		n_logs: usize,
1722	) -> Vec<(TransactionSigned, ReceiptInfo)> {
1723		let mut receipts = Vec::with_capacity(n_tx);
1724
1725		for i in 0..n_tx {
1726			let transaction_hash = make_hash(tx_offset + i, 0x00);
1727
1728			let mut logs = Vec::with_capacity(n_logs);
1729			for j in 0..n_logs {
1730				logs.push(Log { transaction_hash, log_index: U256::from(j), ..Default::default() });
1731			}
1732
1733			receipts.push((
1734				TransactionSigned::default(),
1735				ReceiptInfo {
1736					transaction_hash,
1737					transaction_index: U256::from(i),
1738					logs,
1739					..Default::default()
1740				},
1741			));
1742		}
1743
1744		receipts
1745	}
1746
1747	async fn assert_receipts_inserted(
1748		provider: &ReceiptProvider<MockBlockInfoProvider>,
1749		block: &MockBlockInfo,
1750		ethereum_hash: &H256,
1751		receipts: &[(TransactionSigned, ReceiptInfo)],
1752	) {
1753		let mut expected_logs = 0;
1754		for (_, receipt) in receipts {
1755			assert_eq!(
1756				provider.find_transaction(&receipt.transaction_hash).await,
1757				Some((block.hash(), receipt.transaction_index.as_u32() as usize))
1758			);
1759			expected_logs += receipt.logs.len();
1760		}
1761		assert_eq!(count(&provider.db_ctx.pool, "logs", Some(*ethereum_hash)).await, expected_logs);
1762	}
1763
1764	#[sqlx::test]
1765	async fn test_bulk_insert(pool: SqlitePool) -> anyhow::Result<()> {
1766		let provider = setup_sqlite_provider(pool).await.with_keep_latest(None);
1767		let tx_chunk = provider.db_ctx.tx_insert_chunk_size;
1768		let log_chunk = provider.db_ctx.log_insert_chunk_size;
1769
1770		let cases = [
1771			(tx_chunk, 1),             // exact tx chunk boundary
1772			(tx_chunk + 1, log_chunk), // crosses tx boundary; exact log chunk boundary
1773			(1000, 3),                 // multiple tx and log chunks
1774		];
1775
1776		let mut tx_offset = 0;
1777		for (i, (n_tx, n_logs)) in cases.into_iter().enumerate() {
1778			let block = MockBlockInfo { hash: make_hash(i, 0x00), number: i as u32 + 1 };
1779			let ethereum_hash = make_hash(i, 0xff);
1780			let receipts = make_receipts(tx_offset, n_tx, n_logs);
1781			tx_offset += n_tx;
1782			provider.insert(&block, &receipts, &ethereum_hash).await?;
1783			assert_receipts_inserted(&provider, &block, &ethereum_hash, &receipts).await;
1784		}
1785		Ok(())
1786	}
1787
1788	#[sqlx::test]
1789	async fn test_duplicate_insert_succeeds(pool: SqlitePool) -> anyhow::Result<()> {
1790		let provider = setup_sqlite_provider(pool).await.with_keep_latest(None);
1791		let block = MockBlockInfo { hash: make_hash(0, 0xAA), number: 1 };
1792		let ethereum_hash = make_hash(0, 0xBB);
1793		let receipts = make_receipts(0, 5, 3);
1794
1795		// First insert.
1796		provider.insert_into_db(&block, &receipts, &ethereum_hash).await?;
1797		assert_eq!(count(&provider.db_ctx.pool, "transaction_hashes", None).await, 5);
1798		assert_eq!(count(&provider.db_ctx.pool, "logs", Some(ethereum_hash)).await, 15);
1799		assert_eq!(count(&provider.db_ctx.pool, "eth_to_substrate_blocks", None).await, 1);
1800
1801		// Delete only the block mapping so the EXISTS guard won't short-circuit.
1802		sqlx::query("DELETE FROM eth_to_substrate_blocks")
1803			.execute(&provider.db_ctx.pool)
1804			.await?;
1805		assert_eq!(count(&provider.db_ctx.pool, "eth_to_substrate_blocks", None).await, 0);
1806
1807		// Second insert hits the actual INSERT OR REPLACE statements.
1808		provider.insert_into_db(&block, &receipts, &ethereum_hash).await?;
1809
1810		// Row counts unchanged โ€” no duplicates.
1811		assert_eq!(count(&provider.db_ctx.pool, "transaction_hashes", None).await, 5);
1812		assert_eq!(count(&provider.db_ctx.pool, "logs", Some(ethereum_hash)).await, 15);
1813		assert_eq!(count(&provider.db_ctx.pool, "eth_to_substrate_blocks", None).await, 1);
1814
1815		Ok(())
1816	}
1817
1818	#[sqlx::test]
1819	async fn test_insert_empty_receipts(pool: SqlitePool) -> anyhow::Result<()> {
1820		let provider = setup_sqlite_provider(pool).await.with_keep_latest(None);
1821		let block = MockBlockInfo { hash: H256::from([1u8; 32]), number: 1 };
1822		let ethereum_hash = H256::from([2u8; 32]);
1823
1824		provider.insert(&block, &[], &ethereum_hash).await?;
1825
1826		// Block mapping is stored as a deduplication marker.
1827		assert_eq!(count(&provider.db_ctx.pool, "eth_to_substrate_blocks", None).await, 1);
1828		assert_eq!(count(&provider.db_ctx.pool, "transaction_hashes", None).await, 0);
1829		assert_eq!(count(&provider.db_ctx.pool, "logs", None).await, 0);
1830
1831		// Second insert for the same block is a no-op, even with receipts.
1832		let receipts = make_receipts(0, 3, 2);
1833		provider.insert(&block, &receipts, &ethereum_hash).await?;
1834		assert_eq!(count(&provider.db_ctx.pool, "eth_to_substrate_blocks", None).await, 1);
1835		assert_eq!(count(&provider.db_ctx.pool, "transaction_hashes", None).await, 0);
1836		assert_eq!(count(&provider.db_ctx.pool, "logs", None).await, 0);
1837
1838		Ok(())
1839	}
1840
1841	#[sqlx::test]
1842	async fn test_bulk_delete(pool: SqlitePool) -> anyhow::Result<()> {
1843		// Use the smallest valid limit to force chunked INSERTs and DELETEs.
1844		let db_ctx = DbContext::new(pool, DbContext::LOG_COLUMNS);
1845		let provider = mock_provider().with_db_ctx(db_ctx).with_keep_latest(None);
1846
1847		let n_blocks = 25;
1848		let n_tx_per_block = 5;
1849		let n_logs_per_receipt = 3;
1850		let mut block_mappings = Vec::new();
1851
1852		for i in 0..n_blocks {
1853			let block = MockBlockInfo { hash: make_hash(i, 0xAA), number: i as u32 + 1 };
1854			let ethereum_hash = make_hash(i, 0xBB);
1855			let receipts = make_receipts(i * n_tx_per_block, n_tx_per_block, n_logs_per_receipt);
1856			provider.insert_into_db(&block, &receipts, &ethereum_hash).await?;
1857			block_mappings.push(BlockHashMap::new(block.hash, ethereum_hash));
1858		}
1859
1860		assert_eq!(
1861			count(&provider.db_ctx.pool, "transaction_hashes", None).await,
1862			n_blocks * n_tx_per_block
1863		);
1864		assert_eq!(
1865			count(&provider.db_ctx.pool, "logs", None).await,
1866			n_blocks * n_tx_per_block * n_logs_per_receipt
1867		);
1868		assert_eq!(count(&provider.db_ctx.pool, "eth_to_substrate_blocks", None).await, n_blocks);
1869
1870		provider.remove(&block_mappings).await?;
1871
1872		assert_eq!(count(&provider.db_ctx.pool, "transaction_hashes", None).await, 0);
1873		assert_eq!(count(&provider.db_ctx.pool, "logs", None).await, 0);
1874		assert_eq!(count(&provider.db_ctx.pool, "eth_to_substrate_blocks", None).await, 0);
1875
1876		Ok(())
1877	}
1878
1879	#[sqlx::test]
1880	async fn test_get_processed_eth_block_hash(pool: SqlitePool) -> anyhow::Result<()> {
1881		let provider = setup_sqlite_provider(pool).await;
1882		let block = MockBlockInfo { hash: H256::from([0xAA; 32]), number: 10 };
1883		let ethereum_hash = H256::from([0xBB; 32]);
1884		let receipts = vec![(TransactionSigned::default(), ReceiptInfo::default())];
1885
1886		// Not cached yet
1887		assert!(provider.get_processed_eth_block_hash(10, block.hash).await.is_none());
1888
1889		// Insert also populates the in-memory cache
1890		provider.insert(&block, &receipts, &ethereum_hash).await?;
1891		assert_eq!(
1892			provider.get_processed_eth_block_hash(10, block.hash).await,
1893			Some(ethereum_hash)
1894		);
1895
1896		// Wrong hash for same block number
1897		assert!(
1898			provider
1899				.get_processed_eth_block_hash(10, H256::from([0xCC; 32]))
1900				.await
1901				.is_none()
1902		);
1903
1904		// Wrong block number
1905		assert!(provider.get_processed_eth_block_hash(11, block.hash).await.is_none());
1906
1907		Ok(())
1908	}
1909
1910	#[sqlx::test]
1911	async fn test_logs_by_block_number(pool: SqlitePool) -> anyhow::Result<()> {
1912		let provider = setup_sqlite_provider(pool).await;
1913		let substrate_hash = H256::from([0xAA; 32]);
1914		let tx_hash = H256::from([0xBB; 32]);
1915		let block = MockBlockInfo { hash: substrate_hash, number: 42 };
1916		let ethereum_hash = H256::from([0xCC; 32]);
1917
1918		let log0 = Log {
1919			block_hash: ethereum_hash,
1920			block_number: U256::from(42),
1921			transaction_hash: tx_hash,
1922			log_index: U256::from(0),
1923			address: H160::from([0x01; 20]),
1924			..Default::default()
1925		};
1926		let log1 = Log {
1927			block_hash: ethereum_hash,
1928			block_number: U256::from(42),
1929			transaction_hash: tx_hash,
1930			log_index: U256::from(1),
1931			address: H160::from([0x02; 20]),
1932			..Default::default()
1933		};
1934
1935		let receipts = vec![(
1936			TransactionSigned::default(),
1937			ReceiptInfo {
1938				transaction_hash: tx_hash,
1939				block_hash: ethereum_hash,
1940				logs: vec![log0.clone(), log1.clone()],
1941				..Default::default()
1942			},
1943		)];
1944
1945		// No logs before insert
1946		let logs = provider.logs_by_block_number(42, ethereum_hash).await?;
1947		assert!(logs.is_empty());
1948
1949		provider.insert(&block, &receipts, &ethereum_hash).await?;
1950
1951		// Logs returned in log_index order
1952		let logs = provider.logs_by_block_number(42, ethereum_hash).await?;
1953		assert_eq!(logs.len(), 2);
1954		assert_eq!(logs[0].address, log0.address);
1955		assert_eq!(logs[1].address, log1.address);
1956		assert_eq!(logs[0].log_index, U256::from(0));
1957		assert_eq!(logs[1].log_index, U256::from(1));
1958
1959		// Different block number returns empty
1960		let logs = provider.logs_by_block_number(43, ethereum_hash).await?;
1961		assert!(logs.is_empty());
1962
1963		// Wrong ethereum hash returns empty
1964		let logs = provider.logs_by_block_number(42, H256::from([0xDD; 32])).await?;
1965		assert!(logs.is_empty());
1966
1967		Ok(())
1968	}
1969}