anvil_polkadot/api_server/filters.rs
1use crate::{
2 api_server::{
3 error::{Error, Result, ToRpcResponseResult},
4 txpool_helpers::extract_tx_info,
5 },
6 substrate_node::service::TransactionPoolHandle,
7};
8use anvil_core::eth::subscription::SubscriptionId;
9use anvil_rpc::response::ResponseResult;
10use futures::{FutureExt, StreamExt};
11use pallet_revive_eth_rpc::client::Client as EthRpcClient;
12use polkadot_sdk::{
13 pallet_revive::evm::{BlockNumberOrTag, Filter, HashesOrTransactionInfos, Log},
14 sc_service::TransactionPool,
15};
16use std::{
17 collections::{HashMap, HashSet},
18 sync::Arc,
19 time::Duration,
20};
21use subxt::utils::H256;
22use tokio::{sync::Mutex, time::Instant};
23use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
24
25/// Default timeout duration for active filters in seconds.
26/// Filters that haven't been polled within this duration will be evicted.
27pub const ACTIVE_FILTER_TIMEOUT_SECS: u64 = 60 * 5;
28
29pub const LOG_TARGET: &str = "node::filter";
30
31/// Maps filter IDs to tuples of filter and deadline.
32type FilterMap = Arc<Mutex<HashMap<String, (EthFilter, Instant)>>>;
33
34/// Type alias for block notification streams.
35pub type BlockNotifications = BroadcastStream<H256>;
36
37/// Manages Ethereum style filters for block notifications, logs and pending transactions.
38///
39/// Maintains active filters and automatically evicts that haven't been polled within the
40/// keep alive duration. Each filter is identified by a unique hexa string.
41#[derive(Clone)]
42pub struct Filters {
43 /// Currently active filters
44 active_filters: FilterMap,
45 /// Lifetime of a filter
46 keep_alive: Duration,
47}
48
49impl Filters {
50 /// Creates a new Filters instance with custom keepalive duration
51 pub fn with_keepalive(keep_alive: Duration) -> Self {
52 Self { active_filters: Arc::new(Mutex::new(HashMap::default())), keep_alive }
53 }
54
55 /// Inserts a new Filter and returns its unique identifier.
56 pub async fn add_filter(&self, filter: EthFilter) -> String {
57 let id = new_id();
58 trace!(target: "node::filters", "Adding new filter id {}", id);
59 let mut filters = self.active_filters.lock().await;
60 filters.insert(id.clone(), (filter, self.next_deadline()));
61 id
62 }
63
64 /// Poll the filter for changes since the last call.
65 ///
66 /// This method retrieves any new data from the specified filter and resets its deadline.
67 ///
68 /// - Block filters: Returns an array of new block hashes
69 /// - Log filters: Returns an array of logs matching the filter criteria, including both
70 /// historic logs on the first poll and new logs from blocks produced since the last poll.
71 pub async fn get_filter_changes(&self, id: &str) -> ResponseResult {
72 {
73 let mut filters = self.active_filters.lock().await;
74 if let Some((filter, deadline)) = filters.get_mut(id) {
75 let response = match filter {
76 EthFilter::Blocks(block_filter) => {
77 let blocks = block_filter.drain_blocks().await;
78 Ok(blocks).to_rpc_result()
79 }
80 EthFilter::Logs(logs_filter) => {
81 let logs = logs_filter.drain_logs().await;
82 Ok(logs).to_rpc_result()
83 }
84 EthFilter::PendingTransactions(tx_filter) => {
85 let txs = tx_filter.drain_transactions().await;
86 Ok(txs).to_rpc_result()
87 }
88 };
89 *deadline = self.next_deadline();
90 return response;
91 }
92 }
93 warn!(target: LOG_TARGET, "No filter found for {}", id);
94 ResponseResult::success(Vec::<()>::new())
95 }
96
97 /// Returns the log filter criteria for a given filter ID.
98 pub async fn get_log_filter(&self, id: &str) -> Option<Filter> {
99 let filters = self.active_filters.lock().await;
100 if let Some((EthFilter::Logs(log), _)) = filters.get(id) {
101 return Some(log.filter.clone());
102 }
103 None
104 }
105
106 /// Returns the keepalive duration for filters.
107 pub fn keep_alive(&self) -> Duration {
108 self.keep_alive
109 }
110
111 /// Removes and returns the filter associated with the given identifier.
112 pub async fn uninstall_filter(&self, id: &str) -> Option<EthFilter> {
113 trace!(target: LOG_TARGET, "Uninstalling filter id {}", id);
114 self.active_filters.lock().await.remove(id).map(|(f, _)| f)
115 }
116
117 /// Evicts all filters that have exceeded their keepalive deadline.
118 ///
119 /// This method is typically called periodically by the eviction task to clean up
120 /// stale filters that haven't been polled recently. Evicted filters are permanently
121 /// removed and cannot be recovered.
122 pub async fn evict(&self) {
123 trace!(target: LOG_TARGET, "Evicting stale filters");
124 let now = Instant::now();
125 let mut active_filters = self.active_filters.lock().await;
126 active_filters.retain(|id, (_, deadline)| {
127 if now > *deadline {
128 trace!(target: LOG_TARGET,?id, "Evicting stale filter");
129 return false;
130 }
131 true
132 });
133 }
134
135 fn next_deadline(&self) -> Instant {
136 Instant::now() + self.keep_alive()
137 }
138}
139
140impl Default for Filters {
141 fn default() -> Self {
142 Self {
143 active_filters: Arc::new(Mutex::new(HashMap::default())),
144 keep_alive: Duration::from_secs(ACTIVE_FILTER_TIMEOUT_SECS),
145 }
146 }
147}
148
149fn new_id() -> String {
150 SubscriptionId::random_hex().to_string()
151}
152
153/// Background task that periodically evicts stale filters.
154///
155/// This task runs an infinite loop that calls `Filters::evict()` at regular intervals
156/// based on the filter keepalive duration. It ensures that filters which haven't been
157/// polled are automatically removed to prevent memory leaks.
158///
159/// The task should be spawned once when the filter system is initialized and will
160/// run for the lifetime of the application.
161pub async fn eviction_task(filters: Filters) {
162 let start = filters.next_deadline();
163 let mut interval = tokio::time::interval_at(start, filters.keep_alive());
164 loop {
165 interval.tick().await;
166 filters.evict().await;
167 }
168}
169
170/// Implements the Ethereum JSON-RPC filter specification, supporting block
171/// log and pending transactions filtering capabilities. Each filter type
172/// has different polling behavior and data delivery semantics.
173pub enum EthFilter {
174 /// Block filter that streams new block hashes.
175 ///
176 /// Emits the hash (H256) of each new block as it's added to the chain.
177 /// Subscribers receive notifications through the broadcast channel. When polled,
178 /// returns all block hashes produced since the last poll.
179 Blocks(BlockFilter),
180 /// Log filter that tracks contract event logs.
181 ///
182 /// Filters logs based on block range, addresses, and topics. Combines historic
183 /// logs (from the initial query range) with real-time logs from newly produced
184 /// blocks. The filter applies topic matching with OR logic between topic alternatives
185 /// and validates block ranges for incoming blocks.
186 Logs(LogsFilter),
187 /// Pending transactions filter that tracks new transactions.
188 ///
189 /// Returns mined transactions since last poll + transactions that are
190 /// ready but have not been mined yet.
191 PendingTransactions(PendingTransactionsFilter),
192}
193
194/// Filter for tracking new block hashes.
195pub struct BlockFilter {
196 block_notifications: BlockNotifications,
197}
198
199impl BlockFilter {
200 pub fn new(block_notifier: BlockNotifications) -> Self {
201 Self { block_notifications: block_notifier }
202 }
203
204 /// Drains all new block hashes since the last poll.
205 ///
206 /// Returns all block hashes that were broadcast since the last call to this method.
207 /// Handles lagged notifications gracefully by logging and continuing.
208 async fn drain_blocks(&mut self) -> Vec<H256> {
209 let mut new_blocks = Vec::new();
210
211 while let Some(result) = self.block_notifications.next().now_or_never().flatten() {
212 match result {
213 Ok(block_hash) => new_blocks.push(block_hash),
214 Err(BroadcastStreamRecvError::Lagged(count)) => {
215 warn!(
216 target: LOG_TARGET,
217 "Block filter lagged, skipped {} block notifications",
218 count
219 );
220 }
221 }
222 }
223
224 new_blocks
225 }
226}
227
228/// Filter for tracking and collecting contract event logs.
229///
230/// Combines historic log queries with real-time log streaming to provide
231/// a complete view of logs matching the filter criteria. On creation, it optionally
232/// fetches historic logs based on the specified block range. Subsequently, it monitors
233/// new blocks and queries them for matching logs.
234///
235/// The filter validates that incoming blocks are within the specified range (from_block
236/// to to_block) before querying them, ensuring efficient operation and correct semantics.
237pub struct LogsFilter {
238 /// Stream of new block notifications
239 blocks: BlockNotifications,
240 /// Client for querying Ethereum RPC endpoints
241 eth_client: EthRpcClient,
242 /// Filter criteria (addresses, topics, block range)
243 filter: Filter,
244 /// Historic logs fetched at filter creation time, returned on first poll
245 historic: Option<Vec<Log>>,
246}
247
248impl LogsFilter {
249 /// Creates a new log filter with the specified criteria.
250 ///
251 /// If the filter specifies a block range (from_block, to_block) or specific block hash,
252 /// this constructor will immediately query for historic logs matching the criteria.
253 /// These historic logs are stored and returned on the first call to `get_filter_changes`.
254 ///
255 /// For filters without explicit block constraints, only real-time logs from future
256 /// blocks will be collected.
257 pub async fn new(
258 block_notifier: BlockNotifications,
259 eth_rpc_client: EthRpcClient,
260 filter: Filter,
261 ) -> Result<Self> {
262 let historic = if filter.from_block.is_some()
263 || filter.to_block.is_some()
264 || filter.block_hash.is_some()
265 {
266 eth_rpc_client.logs(Some(filter.clone())).await.ok()
267 } else {
268 None
269 };
270 Ok(Self { blocks: block_notifier, eth_client: eth_rpc_client, filter, historic })
271 }
272
273 /// Drains all accumulated logs since the last poll.
274 ///
275 /// This method:
276 /// 1. Takes any historic logs (returned only on first call)
277 /// 2. Drains all pending block notifications without blocking
278 /// 3. For each new block, checks if it's within the filter's block range
279 /// 4. Queries each relevant block for logs matching the filter criteria
280 /// 5. Returns the combined set of logs
281 async fn drain_logs(&mut self) -> Vec<Log> {
282 let mut logs = self.historic.take().unwrap_or_default();
283 let mut block_hashes = vec![];
284 while let Some(result) = self.blocks.next().now_or_never().flatten() {
285 match result {
286 Ok(block_hash) => block_hashes.push(block_hash),
287 Err(BroadcastStreamRecvError::Lagged(blocks)) => {
288 // Channel overflowed - some blocks were skipped
289 warn!(target: LOG_TARGET, "Logs filter lagged, skipped {} block notifications", blocks);
290 // Continue draining what's left in the channel
291 continue;
292 }
293 }
294 }
295
296 // For each block that we were notified about check for logs
297 for substrate_hash in block_hashes {
298 // This can be optimized if we also submit the block number
299 // from subscribe_and_cache_new_blocks
300 if !self.is_block_in_range(&substrate_hash).await {
301 continue;
302 }
303 let mut block_filter = self.filter.clone();
304 block_filter.from_block = None;
305 block_filter.to_block = None;
306 block_filter.block_hash = self.eth_client.resolve_ethereum_hash(&substrate_hash).await;
307 if let Ok(block_logs) = self.eth_client.logs(Some(block_filter)).await {
308 logs.extend(block_logs);
309 }
310 }
311 logs
312 }
313
314 /// Validates both lower bound (from_block) and upper bound (to_block) constraints.
315 /// Block tags (like "latest", "pending") are always considered in range.
316 async fn is_block_in_range(&self, substrate_hash: &H256) -> bool {
317 let Ok(Some(block)) = self.eth_client.block_by_hash(substrate_hash).await else {
318 return false; // Can't get block, skip it
319 };
320
321 let block_number = block.number();
322 // Check lower limit (from_block)
323 if let Some(from_block) = &self.filter.from_block {
324 match from_block {
325 BlockNumberOrTag::U256(limit) => {
326 if block_number < limit.as_u32() {
327 return false;
328 }
329 }
330 BlockNumberOrTag::BlockTag(_) => {}
331 }
332 }
333 // Check upper limit (to_block)
334 if let Some(to_block) = &self.filter.to_block {
335 match to_block {
336 BlockNumberOrTag::U256(limit) => {
337 if block_number > limit.as_u32() {
338 return false;
339 }
340 }
341 BlockNumberOrTag::BlockTag(_) => {}
342 }
343 }
344 true
345 }
346}
347
348/// Filter for pending transactions
349///
350/// Monitors the transaction pool and returns newly pending transaction hashes
351/// when polled. Transactions that have been included in old blocks are automatically filtered out.
352///
353/// The filter maintains state of previously seen transactions to ensure each
354/// transaction is reported only once, even if it remains in the pending pool
355/// across multiple polls.
356pub struct PendingTransactionsFilter {
357 /// Set of transaction hashes already reported to the client
358 already_seen: HashSet<H256>,
359 /// Stream of new block notifications for detecting mined transactions
360 block_notifications: BroadcastStream<H256>,
361 /// Reference to the transaction pool for querying ready transactions
362 tx_pool: Arc<TransactionPoolHandle>,
363 /// Ethereum RPC client for fetching block transaction data
364 eth_rpc_client: EthRpcClient,
365}
366impl PendingTransactionsFilter {
367 pub fn new(
368 block_notifier: BroadcastStream<H256>,
369 tx_pool: Arc<TransactionPoolHandle>,
370 eth_rpc_client: EthRpcClient,
371 ) -> Self {
372 Self {
373 already_seen: tx_pool
374 .ready()
375 .filter_map(|tx| extract_tx_info(&tx.data).map(|(_, _, tx_info)| tx_info.hash))
376 .collect(),
377 block_notifications: block_notifier,
378 tx_pool,
379 eth_rpc_client,
380 }
381 }
382
383 /// Drains all new pending transaction hashes since the last poll.
384 ///
385 /// This method:
386 /// 1. Queries the current ready transaction pool
387 /// 2. Drains block notifications to identify mined transactions
388 /// 3. Returns only new transactions (not previously seen and not mined)
389 ///
390 /// The filter state is updated to remember all currently pending transactions,
391 /// ensuring they won't be reported again on subsequent polls.
392 async fn drain_transactions(&mut self) -> Vec<H256> {
393 // Get current ready transactions
394 let current_ready: HashSet<H256> = self
395 .tx_pool
396 .ready()
397 .filter_map(|tx| {
398 extract_tx_info(&tx.data).map(|(_, _, tx_info)| tx_info.hash).or_else(|| {
399 warn!(target: LOG_TARGET, "Failed to extract transaction info from ready pool");
400 None
401 })
402 })
403 .collect();
404
405 // Get transactions that have been included in blocks already
406 let mut included_transactions = HashSet::new();
407 while let Some(result) = self.block_notifications.next().now_or_never().flatten() {
408 match result {
409 Ok(block_hash) => match self.fetch_block_transactions(&block_hash).await {
410 Ok(tx_hashes) => included_transactions.extend(tx_hashes),
411 Err(e) => {
412 warn!(
413 target: LOG_TARGET,
414 "Failed to fetch transactions for block {:?}: {}",
415 block_hash, e
416 );
417 }
418 },
419 Err(BroadcastStreamRecvError::Lagged(blocks)) => {
420 // Channel overflowed - some blocks were skipped
421 warn!(target: LOG_TARGET, "Logs filter lagged, skipped {} block notifications", blocks);
422 // Continue draining what's left in the channel
423 continue;
424 }
425 }
426 }
427
428 // New from pool: transactions in ready pool we haven't seen before
429 let new_from_pool: HashSet<H256> =
430 current_ready.difference(&self.already_seen).copied().collect();
431 let excluded: HashSet<H256> = self.already_seen.union(&new_from_pool).copied().collect();
432 let new_from_blocks: HashSet<H256> =
433 included_transactions.difference(&excluded).copied().collect();
434 let new_pending: Vec<H256> = new_from_pool.union(&new_from_blocks).copied().collect();
435 // Remove mined transactions from already_seen
436 for tx_hash in &included_transactions {
437 self.already_seen.remove(tx_hash);
438 }
439
440 // Only track transactions that are still pending (not mined)
441 let still_pending: HashSet<H256> =
442 current_ready.difference(&included_transactions).copied().collect();
443 self.already_seen.extend(still_pending);
444 new_pending
445 }
446
447 /// Fetches all transaction hashes from a given block.
448 ///
449 /// Takes a substrate block hash, fetches the block, converts it to an EVM block,
450 /// and extracts all transaction hashes regardless of whether they're returned
451 /// as hashes or full transaction objects.
452 async fn fetch_block_transactions(&self, substrate_block_hash: &H256) -> Result<Vec<H256>> {
453 let substrate_block =
454 self.eth_rpc_client.block_by_hash(substrate_block_hash).await?.ok_or(
455 Error::InternalError(format!(
456 "Could not find block with hash: {substrate_block_hash}"
457 )),
458 )?;
459 let block = self
460 .eth_rpc_client
461 .evm_block(substrate_block, false)
462 .await
463 .ok_or(Error::InternalError("Could not convert to an evm block".to_string()))?;
464 let tx_hashes = match block.transactions {
465 HashesOrTransactionInfos::Hashes(hashes) => hashes,
466 // Considering that we called evm_block with hydrated false we will
467 // never receive TransactionInfos but handled it anyways.
468 HashesOrTransactionInfos::TransactionInfos(infos) => {
469 infos.iter().map(|ti| ti.hash).collect()
470 }
471 };
472 Ok(tx_hashes)
473 }
474}