referrerpolicy=no-referrer-when-downgrade

pallet_revive_eth_rpc/
receipt_provider.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// 	http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17use crate::{
18	client::{SubstrateBlock, SubstrateBlockNumber},
19	Address, AddressOrAddresses, BlockInfoProvider, BlockNumberOrTag, BlockTag, Bytes, ClientError,
20	FilterTopic, ReceiptExtractor, SubxtBlockInfoProvider,
21};
22use pallet_revive::evm::{Filter, Log, ReceiptInfo, TransactionSigned};
23use sp_core::{H256, U256};
24use sqlx::{query, QueryBuilder, Row, Sqlite, SqlitePool};
25use std::{
26	collections::{BTreeMap, HashMap},
27	sync::Arc,
28};
29use tokio::sync::Mutex;
30
31const LOG_TARGET: &str = "eth-rpc::receipt_provider";
32
33/// ReceiptProvider stores transaction receipts and logs in a SQLite database.
34#[derive(Clone)]
35pub struct ReceiptProvider<B: BlockInfoProvider = SubxtBlockInfoProvider> {
36	/// The database pool.
37	pool: SqlitePool,
38	/// The block provider used to fetch blocks, and reconstruct receipts.
39	block_provider: B,
40	/// A means to extract receipts from extrinsics.
41	receipt_extractor: ReceiptExtractor,
42	/// When `Some`, old blocks will be pruned.
43	keep_latest_n_blocks: Option<usize>,
44	/// A Map of the latest block numbers to block hashes.
45	block_number_to_hashes: Arc<Mutex<BTreeMap<SubstrateBlockNumber, BlockHashMap>>>,
46}
47
48/// Substrate block to Ethereum block mapping
49#[derive(Clone, Debug, PartialEq, Eq)]
50struct BlockHashMap {
51	substrate_hash: H256,
52	ethereum_hash: H256,
53}
54
55impl BlockHashMap {
56	fn new(substrate_hash: H256, ethereum_hash: H256) -> Self {
57		Self { substrate_hash, ethereum_hash }
58	}
59}
60
61/// Provides information about a block,
62/// This is an abstratction on top of [`SubstrateBlock`] that can't be mocked in tests.
63/// Can be removed once <https://github.com/paritytech/subxt/issues/1883> is fixed.
64pub trait BlockInfo {
65	/// Returns the block hash.
66	fn hash(&self) -> H256;
67	/// Returns the block number.
68	fn number(&self) -> SubstrateBlockNumber;
69}
70
71impl BlockInfo for SubstrateBlock {
72	fn hash(&self) -> H256 {
73		SubstrateBlock::hash(self)
74	}
75	fn number(&self) -> SubstrateBlockNumber {
76		SubstrateBlock::number(self)
77	}
78}
79
80impl<B: BlockInfoProvider> ReceiptProvider<B> {
81	/// Create a new `ReceiptProvider` with the given database URL and block provider.
82	pub async fn new(
83		pool: SqlitePool,
84		block_provider: B,
85		receipt_extractor: ReceiptExtractor,
86		keep_latest_n_blocks: Option<usize>,
87	) -> Result<Self, sqlx::Error> {
88		sqlx::migrate!().run(&pool).await?;
89		Ok(Self {
90			pool,
91			block_provider,
92			receipt_extractor,
93			keep_latest_n_blocks,
94			block_number_to_hashes: Default::default(),
95		})
96	}
97
98	// Get block hash and  transaction index by transaction hash
99	pub async fn find_transaction(&self, transaction_hash: &H256) -> Option<(H256, usize)> {
100		let transaction_hash = transaction_hash.as_ref();
101		let result = query!(
102			r#"
103			SELECT block_hash, transaction_index
104			FROM transaction_hashes
105			WHERE transaction_hash = $1
106			"#,
107			transaction_hash
108		)
109		.fetch_optional(&self.pool)
110		.await
111		.ok()??;
112
113		let block_hash = H256::from_slice(&result.block_hash[..]);
114		let transaction_index = result.transaction_index.try_into().ok()?;
115		Some((block_hash, transaction_index))
116	}
117
118	/// Insert a block mapping from Ethereum block hash to Substrate block hash.
119	async fn insert_block_mapping(&self, block_map: &BlockHashMap) -> Result<(), ClientError> {
120		let ethereum_hash_ref = block_map.ethereum_hash.as_ref();
121		let substrate_hash_ref = block_map.substrate_hash.as_ref();
122
123		query!(
124			r#"
125			INSERT OR REPLACE INTO eth_to_substrate_blocks (ethereum_block_hash, substrate_block_hash)
126			VALUES ($1, $2)
127			"#,
128			ethereum_hash_ref,
129			substrate_hash_ref,
130		)
131		.execute(&self.pool)
132		.await?;
133
134		log::trace!(target: LOG_TARGET, "Insert block mapping ethereum block: {:?} -> substrate block: {:?}", block_map.ethereum_hash, block_map.substrate_hash);
135		Ok(())
136	}
137
138	/// Get the Substrate block hash for the given Ethereum block hash.
139	pub async fn get_substrate_hash(&self, ethereum_block_hash: &H256) -> Option<H256> {
140		let ethereum_hash = ethereum_block_hash.as_ref();
141		let result = query!(
142			r#"
143			SELECT substrate_block_hash
144			FROM eth_to_substrate_blocks
145			WHERE ethereum_block_hash = $1
146			"#,
147			ethereum_hash
148		)
149		.fetch_optional(&self.pool)
150		.await
151		.inspect_err(|e| {
152			log::error!(target: LOG_TARGET, "failed to get block mapping for ethereum block {ethereum_block_hash:?}, err: {e:?}");
153		})
154		.ok()?
155		.or_else(||{
156			log::trace!(target: LOG_TARGET, "No block mapping found for ethereum block: {ethereum_block_hash:?}");
157			None
158		})?;
159
160		log::trace!(target: LOG_TARGET, "Get block mapping ethereum block: {:?} -> substrate block: {ethereum_block_hash:?}", H256::from_slice(&result.substrate_block_hash[..]));
161
162		Some(H256::from_slice(&result.substrate_block_hash[..]))
163	}
164
165	/// Get the Ethereum block hash for the given Substrate block hash.
166	pub async fn get_ethereum_hash(&self, substrate_block_hash: &H256) -> Option<H256> {
167		let substrate_hash = substrate_block_hash.as_ref();
168		let result = query!(
169			r#"
170			SELECT ethereum_block_hash
171			FROM eth_to_substrate_blocks
172			WHERE substrate_block_hash = $1
173			"#,
174			substrate_hash
175		)
176		.fetch_optional(&self.pool)
177		.await
178		.inspect_err(|e| {
179			log::error!(target: LOG_TARGET, "failed to get block mapping for substrate block {substrate_block_hash:?}, err: {e:?}");
180		})
181		.ok()?
182		.or_else(||{
183			log::trace!(target: LOG_TARGET, "No block mapping found for substrate block: {substrate_block_hash:?}");
184			None
185		})?;
186
187		log::trace!(target: LOG_TARGET, "Get block mapping substrate block: {substrate_block_hash:?} -> ethereum block: {:?}", H256::from_slice(&result.ethereum_block_hash[..]));
188
189		Some(H256::from_slice(&result.ethereum_block_hash[..]))
190	}
191
192	/// Deletes older records from the database.
193	async fn remove(&self, block_mappings: &[BlockHashMap]) -> Result<(), ClientError> {
194		if block_mappings.is_empty() {
195			return Ok(());
196		}
197		log::debug!(target: LOG_TARGET, "Removing block hashes: {block_mappings:?}");
198
199		let placeholders = vec!["?"; block_mappings.len()].join(", ");
200		let sql = format!("DELETE FROM transaction_hashes WHERE block_hash in ({placeholders})");
201
202		let mut delete_tx_query = sqlx::query(&sql);
203		let sql = format!(
204			"DELETE FROM eth_to_substrate_blocks WHERE substrate_block_hash in ({placeholders})"
205		);
206		let mut delete_mappings_query = sqlx::query(&sql);
207
208		let sql = format!("DELETE FROM logs WHERE block_hash in ({placeholders})");
209		let mut delete_logs_query = sqlx::query(&sql);
210
211		for block_map in block_mappings {
212			delete_tx_query = delete_tx_query.bind(block_map.substrate_hash.as_ref());
213			delete_mappings_query = delete_mappings_query.bind(block_map.substrate_hash.as_ref());
214			// logs table uses  ethereum block hash
215			delete_logs_query = delete_logs_query.bind(block_map.ethereum_hash.as_ref());
216		}
217
218		let delete_transaction_hashes = delete_tx_query.execute(&self.pool);
219		let delete_logs = delete_logs_query.execute(&self.pool);
220		let delete_mappings = delete_mappings_query.execute(&self.pool);
221		tokio::try_join!(delete_transaction_hashes, delete_logs, delete_mappings)?;
222		Ok(())
223	}
224
225	/// Check if the block is before the earliest block.
226	pub fn is_before_earliest_block(&self, at: &BlockNumberOrTag) -> bool {
227		match at {
228			BlockNumberOrTag::U256(block_number) =>
229				self.receipt_extractor.is_before_earliest_block(block_number.as_u32()),
230			BlockNumberOrTag::BlockTag(_) => false,
231		}
232	}
233
234	/// Fetch receipts from the given block.
235	pub async fn receipts_from_block(
236		&self,
237		block: &SubstrateBlock,
238	) -> Result<Vec<(TransactionSigned, ReceiptInfo)>, ClientError> {
239		self.receipt_extractor.extract_from_block(block).await
240	}
241
242	/// Extract and insert receipts from the given block.
243	pub async fn insert_block_receipts(
244		&self,
245		block: &SubstrateBlock,
246		ethereum_hash: &H256,
247	) -> Result<Vec<(TransactionSigned, ReceiptInfo)>, ClientError> {
248		let receipts = self.receipts_from_block(block).await?;
249		self.insert(block, &receipts, ethereum_hash).await?;
250		Ok(receipts)
251	}
252
253	/// Prune blocks older blocks.
254	async fn prune_blocks(
255		&self,
256		block_number: SubstrateBlockNumber,
257		block_map: &BlockHashMap,
258	) -> Result<(), ClientError> {
259		let mut to_remove = Vec::new();
260		let mut block_number_to_hash = self.block_number_to_hashes.lock().await;
261
262		// Fork? - If inserting the same block number with a different hash, remove the old ones.
263		match block_number_to_hash.insert(block_number, block_map.clone()) {
264			Some(old_block_map) if &old_block_map != block_map => {
265				to_remove.push(old_block_map);
266
267				// Now loop through the blocks that were building on top of the old fork and remove
268				// them.
269				let mut next_block_number = block_number.saturating_add(1);
270				while let Some(old_block_map) = block_number_to_hash.remove(&next_block_number) {
271					to_remove.push(old_block_map);
272					next_block_number = next_block_number.saturating_add(1);
273				}
274			},
275			_ => {},
276		}
277
278		if let Some(keep_latest_n_blocks) = self.keep_latest_n_blocks {
279			// If we have more blocks than we should keep, remove the oldest ones by count
280			// (not by block number range, to handle gaps correctly)
281			while block_number_to_hash.len() > keep_latest_n_blocks {
282				// Remove the block with the smallest number (first in BTreeMap)
283				if let Some((_, block_map)) = block_number_to_hash.pop_first() {
284					to_remove.push(block_map);
285				}
286			}
287		}
288
289		// Release the lock.
290		drop(block_number_to_hash);
291
292		if !to_remove.is_empty() {
293			log::trace!(target: LOG_TARGET, "Pruning old blocks: {to_remove:?}");
294			self.remove(&to_remove).await?;
295		}
296
297		Ok(())
298	}
299
300	/// Insert receipts into the provider.
301	///
302	/// Note: Can be merged into `insert_block_receipts` once <https://github.com/paritytech/subxt/issues/1883> is fixed and subxt let
303	/// us create Mock `SubstrateBlock`
304	async fn insert(
305		&self,
306		block: &impl BlockInfo,
307		receipts: &[(TransactionSigned, ReceiptInfo)],
308		ethereum_hash: &H256,
309	) -> Result<(), ClientError> {
310		let substrate_block_hash = block.hash();
311		let substrate_hash_ref = substrate_block_hash.as_ref();
312		let block_number = block.number() as i64;
313		let ethereum_hash_ref = ethereum_hash.as_ref();
314		let block_map = BlockHashMap::new(substrate_block_hash, *ethereum_hash);
315
316		log::trace!(target: LOG_TARGET, "Insert receipts for substrate block #{block_number} {:?}", substrate_block_hash);
317
318		self.prune_blocks(block.number(), &block_map).await?;
319
320		// Check if mapping already exists (eg. added when processing best block and we are now
321		// processing finalized block)
322		let result = sqlx::query!(
323			r#"SELECT EXISTS(SELECT 1 FROM eth_to_substrate_blocks WHERE substrate_block_hash = $1) AS "exists!:bool""#, substrate_hash_ref
324		)
325		.fetch_one(&self.pool)
326		.await?;
327
328		// Assuming that if no mapping exists then no relevant entries in transaction_hashes and
329		// logs exist
330		if !result.exists {
331			for (_, receipt) in receipts {
332				let transaction_hash: &[u8] = receipt.transaction_hash.as_ref();
333				let transaction_index = receipt.transaction_index.as_u32() as i32;
334
335				query!(
336					r#"
337					INSERT INTO transaction_hashes (transaction_hash, block_hash, transaction_index)
338					VALUES ($1, $2, $3)
339					"#,
340					transaction_hash,
341					substrate_hash_ref,
342					transaction_index
343				)
344				.execute(&self.pool)
345				.await?;
346
347				for log in &receipt.logs {
348					let log_index = log.log_index.as_u32() as i32;
349					let address: &[u8] = log.address.as_ref();
350
351					let topic_0 = log.topics.first().as_ref().map(|v| &v[..]);
352					let topic_1 = log.topics.get(1).as_ref().map(|v| &v[..]);
353					let topic_2 = log.topics.get(2).as_ref().map(|v| &v[..]);
354					let topic_3 = log.topics.get(3).as_ref().map(|v| &v[..]);
355					let data = log.data.as_ref().map(|v| &v.0[..]);
356
357					query!(
358						r#"
359						INSERT INTO logs(
360							block_hash,
361							transaction_index,
362							log_index,
363							address,
364							block_number,
365							transaction_hash,
366							topic_0, topic_1, topic_2, topic_3,
367							data)
368						VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
369						"#,
370						ethereum_hash_ref,
371						transaction_index,
372						log_index,
373						address,
374						block_number,
375						transaction_hash,
376						topic_0,
377						topic_1,
378						topic_2,
379						topic_3,
380						data
381					)
382					.execute(&self.pool)
383					.await?;
384				}
385			}
386			// Insert block mapping from Ethereum to Substrate hash
387			self.insert_block_mapping(&block_map).await?;
388		}
389
390		Ok(())
391	}
392
393	/// Get logs that match the given filter.
394	pub async fn logs(&self, filter: Option<Filter>) -> anyhow::Result<Vec<Log>> {
395		let mut qb = QueryBuilder::<Sqlite>::new("SELECT logs.* FROM logs WHERE 1=1");
396		let filter = filter.unwrap_or_default();
397
398		let latest_block = U256::from(self.block_provider.latest_block_number().await);
399
400		let as_block_number = |block_param| match block_param {
401			None => Ok(None),
402			Some(BlockNumberOrTag::U256(v)) => Ok(Some(v)),
403			Some(BlockNumberOrTag::BlockTag(BlockTag::Latest)) => Ok(Some(latest_block)),
404			Some(BlockNumberOrTag::BlockTag(tag)) => anyhow::bail!("Unsupported tag: {tag:?}"),
405		};
406
407		let from_block = as_block_number(filter.from_block)?;
408		let to_block = as_block_number(filter.to_block)?;
409
410		match (from_block, to_block, filter.block_hash) {
411			(Some(_), _, Some(_)) | (_, Some(_), Some(_)) => {
412				anyhow::bail!("block number and block hash cannot be used together");
413			},
414
415			(Some(block), _, _) | (_, Some(block), _) if block > latest_block => {
416				anyhow::bail!("block number exceeds latest block");
417			},
418			(Some(from_block), Some(to_block), None) if from_block > to_block => {
419				anyhow::bail!("invalid block range params");
420			},
421			(Some(from_block), Some(to_block), None) if from_block == to_block => {
422				qb.push(" AND block_number = ").push_bind(from_block.as_u64() as i64);
423			},
424			(Some(from_block), Some(to_block), None) => {
425				qb.push(" AND block_number BETWEEN ")
426					.push_bind(from_block.as_u64() as i64)
427					.push(" AND ")
428					.push_bind(to_block.as_u64() as i64);
429			},
430			(Some(from_block), None, None) => {
431				qb.push(" AND block_number >= ").push_bind(from_block.as_u64() as i64);
432			},
433			(None, Some(to_block), None) => {
434				qb.push(" AND block_number <= ").push_bind(to_block.as_u64() as i64);
435			},
436			(None, None, Some(hash)) => {
437				qb.push(" AND block_hash = ").push_bind(hash.0.to_vec());
438			},
439			(None, None, None) => {
440				qb.push(" AND block_number = ").push_bind(latest_block.as_u64() as i64);
441			},
442		}
443
444		if let Some(addresses) = filter.address {
445			match addresses {
446				AddressOrAddresses::Address(addr) => {
447					qb.push(" AND address = ").push_bind(addr.0.to_vec());
448				},
449				AddressOrAddresses::Addresses(addrs) => {
450					qb.push(" AND address IN (");
451					let mut separated = qb.separated(", ");
452					for addr in addrs {
453						separated.push_bind(addr.0.to_vec());
454					}
455					separated.push_unseparated(")");
456				},
457			}
458		}
459
460		if let Some(topics) = filter.topics {
461			if topics.len() > 4 {
462				return Err(anyhow::anyhow!("exceed max topics"));
463			}
464
465			for (i, topic) in topics.into_iter().enumerate() {
466				match topic {
467					FilterTopic::Single(hash) => {
468						qb.push(format_args!(" AND topic_{i} = ")).push_bind(hash.0.to_vec());
469					},
470					FilterTopic::Multiple(hashes) => {
471						qb.push(format_args!(" AND topic_{i} IN ("));
472						let mut separated = qb.separated(", ");
473						for hash in hashes {
474							separated.push_bind(hash.0.to_vec());
475						}
476						separated.push_unseparated(")");
477					},
478				}
479			}
480		}
481
482		qb.push(" LIMIT 10000");
483
484		let logs = qb
485			.build()
486			.try_map(|row| {
487				let block_hash: Vec<u8> = row.try_get("block_hash")?;
488				let transaction_index: i64 = row.try_get("transaction_index")?;
489				let log_index: i64 = row.try_get("log_index")?;
490				let address: Vec<u8> = row.try_get("address")?;
491				let block_number: i64 = row.try_get("block_number")?;
492				let transaction_hash: Vec<u8> = row.try_get("transaction_hash")?;
493				let topic_0: Option<Vec<u8>> = row.try_get("topic_0")?;
494				let topic_1: Option<Vec<u8>> = row.try_get("topic_1")?;
495				let topic_2: Option<Vec<u8>> = row.try_get("topic_2")?;
496				let topic_3: Option<Vec<u8>> = row.try_get("topic_3")?;
497				let data: Option<Vec<u8>> = row.try_get("data")?;
498
499				let topics = [topic_0, topic_1, topic_2, topic_3]
500					.iter()
501					.filter_map(|t| t.as_ref().map(|t| H256::from_slice(t)))
502					.collect::<Vec<_>>();
503
504				Ok(Log {
505					address: Address::from_slice(&address),
506					block_hash: H256::from_slice(&block_hash),
507					block_number: U256::from(block_number as u64),
508					data: data.map(Bytes::from),
509					log_index: U256::from(log_index as u64),
510					topics,
511					transaction_hash: H256::from_slice(&transaction_hash),
512					transaction_index: U256::from(transaction_index as u64),
513					removed: false,
514				})
515			})
516			.fetch_all(&self.pool)
517			.await?;
518
519		Ok(logs)
520	}
521
522	/// Get the number of receipts per block.
523	pub async fn receipts_count_per_block(&self, block_hash: &H256) -> Option<usize> {
524		let block_hash = block_hash.as_ref();
525		let row = query!(
526			r#"
527            SELECT COUNT(*) as count
528            FROM transaction_hashes
529            WHERE block_hash = $1
530            "#,
531			block_hash
532		)
533		.fetch_one(&self.pool)
534		.await
535		.ok()?;
536
537		let count = row.count as usize;
538		Some(count)
539	}
540
541	/// Return all transaction hashes for the given block hash.
542	pub async fn block_transaction_hashes(
543		&self,
544		block_hash: &H256,
545	) -> Option<HashMap<usize, H256>> {
546		let block_hash = block_hash.as_ref();
547		let rows = query!(
548			r#"
549		      SELECT transaction_index, transaction_hash
550		      FROM transaction_hashes
551		      WHERE block_hash = $1
552		      "#,
553			block_hash
554		)
555		.map(|row| {
556			let transaction_index = row.transaction_index as usize;
557			let transaction_hash = H256::from_slice(&row.transaction_hash);
558			(transaction_index, transaction_hash)
559		})
560		.fetch_all(&self.pool)
561		.await
562		.ok()?;
563
564		Some(rows.into_iter().collect())
565	}
566
567	/// Get the receipt for the given block hash and transaction index.
568	pub async fn receipt_by_block_hash_and_index(
569		&self,
570		block_hash: &H256,
571		transaction_index: usize,
572	) -> Option<ReceiptInfo> {
573		let block = self.block_provider.block_by_hash(block_hash).await.ok()??;
574		let (_, receipt) = self
575			.receipt_extractor
576			.extract_from_transaction(&block, transaction_index)
577			.await
578			.ok()?;
579		Some(receipt)
580	}
581
582	/// Get the receipt for the given transaction hash.
583	pub async fn receipt_by_hash(&self, transaction_hash: &H256) -> Option<ReceiptInfo> {
584		let (block_hash, transaction_index) = self.find_transaction(transaction_hash).await?;
585
586		let block = self.block_provider.block_by_hash(&block_hash).await.ok()??;
587		let (_, receipt) = self
588			.receipt_extractor
589			.extract_from_transaction(&block, transaction_index)
590			.await
591			.ok()?;
592		Some(receipt)
593	}
594
595	/// Get the signed transaction for the given transaction hash.
596	pub async fn signed_tx_by_hash(&self, transaction_hash: &H256) -> Option<TransactionSigned> {
597		let (block_hash, transaction_index) = self.find_transaction(transaction_hash).await?;
598
599		let block = self.block_provider.block_by_hash(&block_hash).await.ok()??;
600		let (signed_tx, _) = self
601			.receipt_extractor
602			.extract_from_transaction(&block, transaction_index)
603			.await
604			.ok()?;
605		Some(signed_tx)
606	}
607}
608
609#[cfg(test)]
610mod tests {
611	use super::*;
612	use crate::test::{MockBlockInfo, MockBlockInfoProvider};
613	use pallet_revive::evm::{ReceiptInfo, TransactionSigned};
614	use pretty_assertions::assert_eq;
615	use sp_core::{H160, H256};
616	use sqlx::SqlitePool;
617
618	async fn count(pool: &SqlitePool, table: &str, block_hash: Option<H256>) -> usize {
619		let count: i64 = match block_hash {
620			None =>
621				sqlx::query_scalar(&format!("SELECT COUNT(*) FROM {table}"))
622					.fetch_one(pool)
623					.await,
624			Some(hash) =>
625				sqlx::query_scalar(&format!("SELECT COUNT(*) FROM {table} WHERE block_hash = ?"))
626					.bind(hash.as_ref())
627					.fetch_one(pool)
628					.await,
629		}
630		.unwrap();
631
632		count as _
633	}
634
635	async fn setup_sqlite_provider(pool: SqlitePool) -> ReceiptProvider<MockBlockInfoProvider> {
636		ReceiptProvider {
637			pool,
638			block_provider: MockBlockInfoProvider {},
639			receipt_extractor: ReceiptExtractor::new_mock(),
640			keep_latest_n_blocks: Some(10),
641			block_number_to_hashes: Default::default(),
642		}
643	}
644
645	#[sqlx::test]
646	async fn test_insert_remove(pool: SqlitePool) -> anyhow::Result<()> {
647		let provider = setup_sqlite_provider(pool).await;
648		let block = MockBlockInfo { hash: H256::default(), number: 0 };
649		let receipts = vec![(
650			TransactionSigned::default(),
651			ReceiptInfo {
652				logs: vec![Log { block_hash: block.hash, ..Default::default() }],
653				..Default::default()
654			},
655		)];
656		let ethereum_hash = H256::from([1_u8; 32]);
657		let block_map = BlockHashMap::new(block.hash(), ethereum_hash);
658
659		provider.insert(&block, &receipts, &ethereum_hash).await?;
660		let row = provider.find_transaction(&receipts[0].1.transaction_hash).await;
661		assert_eq!(row, Some((block.hash, 0)));
662
663		provider.remove(&[block_map]).await?;
664		assert_eq!(count(&provider.pool, "transaction_hashes", Some(block.hash())).await, 0);
665		assert_eq!(count(&provider.pool, "logs", Some(block.hash())).await, 0);
666		Ok(())
667	}
668
669	#[sqlx::test]
670	async fn test_prune(pool: SqlitePool) -> anyhow::Result<()> {
671		let provider = setup_sqlite_provider(pool).await;
672		let n = provider.keep_latest_n_blocks.unwrap();
673
674		for i in 0..2 * n {
675			let block = MockBlockInfo { hash: H256::from([i as u8; 32]), number: i as _ };
676			let transaction_hash = H256::from([i as u8; 32]);
677			let receipts = vec![(
678				TransactionSigned::default(),
679				ReceiptInfo {
680					transaction_hash,
681					logs: vec![Log {
682						block_hash: block.hash,
683						transaction_hash,
684						..Default::default()
685					}],
686					..Default::default()
687				},
688			)];
689			let ethereum_hash = H256::from([(i + 1) as u8; 32]);
690			provider.insert(&block, &receipts, &ethereum_hash).await?;
691		}
692		assert_eq!(count(&provider.pool, "transaction_hashes", None).await, n);
693		assert_eq!(count(&provider.pool, "logs", None).await, n);
694		assert_eq!(count(&provider.pool, "eth_to_substrate_blocks", None).await, n);
695		assert_eq!(provider.block_number_to_hashes.lock().await.len(), n);
696
697		return Ok(());
698	}
699
700	#[sqlx::test]
701	async fn test_fork(pool: SqlitePool) -> anyhow::Result<()> {
702		let provider = setup_sqlite_provider(pool).await;
703
704		let build_block = |seed, number| {
705			let block = MockBlockInfo { hash: H256::from([seed; 32]), number };
706			let transaction_hash = H256::from([seed; 32]);
707			let receipts = vec![(
708				TransactionSigned::default(),
709				ReceiptInfo {
710					transaction_hash,
711					logs: vec![Log {
712						block_hash: block.hash,
713						transaction_hash,
714						..Default::default()
715					}],
716					..Default::default()
717				},
718			)];
719			let ethereum_hash = H256::from([seed + 1; 32]);
720
721			(block, receipts, ethereum_hash)
722		};
723
724		// Build 4 blocks on consecutive heights: 0,1,2,3.
725		let (block0, receipts, ethereum_hash_0) = build_block(0, 0);
726		provider.insert(&block0, &receipts, &ethereum_hash_0).await?;
727		let (block1, receipts, ethereum_hash_1) = build_block(1, 1);
728		provider.insert(&block1, &receipts, &ethereum_hash_1).await?;
729		let (block2, receipts, ethereum_hash_2) = build_block(2, 2);
730		provider.insert(&block2, &receipts, &ethereum_hash_2).await?;
731		let (block3, receipts, ethereum_hash_3) = build_block(3, 3);
732		provider.insert(&block3, &receipts, &ethereum_hash_3).await?;
733
734		assert_eq!(count(&provider.pool, "transaction_hashes", None).await, 4);
735		assert_eq!(count(&provider.pool, "logs", None).await, 4);
736		assert_eq!(count(&provider.pool, "eth_to_substrate_blocks", None).await, 4);
737		assert_eq!(
738			provider.block_number_to_hashes.lock().await.clone(),
739			[
740				(0, BlockHashMap::new(block0.hash, ethereum_hash_0)),
741				(1, BlockHashMap::new(block1.hash, ethereum_hash_1)),
742				(2, BlockHashMap::new(block2.hash, ethereum_hash_2)),
743				(3, BlockHashMap::new(block3.hash, ethereum_hash_3))
744			]
745			.into(),
746		);
747
748		// Now build another block on height 1.
749		let (fork_block, receipts, ethereum_hash_fork) = build_block(4, 1);
750		provider.insert(&fork_block, &receipts, &ethereum_hash_fork).await?;
751
752		assert_eq!(count(&provider.pool, "transaction_hashes", None).await, 2);
753		assert_eq!(count(&provider.pool, "logs", None).await, 2);
754		assert_eq!(count(&provider.pool, "eth_to_substrate_blocks", None).await, 2);
755
756		assert_eq!(
757			provider.block_number_to_hashes.lock().await.clone(),
758			[
759				(0, BlockHashMap::new(block0.hash, ethereum_hash_0)),
760				(1, BlockHashMap::new(fork_block.hash, ethereum_hash_fork))
761			]
762			.into(),
763		);
764
765		return Ok(());
766	}
767
768	#[sqlx::test]
769	async fn test_receipts_count_per_block(pool: SqlitePool) -> anyhow::Result<()> {
770		let provider = setup_sqlite_provider(pool).await;
771		let block = MockBlockInfo { hash: H256::default(), number: 0 };
772		let receipts = vec![
773			(
774				TransactionSigned::default(),
775				ReceiptInfo { transaction_hash: H256::from([0u8; 32]), ..Default::default() },
776			),
777			(
778				TransactionSigned::default(),
779				ReceiptInfo { transaction_hash: H256::from([1u8; 32]), ..Default::default() },
780			),
781		];
782		let ethereum_hash = H256::from([2u8; 32]);
783
784		provider.insert(&block, &receipts, &ethereum_hash).await?;
785		let count = provider.receipts_count_per_block(&block.hash).await;
786		assert_eq!(count, Some(2));
787		Ok(())
788	}
789
790	#[sqlx::test]
791	async fn test_query_logs(pool: SqlitePool) -> anyhow::Result<()> {
792		let provider = setup_sqlite_provider(pool).await;
793		let block1 = MockBlockInfo { hash: H256::from([1u8; 32]), number: 1 };
794		let block2 = MockBlockInfo { hash: H256::from([2u8; 32]), number: 2 };
795		let ethereum_hash1 = H256::from([3u8; 32]);
796		let ethereum_hash2 = H256::from([4u8; 32]);
797		let log1 = Log {
798			block_hash: ethereum_hash1,
799			block_number: block1.number.into(),
800			address: H160::from([1u8; 20]),
801			topics: vec![H256::from([1u8; 32]), H256::from([2u8; 32])],
802			data: Some(vec![0u8; 32].into()),
803			transaction_hash: H256::default(),
804			transaction_index: U256::from(1),
805			log_index: U256::from(1),
806			..Default::default()
807		};
808		let log2 = Log {
809			block_hash: ethereum_hash2,
810			block_number: block2.number.into(),
811			address: H160::from([2u8; 20]),
812			topics: vec![H256::from([2u8; 32]), H256::from([3u8; 32])],
813			transaction_hash: H256::from([1u8; 32]),
814			transaction_index: U256::from(2),
815			log_index: U256::from(1),
816			..Default::default()
817		};
818
819		provider
820			.insert(
821				&block1,
822				&vec![(
823					TransactionSigned::default(),
824					ReceiptInfo {
825						logs: vec![log1.clone()],
826						transaction_hash: log1.transaction_hash,
827						transaction_index: log1.transaction_index,
828						..Default::default()
829					},
830				)],
831				&ethereum_hash1,
832			)
833			.await?;
834		provider
835			.insert(
836				&block2,
837				&vec![(
838					TransactionSigned::default(),
839					ReceiptInfo {
840						logs: vec![log2.clone()],
841						transaction_hash: log2.transaction_hash,
842						transaction_index: log2.transaction_index,
843						..Default::default()
844					},
845				)],
846				&ethereum_hash2,
847			)
848			.await?;
849
850		// Empty filter
851		let logs = provider.logs(None).await?;
852		assert_eq!(logs, vec![log2.clone()]);
853
854		// from_block filter
855		let logs = provider
856			.logs(Some(Filter { from_block: Some(log2.block_number.into()), ..Default::default() }))
857			.await?;
858		assert_eq!(logs, vec![log2.clone()]);
859
860		// from_block filter (using latest block)
861		let logs = provider
862			.logs(Some(Filter { from_block: Some(BlockTag::Latest.into()), ..Default::default() }))
863			.await?;
864		assert_eq!(logs, vec![log2.clone()]);
865
866		// to_block filter
867		let logs = provider
868			.logs(Some(Filter { to_block: Some(log1.block_number.into()), ..Default::default() }))
869			.await?;
870		assert_eq!(logs, vec![log1.clone()]);
871
872		// block_hash filter
873		let logs = provider
874			.logs(Some(Filter { block_hash: Some(log1.block_hash), ..Default::default() }))
875			.await?;
876		assert_eq!(logs, vec![log1.clone()]);
877
878		// single address
879		let logs = provider
880			.logs(Some(Filter {
881				from_block: Some(U256::from(0).into()),
882				address: Some(log1.address.into()),
883				..Default::default()
884			}))
885			.await?;
886		assert_eq!(logs, vec![log1.clone()]);
887
888		// multiple addresses
889		let logs = provider
890			.logs(Some(Filter {
891				from_block: Some(U256::from(0).into()),
892				address: Some(vec![log1.address, log2.address].into()),
893				..Default::default()
894			}))
895			.await?;
896		assert_eq!(logs, vec![log1.clone(), log2.clone()]);
897
898		// single topic
899		let logs = provider
900			.logs(Some(Filter {
901				from_block: Some(U256::from(0).into()),
902				topics: Some(vec![FilterTopic::Single(log1.topics[0])]),
903				..Default::default()
904			}))
905			.await?;
906		assert_eq!(logs, vec![log1.clone()]);
907
908		// multiple topic
909		let logs = provider
910			.logs(Some(Filter {
911				from_block: Some(U256::from(0).into()),
912				topics: Some(vec![
913					FilterTopic::Single(log1.topics[0]),
914					FilterTopic::Single(log1.topics[1]),
915				]),
916				..Default::default()
917			}))
918			.await?;
919		assert_eq!(logs, vec![log1.clone()]);
920
921		// multiple topic for topic_0
922		let logs = provider
923			.logs(Some(Filter {
924				from_block: Some(U256::from(0).into()),
925				topics: Some(vec![FilterTopic::Multiple(vec![log1.topics[0], log2.topics[0]])]),
926				..Default::default()
927			}))
928			.await?;
929		assert_eq!(logs, vec![log1.clone(), log2.clone()]);
930
931		// Altogether
932		let logs = provider
933			.logs(Some(Filter {
934				from_block: Some(log1.block_number.into()),
935				to_block: Some(log2.block_number.into()),
936				block_hash: None,
937				address: Some(vec![log1.address, log2.address].into()),
938				topics: Some(vec![FilterTopic::Multiple(vec![log1.topics[0], log2.topics[0]])]),
939			}))
940			.await?;
941		assert_eq!(logs, vec![log1.clone(), log2.clone()]);
942		Ok(())
943	}
944
945	#[sqlx::test]
946	async fn test_block_mapping_insert_get(pool: SqlitePool) -> anyhow::Result<()> {
947		let provider = setup_sqlite_provider(pool).await;
948		let ethereum_hash = H256::from([1u8; 32]);
949		let substrate_hash = H256::from([2u8; 32]);
950		let block_map = BlockHashMap::new(substrate_hash, ethereum_hash);
951
952		// Insert mapping
953		provider.insert_block_mapping(&block_map).await?;
954
955		// Test forward lookup
956		let resolved = provider.get_substrate_hash(&ethereum_hash).await;
957		assert_eq!(resolved, Some(substrate_hash));
958
959		// Test reverse lookup
960		let resolved = provider.get_ethereum_hash(&substrate_hash).await;
961		assert_eq!(resolved, Some(ethereum_hash));
962
963		Ok(())
964	}
965
966	#[sqlx::test]
967	async fn test_block_mapping_remove(pool: SqlitePool) -> anyhow::Result<()> {
968		let provider = setup_sqlite_provider(pool).await;
969		let ethereum_hash1 = H256::from([1u8; 32]);
970		let ethereum_hash2 = H256::from([2u8; 32]);
971		let substrate_hash1 = H256::from([3u8; 32]);
972		let substrate_hash2 = H256::from([4u8; 32]);
973		let block_map1 = BlockHashMap::new(substrate_hash1, ethereum_hash1);
974		let block_map2 = BlockHashMap::new(substrate_hash2, ethereum_hash2);
975
976		// Insert mappings
977		provider.insert_block_mapping(&block_map1).await?;
978		provider.insert_block_mapping(&block_map2).await?;
979
980		// Verify they exist
981		assert_eq!(
982			provider.get_substrate_hash(&block_map1.ethereum_hash).await,
983			Some(block_map1.substrate_hash)
984		);
985		assert_eq!(
986			provider.get_substrate_hash(&block_map2.ethereum_hash).await,
987			Some(block_map2.substrate_hash)
988		);
989
990		// Remove one mapping
991		provider.remove(&[block_map1]).await?;
992
993		// Verify removal
994		assert_eq!(provider.get_substrate_hash(&ethereum_hash1).await, None);
995		assert_eq!(provider.get_substrate_hash(&ethereum_hash2).await, Some(substrate_hash2));
996
997		Ok(())
998	}
999
1000	#[sqlx::test]
1001	async fn test_block_mapping_pruning_integration(pool: SqlitePool) -> anyhow::Result<()> {
1002		let provider = setup_sqlite_provider(pool).await;
1003		let ethereum_hash = H256::from([1u8; 32]);
1004		let substrate_hash = H256::from([2u8; 32]);
1005		let block_map = BlockHashMap::new(substrate_hash, ethereum_hash);
1006
1007		// Insert mapping
1008		provider.insert_block_mapping(&block_map).await?;
1009		assert_eq!(
1010			provider.get_substrate_hash(&block_map.ethereum_hash).await,
1011			Some(block_map.substrate_hash)
1012		);
1013
1014		// Remove substrate block (this should also remove the mapping)
1015		provider.remove(&[block_map.clone()]).await?;
1016
1017		// Mapping should be gone
1018		assert_eq!(provider.get_substrate_hash(&block_map.ethereum_hash).await, None);
1019
1020		Ok(())
1021	}
1022
1023	#[sqlx::test]
1024	async fn test_logs_with_ethereum_block_hash_mapping(pool: SqlitePool) -> anyhow::Result<()> {
1025		let provider = setup_sqlite_provider(pool).await;
1026		let ethereum_hash = H256::from([1u8; 32]);
1027		let substrate_hash = H256::from([2u8; 32]);
1028		let block_number = 1u64;
1029
1030		// Create a log with ethereum hash
1031		let log = Log {
1032			block_hash: ethereum_hash,
1033			block_number: block_number.into(),
1034			address: H160::from([1u8; 20]),
1035			topics: vec![H256::from([1u8; 32])],
1036			transaction_hash: H256::from([3u8; 32]),
1037			transaction_index: U256::from(0),
1038			log_index: U256::from(0),
1039			data: Some(vec![0u8; 32].into()),
1040			..Default::default()
1041		};
1042
1043		// Insert the log
1044		let block = MockBlockInfo { hash: substrate_hash, number: block_number as u32 };
1045		let receipts = vec![(
1046			TransactionSigned::default(),
1047			ReceiptInfo {
1048				logs: vec![log.clone()],
1049				transaction_hash: log.transaction_hash,
1050				transaction_index: log.transaction_index,
1051				..Default::default()
1052			},
1053		)];
1054		provider.insert(&block, &receipts, &ethereum_hash).await?;
1055
1056		// Query logs using Ethereum block hash (should resolve to substrate hash)
1057		let logs = provider
1058			.logs(Some(Filter { block_hash: Some(ethereum_hash), ..Default::default() }))
1059			.await?;
1060		assert_eq!(logs, vec![log]);
1061
1062		Ok(())
1063	}
1064
1065	#[sqlx::test]
1066	async fn test_mapping_count(pool: SqlitePool) -> anyhow::Result<()> {
1067		let provider = setup_sqlite_provider(pool).await;
1068
1069		// Initially no mappings
1070		assert_eq!(count(&provider.pool, "eth_to_substrate_blocks", None).await, 0);
1071
1072		let block_map1 = BlockHashMap::new(H256::from([1u8; 32]), H256::from([2u8; 32]));
1073		let block_map2 = BlockHashMap::new(H256::from([3u8; 32]), H256::from([4u8; 32]));
1074
1075		// Insert some mappings
1076		provider.insert_block_mapping(&block_map1).await?;
1077		provider.insert_block_mapping(&block_map2).await?;
1078
1079		assert_eq!(count(&provider.pool, "eth_to_substrate_blocks", None).await, 2);
1080
1081		// Remove one
1082		provider.remove(&[block_map1]).await?;
1083		assert_eq!(count(&provider.pool, "eth_to_substrate_blocks", None).await, 1);
1084
1085		Ok(())
1086	}
1087}