1use crate::{
18 client::{SubstrateBlock, SubstrateBlockNumber},
19 Address, AddressOrAddresses, BlockInfoProvider, BlockNumberOrTag, BlockTag, Bytes, ClientError,
20 FilterTopic, ReceiptExtractor, SubxtBlockInfoProvider,
21};
22use pallet_revive::evm::{Filter, Log, ReceiptInfo, TransactionSigned};
23use sp_core::{H256, U256};
24use sqlx::{query, QueryBuilder, Row, Sqlite, SqlitePool};
25use std::{
26 collections::{BTreeMap, HashMap},
27 sync::Arc,
28};
29use tokio::sync::Mutex;
30
31const LOG_TARGET: &str = "eth-rpc::receipt_provider";
32
33#[derive(Clone)]
35pub struct ReceiptProvider<B: BlockInfoProvider = SubxtBlockInfoProvider> {
36 pool: SqlitePool,
38 block_provider: B,
40 receipt_extractor: ReceiptExtractor,
42 keep_latest_n_blocks: Option<usize>,
44 block_number_to_hashes: Arc<Mutex<BTreeMap<SubstrateBlockNumber, BlockHashMap>>>,
46}
47
48#[derive(Clone, Debug, PartialEq, Eq)]
50struct BlockHashMap {
51 substrate_hash: H256,
52 ethereum_hash: H256,
53}
54
55impl BlockHashMap {
56 fn new(substrate_hash: H256, ethereum_hash: H256) -> Self {
57 Self { substrate_hash, ethereum_hash }
58 }
59}
60
61pub trait BlockInfo {
65 fn hash(&self) -> H256;
67 fn number(&self) -> SubstrateBlockNumber;
69}
70
71impl BlockInfo for SubstrateBlock {
72 fn hash(&self) -> H256 {
73 SubstrateBlock::hash(self)
74 }
75 fn number(&self) -> SubstrateBlockNumber {
76 SubstrateBlock::number(self)
77 }
78}
79
80impl<B: BlockInfoProvider> ReceiptProvider<B> {
81 pub async fn new(
83 pool: SqlitePool,
84 block_provider: B,
85 receipt_extractor: ReceiptExtractor,
86 keep_latest_n_blocks: Option<usize>,
87 ) -> Result<Self, sqlx::Error> {
88 sqlx::migrate!().run(&pool).await?;
89 Ok(Self {
90 pool,
91 block_provider,
92 receipt_extractor,
93 keep_latest_n_blocks,
94 block_number_to_hashes: Default::default(),
95 })
96 }
97
98 pub async fn find_transaction(&self, transaction_hash: &H256) -> Option<(H256, usize)> {
100 let transaction_hash = transaction_hash.as_ref();
101 let result = query!(
102 r#"
103 SELECT block_hash, transaction_index
104 FROM transaction_hashes
105 WHERE transaction_hash = $1
106 "#,
107 transaction_hash
108 )
109 .fetch_optional(&self.pool)
110 .await
111 .ok()??;
112
113 let block_hash = H256::from_slice(&result.block_hash[..]);
114 let transaction_index = result.transaction_index.try_into().ok()?;
115 Some((block_hash, transaction_index))
116 }
117
118 async fn insert_block_mapping(&self, block_map: &BlockHashMap) -> Result<(), ClientError> {
120 let ethereum_hash_ref = block_map.ethereum_hash.as_ref();
121 let substrate_hash_ref = block_map.substrate_hash.as_ref();
122
123 query!(
124 r#"
125 INSERT OR REPLACE INTO eth_to_substrate_blocks (ethereum_block_hash, substrate_block_hash)
126 VALUES ($1, $2)
127 "#,
128 ethereum_hash_ref,
129 substrate_hash_ref,
130 )
131 .execute(&self.pool)
132 .await?;
133
134 log::trace!(target: LOG_TARGET, "Insert block mapping ethereum block: {:?} -> substrate block: {:?}", block_map.ethereum_hash, block_map.substrate_hash);
135 Ok(())
136 }
137
138 pub async fn get_substrate_hash(&self, ethereum_block_hash: &H256) -> Option<H256> {
140 let ethereum_hash = ethereum_block_hash.as_ref();
141 let result = query!(
142 r#"
143 SELECT substrate_block_hash
144 FROM eth_to_substrate_blocks
145 WHERE ethereum_block_hash = $1
146 "#,
147 ethereum_hash
148 )
149 .fetch_optional(&self.pool)
150 .await
151 .inspect_err(|e| {
152 log::error!(target: LOG_TARGET, "failed to get block mapping for ethereum block {ethereum_block_hash:?}, err: {e:?}");
153 })
154 .ok()?
155 .or_else(||{
156 log::trace!(target: LOG_TARGET, "No block mapping found for ethereum block: {ethereum_block_hash:?}");
157 None
158 })?;
159
160 log::trace!(target: LOG_TARGET, "Get block mapping ethereum block: {:?} -> substrate block: {ethereum_block_hash:?}", H256::from_slice(&result.substrate_block_hash[..]));
161
162 Some(H256::from_slice(&result.substrate_block_hash[..]))
163 }
164
165 pub async fn get_ethereum_hash(&self, substrate_block_hash: &H256) -> Option<H256> {
167 let substrate_hash = substrate_block_hash.as_ref();
168 let result = query!(
169 r#"
170 SELECT ethereum_block_hash
171 FROM eth_to_substrate_blocks
172 WHERE substrate_block_hash = $1
173 "#,
174 substrate_hash
175 )
176 .fetch_optional(&self.pool)
177 .await
178 .inspect_err(|e| {
179 log::error!(target: LOG_TARGET, "failed to get block mapping for substrate block {substrate_block_hash:?}, err: {e:?}");
180 })
181 .ok()?
182 .or_else(||{
183 log::trace!(target: LOG_TARGET, "No block mapping found for substrate block: {substrate_block_hash:?}");
184 None
185 })?;
186
187 log::trace!(target: LOG_TARGET, "Get block mapping substrate block: {substrate_block_hash:?} -> ethereum block: {:?}", H256::from_slice(&result.ethereum_block_hash[..]));
188
189 Some(H256::from_slice(&result.ethereum_block_hash[..]))
190 }
191
192 async fn remove(&self, block_mappings: &[BlockHashMap]) -> Result<(), ClientError> {
194 if block_mappings.is_empty() {
195 return Ok(());
196 }
197 log::debug!(target: LOG_TARGET, "Removing block hashes: {block_mappings:?}");
198
199 let placeholders = vec!["?"; block_mappings.len()].join(", ");
200 let sql = format!("DELETE FROM transaction_hashes WHERE block_hash in ({placeholders})");
201
202 let mut delete_tx_query = sqlx::query(&sql);
203 let sql = format!(
204 "DELETE FROM eth_to_substrate_blocks WHERE substrate_block_hash in ({placeholders})"
205 );
206 let mut delete_mappings_query = sqlx::query(&sql);
207
208 let sql = format!("DELETE FROM logs WHERE block_hash in ({placeholders})");
209 let mut delete_logs_query = sqlx::query(&sql);
210
211 for block_map in block_mappings {
212 delete_tx_query = delete_tx_query.bind(block_map.substrate_hash.as_ref());
213 delete_mappings_query = delete_mappings_query.bind(block_map.substrate_hash.as_ref());
214 delete_logs_query = delete_logs_query.bind(block_map.ethereum_hash.as_ref());
216 }
217
218 let delete_transaction_hashes = delete_tx_query.execute(&self.pool);
219 let delete_logs = delete_logs_query.execute(&self.pool);
220 let delete_mappings = delete_mappings_query.execute(&self.pool);
221 tokio::try_join!(delete_transaction_hashes, delete_logs, delete_mappings)?;
222 Ok(())
223 }
224
225 pub fn is_before_earliest_block(&self, at: &BlockNumberOrTag) -> bool {
227 match at {
228 BlockNumberOrTag::U256(block_number) =>
229 self.receipt_extractor.is_before_earliest_block(block_number.as_u32()),
230 BlockNumberOrTag::BlockTag(_) => false,
231 }
232 }
233
234 pub async fn receipts_from_block(
236 &self,
237 block: &SubstrateBlock,
238 ) -> Result<Vec<(TransactionSigned, ReceiptInfo)>, ClientError> {
239 self.receipt_extractor.extract_from_block(block).await
240 }
241
242 pub async fn insert_block_receipts(
244 &self,
245 block: &SubstrateBlock,
246 ethereum_hash: &H256,
247 ) -> Result<Vec<(TransactionSigned, ReceiptInfo)>, ClientError> {
248 let receipts = self.receipts_from_block(block).await?;
249 self.insert(block, &receipts, ethereum_hash).await?;
250 Ok(receipts)
251 }
252
253 async fn prune_blocks(
255 &self,
256 block_number: SubstrateBlockNumber,
257 block_map: &BlockHashMap,
258 ) -> Result<(), ClientError> {
259 let mut to_remove = Vec::new();
260 let mut block_number_to_hash = self.block_number_to_hashes.lock().await;
261
262 match block_number_to_hash.insert(block_number, block_map.clone()) {
264 Some(old_block_map) if &old_block_map != block_map => {
265 to_remove.push(old_block_map);
266
267 let mut next_block_number = block_number.saturating_add(1);
270 while let Some(old_block_map) = block_number_to_hash.remove(&next_block_number) {
271 to_remove.push(old_block_map);
272 next_block_number = next_block_number.saturating_add(1);
273 }
274 },
275 _ => {},
276 }
277
278 if let Some(keep_latest_n_blocks) = self.keep_latest_n_blocks {
279 while block_number_to_hash.len() > keep_latest_n_blocks {
282 if let Some((_, block_map)) = block_number_to_hash.pop_first() {
284 to_remove.push(block_map);
285 }
286 }
287 }
288
289 drop(block_number_to_hash);
291
292 if !to_remove.is_empty() {
293 log::trace!(target: LOG_TARGET, "Pruning old blocks: {to_remove:?}");
294 self.remove(&to_remove).await?;
295 }
296
297 Ok(())
298 }
299
300 async fn insert(
305 &self,
306 block: &impl BlockInfo,
307 receipts: &[(TransactionSigned, ReceiptInfo)],
308 ethereum_hash: &H256,
309 ) -> Result<(), ClientError> {
310 let substrate_block_hash = block.hash();
311 let substrate_hash_ref = substrate_block_hash.as_ref();
312 let block_number = block.number() as i64;
313 let ethereum_hash_ref = ethereum_hash.as_ref();
314 let block_map = BlockHashMap::new(substrate_block_hash, *ethereum_hash);
315
316 log::trace!(target: LOG_TARGET, "Insert receipts for substrate block #{block_number} {:?}", substrate_block_hash);
317
318 self.prune_blocks(block.number(), &block_map).await?;
319
320 let result = sqlx::query!(
323 r#"SELECT EXISTS(SELECT 1 FROM eth_to_substrate_blocks WHERE substrate_block_hash = $1) AS "exists!:bool""#, substrate_hash_ref
324 )
325 .fetch_one(&self.pool)
326 .await?;
327
328 if !result.exists {
331 for (_, receipt) in receipts {
332 let transaction_hash: &[u8] = receipt.transaction_hash.as_ref();
333 let transaction_index = receipt.transaction_index.as_u32() as i32;
334
335 query!(
336 r#"
337 INSERT INTO transaction_hashes (transaction_hash, block_hash, transaction_index)
338 VALUES ($1, $2, $3)
339 "#,
340 transaction_hash,
341 substrate_hash_ref,
342 transaction_index
343 )
344 .execute(&self.pool)
345 .await?;
346
347 for log in &receipt.logs {
348 let log_index = log.log_index.as_u32() as i32;
349 let address: &[u8] = log.address.as_ref();
350
351 let topic_0 = log.topics.first().as_ref().map(|v| &v[..]);
352 let topic_1 = log.topics.get(1).as_ref().map(|v| &v[..]);
353 let topic_2 = log.topics.get(2).as_ref().map(|v| &v[..]);
354 let topic_3 = log.topics.get(3).as_ref().map(|v| &v[..]);
355 let data = log.data.as_ref().map(|v| &v.0[..]);
356
357 query!(
358 r#"
359 INSERT INTO logs(
360 block_hash,
361 transaction_index,
362 log_index,
363 address,
364 block_number,
365 transaction_hash,
366 topic_0, topic_1, topic_2, topic_3,
367 data)
368 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
369 "#,
370 ethereum_hash_ref,
371 transaction_index,
372 log_index,
373 address,
374 block_number,
375 transaction_hash,
376 topic_0,
377 topic_1,
378 topic_2,
379 topic_3,
380 data
381 )
382 .execute(&self.pool)
383 .await?;
384 }
385 }
386 self.insert_block_mapping(&block_map).await?;
388 }
389
390 Ok(())
391 }
392
393 pub async fn logs(&self, filter: Option<Filter>) -> anyhow::Result<Vec<Log>> {
395 let mut qb = QueryBuilder::<Sqlite>::new("SELECT logs.* FROM logs WHERE 1=1");
396 let filter = filter.unwrap_or_default();
397
398 let latest_block = U256::from(self.block_provider.latest_block_number().await);
399
400 let as_block_number = |block_param| match block_param {
401 None => Ok(None),
402 Some(BlockNumberOrTag::U256(v)) => Ok(Some(v)),
403 Some(BlockNumberOrTag::BlockTag(BlockTag::Latest)) => Ok(Some(latest_block)),
404 Some(BlockNumberOrTag::BlockTag(tag)) => anyhow::bail!("Unsupported tag: {tag:?}"),
405 };
406
407 let from_block = as_block_number(filter.from_block)?;
408 let to_block = as_block_number(filter.to_block)?;
409
410 match (from_block, to_block, filter.block_hash) {
411 (Some(_), _, Some(_)) | (_, Some(_), Some(_)) => {
412 anyhow::bail!("block number and block hash cannot be used together");
413 },
414
415 (Some(block), _, _) | (_, Some(block), _) if block > latest_block => {
416 anyhow::bail!("block number exceeds latest block");
417 },
418 (Some(from_block), Some(to_block), None) if from_block > to_block => {
419 anyhow::bail!("invalid block range params");
420 },
421 (Some(from_block), Some(to_block), None) if from_block == to_block => {
422 qb.push(" AND block_number = ").push_bind(from_block.as_u64() as i64);
423 },
424 (Some(from_block), Some(to_block), None) => {
425 qb.push(" AND block_number BETWEEN ")
426 .push_bind(from_block.as_u64() as i64)
427 .push(" AND ")
428 .push_bind(to_block.as_u64() as i64);
429 },
430 (Some(from_block), None, None) => {
431 qb.push(" AND block_number >= ").push_bind(from_block.as_u64() as i64);
432 },
433 (None, Some(to_block), None) => {
434 qb.push(" AND block_number <= ").push_bind(to_block.as_u64() as i64);
435 },
436 (None, None, Some(hash)) => {
437 qb.push(" AND block_hash = ").push_bind(hash.0.to_vec());
438 },
439 (None, None, None) => {
440 qb.push(" AND block_number = ").push_bind(latest_block.as_u64() as i64);
441 },
442 }
443
444 if let Some(addresses) = filter.address {
445 match addresses {
446 AddressOrAddresses::Address(addr) => {
447 qb.push(" AND address = ").push_bind(addr.0.to_vec());
448 },
449 AddressOrAddresses::Addresses(addrs) => {
450 qb.push(" AND address IN (");
451 let mut separated = qb.separated(", ");
452 for addr in addrs {
453 separated.push_bind(addr.0.to_vec());
454 }
455 separated.push_unseparated(")");
456 },
457 }
458 }
459
460 if let Some(topics) = filter.topics {
461 if topics.len() > 4 {
462 return Err(anyhow::anyhow!("exceed max topics"));
463 }
464
465 for (i, topic) in topics.into_iter().enumerate() {
466 match topic {
467 FilterTopic::Single(hash) => {
468 qb.push(format_args!(" AND topic_{i} = ")).push_bind(hash.0.to_vec());
469 },
470 FilterTopic::Multiple(hashes) => {
471 qb.push(format_args!(" AND topic_{i} IN ("));
472 let mut separated = qb.separated(", ");
473 for hash in hashes {
474 separated.push_bind(hash.0.to_vec());
475 }
476 separated.push_unseparated(")");
477 },
478 }
479 }
480 }
481
482 qb.push(" LIMIT 10000");
483
484 let logs = qb
485 .build()
486 .try_map(|row| {
487 let block_hash: Vec<u8> = row.try_get("block_hash")?;
488 let transaction_index: i64 = row.try_get("transaction_index")?;
489 let log_index: i64 = row.try_get("log_index")?;
490 let address: Vec<u8> = row.try_get("address")?;
491 let block_number: i64 = row.try_get("block_number")?;
492 let transaction_hash: Vec<u8> = row.try_get("transaction_hash")?;
493 let topic_0: Option<Vec<u8>> = row.try_get("topic_0")?;
494 let topic_1: Option<Vec<u8>> = row.try_get("topic_1")?;
495 let topic_2: Option<Vec<u8>> = row.try_get("topic_2")?;
496 let topic_3: Option<Vec<u8>> = row.try_get("topic_3")?;
497 let data: Option<Vec<u8>> = row.try_get("data")?;
498
499 let topics = [topic_0, topic_1, topic_2, topic_3]
500 .iter()
501 .filter_map(|t| t.as_ref().map(|t| H256::from_slice(t)))
502 .collect::<Vec<_>>();
503
504 Ok(Log {
505 address: Address::from_slice(&address),
506 block_hash: H256::from_slice(&block_hash),
507 block_number: U256::from(block_number as u64),
508 data: data.map(Bytes::from),
509 log_index: U256::from(log_index as u64),
510 topics,
511 transaction_hash: H256::from_slice(&transaction_hash),
512 transaction_index: U256::from(transaction_index as u64),
513 removed: false,
514 })
515 })
516 .fetch_all(&self.pool)
517 .await?;
518
519 Ok(logs)
520 }
521
522 pub async fn receipts_count_per_block(&self, block_hash: &H256) -> Option<usize> {
524 let block_hash = block_hash.as_ref();
525 let row = query!(
526 r#"
527 SELECT COUNT(*) as count
528 FROM transaction_hashes
529 WHERE block_hash = $1
530 "#,
531 block_hash
532 )
533 .fetch_one(&self.pool)
534 .await
535 .ok()?;
536
537 let count = row.count as usize;
538 Some(count)
539 }
540
541 pub async fn block_transaction_hashes(
543 &self,
544 block_hash: &H256,
545 ) -> Option<HashMap<usize, H256>> {
546 let block_hash = block_hash.as_ref();
547 let rows = query!(
548 r#"
549 SELECT transaction_index, transaction_hash
550 FROM transaction_hashes
551 WHERE block_hash = $1
552 "#,
553 block_hash
554 )
555 .map(|row| {
556 let transaction_index = row.transaction_index as usize;
557 let transaction_hash = H256::from_slice(&row.transaction_hash);
558 (transaction_index, transaction_hash)
559 })
560 .fetch_all(&self.pool)
561 .await
562 .ok()?;
563
564 Some(rows.into_iter().collect())
565 }
566
567 pub async fn receipt_by_block_hash_and_index(
569 &self,
570 block_hash: &H256,
571 transaction_index: usize,
572 ) -> Option<ReceiptInfo> {
573 let block = self.block_provider.block_by_hash(block_hash).await.ok()??;
574 let (_, receipt) = self
575 .receipt_extractor
576 .extract_from_transaction(&block, transaction_index)
577 .await
578 .ok()?;
579 Some(receipt)
580 }
581
582 pub async fn receipt_by_hash(&self, transaction_hash: &H256) -> Option<ReceiptInfo> {
584 let (block_hash, transaction_index) = self.find_transaction(transaction_hash).await?;
585
586 let block = self.block_provider.block_by_hash(&block_hash).await.ok()??;
587 let (_, receipt) = self
588 .receipt_extractor
589 .extract_from_transaction(&block, transaction_index)
590 .await
591 .ok()?;
592 Some(receipt)
593 }
594
595 pub async fn signed_tx_by_hash(&self, transaction_hash: &H256) -> Option<TransactionSigned> {
597 let (block_hash, transaction_index) = self.find_transaction(transaction_hash).await?;
598
599 let block = self.block_provider.block_by_hash(&block_hash).await.ok()??;
600 let (signed_tx, _) = self
601 .receipt_extractor
602 .extract_from_transaction(&block, transaction_index)
603 .await
604 .ok()?;
605 Some(signed_tx)
606 }
607}
608
609#[cfg(test)]
610mod tests {
611 use super::*;
612 use crate::test::{MockBlockInfo, MockBlockInfoProvider};
613 use pallet_revive::evm::{ReceiptInfo, TransactionSigned};
614 use pretty_assertions::assert_eq;
615 use sp_core::{H160, H256};
616 use sqlx::SqlitePool;
617
618 async fn count(pool: &SqlitePool, table: &str, block_hash: Option<H256>) -> usize {
619 let count: i64 = match block_hash {
620 None =>
621 sqlx::query_scalar(&format!("SELECT COUNT(*) FROM {table}"))
622 .fetch_one(pool)
623 .await,
624 Some(hash) =>
625 sqlx::query_scalar(&format!("SELECT COUNT(*) FROM {table} WHERE block_hash = ?"))
626 .bind(hash.as_ref())
627 .fetch_one(pool)
628 .await,
629 }
630 .unwrap();
631
632 count as _
633 }
634
635 async fn setup_sqlite_provider(pool: SqlitePool) -> ReceiptProvider<MockBlockInfoProvider> {
636 ReceiptProvider {
637 pool,
638 block_provider: MockBlockInfoProvider {},
639 receipt_extractor: ReceiptExtractor::new_mock(),
640 keep_latest_n_blocks: Some(10),
641 block_number_to_hashes: Default::default(),
642 }
643 }
644
645 #[sqlx::test]
646 async fn test_insert_remove(pool: SqlitePool) -> anyhow::Result<()> {
647 let provider = setup_sqlite_provider(pool).await;
648 let block = MockBlockInfo { hash: H256::default(), number: 0 };
649 let receipts = vec![(
650 TransactionSigned::default(),
651 ReceiptInfo {
652 logs: vec![Log { block_hash: block.hash, ..Default::default() }],
653 ..Default::default()
654 },
655 )];
656 let ethereum_hash = H256::from([1_u8; 32]);
657 let block_map = BlockHashMap::new(block.hash(), ethereum_hash);
658
659 provider.insert(&block, &receipts, ðereum_hash).await?;
660 let row = provider.find_transaction(&receipts[0].1.transaction_hash).await;
661 assert_eq!(row, Some((block.hash, 0)));
662
663 provider.remove(&[block_map]).await?;
664 assert_eq!(count(&provider.pool, "transaction_hashes", Some(block.hash())).await, 0);
665 assert_eq!(count(&provider.pool, "logs", Some(block.hash())).await, 0);
666 Ok(())
667 }
668
669 #[sqlx::test]
670 async fn test_prune(pool: SqlitePool) -> anyhow::Result<()> {
671 let provider = setup_sqlite_provider(pool).await;
672 let n = provider.keep_latest_n_blocks.unwrap();
673
674 for i in 0..2 * n {
675 let block = MockBlockInfo { hash: H256::from([i as u8; 32]), number: i as _ };
676 let transaction_hash = H256::from([i as u8; 32]);
677 let receipts = vec![(
678 TransactionSigned::default(),
679 ReceiptInfo {
680 transaction_hash,
681 logs: vec![Log {
682 block_hash: block.hash,
683 transaction_hash,
684 ..Default::default()
685 }],
686 ..Default::default()
687 },
688 )];
689 let ethereum_hash = H256::from([(i + 1) as u8; 32]);
690 provider.insert(&block, &receipts, ðereum_hash).await?;
691 }
692 assert_eq!(count(&provider.pool, "transaction_hashes", None).await, n);
693 assert_eq!(count(&provider.pool, "logs", None).await, n);
694 assert_eq!(count(&provider.pool, "eth_to_substrate_blocks", None).await, n);
695 assert_eq!(provider.block_number_to_hashes.lock().await.len(), n);
696
697 return Ok(());
698 }
699
700 #[sqlx::test]
701 async fn test_fork(pool: SqlitePool) -> anyhow::Result<()> {
702 let provider = setup_sqlite_provider(pool).await;
703
704 let build_block = |seed, number| {
705 let block = MockBlockInfo { hash: H256::from([seed; 32]), number };
706 let transaction_hash = H256::from([seed; 32]);
707 let receipts = vec![(
708 TransactionSigned::default(),
709 ReceiptInfo {
710 transaction_hash,
711 logs: vec![Log {
712 block_hash: block.hash,
713 transaction_hash,
714 ..Default::default()
715 }],
716 ..Default::default()
717 },
718 )];
719 let ethereum_hash = H256::from([seed + 1; 32]);
720
721 (block, receipts, ethereum_hash)
722 };
723
724 let (block0, receipts, ethereum_hash_0) = build_block(0, 0);
726 provider.insert(&block0, &receipts, ðereum_hash_0).await?;
727 let (block1, receipts, ethereum_hash_1) = build_block(1, 1);
728 provider.insert(&block1, &receipts, ðereum_hash_1).await?;
729 let (block2, receipts, ethereum_hash_2) = build_block(2, 2);
730 provider.insert(&block2, &receipts, ðereum_hash_2).await?;
731 let (block3, receipts, ethereum_hash_3) = build_block(3, 3);
732 provider.insert(&block3, &receipts, ðereum_hash_3).await?;
733
734 assert_eq!(count(&provider.pool, "transaction_hashes", None).await, 4);
735 assert_eq!(count(&provider.pool, "logs", None).await, 4);
736 assert_eq!(count(&provider.pool, "eth_to_substrate_blocks", None).await, 4);
737 assert_eq!(
738 provider.block_number_to_hashes.lock().await.clone(),
739 [
740 (0, BlockHashMap::new(block0.hash, ethereum_hash_0)),
741 (1, BlockHashMap::new(block1.hash, ethereum_hash_1)),
742 (2, BlockHashMap::new(block2.hash, ethereum_hash_2)),
743 (3, BlockHashMap::new(block3.hash, ethereum_hash_3))
744 ]
745 .into(),
746 );
747
748 let (fork_block, receipts, ethereum_hash_fork) = build_block(4, 1);
750 provider.insert(&fork_block, &receipts, ðereum_hash_fork).await?;
751
752 assert_eq!(count(&provider.pool, "transaction_hashes", None).await, 2);
753 assert_eq!(count(&provider.pool, "logs", None).await, 2);
754 assert_eq!(count(&provider.pool, "eth_to_substrate_blocks", None).await, 2);
755
756 assert_eq!(
757 provider.block_number_to_hashes.lock().await.clone(),
758 [
759 (0, BlockHashMap::new(block0.hash, ethereum_hash_0)),
760 (1, BlockHashMap::new(fork_block.hash, ethereum_hash_fork))
761 ]
762 .into(),
763 );
764
765 return Ok(());
766 }
767
768 #[sqlx::test]
769 async fn test_receipts_count_per_block(pool: SqlitePool) -> anyhow::Result<()> {
770 let provider = setup_sqlite_provider(pool).await;
771 let block = MockBlockInfo { hash: H256::default(), number: 0 };
772 let receipts = vec![
773 (
774 TransactionSigned::default(),
775 ReceiptInfo { transaction_hash: H256::from([0u8; 32]), ..Default::default() },
776 ),
777 (
778 TransactionSigned::default(),
779 ReceiptInfo { transaction_hash: H256::from([1u8; 32]), ..Default::default() },
780 ),
781 ];
782 let ethereum_hash = H256::from([2u8; 32]);
783
784 provider.insert(&block, &receipts, ðereum_hash).await?;
785 let count = provider.receipts_count_per_block(&block.hash).await;
786 assert_eq!(count, Some(2));
787 Ok(())
788 }
789
790 #[sqlx::test]
791 async fn test_query_logs(pool: SqlitePool) -> anyhow::Result<()> {
792 let provider = setup_sqlite_provider(pool).await;
793 let block1 = MockBlockInfo { hash: H256::from([1u8; 32]), number: 1 };
794 let block2 = MockBlockInfo { hash: H256::from([2u8; 32]), number: 2 };
795 let ethereum_hash1 = H256::from([3u8; 32]);
796 let ethereum_hash2 = H256::from([4u8; 32]);
797 let log1 = Log {
798 block_hash: ethereum_hash1,
799 block_number: block1.number.into(),
800 address: H160::from([1u8; 20]),
801 topics: vec![H256::from([1u8; 32]), H256::from([2u8; 32])],
802 data: Some(vec![0u8; 32].into()),
803 transaction_hash: H256::default(),
804 transaction_index: U256::from(1),
805 log_index: U256::from(1),
806 ..Default::default()
807 };
808 let log2 = Log {
809 block_hash: ethereum_hash2,
810 block_number: block2.number.into(),
811 address: H160::from([2u8; 20]),
812 topics: vec![H256::from([2u8; 32]), H256::from([3u8; 32])],
813 transaction_hash: H256::from([1u8; 32]),
814 transaction_index: U256::from(2),
815 log_index: U256::from(1),
816 ..Default::default()
817 };
818
819 provider
820 .insert(
821 &block1,
822 &vec![(
823 TransactionSigned::default(),
824 ReceiptInfo {
825 logs: vec![log1.clone()],
826 transaction_hash: log1.transaction_hash,
827 transaction_index: log1.transaction_index,
828 ..Default::default()
829 },
830 )],
831 ðereum_hash1,
832 )
833 .await?;
834 provider
835 .insert(
836 &block2,
837 &vec![(
838 TransactionSigned::default(),
839 ReceiptInfo {
840 logs: vec![log2.clone()],
841 transaction_hash: log2.transaction_hash,
842 transaction_index: log2.transaction_index,
843 ..Default::default()
844 },
845 )],
846 ðereum_hash2,
847 )
848 .await?;
849
850 let logs = provider.logs(None).await?;
852 assert_eq!(logs, vec![log2.clone()]);
853
854 let logs = provider
856 .logs(Some(Filter { from_block: Some(log2.block_number.into()), ..Default::default() }))
857 .await?;
858 assert_eq!(logs, vec![log2.clone()]);
859
860 let logs = provider
862 .logs(Some(Filter { from_block: Some(BlockTag::Latest.into()), ..Default::default() }))
863 .await?;
864 assert_eq!(logs, vec![log2.clone()]);
865
866 let logs = provider
868 .logs(Some(Filter { to_block: Some(log1.block_number.into()), ..Default::default() }))
869 .await?;
870 assert_eq!(logs, vec![log1.clone()]);
871
872 let logs = provider
874 .logs(Some(Filter { block_hash: Some(log1.block_hash), ..Default::default() }))
875 .await?;
876 assert_eq!(logs, vec![log1.clone()]);
877
878 let logs = provider
880 .logs(Some(Filter {
881 from_block: Some(U256::from(0).into()),
882 address: Some(log1.address.into()),
883 ..Default::default()
884 }))
885 .await?;
886 assert_eq!(logs, vec![log1.clone()]);
887
888 let logs = provider
890 .logs(Some(Filter {
891 from_block: Some(U256::from(0).into()),
892 address: Some(vec![log1.address, log2.address].into()),
893 ..Default::default()
894 }))
895 .await?;
896 assert_eq!(logs, vec![log1.clone(), log2.clone()]);
897
898 let logs = provider
900 .logs(Some(Filter {
901 from_block: Some(U256::from(0).into()),
902 topics: Some(vec![FilterTopic::Single(log1.topics[0])]),
903 ..Default::default()
904 }))
905 .await?;
906 assert_eq!(logs, vec![log1.clone()]);
907
908 let logs = provider
910 .logs(Some(Filter {
911 from_block: Some(U256::from(0).into()),
912 topics: Some(vec![
913 FilterTopic::Single(log1.topics[0]),
914 FilterTopic::Single(log1.topics[1]),
915 ]),
916 ..Default::default()
917 }))
918 .await?;
919 assert_eq!(logs, vec![log1.clone()]);
920
921 let logs = provider
923 .logs(Some(Filter {
924 from_block: Some(U256::from(0).into()),
925 topics: Some(vec![FilterTopic::Multiple(vec![log1.topics[0], log2.topics[0]])]),
926 ..Default::default()
927 }))
928 .await?;
929 assert_eq!(logs, vec![log1.clone(), log2.clone()]);
930
931 let logs = provider
933 .logs(Some(Filter {
934 from_block: Some(log1.block_number.into()),
935 to_block: Some(log2.block_number.into()),
936 block_hash: None,
937 address: Some(vec![log1.address, log2.address].into()),
938 topics: Some(vec![FilterTopic::Multiple(vec![log1.topics[0], log2.topics[0]])]),
939 }))
940 .await?;
941 assert_eq!(logs, vec![log1.clone(), log2.clone()]);
942 Ok(())
943 }
944
945 #[sqlx::test]
946 async fn test_block_mapping_insert_get(pool: SqlitePool) -> anyhow::Result<()> {
947 let provider = setup_sqlite_provider(pool).await;
948 let ethereum_hash = H256::from([1u8; 32]);
949 let substrate_hash = H256::from([2u8; 32]);
950 let block_map = BlockHashMap::new(substrate_hash, ethereum_hash);
951
952 provider.insert_block_mapping(&block_map).await?;
954
955 let resolved = provider.get_substrate_hash(ðereum_hash).await;
957 assert_eq!(resolved, Some(substrate_hash));
958
959 let resolved = provider.get_ethereum_hash(&substrate_hash).await;
961 assert_eq!(resolved, Some(ethereum_hash));
962
963 Ok(())
964 }
965
966 #[sqlx::test]
967 async fn test_block_mapping_remove(pool: SqlitePool) -> anyhow::Result<()> {
968 let provider = setup_sqlite_provider(pool).await;
969 let ethereum_hash1 = H256::from([1u8; 32]);
970 let ethereum_hash2 = H256::from([2u8; 32]);
971 let substrate_hash1 = H256::from([3u8; 32]);
972 let substrate_hash2 = H256::from([4u8; 32]);
973 let block_map1 = BlockHashMap::new(substrate_hash1, ethereum_hash1);
974 let block_map2 = BlockHashMap::new(substrate_hash2, ethereum_hash2);
975
976 provider.insert_block_mapping(&block_map1).await?;
978 provider.insert_block_mapping(&block_map2).await?;
979
980 assert_eq!(
982 provider.get_substrate_hash(&block_map1.ethereum_hash).await,
983 Some(block_map1.substrate_hash)
984 );
985 assert_eq!(
986 provider.get_substrate_hash(&block_map2.ethereum_hash).await,
987 Some(block_map2.substrate_hash)
988 );
989
990 provider.remove(&[block_map1]).await?;
992
993 assert_eq!(provider.get_substrate_hash(ðereum_hash1).await, None);
995 assert_eq!(provider.get_substrate_hash(ðereum_hash2).await, Some(substrate_hash2));
996
997 Ok(())
998 }
999
1000 #[sqlx::test]
1001 async fn test_block_mapping_pruning_integration(pool: SqlitePool) -> anyhow::Result<()> {
1002 let provider = setup_sqlite_provider(pool).await;
1003 let ethereum_hash = H256::from([1u8; 32]);
1004 let substrate_hash = H256::from([2u8; 32]);
1005 let block_map = BlockHashMap::new(substrate_hash, ethereum_hash);
1006
1007 provider.insert_block_mapping(&block_map).await?;
1009 assert_eq!(
1010 provider.get_substrate_hash(&block_map.ethereum_hash).await,
1011 Some(block_map.substrate_hash)
1012 );
1013
1014 provider.remove(&[block_map.clone()]).await?;
1016
1017 assert_eq!(provider.get_substrate_hash(&block_map.ethereum_hash).await, None);
1019
1020 Ok(())
1021 }
1022
1023 #[sqlx::test]
1024 async fn test_logs_with_ethereum_block_hash_mapping(pool: SqlitePool) -> anyhow::Result<()> {
1025 let provider = setup_sqlite_provider(pool).await;
1026 let ethereum_hash = H256::from([1u8; 32]);
1027 let substrate_hash = H256::from([2u8; 32]);
1028 let block_number = 1u64;
1029
1030 let log = Log {
1032 block_hash: ethereum_hash,
1033 block_number: block_number.into(),
1034 address: H160::from([1u8; 20]),
1035 topics: vec![H256::from([1u8; 32])],
1036 transaction_hash: H256::from([3u8; 32]),
1037 transaction_index: U256::from(0),
1038 log_index: U256::from(0),
1039 data: Some(vec![0u8; 32].into()),
1040 ..Default::default()
1041 };
1042
1043 let block = MockBlockInfo { hash: substrate_hash, number: block_number as u32 };
1045 let receipts = vec![(
1046 TransactionSigned::default(),
1047 ReceiptInfo {
1048 logs: vec![log.clone()],
1049 transaction_hash: log.transaction_hash,
1050 transaction_index: log.transaction_index,
1051 ..Default::default()
1052 },
1053 )];
1054 provider.insert(&block, &receipts, ðereum_hash).await?;
1055
1056 let logs = provider
1058 .logs(Some(Filter { block_hash: Some(ethereum_hash), ..Default::default() }))
1059 .await?;
1060 assert_eq!(logs, vec![log]);
1061
1062 Ok(())
1063 }
1064
1065 #[sqlx::test]
1066 async fn test_mapping_count(pool: SqlitePool) -> anyhow::Result<()> {
1067 let provider = setup_sqlite_provider(pool).await;
1068
1069 assert_eq!(count(&provider.pool, "eth_to_substrate_blocks", None).await, 0);
1071
1072 let block_map1 = BlockHashMap::new(H256::from([1u8; 32]), H256::from([2u8; 32]));
1073 let block_map2 = BlockHashMap::new(H256::from([3u8; 32]), H256::from([4u8; 32]));
1074
1075 provider.insert_block_mapping(&block_map1).await?;
1077 provider.insert_block_mapping(&block_map2).await?;
1078
1079 assert_eq!(count(&provider.pool, "eth_to_substrate_blocks", None).await, 2);
1080
1081 provider.remove(&[block_map1]).await?;
1083 assert_eq!(count(&provider.pool, "eth_to_substrate_blocks", None).await, 1);
1084
1085 Ok(())
1086 }
1087}