Skip to main content

anvil_polkadot/api_server/
filters.rs

1use crate::{
2    api_server::{
3        error::{Error, Result, ToRpcResponseResult},
4        txpool_helpers::extract_tx_info,
5    },
6    substrate_node::service::TransactionPoolHandle,
7};
8use anvil_core::eth::subscription::SubscriptionId;
9use anvil_rpc::response::ResponseResult;
10use futures::{FutureExt, StreamExt};
11use pallet_revive_eth_rpc::client::Client as EthRpcClient;
12use polkadot_sdk::{
13    pallet_revive::evm::{BlockNumberOrTag, Filter, HashesOrTransactionInfos, Log},
14    sc_service::TransactionPool,
15};
16use std::{
17    collections::{HashMap, HashSet},
18    sync::Arc,
19    time::Duration,
20};
21use subxt::utils::H256;
22use tokio::{sync::Mutex, time::Instant};
23use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
24
25/// Default timeout duration for active filters in seconds.
26/// Filters that haven't been polled within this duration will be evicted.
27pub const ACTIVE_FILTER_TIMEOUT_SECS: u64 = 60 * 5;
28
29pub const LOG_TARGET: &str = "node::filter";
30
31/// Maps filter IDs to tuples of filter and deadline.
32type FilterMap = Arc<Mutex<HashMap<String, (EthFilter, Instant)>>>;
33
34/// Type alias for block notification streams.
35pub type BlockNotifications = BroadcastStream<H256>;
36
37/// Manages Ethereum style filters for block notifications, logs and pending transactions.
38///
39/// Maintains active filters and automatically evicts that haven't been polled within the
40/// keep alive duration. Each filter is identified by a unique hexa string.
41#[derive(Clone)]
42pub struct Filters {
43    /// Currently active filters
44    active_filters: FilterMap,
45    /// Lifetime of a filter
46    keep_alive: Duration,
47}
48
49impl Filters {
50    /// Creates a new Filters instance with custom keepalive duration
51    pub fn with_keepalive(keep_alive: Duration) -> Self {
52        Self { active_filters: Arc::new(Mutex::new(HashMap::default())), keep_alive }
53    }
54
55    /// Inserts a new Filter and returns its unique identifier.
56    pub async fn add_filter(&self, filter: EthFilter) -> String {
57        let id = new_id();
58        trace!(target: "node::filters", "Adding new filter id {}", id);
59        let mut filters = self.active_filters.lock().await;
60        filters.insert(id.clone(), (filter, self.next_deadline()));
61        id
62    }
63
64    /// Poll the filter for changes since the last call.
65    ///
66    /// This method retrieves any new data from the specified filter and resets its deadline.
67    ///
68    ///  - Block filters: Returns an array of new block hashes
69    ///  - Log filters: Returns an array of logs matching the filter criteria, including both
70    ///    historic logs on the first poll and new logs from blocks produced since the last poll.
71    pub async fn get_filter_changes(&self, id: &str) -> ResponseResult {
72        {
73            let mut filters = self.active_filters.lock().await;
74            if let Some((filter, deadline)) = filters.get_mut(id) {
75                let response = match filter {
76                    EthFilter::Blocks(block_filter) => {
77                        let blocks = block_filter.drain_blocks().await;
78                        Ok(blocks).to_rpc_result()
79                    }
80                    EthFilter::Logs(logs_filter) => {
81                        let logs = logs_filter.drain_logs().await;
82                        Ok(logs).to_rpc_result()
83                    }
84                    EthFilter::PendingTransactions(tx_filter) => {
85                        let txs = tx_filter.drain_transactions().await;
86                        Ok(txs).to_rpc_result()
87                    }
88                };
89                *deadline = self.next_deadline();
90                return response;
91            }
92        }
93        warn!(target: LOG_TARGET, "No filter found for {}", id);
94        ResponseResult::success(Vec::<()>::new())
95    }
96
97    /// Returns the log filter criteria for a given filter ID.
98    pub async fn get_log_filter(&self, id: &str) -> Option<Filter> {
99        let filters = self.active_filters.lock().await;
100        if let Some((EthFilter::Logs(log), _)) = filters.get(id) {
101            return Some(log.filter.clone());
102        }
103        None
104    }
105
106    /// Returns the keepalive duration for filters.
107    pub fn keep_alive(&self) -> Duration {
108        self.keep_alive
109    }
110
111    /// Removes and returns the filter associated with the given identifier.
112    pub async fn uninstall_filter(&self, id: &str) -> Option<EthFilter> {
113        trace!(target: LOG_TARGET, "Uninstalling filter id {}", id);
114        self.active_filters.lock().await.remove(id).map(|(f, _)| f)
115    }
116
117    /// Evicts all filters that have exceeded their keepalive deadline.
118    ///
119    /// This method is typically called periodically by the eviction task to clean up
120    /// stale filters that haven't been polled recently. Evicted filters are permanently
121    /// removed and cannot be recovered.
122    pub async fn evict(&self) {
123        trace!(target: LOG_TARGET, "Evicting stale filters");
124        let now = Instant::now();
125        let mut active_filters = self.active_filters.lock().await;
126        active_filters.retain(|id, (_, deadline)| {
127            if now > *deadline {
128                trace!(target: LOG_TARGET,?id, "Evicting stale filter");
129                return false;
130            }
131            true
132        });
133    }
134
135    fn next_deadline(&self) -> Instant {
136        Instant::now() + self.keep_alive()
137    }
138}
139
140impl Default for Filters {
141    fn default() -> Self {
142        Self {
143            active_filters: Arc::new(Mutex::new(HashMap::default())),
144            keep_alive: Duration::from_secs(ACTIVE_FILTER_TIMEOUT_SECS),
145        }
146    }
147}
148
149fn new_id() -> String {
150    SubscriptionId::random_hex().to_string()
151}
152
153/// Background task that periodically evicts stale filters.
154///
155/// This task runs an infinite loop that calls `Filters::evict()` at regular intervals
156/// based on the filter keepalive duration. It ensures that filters which haven't been
157/// polled are automatically removed to prevent memory leaks.
158///
159/// The task should be spawned once when the filter system is initialized and will
160/// run for the lifetime of the application.
161pub async fn eviction_task(filters: Filters) {
162    let start = filters.next_deadline();
163    let mut interval = tokio::time::interval_at(start, filters.keep_alive());
164    loop {
165        interval.tick().await;
166        filters.evict().await;
167    }
168}
169
170/// Implements the Ethereum JSON-RPC filter specification, supporting block
171/// log and pending transactions filtering capabilities. Each filter type
172/// has different polling behavior and data delivery semantics.
173pub enum EthFilter {
174    /// Block filter that streams new block hashes.
175    ///
176    /// Emits the hash (H256) of each new block as it's added to the chain.
177    /// Subscribers receive notifications through the broadcast channel. When polled,
178    /// returns all block hashes produced since the last poll.
179    Blocks(BlockFilter),
180    /// Log filter that tracks contract event logs.
181    ///
182    /// Filters logs based on block range, addresses, and topics. Combines historic
183    /// logs (from the initial query range) with real-time logs from newly produced
184    /// blocks. The filter applies topic matching with OR logic between topic alternatives
185    /// and validates block ranges for incoming blocks.
186    Logs(LogsFilter),
187    /// Pending transactions filter that tracks new transactions.
188    ///
189    /// Returns mined transactions since last poll + transactions that are
190    /// ready but have not been mined yet.
191    PendingTransactions(PendingTransactionsFilter),
192}
193
194/// Filter for tracking new block hashes.
195pub struct BlockFilter {
196    block_notifications: BlockNotifications,
197}
198
199impl BlockFilter {
200    pub fn new(block_notifier: BlockNotifications) -> Self {
201        Self { block_notifications: block_notifier }
202    }
203
204    /// Drains all new block hashes since the last poll.
205    ///
206    /// Returns all block hashes that were broadcast since the last call to this method.
207    /// Handles lagged notifications gracefully by logging and continuing.
208    async fn drain_blocks(&mut self) -> Vec<H256> {
209        let mut new_blocks = Vec::new();
210
211        while let Some(result) = self.block_notifications.next().now_or_never().flatten() {
212            match result {
213                Ok(block_hash) => new_blocks.push(block_hash),
214                Err(BroadcastStreamRecvError::Lagged(count)) => {
215                    warn!(
216                        target: LOG_TARGET,
217                        "Block filter lagged, skipped {} block notifications",
218                        count
219                    );
220                }
221            }
222        }
223
224        new_blocks
225    }
226}
227
228/// Filter for tracking and collecting contract event logs.
229///
230/// Combines historic log queries with real-time log streaming to provide
231/// a complete view of logs matching the filter criteria. On creation, it optionally
232/// fetches historic logs based on the specified block range. Subsequently, it monitors
233/// new blocks and queries them for matching logs.
234///
235/// The filter validates that incoming blocks are within the specified range (from_block
236/// to to_block) before querying them, ensuring efficient operation and correct semantics.
237pub struct LogsFilter {
238    /// Stream of new block notifications
239    blocks: BlockNotifications,
240    /// Client for querying Ethereum RPC endpoints
241    eth_client: EthRpcClient,
242    /// Filter criteria (addresses, topics, block range)
243    filter: Filter,
244    /// Historic logs fetched at filter creation time, returned on first poll
245    historic: Option<Vec<Log>>,
246}
247
248impl LogsFilter {
249    /// Creates a new log filter with the specified criteria.
250    ///
251    /// If the filter specifies a block range (from_block, to_block) or specific block hash,
252    /// this constructor will immediately query for historic logs matching the criteria.
253    /// These historic logs are stored and returned on the first call to `get_filter_changes`.
254    ///
255    /// For filters without explicit block constraints, only real-time logs from future
256    /// blocks will be collected.
257    pub async fn new(
258        block_notifier: BlockNotifications,
259        eth_rpc_client: EthRpcClient,
260        filter: Filter,
261    ) -> Result<Self> {
262        let historic = if filter.from_block.is_some()
263            || filter.to_block.is_some()
264            || filter.block_hash.is_some()
265        {
266            eth_rpc_client.logs(Some(filter.clone())).await.ok()
267        } else {
268            None
269        };
270        Ok(Self { blocks: block_notifier, eth_client: eth_rpc_client, filter, historic })
271    }
272
273    /// Drains all accumulated logs since the last poll.
274    ///
275    /// This method:
276    /// 1. Takes any historic logs (returned only on first call)
277    /// 2. Drains all pending block notifications without blocking
278    /// 3. For each new block, checks if it's within the filter's block range
279    /// 4. Queries each relevant block for logs matching the filter criteria
280    /// 5. Returns the combined set of logs
281    async fn drain_logs(&mut self) -> Vec<Log> {
282        let mut logs = self.historic.take().unwrap_or_default();
283        let mut block_hashes = vec![];
284        while let Some(result) = self.blocks.next().now_or_never().flatten() {
285            match result {
286                Ok(block_hash) => block_hashes.push(block_hash),
287                Err(BroadcastStreamRecvError::Lagged(blocks)) => {
288                    // Channel overflowed - some blocks were skipped
289                    warn!(target: LOG_TARGET, "Logs filter lagged, skipped {} block notifications", blocks);
290                    // Continue draining what's left in the channel
291                    continue;
292                }
293            }
294        }
295
296        // For each block that we were notified about check for logs
297        for substrate_hash in block_hashes {
298            // This can be optimized if we also submit the block number
299            // from subscribe_and_cache_new_blocks
300            if !self.is_block_in_range(&substrate_hash).await {
301                continue;
302            }
303            let mut block_filter = self.filter.clone();
304            block_filter.from_block = None;
305            block_filter.to_block = None;
306            block_filter.block_hash = self.eth_client.resolve_ethereum_hash(&substrate_hash).await;
307            if let Ok(block_logs) = self.eth_client.logs(Some(block_filter)).await {
308                logs.extend(block_logs);
309            }
310        }
311        logs
312    }
313
314    /// Validates both lower bound (from_block) and upper bound (to_block) constraints.
315    /// Block tags (like "latest", "pending") are always considered in range.
316    async fn is_block_in_range(&self, substrate_hash: &H256) -> bool {
317        let Ok(Some(block)) = self.eth_client.block_by_hash(substrate_hash).await else {
318            return false; // Can't get block, skip it
319        };
320
321        let block_number = block.number();
322        // Check lower limit (from_block)
323        if let Some(from_block) = &self.filter.from_block {
324            match from_block {
325                BlockNumberOrTag::U256(limit) => {
326                    if block_number < limit.as_u32() {
327                        return false;
328                    }
329                }
330                BlockNumberOrTag::BlockTag(_) => {}
331            }
332        }
333        // Check upper limit (to_block)
334        if let Some(to_block) = &self.filter.to_block {
335            match to_block {
336                BlockNumberOrTag::U256(limit) => {
337                    if block_number > limit.as_u32() {
338                        return false;
339                    }
340                }
341                BlockNumberOrTag::BlockTag(_) => {}
342            }
343        }
344        true
345    }
346}
347
348/// Filter for pending transactions
349///
350/// Monitors the transaction pool and returns newly pending transaction hashes
351/// when polled. Transactions that have been included in old blocks are automatically filtered out.
352///
353/// The filter maintains state of previously seen transactions to ensure each
354/// transaction is reported only once, even if it remains in the pending pool
355/// across multiple polls.
356pub struct PendingTransactionsFilter {
357    /// Set of transaction hashes already reported to the client
358    already_seen: HashSet<H256>,
359    /// Stream of new block notifications for detecting mined transactions
360    block_notifications: BroadcastStream<H256>,
361    /// Reference to the transaction pool for querying ready transactions
362    tx_pool: Arc<TransactionPoolHandle>,
363    /// Ethereum RPC client for fetching block transaction data
364    eth_rpc_client: EthRpcClient,
365}
366impl PendingTransactionsFilter {
367    pub fn new(
368        block_notifier: BroadcastStream<H256>,
369        tx_pool: Arc<TransactionPoolHandle>,
370        eth_rpc_client: EthRpcClient,
371    ) -> Self {
372        Self {
373            already_seen: tx_pool
374                .ready()
375                .filter_map(|tx| extract_tx_info(&tx.data).map(|(_, _, tx_info)| tx_info.hash))
376                .collect(),
377            block_notifications: block_notifier,
378            tx_pool,
379            eth_rpc_client,
380        }
381    }
382
383    /// Drains all new pending transaction hashes since the last poll.
384    ///
385    /// This method:
386    /// 1. Queries the current ready transaction pool
387    /// 2. Drains block notifications to identify mined transactions
388    /// 3. Returns only new transactions (not previously seen and not mined)
389    ///
390    /// The filter state is updated to remember all currently pending transactions,
391    /// ensuring they won't be reported again on subsequent polls.
392    async fn drain_transactions(&mut self) -> Vec<H256> {
393        // Get current ready transactions
394        let current_ready: HashSet<H256> = self
395            .tx_pool
396            .ready()
397            .filter_map(|tx| {
398                extract_tx_info(&tx.data).map(|(_, _, tx_info)| tx_info.hash).or_else(|| {
399                    warn!(target: LOG_TARGET, "Failed to extract transaction info from ready pool");
400                    None
401                })
402            })
403            .collect();
404
405        // Get transactions that have been included in blocks already
406        let mut included_transactions = HashSet::new();
407        while let Some(result) = self.block_notifications.next().now_or_never().flatten() {
408            match result {
409                Ok(block_hash) => match self.fetch_block_transactions(&block_hash).await {
410                    Ok(tx_hashes) => included_transactions.extend(tx_hashes),
411                    Err(e) => {
412                        warn!(
413                            target: LOG_TARGET,
414                            "Failed to fetch transactions for block {:?}: {}",
415                            block_hash, e
416                        );
417                    }
418                },
419                Err(BroadcastStreamRecvError::Lagged(blocks)) => {
420                    // Channel overflowed - some blocks were skipped
421                    warn!(target: LOG_TARGET, "Logs filter lagged, skipped {} block notifications", blocks);
422                    // Continue draining what's left in the channel
423                    continue;
424                }
425            }
426        }
427
428        // New from pool: transactions in ready pool we haven't seen before
429        let new_from_pool: HashSet<H256> =
430            current_ready.difference(&self.already_seen).copied().collect();
431        let excluded: HashSet<H256> = self.already_seen.union(&new_from_pool).copied().collect();
432        let new_from_blocks: HashSet<H256> =
433            included_transactions.difference(&excluded).copied().collect();
434        let new_pending: Vec<H256> = new_from_pool.union(&new_from_blocks).copied().collect();
435        // Remove mined transactions from already_seen
436        for tx_hash in &included_transactions {
437            self.already_seen.remove(tx_hash);
438        }
439
440        // Only track transactions that are still pending (not mined)
441        let still_pending: HashSet<H256> =
442            current_ready.difference(&included_transactions).copied().collect();
443        self.already_seen.extend(still_pending);
444        new_pending
445    }
446
447    /// Fetches all transaction hashes from a given block.
448    ///
449    /// Takes a substrate block hash, fetches the block, converts it to an EVM block,
450    /// and extracts all transaction hashes regardless of whether they're returned
451    /// as hashes or full transaction objects.
452    async fn fetch_block_transactions(&self, substrate_block_hash: &H256) -> Result<Vec<H256>> {
453        let substrate_block =
454            self.eth_rpc_client.block_by_hash(substrate_block_hash).await?.ok_or(
455                Error::InternalError(format!(
456                    "Could not find block with hash: {substrate_block_hash}"
457                )),
458            )?;
459        let block = self
460            .eth_rpc_client
461            .evm_block(substrate_block, false)
462            .await
463            .ok_or(Error::InternalError("Could not convert to an evm block".to_string()))?;
464        let tx_hashes = match block.transactions {
465            HashesOrTransactionInfos::Hashes(hashes) => hashes,
466            // Considering that we called evm_block with hydrated false we will
467            // never receive TransactionInfos but handled it anyways.
468            HashesOrTransactionInfos::TransactionInfos(infos) => {
469                infos.iter().map(|ti| ti.hash).collect()
470            }
471        };
472        Ok(tx_hashes)
473    }
474}