1use crate::{
18 Address, AddressOrAddresses, BlockInfoProvider, BlockNumberOrTag, Bytes, ChainMetadata,
19 ClientError, FilterTopic, ReceiptExtractor, SubxtBlockInfoProvider, SyncLabel, SyncStateKey,
20 block_sync::SyncCheckpoint,
21 client::{SubstrateBlock, SubstrateBlockNumber},
22};
23use pallet_revive::evm::{Filter, Log, ReceiptInfo, TransactionSigned};
24use sp_core::{H256, U256};
25use sqlx::{QueryBuilder, Row, Sqlite, SqlitePool, query};
26use std::{
27 collections::{BTreeMap, HashMap},
28 sync::Arc,
29};
30use tokio::sync::Mutex;
31
32const LOG_TARGET: &str = "eth-rpc::receipt_provider";
33const MAX_LOG_RESULTS: usize = 10_000;
34
35fn parse_log_row(row: sqlx::sqlite::SqliteRow) -> Result<Log, sqlx::Error> {
37 let block_hash: Vec<u8> = row.try_get("block_hash")?;
38 let transaction_index: i64 = row.try_get("transaction_index")?;
39 let log_index: i64 = row.try_get("log_index")?;
40 let address: Vec<u8> = row.try_get("address")?;
41 let block_number: i64 = row.try_get("block_number")?;
42 let transaction_hash: Vec<u8> = row.try_get("transaction_hash")?;
43 let topic_0: Option<Vec<u8>> = row.try_get("topic_0")?;
44 let topic_1: Option<Vec<u8>> = row.try_get("topic_1")?;
45 let topic_2: Option<Vec<u8>> = row.try_get("topic_2")?;
46 let topic_3: Option<Vec<u8>> = row.try_get("topic_3")?;
47 let data: Option<Vec<u8>> = row.try_get("data")?;
48
49 let topics = [topic_0, topic_1, topic_2, topic_3]
50 .iter()
51 .filter_map(|t| t.as_ref().map(|t| H256::from_slice(t)))
52 .collect::<Vec<_>>();
53
54 Ok(Log {
55 address: Address::from_slice(&address),
56 block_hash: H256::from_slice(&block_hash),
57 block_number: U256::from(block_number as u64),
58 data: data.map(Bytes::from),
59 log_index: U256::from(log_index as u64),
60 topics,
61 transaction_hash: H256::from_slice(&transaction_hash),
62 transaction_index: U256::from(transaction_index as u64),
63 removed: false,
64 })
65}
66
67#[derive(Clone)]
69pub struct DbContext {
70 pool: SqlitePool,
71 max_variable_number: usize,
73 tx_insert_chunk_size: usize,
75 log_insert_chunk_size: usize,
77}
78
79impl DbContext {
80 pub const DEFAULT_MAX_VARIABLE_NUMBER: usize = 999;
82 const TX_HASH_COLUMNS: usize = 3;
84 const LOG_COLUMNS: usize = 11;
86
87 pub fn new(pool: SqlitePool, max_variable_number: usize) -> Self {
88 assert!(
89 max_variable_number >= Self::LOG_COLUMNS,
90 "SQLite max_variable_number ({max_variable_number}) must be >= {}",
91 Self::LOG_COLUMNS
92 );
93 Self {
94 pool,
95 max_variable_number,
96 tx_insert_chunk_size: max_variable_number / Self::TX_HASH_COLUMNS,
97 log_insert_chunk_size: max_variable_number / Self::LOG_COLUMNS,
98 }
99 }
100}
101
102#[derive(Clone)]
104pub struct ReceiptProvider<B: BlockInfoProvider = SubxtBlockInfoProvider> {
105 db_ctx: DbContext,
107 block_provider: B,
109 receipt_extractor: ReceiptExtractor,
111 keep_latest_n_blocks: Option<usize>,
113 block_number_to_hashes: Arc<Mutex<BTreeMap<SubstrateBlockNumber, BlockHashMap>>>,
115}
116
117#[derive(Clone, Debug, PartialEq, Eq)]
119struct BlockHashMap {
120 substrate_hash: H256,
121 ethereum_hash: H256,
122}
123
124impl BlockHashMap {
125 fn new(substrate_hash: H256, ethereum_hash: H256) -> Self {
126 Self { substrate_hash, ethereum_hash }
127 }
128}
129
130pub trait BlockInfo {
134 fn hash(&self) -> H256;
136 fn number(&self) -> SubstrateBlockNumber;
138}
139
140impl BlockInfo for SubstrateBlock {
141 fn hash(&self) -> H256 {
142 SubstrateBlock::hash(self)
143 }
144 fn number(&self) -> SubstrateBlockNumber {
145 SubstrateBlock::number(self)
146 }
147}
148
149pub const MAX_CACHED_BLOCKS: usize = 256;
151
152macro_rules! upsert_sync_label {
155 ($pool:expr, $op:literal, $label:expr, $checkpoint:expr) => {{
156 let label_str = $label.to_string();
157 let block_number = $checkpoint.block_number as i64;
158 let block_hash = $checkpoint.block_hash.map(|h| h.as_bytes().to_vec());
159 query!(
160 "INSERT INTO sync_state (label, block_number, block_hash)
161 VALUES ($1, $2, $3)
162 ON CONFLICT(label) DO UPDATE
163 SET block_number = excluded.block_number, block_hash = excluded.block_hash
164 WHERE sync_state.block_number " +
165 $op + " excluded.block_number
166 ",
167 label_str,
168 block_number,
169 block_hash
170 )
171 .execute($pool)
172 .await?;
173 }};
174}
175
176async fn insert_block_mapping<'e, E: sqlx::Executor<'e, Database = Sqlite>>(
177 executor: E,
178 block_map: &BlockHashMap,
179) -> Result<sqlx::sqlite::SqliteQueryResult, sqlx::Error> {
180 let ethereum_hash_ref = block_map.ethereum_hash.as_ref();
181 let substrate_hash_ref = block_map.substrate_hash.as_ref();
182 query!(
183 r#"
184 INSERT OR REPLACE INTO eth_to_substrate_blocks (ethereum_block_hash, substrate_block_hash)
185 VALUES ($1, $2)
186 "#,
187 ethereum_hash_ref,
188 substrate_hash_ref,
189 )
190 .execute(executor)
191 .await
192}
193
194impl<B: BlockInfoProvider> ReceiptProvider<B> {
195 pub async fn new(
197 db_ctx: DbContext,
198 block_provider: B,
199 receipt_extractor: ReceiptExtractor,
200 keep_latest_n_blocks: Option<usize>,
201 ) -> Result<Self, ClientError> {
202 sqlx::migrate!()
203 .run(&db_ctx.pool)
204 .await
205 .map_err(|e| sqlx::Error::Migrate(e.into()))?;
206
207 let provider = Self {
208 db_ctx,
209 block_provider,
210 receipt_extractor,
211 keep_latest_n_blocks,
212 block_number_to_hashes: Default::default(),
213 };
214 provider.restore_first_evm_block().await?;
215
216 Ok(provider)
217 }
218
219 pub fn is_before_earliest_block(&self, at: &BlockNumberOrTag) -> bool {
221 match at {
222 BlockNumberOrTag::U256(block_number) => {
223 if *block_number > U256::from(u32::MAX) {
224 return false;
225 }
226 self.receipt_extractor.is_before_first_evm_block(block_number.as_u32())
227 },
228 BlockNumberOrTag::BlockTag(_) => false,
229 }
230 }
231
232 pub fn first_evm_block(&self) -> Option<SubstrateBlockNumber> {
234 self.receipt_extractor.first_evm_block()
235 }
236
237 pub async fn set_first_evm_block(
239 &self,
240 block_number: SubstrateBlockNumber,
241 ) -> Result<(), ClientError> {
242 self.receipt_extractor.set_first_evm_block(block_number);
243 self.set_sync_label(ChainMetadata::FirstEvmBlock, SyncCheckpoint::from_number(block_number))
244 .await
245 }
246
247 async fn restore_first_evm_block(&self) -> Result<(), ClientError> {
249 let Some(evm_first) =
250 self.get_sync_label(ChainMetadata::FirstEvmBlock).await?.map(|c| c.block_number)
251 else {
252 return Ok(());
253 };
254
255 let has_evm_hash = |block_number: SubstrateBlockNumber| async move {
256 match self.block_provider.block_by_number(block_number).await.ok().flatten() {
257 Some(block) => self
258 .receipt_extractor
259 .get_ethereum_block_hash(&block.hash(), block_number as u64)
260 .await
261 .is_some(),
262 None => false,
263 }
264 };
265
266 let current_has_evm = has_evm_hash(evm_first).await;
268 let predecessor_has_evm =
269 if evm_first > 0 { has_evm_hash(evm_first - 1).await } else { false };
270
271 if !current_has_evm || predecessor_has_evm {
272 log::warn!(target: LOG_TARGET,
273 "๐๏ธ Stored first-evm-block=#{evm_first} is stale \
274 (has_evm={current_has_evm}, predecessor_has_evm={predecessor_has_evm}), \
275 clearing.");
276 if let Err(e) = self.delete_sync_label(ChainMetadata::FirstEvmBlock).await {
277 log::error!(target: LOG_TARGET,
278 "๐๏ธ Failed to clear stale first-evm-block from DB: {e:?}");
279 }
280 } else {
281 self.receipt_extractor.set_first_evm_block(evm_first);
282 }
283 Ok(())
284 }
285
286 pub async fn find_transaction(&self, transaction_hash: &H256) -> Option<(H256, usize)> {
288 let transaction_hash_bytes = transaction_hash.as_ref();
289 let result = query!(
290 r#"
291 SELECT block_hash, transaction_index
292 FROM transaction_hashes
293 WHERE transaction_hash = $1
294 "#,
295 transaction_hash_bytes
296 )
297 .fetch_optional(&self.db_ctx.pool)
298 .await
299 .inspect_err(|err| {
300 log::trace!(target: LOG_TARGET,
301 "find_transaction: DB query failed for tx {transaction_hash:?}: {err:?}");
302 })
303 .ok()?
304 .or_else(|| {
305 log::trace!(target: LOG_TARGET,
306 "find_transaction: tx {transaction_hash:?} not found in DB");
307 None
308 })?;
309
310 let block_hash = H256::from_slice(&result.block_hash[..]);
311 let transaction_index = result.transaction_index.try_into().ok()?;
312 Some((block_hash, transaction_index))
313 }
314
315 pub async fn get_substrate_hash(&self, ethereum_block_hash: &H256) -> Option<H256> {
317 let ethereum_hash = ethereum_block_hash.as_ref();
318 let result = query!(
319 r#"
320 SELECT substrate_block_hash
321 FROM eth_to_substrate_blocks
322 WHERE ethereum_block_hash = $1
323 "#,
324 ethereum_hash
325 )
326 .fetch_optional(&self.db_ctx.pool)
327 .await
328 .inspect_err(|e| {
329 log::error!(target: LOG_TARGET, "failed to get block mapping for ethereum block {ethereum_block_hash:?}, err: {e:?}");
330 })
331 .ok()?
332 .or_else(||{
333 log::trace!(target: LOG_TARGET, "No block mapping found for ethereum block: {ethereum_block_hash:?}");
334 None
335 })?;
336
337 log::trace!(target: LOG_TARGET, "Get block mapping ethereum block: {:?} -> substrate block: {ethereum_block_hash:?}", H256::from_slice(&result.substrate_block_hash[..]));
338
339 Some(H256::from_slice(&result.substrate_block_hash[..]))
340 }
341
342 pub async fn get_ethereum_hash(&self, substrate_block_hash: &H256) -> Option<H256> {
344 let substrate_hash = substrate_block_hash.as_ref();
345 let result = query!(
346 r#"
347 SELECT ethereum_block_hash
348 FROM eth_to_substrate_blocks
349 WHERE substrate_block_hash = $1
350 "#,
351 substrate_hash
352 )
353 .fetch_optional(&self.db_ctx.pool)
354 .await
355 .inspect_err(|e| {
356 log::error!(target: LOG_TARGET, "failed to get block mapping for substrate block {substrate_block_hash:?}, err: {e:?}");
357 })
358 .ok()?
359 .or_else(||{
360 log::trace!(target: LOG_TARGET, "No block mapping found for substrate block: {substrate_block_hash:?}");
361 None
362 })?;
363
364 log::trace!(target: LOG_TARGET, "Get block mapping substrate block: {substrate_block_hash:?} -> ethereum block: {:?}", H256::from_slice(&result.ethereum_block_hash[..]));
365
366 Some(H256::from_slice(&result.ethereum_block_hash[..]))
367 }
368
369 async fn remove(&self, block_mappings: &[BlockHashMap]) -> Result<(), ClientError> {
371 if block_mappings.is_empty() {
372 return Ok(());
373 }
374 log::debug!(target: LOG_TARGET, "Removing block hashes: {block_mappings:?}");
375
376 let mut db_tx = self.db_ctx.pool.begin().await?;
377
378 for chunk in block_mappings.chunks(self.db_ctx.max_variable_number) {
379 let placeholders = vec!["?"; chunk.len()].join(", ");
380 let sql_tx =
381 format!("DELETE FROM transaction_hashes WHERE block_hash in ({placeholders})");
382 let sql_logs = format!("DELETE FROM logs WHERE block_hash in ({placeholders})");
383 let sql_mappings = format!(
384 "DELETE FROM eth_to_substrate_blocks WHERE substrate_block_hash in ({placeholders})"
385 );
386
387 let mut delete_tx_query = sqlx::query(&sql_tx);
388 let mut delete_logs_query = sqlx::query(&sql_logs);
389 let mut delete_mappings_query = sqlx::query(&sql_mappings);
390
391 for block_map in chunk {
392 delete_tx_query = delete_tx_query.bind(block_map.substrate_hash.as_ref());
393 delete_logs_query = delete_logs_query.bind(block_map.ethereum_hash.as_ref());
394 delete_mappings_query =
395 delete_mappings_query.bind(block_map.substrate_hash.as_ref());
396 }
397
398 delete_tx_query.execute(&mut *db_tx).await?;
399 delete_logs_query.execute(&mut *db_tx).await?;
400 delete_mappings_query.execute(&mut *db_tx).await?;
401 }
402
403 db_tx.commit().await?;
404 Ok(())
405 }
406
407 pub async fn get_sync_label(
409 &self,
410 label: impl SyncStateKey,
411 ) -> Result<Option<SyncCheckpoint>, ClientError> {
412 let label_str = label.to_string();
413 let row = query!(
414 r#"
415 SELECT block_number, block_hash
416 FROM sync_state
417 WHERE label = $1
418 "#,
419 label_str
420 )
421 .fetch_optional(&self.db_ctx.pool)
422 .await?;
423
424 match row {
425 Some(row) => {
426 let block_number: SubstrateBlockNumber =
427 row.block_number.try_into().map_err(|_| {
428 sqlx::Error::Decode(
429 format!("block_number {} overflows u32", row.block_number).into(),
430 )
431 })?;
432 Ok(Some(SyncCheckpoint {
433 block_number,
434 block_hash: row
435 .block_hash
436 .filter(|b| b.len() == 32)
437 .map(|b| H256::from_slice(&b)),
438 }))
439 },
440 None => Ok(None),
441 }
442 }
443
444 pub async fn set_sync_label(
446 &self,
447 label: impl SyncStateKey,
448 checkpoint: SyncCheckpoint,
449 ) -> Result<(), ClientError> {
450 let label_str = label.to_string();
451 let block_number = checkpoint.block_number as i64;
452 let block_hash = checkpoint.block_hash.map(|h| h.as_bytes().to_vec());
453 query!(
454 r#"
455 INSERT OR REPLACE INTO sync_state (label, block_number, block_hash)
456 VALUES ($1, $2, $3)
457 "#,
458 label_str,
459 block_number,
460 block_hash,
461 )
462 .execute(&self.db_ctx.pool)
463 .await?;
464 Ok(())
465 }
466
467 pub async fn delete_sync_label(&self, label: impl SyncStateKey) -> Result<(), ClientError> {
469 let label_str = label.to_string();
470 query!(
471 r#"
472 DELETE FROM sync_state WHERE label = $1
473 "#,
474 label_str,
475 )
476 .execute(&self.db_ctx.pool)
477 .await?;
478 Ok(())
479 }
480
481 pub async fn advance_sync_label(
485 &self,
486 label: SyncLabel,
487 checkpoint: SyncCheckpoint,
488 ) -> Result<(), ClientError> {
489 upsert_sync_label!(&self.db_ctx.pool, "<", label, checkpoint);
490 Ok(())
491 }
492
493 pub async fn recede_sync_label(
497 &self,
498 label: SyncLabel,
499 checkpoint: SyncCheckpoint,
500 ) -> Result<(), ClientError> {
501 upsert_sync_label!(&self.db_ctx.pool, ">", label, checkpoint);
502 Ok(())
503 }
504
505 pub async fn get_processed_eth_block_hash(
507 &self,
508 block_number: SubstrateBlockNumber,
509 substrate_hash: H256,
510 ) -> Option<H256> {
511 self.block_number_to_hashes
512 .lock()
513 .await
514 .get(&block_number)
515 .filter(|entry| entry.substrate_hash == substrate_hash)
516 .map(|entry| entry.ethereum_hash)
517 }
518
519 pub async fn receipts_from_block(
521 &self,
522 block: &SubstrateBlock,
523 ethereum_hash: H256,
524 ) -> Result<Vec<(TransactionSigned, ReceiptInfo)>, ClientError> {
525 self.receipt_extractor
526 .extract_from_block_with_eth_hash(block, ethereum_hash)
527 .await
528 }
529
530 pub async fn insert_block_receipts_past(
533 &self,
534 block: &SubstrateBlock,
535 ethereum_hash: &H256,
536 ) -> Result<(), ClientError> {
537 let receipts = self
538 .receipt_extractor
539 .extract_from_block_with_eth_hash(block, *ethereum_hash)
540 .await?;
541 self.insert_into_db(block, &receipts, ethereum_hash).await?;
542 Ok(())
543 }
544
545 pub async fn insert_block_receipts(
547 &self,
548 block: &SubstrateBlock,
549 receipts: &[(TransactionSigned, ReceiptInfo)],
550 ethereum_hash: &H256,
551 ) -> Result<(), ClientError> {
552 self.insert(block, receipts, ethereum_hash).await
553 }
554
555 async fn insert(
560 &self,
561 block: &impl BlockInfo,
562 receipts: &[(TransactionSigned, ReceiptInfo)],
563 ethereum_hash: &H256,
564 ) -> Result<(), ClientError> {
565 let block_map = BlockHashMap::new(block.hash(), *ethereum_hash);
566 self.prune_blocks(block.number(), &block_map).await?;
567 self.insert_into_db(block, receipts, ethereum_hash).await?;
568 Ok(())
569 }
570
571 async fn prune_blocks(
573 &self,
574 block_number: SubstrateBlockNumber,
575 block_map: &BlockHashMap,
576 ) -> Result<(), ClientError> {
577 let mut to_remove = Vec::new();
578 let mut block_number_to_hash = self.block_number_to_hashes.lock().await;
579
580 match block_number_to_hash.insert(block_number, block_map.clone()) {
582 Some(old_block_map) if &old_block_map != block_map => {
583 to_remove.push(old_block_map);
584
585 let mut next_block_number = block_number.saturating_add(1);
588 while let Some(old_block_map) = block_number_to_hash.remove(&next_block_number) {
589 to_remove.push(old_block_map);
590 next_block_number = next_block_number.saturating_add(1);
591 }
592 },
593 _ => {},
594 }
595
596 if let Some(keep_latest_n_blocks) = self.keep_latest_n_blocks {
597 while block_number_to_hash.len() > keep_latest_n_blocks {
600 if let Some((_, block_map)) = block_number_to_hash.pop_first() {
602 to_remove.push(block_map);
603 }
604 }
605 } else {
606 while block_number_to_hash.len() > MAX_CACHED_BLOCKS {
609 block_number_to_hash.pop_first();
610 }
611 }
612
613 drop(block_number_to_hash);
615
616 if !to_remove.is_empty() {
617 log::trace!(target: LOG_TARGET, "Pruning old blocks: {to_remove:?}");
618 self.remove(&to_remove).await?;
619 }
620
621 Ok(())
622 }
623
624 async fn insert_into_db(
626 &self,
627 block: &impl BlockInfo,
628 receipts: &[(TransactionSigned, ReceiptInfo)],
629 ethereum_hash: &H256,
630 ) -> Result<(), ClientError> {
631 let block_number = block.number() as i64;
632 let substrate_block_hash = block.hash();
633 let substrate_hash_ref = substrate_block_hash.as_ref();
634 let ethereum_hash_ref = ethereum_hash.as_ref();
635
636 log::trace!(target: LOG_TARGET, "Inserting receipts for block #{block_number} ethereum: {ethereum_hash:?} substrate: {substrate_block_hash:?}");
637
638 let result = sqlx::query!(
641 r#"SELECT EXISTS(SELECT 1 FROM eth_to_substrate_blocks WHERE substrate_block_hash = $1) AS "exists!:bool""#, substrate_hash_ref
642 )
643 .fetch_one(&self.db_ctx.pool)
644 .await?;
645
646 if result.exists {
649 log::trace!(target: LOG_TARGET,
650 "Skipping receipt insert for block #{block_number} ({substrate_block_hash:?}): \
651 mapping already exists. ETH hash: {ethereum_hash:?}, receipts count: {count}",
652 count = receipts.len(),
653 );
654 return Ok(());
655 }
656
657 let mut db_tx = self.db_ctx.pool.begin().await?;
658
659 for chunk in receipts.chunks(self.db_ctx.tx_insert_chunk_size) {
660 let mut query_builder = QueryBuilder::<Sqlite>::new(
661 "INSERT OR REPLACE INTO transaction_hashes (transaction_hash, block_hash, transaction_index) ",
662 );
663 query_builder.push_values(chunk, |mut row, (_, receipt)| {
664 row.push_bind(receipt.transaction_hash.as_ref() as &[u8])
665 .push_bind(substrate_hash_ref)
666 .push_bind(receipt.transaction_index.as_u32() as i32);
667 });
668 query_builder.build().execute(&mut *db_tx).await?;
669 }
670
671 let all_logs: Vec<(i32, &[u8], &Log)> = receipts
672 .iter()
673 .flat_map(|(_, receipt)| {
674 let tx_index = receipt.transaction_index.as_u32() as i32;
675 let tx_hash: &[u8] = receipt.transaction_hash.as_ref();
676 receipt.logs.iter().map(move |log| (tx_index, tx_hash, log))
677 })
678 .collect();
679
680 for chunk in all_logs.chunks(self.db_ctx.log_insert_chunk_size) {
681 let mut query_builder = QueryBuilder::<Sqlite>::new(
682 "INSERT OR REPLACE INTO logs(block_hash, transaction_index, log_index, address, block_number, transaction_hash, topic_0, topic_1, topic_2, topic_3, data) ",
683 );
684 query_builder.push_values(chunk, |mut row, (tx_index, tx_hash, log)| {
685 row.push_bind(ethereum_hash_ref)
686 .push_bind(*tx_index)
687 .push_bind(log.log_index.as_u32() as i32)
688 .push_bind(log.address.as_ref() as &[u8])
689 .push_bind(block_number)
690 .push_bind(*tx_hash)
691 .push_bind(log.topics.first().map(|v| &v[..]))
692 .push_bind(log.topics.get(1).map(|v| &v[..]))
693 .push_bind(log.topics.get(2).map(|v| &v[..]))
694 .push_bind(log.topics.get(3).map(|v| &v[..]))
695 .push_bind(log.data.as_ref().map(|v| &v.0[..]));
696 });
697 query_builder.build().execute(&mut *db_tx).await?;
698 }
699
700 let block_map = BlockHashMap::new(substrate_block_hash, *ethereum_hash);
701 insert_block_mapping(&mut *db_tx, &block_map).await?;
702
703 db_tx.commit().await?;
704 log::trace!(target: LOG_TARGET, "Inserted {} receipts for block #{block_number} ethereum: {ethereum_hash:?} substrate: {substrate_block_hash:?}", receipts.len());
705
706 Ok(())
707 }
708
709 pub async fn logs(
713 &self,
714 filter: Option<Filter>,
715 resolve_block_number: impl Fn(BlockNumberOrTag) -> anyhow::Result<U256>,
716 ) -> anyhow::Result<Vec<Log>> {
717 let mut qb = QueryBuilder::<Sqlite>::new("SELECT logs.* FROM logs WHERE 1=1");
718 let filter = filter.unwrap_or_default();
719
720 let from_block = filter.from_block.map(&resolve_block_number).transpose()?;
721 let to_block = filter.to_block.map(&resolve_block_number).transpose()?;
722 let latest_block = U256::from(self.block_provider.latest_block_number().await);
723
724 match (from_block, to_block, filter.block_hash) {
725 (Some(_), _, Some(_)) | (_, Some(_), Some(_)) => {
726 anyhow::bail!("block number and block hash cannot be used together");
727 },
728
729 (Some(block), _, _) | (_, Some(block), _) if block > latest_block => {
730 anyhow::bail!("block number exceeds latest block");
731 },
732 (Some(from_block), Some(to_block), None) if from_block > to_block => {
733 anyhow::bail!("invalid block range params");
734 },
735 (Some(from_block), Some(to_block), None) if from_block == to_block => {
736 qb.push(" AND block_number = ").push_bind(from_block.as_u64() as i64);
737 },
738 (Some(from_block), Some(to_block), None) => {
739 qb.push(" AND block_number BETWEEN ")
740 .push_bind(from_block.as_u64() as i64)
741 .push(" AND ")
742 .push_bind(to_block.as_u64() as i64);
743 },
744 (Some(from_block), None, None) => {
745 qb.push(" AND block_number >= ").push_bind(from_block.as_u64() as i64);
746 },
747 (None, Some(to_block), None) => {
748 qb.push(" AND block_number <= ").push_bind(to_block.as_u64() as i64);
749 },
750 (None, None, Some(hash)) => {
751 qb.push(" AND block_hash = ").push_bind(hash.0.to_vec());
752 },
753 (None, None, None) => {
754 qb.push(" AND block_number = ").push_bind(latest_block.as_u64() as i64);
755 },
756 }
757
758 if let Some(addresses) = filter.address {
759 match addresses {
760 AddressOrAddresses::Address(addr) => {
761 qb.push(" AND address = ").push_bind(addr.0.to_vec());
762 },
763 AddressOrAddresses::Addresses(addrs) => {
764 qb.push(" AND address IN (");
765 let mut separated = qb.separated(", ");
766 for addr in addrs {
767 separated.push_bind(addr.0.to_vec());
768 }
769 separated.push_unseparated(")");
770 },
771 }
772 }
773
774 if let Some(topics) = filter.topics {
775 if topics.len() > 4 {
776 return Err(anyhow::anyhow!("exceed max topics"));
777 }
778
779 for (i, topic) in topics.into_iter().enumerate() {
780 match topic {
781 FilterTopic::Single(hash) => {
782 qb.push(format_args!(" AND topic_{i} = ")).push_bind(hash.0.to_vec());
783 },
784 FilterTopic::Multiple(hashes) => {
785 qb.push(format_args!(" AND topic_{i} IN ("));
786 let mut separated = qb.separated(", ");
787 for hash in hashes {
788 separated.push_bind(hash.0.to_vec());
789 }
790 separated.push_unseparated(")");
791 },
792 }
793 }
794 }
795
796 qb.push(" LIMIT ").push_bind(MAX_LOG_RESULTS as i64);
797
798 let logs = qb.build().try_map(parse_log_row).fetch_all(&self.db_ctx.pool).await?;
799
800 if logs.len() == MAX_LOG_RESULTS {
801 log::warn!(
802 target: LOG_TARGET,
803 "Log query hit limit of {MAX_LOG_RESULTS}; results may be truncated",
804 );
805 }
806
807 Ok(logs)
808 }
809
810 pub async fn logs_by_block_number(
812 &self,
813 block_number: SubstrateBlockNumber,
814 ethereum_hash: H256,
815 ) -> Result<Vec<Log>, ClientError> {
816 let mut query_builder =
817 QueryBuilder::<Sqlite>::new("SELECT logs.* FROM logs WHERE block_number = ");
818 query_builder
819 .push_bind(block_number as i64)
820 .push(" AND block_hash = ")
821 .push_bind(ethereum_hash.as_bytes().to_vec())
822 .push(" ORDER BY log_index LIMIT ")
823 .push_bind(MAX_LOG_RESULTS as i64);
824
825 let logs = query_builder
826 .build()
827 .try_map(parse_log_row)
828 .fetch_all(&self.db_ctx.pool)
829 .await?;
830
831 if logs.len() == MAX_LOG_RESULTS {
832 log::warn!(
833 target: LOG_TARGET,
834 "Log query for block {block_number} hit limit of {MAX_LOG_RESULTS}; results may be truncated",
835 );
836 }
837
838 Ok(logs)
839 }
840
841 pub async fn receipts_count_per_block(&self, block_hash: &H256) -> Option<usize> {
843 let block_hash = block_hash.as_ref();
844 let row = query!(
845 r#"
846 SELECT COUNT(*) as count
847 FROM transaction_hashes
848 WHERE block_hash = $1
849 "#,
850 block_hash
851 )
852 .fetch_one(&self.db_ctx.pool)
853 .await
854 .ok()?;
855
856 let count = row.count as usize;
857 Some(count)
858 }
859
860 pub async fn block_transaction_hashes(
862 &self,
863 block_hash: &H256,
864 ) -> Option<HashMap<usize, H256>> {
865 let block_hash = block_hash.as_ref();
866 let rows = query!(
867 r#"
868 SELECT transaction_index, transaction_hash
869 FROM transaction_hashes
870 WHERE block_hash = $1
871 "#,
872 block_hash
873 )
874 .map(|row| {
875 let transaction_index = row.transaction_index as usize;
876 let transaction_hash = H256::from_slice(&row.transaction_hash);
877 (transaction_index, transaction_hash)
878 })
879 .fetch_all(&self.db_ctx.pool)
880 .await
881 .ok()?;
882
883 Some(rows.into_iter().collect())
884 }
885
886 pub async fn receipt_by_block_hash_and_index(
888 &self,
889 block_hash: &H256,
890 transaction_index: usize,
891 ) -> Option<ReceiptInfo> {
892 let block = self.block_provider.block_by_hash(block_hash).await.ok()??;
893 let (_, receipt) = self
894 .receipt_extractor
895 .extract_from_transaction(&block, transaction_index)
896 .await
897 .ok()?;
898 Some(receipt)
899 }
900
901 pub async fn receipt_by_hash(&self, transaction_hash: &H256) -> Option<ReceiptInfo> {
903 let (block_hash, transaction_index) = self.find_transaction(transaction_hash).await?;
904
905 let block = match self.block_provider.block_by_hash(&block_hash).await {
906 Ok(Some(b)) => b,
907 Ok(None) => {
908 log::trace!(target: LOG_TARGET,
909 "receipt_by_hash: block {block_hash:?} not available from node (pruned?) for tx {transaction_hash:?}");
910 return None;
911 },
912 Err(err) => {
913 log::trace!(target: LOG_TARGET,
914 "receipt_by_hash: failed to fetch block {block_hash:?} for tx {transaction_hash:?}: {err:?}");
915 return None;
916 },
917 };
918
919 match self.receipt_extractor.extract_from_transaction(&block, transaction_index).await {
920 Ok((_, receipt)) => Some(receipt),
921 Err(err) => {
922 log::trace!(target: LOG_TARGET,
923 "receipt_by_hash: extraction failed for tx {transaction_hash:?} in block {block_hash:?}: {err:?}");
924 None
925 },
926 }
927 }
928
929 pub async fn signed_tx_by_hash(&self, transaction_hash: &H256) -> Option<TransactionSigned> {
931 let (block_hash, transaction_index) = self.find_transaction(transaction_hash).await?;
932
933 let block = self.block_provider.block_by_hash(&block_hash).await.ok()??;
934 let (signed_tx, _) = self
935 .receipt_extractor
936 .extract_from_transaction(&block, transaction_index)
937 .await
938 .inspect_err(|err| {
939 log::trace!(target: LOG_TARGET,
940 "signed_tx_by_hash: extraction failed for tx {transaction_hash:?} \
941 in block {block_hash:?}: {err:?}");
942 })
943 .ok()?;
944 Some(signed_tx)
945 }
946}
947
948#[cfg(test)]
949mod tests {
950 use super::*;
951 use crate::test::{MockBlockInfo, MockBlockInfoProvider};
952 use pallet_revive::evm::{BlockTag, ReceiptInfo, TransactionSigned};
953 use pretty_assertions::assert_eq;
954 use sp_core::{H160, H256};
955 use sqlx::SqlitePool;
956
957 async fn count(pool: &SqlitePool, table: &str, block_hash: Option<H256>) -> usize {
958 let count: i64 = match block_hash {
959 None => {
960 sqlx::query_scalar(&format!("SELECT COUNT(*) FROM {table}"))
961 .fetch_one(pool)
962 .await
963 },
964 Some(hash) => {
965 sqlx::query_scalar(&format!("SELECT COUNT(*) FROM {table} WHERE block_hash = ?"))
966 .bind(hash.as_ref())
967 .fetch_one(pool)
968 .await
969 },
970 }
971 .unwrap();
972
973 count as _
974 }
975
976 fn mock_provider() -> ReceiptProvider<MockBlockInfoProvider> {
977 ReceiptProvider {
978 db_ctx: DbContext::new(
979 SqlitePool::connect_lazy("sqlite::memory:").unwrap(),
980 DbContext::DEFAULT_MAX_VARIABLE_NUMBER,
981 ),
982 block_provider: MockBlockInfoProvider {},
983 receipt_extractor: ReceiptExtractor::new_mock(),
984 keep_latest_n_blocks: None,
985 block_number_to_hashes: Default::default(),
986 }
987 }
988
989 fn mock_resolve_block_number_with_latest(
991 latest: u64,
992 ) -> impl Fn(BlockNumberOrTag) -> anyhow::Result<U256> {
993 move |block: BlockNumberOrTag| match block {
994 BlockNumberOrTag::U256(v) => Ok(v),
995 BlockNumberOrTag::BlockTag(BlockTag::Earliest) => Ok(U256::zero()),
996 BlockNumberOrTag::BlockTag(BlockTag::Latest) => Ok(U256::from(latest)),
997 BlockNumberOrTag::BlockTag(tag) => anyhow::bail!("Unsupported tag: {tag:?}"),
998 }
999 }
1000
1001 impl ReceiptProvider<MockBlockInfoProvider> {
1002 fn with_db_ctx(mut self, db_ctx: DbContext) -> Self {
1003 self.db_ctx = db_ctx;
1004 self
1005 }
1006
1007 fn with_extractor(mut self, extractor: ReceiptExtractor) -> Self {
1008 self.receipt_extractor = extractor;
1009 self
1010 }
1011
1012 fn with_keep_latest(mut self, n: Option<usize>) -> Self {
1013 self.keep_latest_n_blocks = n;
1014 self
1015 }
1016 }
1017
1018 async fn setup_sqlite_provider(pool: SqlitePool) -> ReceiptProvider<MockBlockInfoProvider> {
1019 mock_provider()
1020 .with_db_ctx(DbContext::new(pool, DbContext::DEFAULT_MAX_VARIABLE_NUMBER))
1021 .with_keep_latest(Some(10))
1022 }
1023
1024 #[sqlx::test]
1025 async fn test_insert_remove(pool: SqlitePool) -> anyhow::Result<()> {
1026 let provider = setup_sqlite_provider(pool).await;
1027 let block = MockBlockInfo { hash: H256::default(), number: 0 };
1028 let receipts = vec![(
1029 TransactionSigned::default(),
1030 ReceiptInfo {
1031 logs: vec![Log { block_hash: block.hash, ..Default::default() }],
1032 ..Default::default()
1033 },
1034 )];
1035 let ethereum_hash = H256::from([1_u8; 32]);
1036 let block_map = BlockHashMap::new(block.hash(), ethereum_hash);
1037
1038 provider.insert(&block, &receipts, ðereum_hash).await?;
1039 let row = provider.find_transaction(&receipts[0].1.transaction_hash).await;
1040 assert_eq!(row, Some((block.hash, 0)));
1041 assert_eq!(count(&provider.db_ctx.pool, "transaction_hashes", Some(block.hash())).await, 1);
1042 assert_eq!(count(&provider.db_ctx.pool, "logs", Some(ethereum_hash)).await, 1);
1043
1044 provider.remove(&[block_map]).await?;
1045 assert_eq!(count(&provider.db_ctx.pool, "transaction_hashes", Some(block.hash())).await, 0);
1046 assert_eq!(count(&provider.db_ctx.pool, "logs", Some(ethereum_hash)).await, 0);
1047 Ok(())
1048 }
1049
1050 #[sqlx::test]
1051 async fn test_prune(pool: SqlitePool) -> anyhow::Result<()> {
1052 let provider = setup_sqlite_provider(pool).await;
1053 let n = provider.keep_latest_n_blocks.unwrap();
1054
1055 for i in 0..2 * n {
1056 let block = MockBlockInfo { hash: H256::from([i as u8; 32]), number: i as _ };
1057 let transaction_hash = H256::from([i as u8; 32]);
1058 let receipts = vec![(
1059 TransactionSigned::default(),
1060 ReceiptInfo {
1061 transaction_hash,
1062 logs: vec![Log {
1063 block_hash: block.hash,
1064 transaction_hash,
1065 ..Default::default()
1066 }],
1067 ..Default::default()
1068 },
1069 )];
1070 let ethereum_hash = H256::from([(i + 1) as u8; 32]);
1071 provider.insert(&block, &receipts, ðereum_hash).await?;
1072 }
1073 assert_eq!(count(&provider.db_ctx.pool, "transaction_hashes", None).await, n);
1074 assert_eq!(count(&provider.db_ctx.pool, "logs", None).await, n);
1075 assert_eq!(count(&provider.db_ctx.pool, "eth_to_substrate_blocks", None).await, n);
1076 assert_eq!(provider.block_number_to_hashes.lock().await.len(), n);
1077
1078 return Ok(());
1079 }
1080
1081 #[sqlx::test]
1082 async fn test_fork(pool: SqlitePool) -> anyhow::Result<()> {
1083 let provider = setup_sqlite_provider(pool).await;
1084
1085 let build_block = |seed, number| {
1086 let block = MockBlockInfo { hash: H256::from([seed; 32]), number };
1087 let transaction_hash = H256::from([seed; 32]);
1088 let receipts = vec![(
1089 TransactionSigned::default(),
1090 ReceiptInfo {
1091 transaction_hash,
1092 logs: vec![Log {
1093 block_hash: block.hash,
1094 transaction_hash,
1095 ..Default::default()
1096 }],
1097 ..Default::default()
1098 },
1099 )];
1100 let ethereum_hash = H256::from([seed + 1; 32]);
1101
1102 (block, receipts, ethereum_hash)
1103 };
1104
1105 let (block0, receipts, ethereum_hash_0) = build_block(0, 0);
1107 provider.insert(&block0, &receipts, ðereum_hash_0).await?;
1108 let (block1, receipts, ethereum_hash_1) = build_block(1, 1);
1109 provider.insert(&block1, &receipts, ðereum_hash_1).await?;
1110 let (block2, receipts, ethereum_hash_2) = build_block(2, 2);
1111 provider.insert(&block2, &receipts, ðereum_hash_2).await?;
1112 let (block3, receipts, ethereum_hash_3) = build_block(3, 3);
1113 provider.insert(&block3, &receipts, ðereum_hash_3).await?;
1114
1115 assert_eq!(count(&provider.db_ctx.pool, "transaction_hashes", None).await, 4);
1116 assert_eq!(count(&provider.db_ctx.pool, "logs", None).await, 4);
1117 assert_eq!(count(&provider.db_ctx.pool, "eth_to_substrate_blocks", None).await, 4);
1118 assert_eq!(
1119 provider.block_number_to_hashes.lock().await.clone(),
1120 [
1121 (0, BlockHashMap::new(block0.hash, ethereum_hash_0)),
1122 (1, BlockHashMap::new(block1.hash, ethereum_hash_1)),
1123 (2, BlockHashMap::new(block2.hash, ethereum_hash_2)),
1124 (3, BlockHashMap::new(block3.hash, ethereum_hash_3))
1125 ]
1126 .into(),
1127 );
1128
1129 let (fork_block, receipts, ethereum_hash_fork) = build_block(4, 1);
1131 provider.insert(&fork_block, &receipts, ðereum_hash_fork).await?;
1132
1133 assert_eq!(count(&provider.db_ctx.pool, "transaction_hashes", None).await, 2);
1134 assert_eq!(count(&provider.db_ctx.pool, "logs", None).await, 2);
1135 assert_eq!(count(&provider.db_ctx.pool, "eth_to_substrate_blocks", None).await, 2);
1136
1137 assert_eq!(
1138 provider.block_number_to_hashes.lock().await.clone(),
1139 [
1140 (0, BlockHashMap::new(block0.hash, ethereum_hash_0)),
1141 (1, BlockHashMap::new(fork_block.hash, ethereum_hash_fork))
1142 ]
1143 .into(),
1144 );
1145
1146 return Ok(());
1147 }
1148
1149 #[sqlx::test]
1150 async fn test_reorg_same_transaction_hash(pool: SqlitePool) -> anyhow::Result<()> {
1151 let provider = setup_sqlite_provider(pool).await;
1152
1153 let tx_hash = H256::from([42u8; 32]);
1155
1156 let block_a = MockBlockInfo { hash: H256::from([1u8; 32]), number: 1 };
1158 let ethereum_hash_a = H256::from([2u8; 32]);
1159 let receipts_a = vec![(
1160 TransactionSigned::default(),
1161 ReceiptInfo {
1162 transaction_hash: tx_hash,
1163 transaction_index: U256::from(0),
1164 ..Default::default()
1165 },
1166 )];
1167
1168 provider.insert(&block_a, &receipts_a, ðereum_hash_a).await?;
1169
1170 let (found_hash, _) = provider.find_transaction(&tx_hash).await.unwrap();
1172 assert_eq!(found_hash, block_a.hash);
1173
1174 provider.block_number_to_hashes.lock().await.clear();
1176
1177 let block_b = MockBlockInfo { hash: H256::from([3u8; 32]), number: 1 };
1179 let ethereum_hash_b = H256::from([4u8; 32]);
1180 let receipts_b = vec![(
1181 TransactionSigned::default(),
1182 ReceiptInfo {
1183 transaction_hash: tx_hash, transaction_index: U256::from(0),
1185 ..Default::default()
1186 },
1187 )];
1188
1189 provider.insert(&block_b, &receipts_b, ðereum_hash_b).await?;
1191
1192 let (found_hash, _) = provider.find_transaction(&tx_hash).await.unwrap();
1194 assert_eq!(found_hash, block_b.hash);
1195
1196 Ok(())
1197 }
1198
1199 #[sqlx::test]
1200 async fn test_receipts_count_per_block(pool: SqlitePool) -> anyhow::Result<()> {
1201 let provider = setup_sqlite_provider(pool).await;
1202 let block = MockBlockInfo { hash: H256::default(), number: 0 };
1203 let receipts = vec![
1204 (
1205 TransactionSigned::default(),
1206 ReceiptInfo { transaction_hash: H256::from([0u8; 32]), ..Default::default() },
1207 ),
1208 (
1209 TransactionSigned::default(),
1210 ReceiptInfo { transaction_hash: H256::from([1u8; 32]), ..Default::default() },
1211 ),
1212 ];
1213 let ethereum_hash = H256::from([2u8; 32]);
1214
1215 provider.insert(&block, &receipts, ðereum_hash).await?;
1216 let count = provider.receipts_count_per_block(&block.hash).await;
1217 assert_eq!(count, Some(2));
1218 Ok(())
1219 }
1220
1221 #[sqlx::test]
1222 async fn test_query_logs(pool: SqlitePool) -> anyhow::Result<()> {
1223 let provider = setup_sqlite_provider(pool).await;
1224 let block1 = MockBlockInfo { hash: H256::from([1u8; 32]), number: 1 };
1225 let block2 = MockBlockInfo { hash: H256::from([2u8; 32]), number: 2 };
1226 let ethereum_hash1 = H256::from([3u8; 32]);
1227 let ethereum_hash2 = H256::from([4u8; 32]);
1228 let log1 = Log {
1229 block_hash: ethereum_hash1,
1230 block_number: block1.number.into(),
1231 address: H160::from([1u8; 20]),
1232 topics: vec![H256::from([1u8; 32]), H256::from([2u8; 32])],
1233 data: Some(vec![0u8; 32].into()),
1234 transaction_hash: H256::default(),
1235 transaction_index: U256::from(1),
1236 log_index: U256::from(1),
1237 ..Default::default()
1238 };
1239 let log2 = Log {
1240 block_hash: ethereum_hash2,
1241 block_number: block2.number.into(),
1242 address: H160::from([2u8; 20]),
1243 topics: vec![H256::from([2u8; 32]), H256::from([3u8; 32])],
1244 transaction_hash: H256::from([1u8; 32]),
1245 transaction_index: U256::from(2),
1246 log_index: U256::from(1),
1247 ..Default::default()
1248 };
1249
1250 provider
1251 .insert(
1252 &block1,
1253 &vec![(
1254 TransactionSigned::default(),
1255 ReceiptInfo {
1256 logs: vec![log1.clone()],
1257 transaction_hash: log1.transaction_hash,
1258 transaction_index: log1.transaction_index,
1259 ..Default::default()
1260 },
1261 )],
1262 ðereum_hash1,
1263 )
1264 .await?;
1265 provider
1266 .insert(
1267 &block2,
1268 &vec![(
1269 TransactionSigned::default(),
1270 ReceiptInfo {
1271 logs: vec![log2.clone()],
1272 transaction_hash: log2.transaction_hash,
1273 transaction_index: log2.transaction_index,
1274 ..Default::default()
1275 },
1276 )],
1277 ðereum_hash2,
1278 )
1279 .await?;
1280
1281 let resolve_block_number = mock_resolve_block_number_with_latest(block2.number.into());
1282
1283 let logs = provider.logs(None, &resolve_block_number).await?;
1285 assert_eq!(logs, vec![log2.clone()]);
1286
1287 let logs = provider
1289 .logs(
1290 Some(Filter { from_block: Some(log2.block_number.into()), ..Default::default() }),
1291 &resolve_block_number,
1292 )
1293 .await?;
1294 assert_eq!(logs, vec![log2.clone()]);
1295
1296 let logs = provider
1298 .logs(
1299 Some(Filter { from_block: Some(BlockTag::Latest.into()), ..Default::default() }),
1300 &resolve_block_number,
1301 )
1302 .await?;
1303 assert_eq!(logs, vec![log2.clone()]);
1304
1305 let logs = provider
1307 .logs(
1308 Some(Filter { to_block: Some(log1.block_number.into()), ..Default::default() }),
1309 &resolve_block_number,
1310 )
1311 .await?;
1312 assert_eq!(logs, vec![log1.clone()]);
1313
1314 let logs = provider
1316 .logs(
1317 Some(Filter { block_hash: Some(log1.block_hash), ..Default::default() }),
1318 &resolve_block_number,
1319 )
1320 .await?;
1321 assert_eq!(logs, vec![log1.clone()]);
1322
1323 let logs = provider
1325 .logs(
1326 Some(Filter {
1327 from_block: Some(BlockTag::Earliest.into()),
1328 address: Some(log1.address.into()),
1329 ..Default::default()
1330 }),
1331 &resolve_block_number,
1332 )
1333 .await?;
1334 assert_eq!(logs, vec![log1.clone()]);
1335
1336 let logs = provider
1338 .logs(
1339 Some(Filter {
1340 from_block: Some(BlockTag::Earliest.into()),
1341 address: Some(vec![log1.address, log2.address].into()),
1342 ..Default::default()
1343 }),
1344 &resolve_block_number,
1345 )
1346 .await?;
1347 assert_eq!(logs, vec![log1.clone(), log2.clone()]);
1348
1349 let logs = provider
1351 .logs(
1352 Some(Filter {
1353 from_block: Some(BlockTag::Earliest.into()),
1354 topics: Some(vec![FilterTopic::Single(log1.topics[0])]),
1355 ..Default::default()
1356 }),
1357 &resolve_block_number,
1358 )
1359 .await?;
1360 assert_eq!(logs, vec![log1.clone()]);
1361
1362 let logs = provider
1364 .logs(
1365 Some(Filter {
1366 from_block: Some(BlockTag::Earliest.into()),
1367 topics: Some(vec![
1368 FilterTopic::Single(log1.topics[0]),
1369 FilterTopic::Single(log1.topics[1]),
1370 ]),
1371 ..Default::default()
1372 }),
1373 &resolve_block_number,
1374 )
1375 .await?;
1376 assert_eq!(logs, vec![log1.clone()]);
1377
1378 let logs = provider
1380 .logs(
1381 Some(Filter {
1382 from_block: Some(BlockTag::Earliest.into()),
1383 topics: Some(vec![FilterTopic::Multiple(vec![log1.topics[0], log2.topics[0]])]),
1384 ..Default::default()
1385 }),
1386 &resolve_block_number,
1387 )
1388 .await?;
1389 assert_eq!(logs, vec![log1.clone(), log2.clone()]);
1390
1391 let logs = provider
1393 .logs(
1394 Some(Filter {
1395 from_block: Some(BlockTag::Earliest.into()),
1396 to_block: Some(BlockTag::Latest.into()),
1397 block_hash: None,
1398 address: Some(vec![log1.address, log2.address].into()),
1399 topics: Some(vec![FilterTopic::Multiple(vec![log1.topics[0], log2.topics[0]])]),
1400 }),
1401 &resolve_block_number,
1402 )
1403 .await?;
1404 assert_eq!(logs, vec![log1.clone(), log2.clone()]);
1405 Ok(())
1406 }
1407
1408 #[sqlx::test]
1409 async fn test_block_mapping_insert_get(pool: SqlitePool) -> anyhow::Result<()> {
1410 let provider = setup_sqlite_provider(pool).await;
1411 let ethereum_hash = H256::from([1u8; 32]);
1412 let substrate_hash = H256::from([2u8; 32]);
1413 let block_map = BlockHashMap::new(substrate_hash, ethereum_hash);
1414
1415 insert_block_mapping(&provider.db_ctx.pool, &block_map).await?;
1417
1418 let resolved = provider.get_substrate_hash(ðereum_hash).await;
1420 assert_eq!(resolved, Some(substrate_hash));
1421
1422 let resolved = provider.get_ethereum_hash(&substrate_hash).await;
1424 assert_eq!(resolved, Some(ethereum_hash));
1425
1426 Ok(())
1427 }
1428
1429 #[sqlx::test]
1430 async fn test_block_mapping_remove(pool: SqlitePool) -> anyhow::Result<()> {
1431 let provider = setup_sqlite_provider(pool).await;
1432 let ethereum_hash1 = H256::from([1u8; 32]);
1433 let ethereum_hash2 = H256::from([2u8; 32]);
1434 let substrate_hash1 = H256::from([3u8; 32]);
1435 let substrate_hash2 = H256::from([4u8; 32]);
1436 let block_map1 = BlockHashMap::new(substrate_hash1, ethereum_hash1);
1437 let block_map2 = BlockHashMap::new(substrate_hash2, ethereum_hash2);
1438
1439 insert_block_mapping(&provider.db_ctx.pool, &block_map1).await?;
1441 insert_block_mapping(&provider.db_ctx.pool, &block_map2).await?;
1442
1443 assert_eq!(
1445 provider.get_substrate_hash(&block_map1.ethereum_hash).await,
1446 Some(block_map1.substrate_hash)
1447 );
1448 assert_eq!(
1449 provider.get_substrate_hash(&block_map2.ethereum_hash).await,
1450 Some(block_map2.substrate_hash)
1451 );
1452
1453 provider.remove(&[block_map1]).await?;
1455
1456 assert_eq!(provider.get_substrate_hash(ðereum_hash1).await, None);
1458 assert_eq!(provider.get_substrate_hash(ðereum_hash2).await, Some(substrate_hash2));
1459
1460 Ok(())
1461 }
1462
1463 #[sqlx::test]
1464 async fn test_block_mapping_pruning_integration(pool: SqlitePool) -> anyhow::Result<()> {
1465 let provider = setup_sqlite_provider(pool).await;
1466 let ethereum_hash = H256::from([1u8; 32]);
1467 let substrate_hash = H256::from([2u8; 32]);
1468 let block_map = BlockHashMap::new(substrate_hash, ethereum_hash);
1469
1470 insert_block_mapping(&provider.db_ctx.pool, &block_map).await?;
1472 assert_eq!(
1473 provider.get_substrate_hash(&block_map.ethereum_hash).await,
1474 Some(block_map.substrate_hash)
1475 );
1476
1477 provider.remove(&[block_map.clone()]).await?;
1479
1480 assert_eq!(provider.get_substrate_hash(&block_map.ethereum_hash).await, None);
1482
1483 Ok(())
1484 }
1485
1486 #[sqlx::test]
1487 async fn test_logs_with_ethereum_block_hash_mapping(pool: SqlitePool) -> anyhow::Result<()> {
1488 let provider = setup_sqlite_provider(pool).await;
1489 let ethereum_hash = H256::from([1u8; 32]);
1490 let substrate_hash = H256::from([2u8; 32]);
1491 let block_number = 1u64;
1492
1493 let log = Log {
1495 block_hash: ethereum_hash,
1496 block_number: block_number.into(),
1497 address: H160::from([1u8; 20]),
1498 topics: vec![H256::from([1u8; 32])],
1499 transaction_hash: H256::from([3u8; 32]),
1500 transaction_index: U256::from(0),
1501 log_index: U256::from(0),
1502 data: Some(vec![0u8; 32].into()),
1503 ..Default::default()
1504 };
1505
1506 let block = MockBlockInfo { hash: substrate_hash, number: block_number as u32 };
1508 let receipts = vec![(
1509 TransactionSigned::default(),
1510 ReceiptInfo {
1511 logs: vec![log.clone()],
1512 transaction_hash: log.transaction_hash,
1513 transaction_index: log.transaction_index,
1514 ..Default::default()
1515 },
1516 )];
1517 provider.insert(&block, &receipts, ðereum_hash).await?;
1518
1519 let logs = provider
1521 .logs(
1522 Some(Filter { block_hash: Some(ethereum_hash), ..Default::default() }),
1523 mock_resolve_block_number_with_latest(block.number.into()),
1524 )
1525 .await?;
1526 assert_eq!(logs, vec![log]);
1527
1528 Ok(())
1529 }
1530
1531 #[sqlx::test]
1532 async fn test_mapping_count(pool: SqlitePool) -> anyhow::Result<()> {
1533 let provider = setup_sqlite_provider(pool).await;
1534
1535 assert_eq!(count(&provider.db_ctx.pool, "eth_to_substrate_blocks", None).await, 0);
1537
1538 let block_map1 = BlockHashMap::new(H256::from([1u8; 32]), H256::from([2u8; 32]));
1539 let block_map2 = BlockHashMap::new(H256::from([3u8; 32]), H256::from([4u8; 32]));
1540
1541 insert_block_mapping(&provider.db_ctx.pool, &block_map1).await?;
1543 insert_block_mapping(&provider.db_ctx.pool, &block_map2).await?;
1544
1545 assert_eq!(count(&provider.db_ctx.pool, "eth_to_substrate_blocks", None).await, 2);
1546
1547 provider.remove(&[block_map1]).await?;
1549 assert_eq!(count(&provider.db_ctx.pool, "eth_to_substrate_blocks", None).await, 1);
1550
1551 Ok(())
1552 }
1553
1554 #[sqlx::test]
1555 async fn restore_first_evm_block_clears_stale(pool: SqlitePool) -> anyhow::Result<()> {
1556 let provider = setup_sqlite_provider(pool).await;
1557
1558 provider
1560 .set_sync_label(ChainMetadata::FirstEvmBlock, SyncCheckpoint::from_number(42))
1561 .await?;
1562
1563 provider.restore_first_evm_block().await?;
1566
1567 assert_eq!(provider.first_evm_block(), None);
1569
1570 let checkpoint = provider.get_sync_label(ChainMetadata::FirstEvmBlock).await?;
1572 assert!(checkpoint.is_none());
1573 Ok(())
1574 }
1575
1576 #[sqlx::test]
1577 async fn advance_sync_label_only_increases(pool: SqlitePool) -> anyhow::Result<()> {
1578 let provider = setup_sqlite_provider(pool).await;
1579 let hash_a = H256::repeat_byte(0xAA);
1580 let hash_b = H256::repeat_byte(0xBB);
1581
1582 provider
1584 .advance_sync_label(SyncLabel::Head, SyncCheckpoint::new(100, hash_a))
1585 .await?;
1586 let checkpoint = provider.get_sync_label(SyncLabel::Head).await?.unwrap();
1587 assert_eq!((checkpoint.block_number, checkpoint.block_hash), (100, Some(hash_a)));
1588
1589 provider
1591 .advance_sync_label(SyncLabel::Head, SyncCheckpoint::new(200, hash_b))
1592 .await?;
1593 let checkpoint = provider.get_sync_label(SyncLabel::Head).await?.unwrap();
1594 assert_eq!((checkpoint.block_number, checkpoint.block_hash), (200, Some(hash_b)));
1595
1596 provider
1598 .advance_sync_label(SyncLabel::Head, SyncCheckpoint::new(50, hash_a))
1599 .await?;
1600 provider
1601 .advance_sync_label(SyncLabel::Head, SyncCheckpoint::new(200, hash_a))
1602 .await?;
1603 let checkpoint = provider.get_sync_label(SyncLabel::Head).await?.unwrap();
1604 assert_eq!((checkpoint.block_number, checkpoint.block_hash), (200, Some(hash_b)));
1605
1606 Ok(())
1607 }
1608
1609 #[sqlx::test]
1610 async fn recede_sync_label_only_decreases(pool: SqlitePool) -> anyhow::Result<()> {
1611 let provider = setup_sqlite_provider(pool).await;
1612 let hash_a = H256::repeat_byte(0xAA);
1613 let hash_b = H256::repeat_byte(0xBB);
1614
1615 provider
1617 .recede_sync_label(SyncLabel::Tail, SyncCheckpoint::new(100, hash_a))
1618 .await?;
1619 let checkpoint = provider.get_sync_label(SyncLabel::Tail).await?.unwrap();
1620 assert_eq!((checkpoint.block_number, checkpoint.block_hash), (100, Some(hash_a)));
1621
1622 provider
1624 .recede_sync_label(SyncLabel::Tail, SyncCheckpoint::new(50, hash_b))
1625 .await?;
1626 let checkpoint = provider.get_sync_label(SyncLabel::Tail).await?.unwrap();
1627 assert_eq!((checkpoint.block_number, checkpoint.block_hash), (50, Some(hash_b)));
1628
1629 provider
1631 .recede_sync_label(SyncLabel::Tail, SyncCheckpoint::new(200, hash_a))
1632 .await?;
1633 provider
1634 .recede_sync_label(SyncLabel::Tail, SyncCheckpoint::new(50, hash_a))
1635 .await?;
1636 let checkpoint = provider.get_sync_label(SyncLabel::Tail).await?.unwrap();
1637 assert_eq!((checkpoint.block_number, checkpoint.block_hash), (50, Some(hash_b)));
1638
1639 Ok(())
1640 }
1641
1642 #[tokio::test]
1643 async fn is_before_earliest_block_edge_cases() {
1644 let extractor = ReceiptExtractor::new_mock();
1646 extractor.set_first_evm_block(10);
1647 let provider = mock_provider().with_extractor(extractor);
1648
1649 let huge = BlockNumberOrTag::U256(U256::from(u64::MAX));
1650 assert!(!provider.is_before_earliest_block(&huge));
1651
1652 let just_over = BlockNumberOrTag::U256(U256::from(u32::MAX as u64 + 1));
1653 assert!(!provider.is_before_earliest_block(&just_over));
1654
1655 let provider = mock_provider();
1657 assert!(!provider.is_before_earliest_block(&BlockNumberOrTag::U256(U256::from(0u32))));
1658 assert!(
1659 !provider.is_before_earliest_block(&BlockNumberOrTag::U256(U256::from(1_000_000u32)))
1660 );
1661
1662 assert!(!provider.is_before_earliest_block(&BlockNumberOrTag::BlockTag(
1664 pallet_revive::evm::BlockTag::Latest
1665 )));
1666 }
1667
1668 #[sqlx::test]
1669 async fn persistent_mode_caps_in_memory_map(pool: SqlitePool) -> anyhow::Result<()> {
1670 let provider = mock_provider()
1672 .with_db_ctx(DbContext::new(pool, DbContext::DEFAULT_MAX_VARIABLE_NUMBER));
1673
1674 let start_block: u64 = 1;
1676 let n = MAX_CACHED_BLOCKS + 1;
1677 let end_block = start_block + n as u64;
1678 for i in start_block..end_block {
1679 let block = MockBlockInfo { hash: H256::from_low_u64_be(i), number: i as _ };
1680 let receipts = vec![(
1681 TransactionSigned::default(),
1682 ReceiptInfo {
1683 transaction_hash: H256::from_low_u64_be(i),
1684 logs: vec![Log {
1685 block_hash: block.hash,
1686 transaction_hash: H256::from_low_u64_be(i),
1687 ..Default::default()
1688 }],
1689 ..Default::default()
1690 },
1691 )];
1692 let ethereum_hash = H256::from_low_u64_be(i + 1);
1693 provider.insert(&block, &receipts, ðereum_hash).await?;
1694 }
1695
1696 let map = provider.block_number_to_hashes.lock().await;
1698 assert_eq!(map.len(), MAX_CACHED_BLOCKS);
1699
1700 assert!(!map.contains_key(&1));
1702 assert!(map.contains_key(&2));
1703 assert!(map.contains_key(&(MAX_CACHED_BLOCKS as u32 + 1)));
1704 drop(map);
1705
1706 assert_eq!(count(&provider.db_ctx.pool, "eth_to_substrate_blocks", None).await, n);
1708
1709 Ok(())
1710 }
1711
1712 fn make_hash(i: usize, fill: u8) -> H256 {
1713 let mut hash = [fill; 32];
1714 hash[..8].copy_from_slice(&i.to_le_bytes());
1715 H256::from(hash)
1716 }
1717
1718 fn make_receipts(
1719 tx_offset: usize,
1720 n_tx: usize,
1721 n_logs: usize,
1722 ) -> Vec<(TransactionSigned, ReceiptInfo)> {
1723 let mut receipts = Vec::with_capacity(n_tx);
1724
1725 for i in 0..n_tx {
1726 let transaction_hash = make_hash(tx_offset + i, 0x00);
1727
1728 let mut logs = Vec::with_capacity(n_logs);
1729 for j in 0..n_logs {
1730 logs.push(Log { transaction_hash, log_index: U256::from(j), ..Default::default() });
1731 }
1732
1733 receipts.push((
1734 TransactionSigned::default(),
1735 ReceiptInfo {
1736 transaction_hash,
1737 transaction_index: U256::from(i),
1738 logs,
1739 ..Default::default()
1740 },
1741 ));
1742 }
1743
1744 receipts
1745 }
1746
1747 async fn assert_receipts_inserted(
1748 provider: &ReceiptProvider<MockBlockInfoProvider>,
1749 block: &MockBlockInfo,
1750 ethereum_hash: &H256,
1751 receipts: &[(TransactionSigned, ReceiptInfo)],
1752 ) {
1753 let mut expected_logs = 0;
1754 for (_, receipt) in receipts {
1755 assert_eq!(
1756 provider.find_transaction(&receipt.transaction_hash).await,
1757 Some((block.hash(), receipt.transaction_index.as_u32() as usize))
1758 );
1759 expected_logs += receipt.logs.len();
1760 }
1761 assert_eq!(count(&provider.db_ctx.pool, "logs", Some(*ethereum_hash)).await, expected_logs);
1762 }
1763
1764 #[sqlx::test]
1765 async fn test_bulk_insert(pool: SqlitePool) -> anyhow::Result<()> {
1766 let provider = setup_sqlite_provider(pool).await.with_keep_latest(None);
1767 let tx_chunk = provider.db_ctx.tx_insert_chunk_size;
1768 let log_chunk = provider.db_ctx.log_insert_chunk_size;
1769
1770 let cases = [
1771 (tx_chunk, 1), (tx_chunk + 1, log_chunk), (1000, 3), ];
1775
1776 let mut tx_offset = 0;
1777 for (i, (n_tx, n_logs)) in cases.into_iter().enumerate() {
1778 let block = MockBlockInfo { hash: make_hash(i, 0x00), number: i as u32 + 1 };
1779 let ethereum_hash = make_hash(i, 0xff);
1780 let receipts = make_receipts(tx_offset, n_tx, n_logs);
1781 tx_offset += n_tx;
1782 provider.insert(&block, &receipts, ðereum_hash).await?;
1783 assert_receipts_inserted(&provider, &block, ðereum_hash, &receipts).await;
1784 }
1785 Ok(())
1786 }
1787
1788 #[sqlx::test]
1789 async fn test_duplicate_insert_succeeds(pool: SqlitePool) -> anyhow::Result<()> {
1790 let provider = setup_sqlite_provider(pool).await.with_keep_latest(None);
1791 let block = MockBlockInfo { hash: make_hash(0, 0xAA), number: 1 };
1792 let ethereum_hash = make_hash(0, 0xBB);
1793 let receipts = make_receipts(0, 5, 3);
1794
1795 provider.insert_into_db(&block, &receipts, ðereum_hash).await?;
1797 assert_eq!(count(&provider.db_ctx.pool, "transaction_hashes", None).await, 5);
1798 assert_eq!(count(&provider.db_ctx.pool, "logs", Some(ethereum_hash)).await, 15);
1799 assert_eq!(count(&provider.db_ctx.pool, "eth_to_substrate_blocks", None).await, 1);
1800
1801 sqlx::query("DELETE FROM eth_to_substrate_blocks")
1803 .execute(&provider.db_ctx.pool)
1804 .await?;
1805 assert_eq!(count(&provider.db_ctx.pool, "eth_to_substrate_blocks", None).await, 0);
1806
1807 provider.insert_into_db(&block, &receipts, ðereum_hash).await?;
1809
1810 assert_eq!(count(&provider.db_ctx.pool, "transaction_hashes", None).await, 5);
1812 assert_eq!(count(&provider.db_ctx.pool, "logs", Some(ethereum_hash)).await, 15);
1813 assert_eq!(count(&provider.db_ctx.pool, "eth_to_substrate_blocks", None).await, 1);
1814
1815 Ok(())
1816 }
1817
1818 #[sqlx::test]
1819 async fn test_insert_empty_receipts(pool: SqlitePool) -> anyhow::Result<()> {
1820 let provider = setup_sqlite_provider(pool).await.with_keep_latest(None);
1821 let block = MockBlockInfo { hash: H256::from([1u8; 32]), number: 1 };
1822 let ethereum_hash = H256::from([2u8; 32]);
1823
1824 provider.insert(&block, &[], ðereum_hash).await?;
1825
1826 assert_eq!(count(&provider.db_ctx.pool, "eth_to_substrate_blocks", None).await, 1);
1828 assert_eq!(count(&provider.db_ctx.pool, "transaction_hashes", None).await, 0);
1829 assert_eq!(count(&provider.db_ctx.pool, "logs", None).await, 0);
1830
1831 let receipts = make_receipts(0, 3, 2);
1833 provider.insert(&block, &receipts, ðereum_hash).await?;
1834 assert_eq!(count(&provider.db_ctx.pool, "eth_to_substrate_blocks", None).await, 1);
1835 assert_eq!(count(&provider.db_ctx.pool, "transaction_hashes", None).await, 0);
1836 assert_eq!(count(&provider.db_ctx.pool, "logs", None).await, 0);
1837
1838 Ok(())
1839 }
1840
1841 #[sqlx::test]
1842 async fn test_bulk_delete(pool: SqlitePool) -> anyhow::Result<()> {
1843 let db_ctx = DbContext::new(pool, DbContext::LOG_COLUMNS);
1845 let provider = mock_provider().with_db_ctx(db_ctx).with_keep_latest(None);
1846
1847 let n_blocks = 25;
1848 let n_tx_per_block = 5;
1849 let n_logs_per_receipt = 3;
1850 let mut block_mappings = Vec::new();
1851
1852 for i in 0..n_blocks {
1853 let block = MockBlockInfo { hash: make_hash(i, 0xAA), number: i as u32 + 1 };
1854 let ethereum_hash = make_hash(i, 0xBB);
1855 let receipts = make_receipts(i * n_tx_per_block, n_tx_per_block, n_logs_per_receipt);
1856 provider.insert_into_db(&block, &receipts, ðereum_hash).await?;
1857 block_mappings.push(BlockHashMap::new(block.hash, ethereum_hash));
1858 }
1859
1860 assert_eq!(
1861 count(&provider.db_ctx.pool, "transaction_hashes", None).await,
1862 n_blocks * n_tx_per_block
1863 );
1864 assert_eq!(
1865 count(&provider.db_ctx.pool, "logs", None).await,
1866 n_blocks * n_tx_per_block * n_logs_per_receipt
1867 );
1868 assert_eq!(count(&provider.db_ctx.pool, "eth_to_substrate_blocks", None).await, n_blocks);
1869
1870 provider.remove(&block_mappings).await?;
1871
1872 assert_eq!(count(&provider.db_ctx.pool, "transaction_hashes", None).await, 0);
1873 assert_eq!(count(&provider.db_ctx.pool, "logs", None).await, 0);
1874 assert_eq!(count(&provider.db_ctx.pool, "eth_to_substrate_blocks", None).await, 0);
1875
1876 Ok(())
1877 }
1878
1879 #[sqlx::test]
1880 async fn test_get_processed_eth_block_hash(pool: SqlitePool) -> anyhow::Result<()> {
1881 let provider = setup_sqlite_provider(pool).await;
1882 let block = MockBlockInfo { hash: H256::from([0xAA; 32]), number: 10 };
1883 let ethereum_hash = H256::from([0xBB; 32]);
1884 let receipts = vec![(TransactionSigned::default(), ReceiptInfo::default())];
1885
1886 assert!(provider.get_processed_eth_block_hash(10, block.hash).await.is_none());
1888
1889 provider.insert(&block, &receipts, ðereum_hash).await?;
1891 assert_eq!(
1892 provider.get_processed_eth_block_hash(10, block.hash).await,
1893 Some(ethereum_hash)
1894 );
1895
1896 assert!(
1898 provider
1899 .get_processed_eth_block_hash(10, H256::from([0xCC; 32]))
1900 .await
1901 .is_none()
1902 );
1903
1904 assert!(provider.get_processed_eth_block_hash(11, block.hash).await.is_none());
1906
1907 Ok(())
1908 }
1909
1910 #[sqlx::test]
1911 async fn test_logs_by_block_number(pool: SqlitePool) -> anyhow::Result<()> {
1912 let provider = setup_sqlite_provider(pool).await;
1913 let substrate_hash = H256::from([0xAA; 32]);
1914 let tx_hash = H256::from([0xBB; 32]);
1915 let block = MockBlockInfo { hash: substrate_hash, number: 42 };
1916 let ethereum_hash = H256::from([0xCC; 32]);
1917
1918 let log0 = Log {
1919 block_hash: ethereum_hash,
1920 block_number: U256::from(42),
1921 transaction_hash: tx_hash,
1922 log_index: U256::from(0),
1923 address: H160::from([0x01; 20]),
1924 ..Default::default()
1925 };
1926 let log1 = Log {
1927 block_hash: ethereum_hash,
1928 block_number: U256::from(42),
1929 transaction_hash: tx_hash,
1930 log_index: U256::from(1),
1931 address: H160::from([0x02; 20]),
1932 ..Default::default()
1933 };
1934
1935 let receipts = vec![(
1936 TransactionSigned::default(),
1937 ReceiptInfo {
1938 transaction_hash: tx_hash,
1939 block_hash: ethereum_hash,
1940 logs: vec![log0.clone(), log1.clone()],
1941 ..Default::default()
1942 },
1943 )];
1944
1945 let logs = provider.logs_by_block_number(42, ethereum_hash).await?;
1947 assert!(logs.is_empty());
1948
1949 provider.insert(&block, &receipts, ðereum_hash).await?;
1950
1951 let logs = provider.logs_by_block_number(42, ethereum_hash).await?;
1953 assert_eq!(logs.len(), 2);
1954 assert_eq!(logs[0].address, log0.address);
1955 assert_eq!(logs[1].address, log1.address);
1956 assert_eq!(logs[0].log_index, U256::from(0));
1957 assert_eq!(logs[1].log_index, U256::from(1));
1958
1959 let logs = provider.logs_by_block_number(43, ethereum_hash).await?;
1961 assert!(logs.is_empty());
1962
1963 let logs = provider.logs_by_block_number(42, H256::from([0xDD; 32])).await?;
1965 assert!(logs.is_empty());
1966
1967 Ok(())
1968 }
1969}