referrerpolicy=no-referrer-when-downgrade

sc_statement_store/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Disk-backed statement store.
20//!
21//! This module contains an implementation of `sp_statement_store::StatementStore` which is backed
22//! by a database.
23//!
24//! Constraint management.
25//!
26//! The statement store validates statements using node-side signature verification and
27//! static runtime allowance limits.
28//! The following constraints are then checked:
29//! * For a given account id, there may be at most `max_count` statements with `max_size` total data
30//!   size. To satisfy this, statements for this account ID are removed from the store starting with
31//!   the lowest priority until a constraint is satisfied.
32//! * There may not be more than `MAX_TOTAL_STATEMENTS` total statements with `MAX_TOTAL_SIZE` size.
33//!   To satisfy this, statements are removed from the store starting with the lowest
34//!   `global_priority` until a constraint is satisfied.
35//!
36//! When a new statement is inserted that would not satisfy constraints in the first place, no
37//! statements are deleted and `Ignored` result is returned.
38//! The order in which statements with the same priority are deleted is unspecified.
39//!
40//! Statement expiration.
41//!
42//! Each time a statement is removed from the store (Either evicted by higher priority statement or
43//! explicitly with the `remove` function) the statement is marked as expired. Expired statements
44//! can't be added to the store for `Options::purge_after_sec` seconds. This is to prevent old
45//! statements from being propagated on the network.
46
47#![warn(missing_docs)]
48#![warn(unused_extern_crates)]
49
50mod metrics;
51mod subscription;
52
53#[cfg(feature = "test-helpers")]
54pub mod subxt_client;
55#[cfg(feature = "test-helpers")]
56pub mod test_utils;
57
58use crate::subscription::{SubscriptionStatementsStream, SubscriptionsHandle};
59use futures::FutureExt;
60use metrics::MetricsLink as PrometheusMetrics;
61use parking_lot::{lock_api::RwLockUpgradableReadGuard, RwLock};
62use prometheus_endpoint::Registry as PrometheusRegistry;
63use sc_client_api::{backend::StorageProvider, Backend, StorageKey};
64use sc_keystore::LocalKeystore;
65use sp_blockchain::HeaderBackend;
66use sp_core::{crypto::UncheckedFrom, hexdisplay::HexDisplay, traits::SpawnNamed, Decode, Encode};
67use sp_runtime::traits::Block as BlockT;
68use sp_statement_store::{
69	runtime_api::{StatementSource, StatementStoreExt},
70	AccountId, BlockHash, Channel, DecryptionKey, FilterDecision, Hash, InvalidReason,
71	OptimizedTopicFilter, RejectionReason, Result, SignatureVerificationResult, Statement,
72	StatementAllowance, StatementEvent, SubmitResult, Topic,
73};
74pub use sp_statement_store::{Error, StatementStore, MAX_TOPICS};
75use std::{
76	collections::{BTreeMap, BTreeSet, HashMap, HashSet},
77	sync::Arc,
78	time::{Duration, Instant},
79};
80pub use subscription::StatementStoreSubscriptionApi;
81
82const KEY_VERSION: &[u8] = b"version".as_slice();
83const CURRENT_VERSION: u32 = 1;
84
85const LOG_TARGET: &str = "statement-store";
86
87/// The amount of time an expired statement is kept before it is removed from the store entirely.
88pub const DEFAULT_PURGE_AFTER_SEC: u64 = 2 * 24 * 60 * 60; // 48h
89/// The maximum number of statements the statement store can hold.
90pub const DEFAULT_MAX_TOTAL_STATEMENTS: usize = 4 * 1024 * 1024; // ~4 million
91/// The maximum amount of data the statement store can hold, regardless of the number of
92/// statements from which the data originates.
93pub const DEFAULT_MAX_TOTAL_SIZE: usize = 2 * 1024 * 1024 * 1024; // 2GiB
94/// The maximum size of a single statement in bytes.
95/// Accounts for the 1-byte vector length prefix when statements are gossiped as `Vec<Statement>`.
96pub const MAX_STATEMENT_SIZE: usize =
97	sc_network_statement::config::MAX_STATEMENT_NOTIFICATION_SIZE as usize - 1;
98
99/// Maximum number of statements to expire in a single iteration.
100const MAX_EXPIRY_STATEMENTS_PER_ITERATION: usize = 10_000;
101/// Maximum number of accounts to check for expiry in a single iteration.
102const MAX_EXPIRY_ACCOUNTS_PER_ITERATION: usize = 10_000;
103/// Maximum time in milliseconds to spend checking for expiry in a single iteration.
104const MAX_EXPIRY_TIME_PER_ITERATION: Duration = Duration::from_millis(100);
105
106/// Number of subscription filter worker tasks.
107const NUM_FILTER_WORKERS: usize = 1;
108
109const MAINTENANCE_PERIOD: std::time::Duration = std::time::Duration::from_secs(29);
110
111/// Specifies which block hash to use when reading statement allowances.
112enum AllowanceBlock {
113	/// Use the best (latest) block hash.
114	Best,
115	/// Use the finalized block hash.
116	Finalized,
117}
118
119// Period between enforcing limits (checking for expired statements and making sure statements stay
120// within allowances). Different from maintenance period to avoid keeping the lock for too long for
121// maintenance tasks.
122const ENFORCE_LIMITS_PERIOD: std::time::Duration = std::time::Duration::from_secs(31);
123
124mod col {
125	pub const META: u8 = 0;
126	pub const STATEMENTS: u8 = 1;
127	pub const EXPIRED: u8 = 2;
128
129	pub const COUNT: u8 = 3;
130}
131
132#[derive(Eq, PartialEq, Debug, Ord, PartialOrd, Clone, Copy)]
133struct Expiry(u64);
134
135impl Expiry {
136	/// Returns the expiration timestamp in seconds
137	fn get_expiration_timestamp_secs(self) -> u64 {
138		self.0 >> 32
139	}
140}
141
142#[derive(PartialEq, Eq)]
143struct PriorityKey {
144	hash: Hash,
145	expiry: Expiry,
146}
147
148impl PartialOrd for PriorityKey {
149	fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
150		Some(self.cmp(other))
151	}
152}
153
154impl Ord for PriorityKey {
155	fn cmp(&self, other: &Self) -> std::cmp::Ordering {
156		self.expiry.cmp(&other.expiry).then_with(|| self.hash.cmp(&other.hash))
157	}
158}
159
160#[derive(PartialEq, Eq)]
161struct ChannelEntry {
162	hash: Hash,
163	expiry: Expiry,
164}
165
166#[derive(Default)]
167struct StatementsForAccount {
168	// Statements ordered by priority.
169	by_priority: BTreeMap<PriorityKey, (Option<Channel>, usize)>,
170	// Channel to statement map. Only one statement per channel is allowed.
171	channels: HashMap<Channel, ChannelEntry>,
172	// Sum of all `Data` field sizes.
173	data_size: usize,
174}
175
176impl StatementsForAccount {
177	/// Returns an iterator over statements that have expired by `current_time`.
178	fn expired_by_iter(
179		&self,
180		current_time: u64,
181	) -> impl Iterator<Item = (&PriorityKey, &(Option<Channel>, usize))> {
182		let range = PriorityKey { hash: Hash::default(), expiry: Expiry(0) }..PriorityKey {
183			hash: Hash::default(),
184			expiry: Expiry(current_time << 32),
185		};
186		self.by_priority.range(range)
187	}
188}
189
190/// Default number of concurrent workers for statement validation.
191pub const DEFAULT_NETWORK_WORKERS: usize = 1;
192
193/// Default maximum statements per second per peer before rate limiting kicks in.
194pub use sc_network_statement::config::DEFAULT_STATEMENTS_PER_SECOND as DEFAULT_RATE_LIMIT;
195
196/// Statement store and network handler configuration.
197#[derive(Debug, Clone, Copy)]
198pub struct Config {
199	/// Maximum statements allowed in the store. Once this limit is reached lower-priority
200	/// statements may be evicted.
201	pub max_total_statements: usize,
202	/// Maximum total data size allowed in the store. Once this limit is reached lower-priority
203	/// statements may be evicted.
204	pub max_total_size: usize,
205	/// Number of seconds for which removed statements won't be allowed to be added back in.
206	pub purge_after_sec: u64,
207	/// Number of concurrent workers for statement validation from the network.
208	pub network_workers: usize,
209	/// Maximum statements per second per peer before rate limiting kicks in.
210	pub rate_limit: u32,
211}
212
213impl Config {
214	/// Validate the configuration, returning an error if any values are invalid.
215	pub fn validate(&self) -> Result<()> {
216		if self.max_total_statements == 0 {
217			return Err(Error::InvalidConfig(
218				"max_total_statements must be greater than zero".into(),
219			));
220		}
221		if self.max_total_size == 0 {
222			return Err(Error::InvalidConfig("max_total_size must be greater than zero".into()));
223		}
224		if self.network_workers == 0 {
225			return Err(Error::InvalidConfig("network_workers must be greater than zero".into()));
226		}
227		Ok(())
228	}
229}
230
231impl Default for Config {
232	fn default() -> Self {
233		Config {
234			max_total_statements: DEFAULT_MAX_TOTAL_STATEMENTS,
235			max_total_size: DEFAULT_MAX_TOTAL_SIZE,
236			purge_after_sec: DEFAULT_PURGE_AFTER_SEC,
237			network_workers: DEFAULT_NETWORK_WORKERS,
238			rate_limit: DEFAULT_RATE_LIMIT,
239		}
240	}
241}
242
243/// Tracks evicted statement hashes to suppress re-gossip until their purge deadline elapses
244#[derive(Default)]
245struct EvictedIndex {
246	hashes: HashSet<Hash>,
247	queue: BTreeSet<(u64, Hash)>,
248	pending_cleanup: Vec<Hash>,
249}
250
251impl EvictedIndex {
252	fn insert(&mut self, hash: Hash, purge_at: u64) {
253		if self.hashes.len() >= DEFAULT_MAX_TOTAL_STATEMENTS {
254			if let Some(&key) = self.queue.iter().next() {
255				self.queue.remove(&key);
256				self.hashes.remove(&key.1);
257				self.pending_cleanup.push(key.1);
258			}
259		}
260		self.hashes.insert(hash);
261		self.queue.insert((purge_at, hash));
262	}
263
264	fn contains(&self, hash: &Hash) -> bool {
265		self.hashes.contains(hash)
266	}
267
268	fn len(&self) -> usize {
269		self.hashes.len()
270	}
271
272	/// Removes and returns all hashes whose purge deadline is at or before `current_time`,
273	/// plus any hashes displaced by the capacity cap since the last call.
274	fn drain_due(&mut self, current_time: u64) -> Vec<Hash> {
275		let cutoff = (current_time.saturating_add(1), Hash::default());
276		let to_keep = self.queue.split_off(&cutoff);
277		let due = std::mem::replace(&mut self.queue, to_keep);
278		let mut result: Vec<Hash> = std::mem::take(&mut self.pending_cleanup);
279		result.extend(due.into_iter().map(|(_, hash)| {
280			self.hashes.remove(&hash);
281			hash
282		}));
283		result
284	}
285}
286
287/// Index for query operations (topic/key-based filtering).
288#[derive(Default)]
289struct QueryIndex {
290	by_topic: HashMap<Topic, HashSet<Hash>>,
291	by_dec_key: HashMap<Option<DecryptionKey>, HashSet<Hash>>,
292	topics_and_keys: HashMap<Hash, ([Option<Topic>; MAX_TOPICS], Option<DecryptionKey>)>,
293	recent: HashSet<Hash>,
294}
295
296/// Index for submit operations (constraint checking, entries, accounts).
297#[derive(Default)]
298struct SubmitIndex {
299	entries: HashMap<Hash, (AccountId, Expiry, usize)>,
300	evicted: EvictedIndex,
301	accounts: HashMap<AccountId, StatementsForAccount>,
302	accounts_to_check_for_expiry_stmts: Vec<AccountId>,
303	config: Config,
304	total_size: usize,
305}
306
307struct ClientWrapper<Block, Client, BE> {
308	client: Arc<Client>,
309	_block: std::marker::PhantomData<Block>,
310	_backend: std::marker::PhantomData<BE>,
311}
312
313impl<Block, Client, BE> ClientWrapper<Block, Client, BE>
314where
315	Block: BlockT,
316	Block::Hash: From<BlockHash>,
317	BE: Backend<Block> + 'static,
318	Client: HeaderBackend<Block> + StorageProvider<Block, BE> + Send + Sync + 'static,
319{
320	fn read_allowance(
321		&self,
322		account_id: &AccountId,
323		allowance_block: AllowanceBlock,
324	) -> Result<Option<StatementAllowance>> {
325		use sp_statement_store::{statement_allowance_key, StatementAllowance};
326
327		let block_hash = match allowance_block {
328			AllowanceBlock::Best => self.client.info().best_hash,
329			AllowanceBlock::Finalized => self.client.info().finalized_hash,
330		};
331		let key = statement_allowance_key(account_id);
332		let storage_key = StorageKey(key);
333		self.client
334			.storage(block_hash, &storage_key)
335			.map_err(|e| Error::Storage(format!("Failed to read allowance: {:?}", e)))?
336			.map(|value| {
337				StatementAllowance::decode(&mut &value.0[..])
338					.map_err(|e| Error::Decode(format!("Failed to decode allowance: {:?}", e)))
339			})
340			.transpose()
341	}
342}
343
344/// Statement store.
345pub struct Store {
346	db: parity_db::Db,
347	submit_index: RwLock<SubmitIndex>,
348	query_index: RwLock<QueryIndex>,
349	read_allowance_fn:
350		Box<dyn Fn(&AccountId, AllowanceBlock) -> Result<Option<StatementAllowance>> + Send + Sync>,
351	subscription_manager: SubscriptionsHandle,
352	keystore: Arc<LocalKeystore>,
353	// Used for testing
354	time_override: Option<u64>,
355	metrics: PrometheusMetrics,
356}
357
358enum IndexQuery {
359	Unknown,
360	Exists,
361	Expired,
362}
363
364impl QueryIndex {
365	fn insert(&mut self, hash: Hash, statement: &Statement) {
366		let mut all_topics = [None; MAX_TOPICS];
367		let mut nt = 0;
368		while let Some(t) = statement.topic(nt) {
369			self.by_topic.entry(t).or_default().insert(hash);
370			all_topics[nt] = Some(t);
371			nt += 1;
372		}
373		let key = statement.decryption_key();
374		self.by_dec_key.entry(key).or_default().insert(hash);
375		self.topics_and_keys.insert(hash, (all_topics, key));
376	}
377
378	fn take_recent(&mut self) -> HashSet<Hash> {
379		std::mem::take(&mut self.recent)
380	}
381
382	fn remove(&mut self, hash: &Hash) {
383		let _ = self.recent.remove(hash);
384		if let Some((topics, key)) = self.topics_and_keys.remove(hash) {
385			for t in topics.into_iter().flatten() {
386				if let std::collections::hash_map::Entry::Occupied(mut set) = self.by_topic.entry(t)
387				{
388					set.get_mut().remove(hash);
389					if set.get().is_empty() {
390						set.remove_entry();
391					}
392				}
393			}
394			if let std::collections::hash_map::Entry::Occupied(mut set) = self.by_dec_key.entry(key)
395			{
396				set.get_mut().remove(hash);
397				if set.get().is_empty() {
398					set.remove_entry();
399				}
400			}
401		}
402	}
403
404	fn iterate_with(
405		&self,
406		key: Option<DecryptionKey>,
407		topic: &OptimizedTopicFilter,
408		f: impl FnMut(&Hash) -> Result<()>,
409	) -> Result<()> {
410		match topic {
411			OptimizedTopicFilter::Any => self.iterate_with_any(key, f),
412			OptimizedTopicFilter::MatchAll(topics) => {
413				self.iterate_with_match_all(key, topics.iter(), f)
414			},
415			OptimizedTopicFilter::MatchAny(topics) => {
416				self.iterate_with_match_any(key, topics.iter(), f)
417			},
418		}
419	}
420
421	fn iterate_with_match_any<'a>(
422		&self,
423		key: Option<DecryptionKey>,
424		match_any_topics: impl ExactSizeIterator<Item = &'a Topic>,
425		mut f: impl FnMut(&Hash) -> Result<()>,
426	) -> Result<()> {
427		let Some(key_set) = self.by_dec_key.get(&key).filter(|k| !k.is_empty()) else {
428			return Ok(());
429		};
430
431		for t in match_any_topics {
432			let set = self.by_topic.get(t);
433
434			for item in set.iter().flat_map(|set| set.iter()) {
435				if key_set.contains(item) {
436					log::trace!(
437						target: LOG_TARGET,
438						"Iterating by topic/key: statement {:?}",
439						HexDisplay::from(item)
440					);
441					f(item)?
442				}
443			}
444		}
445		Ok(())
446	}
447
448	fn iterate_with_any(
449		&self,
450		key: Option<DecryptionKey>,
451		mut f: impl FnMut(&Hash) -> Result<()>,
452	) -> Result<()> {
453		let key_set = self.by_dec_key.get(&key);
454		if key_set.map_or(true, |s| s.is_empty()) {
455			// Key does not exist in the index.
456			return Ok(());
457		}
458
459		for item in key_set.map(|hashes| hashes.iter()).into_iter().flatten() {
460			f(item)?
461		}
462		Ok(())
463	}
464
465	fn iterate_with_match_all<'a>(
466		&self,
467		key: Option<DecryptionKey>,
468		match_all_topics: impl ExactSizeIterator<Item = &'a Topic>,
469		mut f: impl FnMut(&Hash) -> Result<()>,
470	) -> Result<()> {
471		let empty = HashSet::new();
472		let mut sets: [&HashSet<Hash>; MAX_TOPICS + 1] = [&empty; MAX_TOPICS + 1];
473		let num_topics = match_all_topics.len();
474		if num_topics > MAX_TOPICS {
475			return Ok(());
476		}
477		let key_set = self.by_dec_key.get(&key);
478		if key_set.map_or(true, |s| s.is_empty()) {
479			// Key does not exist in the index.
480			return Ok(());
481		}
482		sets[0] = key_set.expect("Function returns if key_set is None");
483		for (i, t) in match_all_topics.enumerate() {
484			let set = self.by_topic.get(t);
485			if set.map_or(0, |s| s.len()) == 0 {
486				// At least one of the match_all_topics does not exist in the index.
487				return Ok(());
488			}
489			sets[i + 1] = set.expect("Function returns if set is None");
490		}
491		let sets = &mut sets[0..num_topics + 1];
492		// Start with the smallest topic set or the key set.
493		sets.sort_by_key(|s| s.len());
494		for item in sets[0] {
495			if sets[1..].iter().all(|set| set.contains(item)) {
496				log::trace!(
497					target: LOG_TARGET,
498					"Iterating by topic/key: statement {:?}",
499					HexDisplay::from(item)
500				);
501				f(item)?
502			}
503		}
504		Ok(())
505	}
506}
507
508impl SubmitIndex {
509	fn new(config: Config) -> SubmitIndex {
510		SubmitIndex { config, ..Default::default() }
511	}
512
513	fn insert_new(&mut self, hash: Hash, account: AccountId, statement: &Statement) {
514		let expiry = Expiry(statement.expiry());
515		self.entries.insert(hash, (account, expiry, statement.data_len()));
516		self.total_size += statement.data_len();
517		let account_info = self.accounts.entry(account).or_default();
518		account_info.data_size += statement.data_len();
519		if let Some(channel) = statement.channel() {
520			account_info.channels.insert(channel, ChannelEntry { hash, expiry });
521		}
522		account_info
523			.by_priority
524			.insert(PriorityKey { hash, expiry }, (statement.channel(), statement.data_len()));
525	}
526
527	fn query(&self, hash: &Hash) -> IndexQuery {
528		if self.entries.contains_key(hash) {
529			return IndexQuery::Exists;
530		}
531		if self.evicted.contains(hash) {
532			return IndexQuery::Expired;
533		}
534		IndexQuery::Unknown
535	}
536
537	fn insert_expired(&mut self, hash: Hash, timestamp: u64) {
538		let purge_at = timestamp.saturating_add(self.config.purge_after_sec);
539		self.evicted.insert(hash, purge_at);
540	}
541
542	fn maintain(&mut self, current_time: u64) -> Vec<Hash> {
543		self.evicted.drain_due(current_time)
544	}
545
546	fn make_expired(&mut self, hash: &Hash, current_time: u64) -> bool {
547		if let Some((account, expiry, len)) = self.entries.remove(hash) {
548			self.total_size -= len;
549			if current_time < expiry.get_expiration_timestamp_secs() {
550				let purge_at = expiry
551					.get_expiration_timestamp_secs()
552					.min(current_time.saturating_add(self.config.purge_after_sec));
553				self.evicted.insert(*hash, purge_at);
554			}
555			if let std::collections::hash_map::Entry::Occupied(mut account_rec) =
556				self.accounts.entry(account)
557			{
558				let key = PriorityKey { hash: *hash, expiry };
559				if let Some((channel, len)) = account_rec.get_mut().by_priority.remove(&key) {
560					account_rec.get_mut().data_size -= len;
561					if let Some(channel) = channel {
562						account_rec.get_mut().channels.remove(&channel);
563					}
564				}
565				if account_rec.get().by_priority.is_empty() {
566					account_rec.remove_entry();
567				}
568			}
569			log::trace!(target: LOG_TARGET, "Expired statement {:?}", HexDisplay::from(hash));
570			true
571		} else {
572			false
573		}
574	}
575
576	fn insert(
577		&mut self,
578		hash: Hash,
579		statement: &Statement,
580		account: &AccountId,
581		validation: &StatementAllowance,
582		current_time: u64,
583	) -> std::result::Result<HashSet<Hash>, RejectionReason> {
584		let statement_len = statement.data_len();
585		if statement_len > validation.max_size as usize {
586			log::debug!(
587				target: LOG_TARGET,
588				"Ignored oversize message: {:?} ({} bytes)",
589				HexDisplay::from(&hash),
590				statement_len,
591			);
592			return Err(RejectionReason::DataTooLarge {
593				submitted_size: statement_len,
594				available_size: validation.max_size as usize,
595			});
596		}
597
598		let mut evicted = HashSet::new();
599		let mut would_free_size = 0;
600		let expiry = Expiry(statement.expiry());
601		let (max_size, max_count) = (validation.max_size as usize, validation.max_count as usize);
602		// It may happen that we can't delete enough lower priority messages
603		// to satisfy size constraints. We check for that before deleting anything,
604		// taking into account channel message replacement.
605		if let Some(account_rec) = self.accounts.get(account) {
606			if let Some(channel) = statement.channel() {
607				if let Some(channel_record) = account_rec.channels.get(&channel) {
608					if expiry <= channel_record.expiry {
609						// Trying to replace channel message with lower expiry.
610						log::debug!(
611							target: LOG_TARGET,
612							"Ignored lower priority channel message: {:?} {:?} <= {:?}",
613							HexDisplay::from(&hash),
614							expiry,
615							channel_record.expiry,
616						);
617						return Err(RejectionReason::ChannelPriorityTooLow {
618							submitted_expiry: expiry.0,
619							min_expiry: channel_record.expiry.0,
620						});
621					} else {
622						// Would replace channel message. Still need to check for size constraints
623						// below.
624						log::debug!(
625							target: LOG_TARGET,
626							"Replacing higher priority channel message: {:?} ({:?}) > {:?} ({:?})",
627							HexDisplay::from(&hash),
628							expiry,
629							HexDisplay::from(&channel_record.hash),
630							channel_record.expiry,
631						);
632						let key = PriorityKey {
633							hash: channel_record.hash,
634							expiry: channel_record.expiry,
635						};
636						if let Some((_channel, len)) = account_rec.by_priority.get(&key) {
637							would_free_size += *len;
638							evicted.insert(channel_record.hash);
639						}
640					}
641				}
642			}
643			// Check if we can evict enough lower priority statements to satisfy constraints
644			for (entry, (_, len)) in account_rec.by_priority.iter() {
645				if (account_rec.data_size - would_free_size + statement_len <= max_size) &&
646					account_rec.by_priority.len() + 1 - evicted.len() <= max_count
647				{
648					// Satisfied
649					break;
650				}
651				if evicted.contains(&entry.hash) {
652					// Already accounted for above
653					continue;
654				}
655				if entry.expiry >= expiry {
656					log::debug!(
657						target: LOG_TARGET,
658						"Ignored message due to constraints {:?} {:?} < {:?}",
659						HexDisplay::from(&hash),
660						expiry,
661						entry.expiry,
662					);
663					return Err(RejectionReason::AccountFull {
664						submitted_expiry: expiry.0,
665						min_expiry: entry.expiry.0,
666					});
667				}
668				evicted.insert(entry.hash);
669				would_free_size += len;
670			}
671		}
672		// Now check global constraints as well.
673		if !((self.total_size - would_free_size + statement_len <= self.config.max_total_size) &&
674			self.entries.len() + 1 - evicted.len() <= self.config.max_total_statements)
675		{
676			log::debug!(
677				target: LOG_TARGET,
678				"Ignored statement {} because the store is full (size={}, count={})",
679				HexDisplay::from(&hash),
680				self.total_size,
681				self.entries.len(),
682			);
683			return Err(RejectionReason::StoreFull);
684		}
685
686		for h in &evicted {
687			self.make_expired(h, current_time);
688		}
689		self.insert_new(hash, *account, statement);
690		Ok(evicted)
691	}
692}
693
694impl Store {
695	/// Create a new shared store instance. There should only be one per process.
696	/// `path` will be used to open a statement database or create a new one if it does not exist.
697	pub fn new_shared<Block, Client, BE>(
698		path: &std::path::Path,
699		config: Config,
700		client: Arc<Client>,
701		keystore: Arc<LocalKeystore>,
702		prometheus: Option<&PrometheusRegistry>,
703		task_spawner: Box<dyn SpawnNamed>,
704	) -> Result<Arc<Store>>
705	where
706		Block: BlockT,
707		Block::Hash: From<BlockHash>,
708		BE: Backend<Block> + 'static,
709		Client: HeaderBackend<Block> + StorageProvider<Block, BE> + Send + Sync + 'static,
710	{
711		let store =
712			Arc::new(Self::new(path, config, client, keystore, prometheus, task_spawner.clone())?);
713
714		// Perform periodic statement store maintenance
715		let worker_store = store.clone();
716		task_spawner.spawn(
717			"statement-store-maintenance",
718			Some("statement-store"),
719			Box::pin(async move {
720				let mut maintenance_interval = tokio::time::interval(MAINTENANCE_PERIOD);
721				let mut enforce_limits_interval = tokio::time::interval(ENFORCE_LIMITS_PERIOD);
722				loop {
723					futures::select! {
724						_ = maintenance_interval.tick().fuse() => {worker_store.maintain();}
725						_ = enforce_limits_interval.tick().fuse() => {worker_store.enforce_limits();}
726					}
727				}
728			}),
729		);
730
731		Ok(store)
732	}
733
734	/// Create a new instance.
735	/// `path` will be used to open a statement database or create a new one if it does not exist.
736	#[doc(hidden)]
737	pub fn new<Block, Client, BE>(
738		path: &std::path::Path,
739		config: Config,
740		client: Arc<Client>,
741		keystore: Arc<LocalKeystore>,
742		prometheus: Option<&PrometheusRegistry>,
743		task_spawner: Box<dyn SpawnNamed>,
744	) -> Result<Store>
745	where
746		Block: BlockT,
747		Block::Hash: From<BlockHash>,
748		BE: Backend<Block> + 'static,
749		Client: HeaderBackend<Block> + StorageProvider<Block, BE> + Send + Sync + 'static,
750	{
751		config.validate()?;
752
753		let mut path: std::path::PathBuf = path.into();
754		path.push("statements");
755
756		let mut db_config = parity_db::Options::with_columns(&path, col::COUNT);
757
758		let statement_col = &mut db_config.columns[col::STATEMENTS as usize];
759		statement_col.ref_counted = false;
760		statement_col.preimage = true;
761		statement_col.uniform = true;
762		let db = parity_db::Db::open_or_create(&db_config).map_err(|e| Error::Db(e.to_string()))?;
763		match db.get(col::META, &KEY_VERSION).map_err(|e| Error::Db(e.to_string()))? {
764			Some(version) => {
765				let version = u32::from_le_bytes(
766					version
767						.try_into()
768						.map_err(|_| Error::Db("Error reading database version".into()))?,
769				);
770				if version != CURRENT_VERSION {
771					return Err(Error::Db(format!("Unsupported database version: {version}")));
772				}
773			},
774			None => {
775				db.commit([(
776					col::META,
777					KEY_VERSION.to_vec(),
778					Some(CURRENT_VERSION.to_le_bytes().to_vec()),
779				)])
780				.map_err(|e| Error::Db(e.to_string()))?;
781			},
782		}
783
784		let storage_reader =
785			ClientWrapper { client, _block: Default::default(), _backend: Default::default() };
786		let read_allowance_fn =
787			Box::new(move |account_id: &AccountId, allowance_block: AllowanceBlock| {
788				storage_reader.read_allowance(account_id, allowance_block)
789			});
790
791		let store = Store {
792			db,
793			submit_index: RwLock::new(SubmitIndex::new(config)),
794			query_index: RwLock::new(QueryIndex::default()),
795			read_allowance_fn,
796			keystore,
797			time_override: None,
798			metrics: PrometheusMetrics::new(prometheus),
799			subscription_manager: SubscriptionsHandle::new(
800				task_spawner.clone(),
801				NUM_FILTER_WORKERS,
802			),
803		};
804		store.populate()?;
805		Ok(store)
806	}
807
808	/// Create memory index from the data.
809	// This may be moved to a background thread if it slows startup too much.
810	// This function should only be used on startup. There should be no other DB operations when
811	// iterating the index.
812	fn populate(&self) -> Result<()> {
813		// Holding both locks here is fine: this runs at startup before any statements are
814		// processed, so there is no contention.
815		{
816			let mut submit_index = self.submit_index.write();
817			let mut query_index = self.query_index.write();
818			self.db
819				.iter_column_while(col::STATEMENTS, |item| {
820					let statement = item.value;
821					if let Ok(statement) = Statement::decode(&mut statement.as_slice()) {
822						let hash = statement.hash();
823						log::trace!(
824							target: LOG_TARGET,
825							"Statement loaded {:?}",
826							HexDisplay::from(&hash)
827						);
828						if let Some(account_id) = statement.account_id() {
829							submit_index.insert_new(hash, account_id, &statement);
830							query_index.insert(hash, &statement);
831						} else {
832							log::debug!(
833								target: LOG_TARGET,
834								"Error decoding statement loaded from the DB: {:?}",
835								HexDisplay::from(&hash)
836							);
837						}
838					}
839					true
840				})
841				.map_err(|e| Error::Db(e.to_string()))?;
842			self.db
843				.iter_column_while(col::EXPIRED, |item| {
844					let expired_info = item.value;
845					if let Ok((hash, timestamp)) =
846						<(Hash, u64)>::decode(&mut expired_info.as_slice())
847					{
848						log::trace!(
849							target: LOG_TARGET,
850							"Statement loaded (expired): {:?}",
851							HexDisplay::from(&hash)
852						);
853						submit_index.insert_expired(hash, timestamp);
854					}
855					true
856				})
857				.map_err(|e| Error::Db(e.to_string()))?;
858		}
859
860		self.maintain();
861		Ok(())
862	}
863
864	fn collect_statements_locked<R>(
865		&self,
866		key: Option<DecryptionKey>,
867		topic_filter: &OptimizedTopicFilter,
868		query_index: &QueryIndex,
869		result: &mut Vec<R>,
870		mut f: impl FnMut(Statement) -> Option<R>,
871	) -> Result<()> {
872		query_index.iterate_with(key, topic_filter, |hash| {
873			match self.db.get(col::STATEMENTS, hash).map_err(|e| Error::Db(e.to_string()))? {
874				Some(entry) => {
875					if let Ok(statement) = Statement::decode(&mut entry.as_slice()) {
876						if let Some(data) = f(statement) {
877							result.push(data);
878						}
879					} else {
880						// DB inconsistency
881						log::warn!(
882							target: LOG_TARGET,
883							"Corrupt statement {:?}",
884							HexDisplay::from(hash)
885						);
886					}
887				},
888				None => {
889					// DB inconsistency
890					log::debug!(
891						target: LOG_TARGET,
892						"Missing statement {:?}",
893						HexDisplay::from(hash)
894					);
895				},
896			}
897			Ok(())
898		})?;
899		Ok(())
900	}
901
902	fn collect_statements<R>(
903		&self,
904		key: Option<DecryptionKey>,
905		topic_filter: &OptimizedTopicFilter,
906		f: impl FnMut(Statement) -> Option<R>,
907	) -> Result<Vec<R>> {
908		let mut result = Vec::new();
909		let query_index = self.query_index.read();
910		self.collect_statements_locked(key, topic_filter, &query_index, &mut result, f)?;
911		Ok(result)
912	}
913
914	// Collects expired and over-allowance statement hashes for a single account.
915	fn collect_evictions(
916		&self,
917		account: &AccountId,
918		account_rec: &StatementsForAccount,
919		current_time: u64,
920	) -> Vec<Hash> {
921		let mut to_evict = Vec::new();
922		let mut expired_count = 0usize;
923		let mut expired_size = 0usize;
924		for (key, (_, len)) in account_rec.expired_by_iter(current_time) {
925			to_evict.push(key.hash);
926			expired_count += 1;
927			expired_size += len;
928		}
929
930		// Enforce allowances for remaining (non-expired) statements, we use the finalized block to
931		// make sure we enforce allowances based on the correct chain state.
932		let allowance = match (self.read_allowance_fn)(account, AllowanceBlock::Finalized) {
933			Ok(Some(allowance)) => allowance,
934			Ok(None) => {
935				log::debug!(
936					target: LOG_TARGET,
937					"No allowance found for account {:?}, treating as zero allowance",
938					HexDisplay::from(account)
939				);
940				StatementAllowance { max_count: 0, max_size: 0 }
941			},
942			Err(e) => {
943				log::error!(target: LOG_TARGET, "Error reading allowance: {:?}", e);
944				// Skip allowance enforcement for this account on error
945				return to_evict;
946			},
947		};
948
949		// Calculate remaining count and size after expiring statements
950		let mut remaining_count = account_rec.by_priority.len() - expired_count;
951		let mut remaining_size = account_rec.data_size - expired_size;
952
953		// Evict lowest priority statements that exceed allowance
954		if remaining_count > allowance.max_count as usize ||
955			remaining_size > allowance.max_size as usize
956		{
957			log::debug!(
958				target: LOG_TARGET,
959				"Account {:?} exceeds allowance: count={}/{}, size={}/{}",
960				HexDisplay::from(account),
961				remaining_count,
962				allowance.max_count,
963				remaining_size,
964				allowance.max_size
965			);
966
967			// Skip expired statements (they're at the beginning due to BTreeMap ordering)
968			for (key, (_, len)) in account_rec.by_priority.iter().skip(expired_count) {
969				if remaining_count <= allowance.max_count as usize &&
970					remaining_size <= allowance.max_size as usize
971				{
972					break;
973				}
974				to_evict.push(key.hash);
975				remaining_count -= 1;
976				remaining_size -= len;
977				log::debug!(
978					target: LOG_TARGET,
979					"Evicting statement {:?} due to allowance enforcement",
980					HexDisplay::from(&key.hash)
981				);
982			}
983		}
984
985		to_evict
986	}
987
988	// Checks for expired statements and enforces allowances, marking violating statements
989	// as expired in the index.
990	//
991	// This function performs incremental checking to avoid blocking the store for too long.
992	// It processes accounts in batches and stops when any of these limits are reached:
993	// - `MAX_EXPIRY_STATEMENTS_PER_ITERATION` statements found to expire/evict
994	// - `MAX_EXPIRY_ACCOUNTS_PER_ITERATION` accounts checked
995	// - `MAX_EXPIRY_TIME_MS_PER_ITERATION` milliseconds elapsed
996	//
997	// The function maintains a list of accounts to check (`accounts_to_check_for_expiry_stmts`).
998	// When this list is empty, it repopulates it with all current accounts and returns early,
999	// deferring the actual check to the next call. This ensures the process eventually covers
1000	// all accounts across multiple invocations.
1001	//
1002	// Statements are considered expired when their priority (which encodes the expiration
1003	// timestamp in the upper 32 bits) is less than the current timestamp.
1004	fn enforce_limits(&self) {
1005		let _start_check_expiration_timer = self.metrics.start_check_expiration_timer();
1006		let current_time = self.timestamp();
1007
1008		let (to_evict, num_accounts_checked) = {
1009			let submit_index = self.submit_index.upgradable_read();
1010			if submit_index.accounts_to_check_for_expiry_stmts.is_empty() {
1011				let existing_accounts = submit_index.accounts.keys().cloned().collect::<Vec<_>>();
1012				let mut submit_index = RwLockUpgradableReadGuard::upgrade(submit_index);
1013				submit_index.accounts_to_check_for_expiry_stmts = existing_accounts;
1014				return;
1015			}
1016
1017			let mut to_evict = Vec::new();
1018			let mut num_accounts_checked = 0;
1019			let start = Instant::now();
1020
1021			for account in submit_index.accounts_to_check_for_expiry_stmts.iter().rev() {
1022				num_accounts_checked += 1;
1023				if let Some(account_rec) = submit_index.accounts.get(account) {
1024					to_evict.extend(self.collect_evictions(account, account_rec, current_time));
1025				}
1026
1027				if to_evict.len() >= MAX_EXPIRY_STATEMENTS_PER_ITERATION ||
1028					num_accounts_checked >= MAX_EXPIRY_ACCOUNTS_PER_ITERATION ||
1029					start.elapsed() >= MAX_EXPIRY_TIME_PER_ITERATION
1030				{
1031					break;
1032				}
1033			}
1034
1035			(to_evict, num_accounts_checked)
1036		};
1037
1038		let mut expired = 0;
1039
1040		for hash in to_evict {
1041			if let Err(e) = self.remove(&hash) {
1042				log::debug!(
1043					target: LOG_TARGET,
1044					"Error marking statement {:?} as expired: {:?}",
1045					HexDisplay::from(&hash),
1046					e
1047				);
1048			} else {
1049				expired += 1;
1050				log::trace!(
1051					target: LOG_TARGET,
1052					"Marked statement {:?} as expired",
1053					HexDisplay::from(&hash)
1054				);
1055			}
1056		}
1057
1058		let mut submit_index = self.submit_index.write();
1059		let new_len = submit_index
1060			.accounts_to_check_for_expiry_stmts
1061			.len()
1062			.saturating_sub(num_accounts_checked);
1063		submit_index.accounts_to_check_for_expiry_stmts.truncate(new_len);
1064
1065		drop(_start_check_expiration_timer);
1066
1067		self.metrics.report(|metrics| {
1068			metrics.statements_expired_total.inc_by(expired);
1069		});
1070	}
1071
1072	/// Perform periodic store maintenance
1073	pub fn maintain(&self) {
1074		log::trace!(target: LOG_TARGET, "Started store maintenance");
1075		let (
1076			deleted,
1077			active_count,
1078			expired_count,
1079			total_size,
1080			accounts_count,
1081			capacity_statements,
1082			capacity_bytes,
1083		): (Vec<_>, usize, usize, usize, usize, usize, usize) = {
1084			let mut submit_index = self.submit_index.write();
1085			let deleted = submit_index.maintain(self.timestamp());
1086			(
1087				deleted,
1088				submit_index.entries.len(),
1089				submit_index.evicted.len(),
1090				submit_index.total_size,
1091				submit_index.accounts.len(),
1092				submit_index.config.max_total_statements,
1093				submit_index.config.max_total_size,
1094			)
1095		};
1096		let deleted: Vec<_> =
1097			deleted.into_iter().map(|hash| (col::EXPIRED, hash.to_vec(), None)).collect();
1098		let deleted_count = deleted.len() as u64;
1099		if let Err(e) = self.db.commit(deleted) {
1100			log::warn!(target: LOG_TARGET, "Error writing to the statement database: {:?}", e);
1101		} else {
1102			self.metrics.report(|metrics| metrics.statements_pruned.inc_by(deleted_count));
1103		}
1104
1105		self.metrics.report(|metrics| {
1106			metrics.statements_total.set(active_count as u64);
1107			metrics.bytes_total.set(total_size as u64);
1108			metrics.accounts_total.set(accounts_count as u64);
1109			metrics.expired_total.set(expired_count as u64);
1110			metrics.capacity_statements.set(capacity_statements as u64);
1111			metrics.capacity_bytes.set(capacity_bytes as u64);
1112		});
1113
1114		log::trace!(
1115			target: LOG_TARGET,
1116			"Completed store maintenance. Purged: {}, Active: {}, Expired: {}",
1117			deleted_count,
1118			active_count,
1119			expired_count
1120		);
1121	}
1122
1123	fn timestamp(&self) -> u64 {
1124		self.time_override.unwrap_or_else(|| {
1125			std::time::SystemTime::now()
1126				.duration_since(std::time::UNIX_EPOCH)
1127				.unwrap_or_default()
1128				.as_secs()
1129		})
1130	}
1131
1132	#[cfg(test)]
1133	fn set_time(&mut self, time: u64) {
1134		self.time_override = Some(time);
1135	}
1136
1137	/// Returns `self` as [`StatementStoreExt`].
1138	pub fn as_statement_store_ext(self: Arc<Self>) -> StatementStoreExt {
1139		StatementStoreExt::new(self)
1140	}
1141
1142	/// Return information of all known statements whose decryption key is identified as
1143	/// `dest`. The key must be available to the client.
1144	fn posted_clear_inner<R>(
1145		&self,
1146		match_all_topics: &[Topic],
1147		dest: [u8; 32],
1148		// Map the statement and the decrypted data to the desired result.
1149		mut map_f: impl FnMut(Statement, Vec<u8>) -> R,
1150	) -> Result<Vec<R>> {
1151		self.collect_statements(
1152			Some(dest),
1153			&OptimizedTopicFilter::MatchAll(match_all_topics.iter().cloned().collect()),
1154			|statement| {
1155				if let (Some(key), Some(_)) = (statement.decryption_key(), statement.data()) {
1156					let public: sp_core::ed25519::Public = UncheckedFrom::unchecked_from(key);
1157					let public: sp_statement_store::ed25519::Public = public.into();
1158					match self.keystore.key_pair::<sp_statement_store::ed25519::Pair>(&public) {
1159						Err(e) => {
1160							log::debug!(
1161								target: LOG_TARGET,
1162								"Keystore error: {:?}, for statement {:?}",
1163								e,
1164								HexDisplay::from(&statement.hash())
1165							);
1166							None
1167						},
1168						Ok(None) => {
1169							log::debug!(
1170								target: LOG_TARGET,
1171								"Keystore is missing key for statement {:?}",
1172								HexDisplay::from(&statement.hash())
1173							);
1174							None
1175						},
1176						Ok(Some(pair)) => match statement.decrypt_private(&pair.into_inner()) {
1177							Ok(r) => r.map(|data| map_f(statement, data)),
1178							Err(e) => {
1179								log::debug!(
1180									target: LOG_TARGET,
1181									"Decryption error: {:?}, for statement {:?}",
1182									e,
1183									HexDisplay::from(&statement.hash())
1184								);
1185								None
1186							},
1187						},
1188					}
1189				} else {
1190					None
1191				}
1192			},
1193		)
1194	}
1195}
1196
1197impl StatementStore for Store {
1198	/// Return all statements.
1199	fn statements(&self) -> Result<Vec<(Hash, Statement)>> {
1200		let query_index = self.query_index.read();
1201		let mut result = Vec::with_capacity(query_index.topics_and_keys.len());
1202		for hash in query_index.topics_and_keys.keys().cloned() {
1203			let Some(encoded) =
1204				self.db.get(col::STATEMENTS, &hash).map_err(|e| Error::Db(e.to_string()))?
1205			else {
1206				continue;
1207			};
1208			if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) {
1209				result.push((hash, statement));
1210			}
1211		}
1212		Ok(result)
1213	}
1214
1215	fn take_recent_statements(&self) -> Result<Vec<(Hash, Statement)>> {
1216		let mut query_index = self.query_index.write();
1217		let recent = query_index.take_recent();
1218		let mut result = Vec::with_capacity(recent.len());
1219		for hash in recent {
1220			let Some(encoded) =
1221				self.db.get(col::STATEMENTS, &hash).map_err(|e| Error::Db(e.to_string()))?
1222			else {
1223				continue;
1224			};
1225			if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) {
1226				result.push((hash, statement));
1227			}
1228		}
1229		Ok(result)
1230	}
1231
1232	/// Returns a statement by hash.
1233	fn statement(&self, hash: &Hash) -> Result<Option<Statement>> {
1234		Ok(
1235			match self
1236				.db
1237				.get(col::STATEMENTS, hash.as_slice())
1238				.map_err(|e| Error::Db(e.to_string()))?
1239			{
1240				Some(entry) => {
1241					log::trace!(
1242						target: LOG_TARGET,
1243						"Queried statement {:?}",
1244						HexDisplay::from(hash)
1245					);
1246					Some(
1247						Statement::decode(&mut entry.as_slice())
1248							.map_err(|e| Error::Decode(e.to_string()))?,
1249					)
1250				},
1251				None => {
1252					log::trace!(
1253						target: LOG_TARGET,
1254						"Queried missing statement {:?}",
1255						HexDisplay::from(hash)
1256					);
1257					None
1258				},
1259			},
1260		)
1261	}
1262
1263	fn has_statement(&self, hash: &Hash) -> bool {
1264		self.query_index.read().topics_and_keys.contains_key(hash)
1265	}
1266
1267	fn statement_hashes(&self) -> Vec<Hash> {
1268		self.query_index.read().topics_and_keys.keys().cloned().collect()
1269	}
1270
1271	fn statements_by_hashes(
1272		&self,
1273		hashes: &[Hash],
1274		filter: &mut dyn FnMut(&Hash, &[u8], &Statement) -> FilterDecision,
1275	) -> Result<(Vec<(Hash, Statement)>, usize)> {
1276		let mut result = Vec::new();
1277		let mut processed = 0;
1278		for hash in hashes {
1279			processed += 1;
1280			let Some(encoded) =
1281				self.db.get(col::STATEMENTS, hash).map_err(|e| Error::Db(e.to_string()))?
1282			else {
1283				continue;
1284			};
1285			let Ok(statement) = Statement::decode(&mut encoded.as_slice()) else { continue };
1286			match filter(hash, &encoded, &statement) {
1287				FilterDecision::Skip => {},
1288				FilterDecision::Take => {
1289					result.push((*hash, statement));
1290				},
1291				FilterDecision::Abort => {
1292					// We did not process it :)
1293					processed -= 1;
1294					break;
1295				},
1296			}
1297		}
1298
1299		Ok((result, processed))
1300	}
1301
1302	/// Return the data of all known statements which include all topics and have no `DecryptionKey`
1303	/// field.
1304	fn broadcasts(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>> {
1305		self.collect_statements(
1306			None,
1307			&OptimizedTopicFilter::MatchAll(match_all_topics.iter().cloned().collect()),
1308			|statement| statement.into_data(),
1309		)
1310	}
1311
1312	/// Return the data of all known statements whose decryption key is identified as `dest` (this
1313	/// will generally be the public key or a hash thereof for symmetric ciphers, or a hash of the
1314	/// private key for symmetric ciphers).
1315	fn posted(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
1316		self.collect_statements(
1317			Some(dest),
1318			&OptimizedTopicFilter::MatchAll(match_all_topics.iter().cloned().collect()),
1319			|statement| statement.into_data(),
1320		)
1321	}
1322
1323	/// Return the decrypted data of all known statements whose decryption key is identified as
1324	/// `dest`. The key must be available to the client.
1325	fn posted_clear(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
1326		self.posted_clear_inner(match_all_topics, dest, |_statement, data| data)
1327	}
1328
1329	/// Return all known statements which include all topics and have no `DecryptionKey`
1330	/// field.
1331	fn broadcasts_stmt(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>> {
1332		self.collect_statements(
1333			None,
1334			&OptimizedTopicFilter::MatchAll(match_all_topics.iter().cloned().collect()),
1335			|statement| Some(statement.encode()),
1336		)
1337	}
1338
1339	/// Return all known statements whose decryption key is identified as `dest` (this
1340	/// will generally be the public key or a hash thereof for symmetric ciphers, or a hash of the
1341	/// private key for symmetric ciphers).
1342	fn posted_stmt(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
1343		self.collect_statements(
1344			Some(dest),
1345			&OptimizedTopicFilter::MatchAll(match_all_topics.iter().cloned().collect()),
1346			|statement| Some(statement.encode()),
1347		)
1348	}
1349
1350	/// Return the statement and the decrypted data of all known statements whose decryption key is
1351	/// identified as `dest`. The key must be available to the client.
1352	fn posted_clear_stmt(
1353		&self,
1354		match_all_topics: &[Topic],
1355		dest: [u8; 32],
1356	) -> Result<Vec<Vec<u8>>> {
1357		self.posted_clear_inner(match_all_topics, dest, |statement, data| {
1358			let mut res = Vec::with_capacity(statement.size_hint() + data.len());
1359			statement.encode_to(&mut res);
1360			res.extend_from_slice(&data);
1361			res
1362		})
1363	}
1364
1365	/// Submit a statement to the store. Validates the statement and returns validation result.
1366	fn submit(&self, statement: Statement, source: StatementSource) -> SubmitResult {
1367		let _histogram_submit_start_timer = self.metrics.start_submit_timer();
1368		let hash = statement.hash();
1369		// Get unix timestamp
1370		if self.timestamp() >= statement.get_expiration_timestamp_secs().into() {
1371			log::debug!(
1372				target: LOG_TARGET,
1373				"Statement is already expired: {:?}",
1374				HexDisplay::from(&hash),
1375			);
1376			let reason = InvalidReason::AlreadyExpired;
1377			self.metrics.report(|metrics| {
1378				metrics.validations_invalid.with_label_values(&[reason.label()]).inc();
1379			});
1380			return SubmitResult::Invalid(reason);
1381		}
1382		let encoded_size = statement.encoded_size();
1383		if encoded_size > MAX_STATEMENT_SIZE {
1384			log::debug!(
1385				target: LOG_TARGET,
1386				"Statement is too big for propogation: {:?} ({}/{} bytes)",
1387				HexDisplay::from(&hash),
1388				statement.encoded_size(),
1389				MAX_STATEMENT_SIZE
1390			);
1391			let reason = InvalidReason::EncodingTooLarge {
1392				submitted_size: encoded_size,
1393				max_size: MAX_STATEMENT_SIZE,
1394			};
1395			self.metrics.report(|metrics| {
1396				metrics.validations_invalid.with_label_values(&[reason.label()]).inc();
1397			});
1398			return SubmitResult::Invalid(reason);
1399		}
1400
1401		match self.submit_index.read().query(&hash) {
1402			IndexQuery::Expired => {
1403				if !source.can_be_resubmitted() {
1404					self.metrics.report(|metrics| {
1405						metrics.known_statements.with_label_values(&["known_expired"]).inc();
1406					});
1407					return SubmitResult::KnownExpired;
1408				}
1409			},
1410			IndexQuery::Exists => {
1411				if !source.can_be_resubmitted() {
1412					self.metrics.report(|metrics| {
1413						metrics.known_statements.with_label_values(&["known"]).inc();
1414					});
1415					return SubmitResult::Known;
1416				}
1417			},
1418			IndexQuery::Unknown => {},
1419		}
1420
1421		let Some(account_id) = statement.account_id() else {
1422			log::debug!(
1423				target: LOG_TARGET,
1424				"Statement validation failed: Missing proof ({:?})",
1425				HexDisplay::from(&hash),
1426			);
1427			let reason = InvalidReason::NoProof;
1428			self.metrics.report(|metrics| {
1429				metrics.validations_invalid.with_label_values(&[reason.label()]).inc();
1430			});
1431			return SubmitResult::Invalid(reason);
1432		};
1433
1434		match statement.verify_signature() {
1435			SignatureVerificationResult::Valid(_) => {},
1436			SignatureVerificationResult::Invalid => {
1437				log::debug!(
1438					target: LOG_TARGET,
1439					"Statement validation failed: BadProof, {:?}",
1440					HexDisplay::from(&hash),
1441				);
1442				let reason = InvalidReason::BadProof;
1443				self.metrics.report(|metrics| {
1444					metrics.validations_invalid.with_label_values(&[reason.label()]).inc();
1445				});
1446				return SubmitResult::Invalid(reason);
1447			},
1448			SignatureVerificationResult::NoSignature => {
1449				log::debug!(
1450					target: LOG_TARGET,
1451					"Statement validation failed: NoProof, {:?}",
1452					HexDisplay::from(&hash),
1453				);
1454				let reason = InvalidReason::NoProof;
1455				self.metrics.report(|metrics| {
1456					metrics.validations_invalid.with_label_values(&[reason.label()]).inc();
1457				});
1458				return SubmitResult::Invalid(reason);
1459			},
1460		};
1461
1462		// Check statement allowance for the account and evict statements if necessary to make room
1463		// for the new statement. We use the best block for allowance checks to allow for more
1464		// up-to-date allowances. This means that in some cases, a statement may be accepted but
1465		// then later evicted when we enforce limits based on the finalized block, if the best_hash
1466		// does not make it into the finalized chain, but this is an acceptable tradeoff for
1467		// better responsiveness to allowance changes.
1468		let validation = match (self.read_allowance_fn)(&account_id, AllowanceBlock::Best) {
1469			Ok(Some(allowance)) => allowance,
1470			Ok(None) => {
1471				log::debug!(
1472					target: LOG_TARGET,
1473					"Account {} has no statement allowance set",
1474					HexDisplay::from(&account_id),
1475				);
1476				let reason = RejectionReason::NoAllowance;
1477				self.metrics.report(|metrics| {
1478					metrics.rejections.with_label_values(&[reason.label()]).inc();
1479				});
1480				return SubmitResult::Rejected(reason);
1481			},
1482			Err(e) => {
1483				log::debug!(
1484					target: LOG_TARGET,
1485					"Reading statement allowance for account {} failed",
1486					HexDisplay::from(&account_id),
1487				);
1488				self.metrics.report(|metrics| {
1489					metrics.internal_errors.with_label_values(&["read_allowance"]).inc();
1490				});
1491				return SubmitResult::InternalError(e);
1492			},
1493		};
1494
1495		let current_time = self.timestamp();
1496		let evicted = {
1497			let mut submit_index = self.submit_index.write();
1498
1499			let evicted =
1500				match submit_index.insert(hash, &statement, &account_id, &validation, current_time)
1501				{
1502					Ok(evicted) => evicted,
1503					Err(reason) => {
1504						self.metrics.report(|metrics| {
1505							metrics.rejections.with_label_values(&[reason.label()]).inc();
1506						});
1507						return SubmitResult::Rejected(reason);
1508					},
1509				};
1510
1511			let mut commit = Vec::new();
1512			commit.push((col::STATEMENTS, hash.to_vec(), Some(statement.encode())));
1513			for h in &evicted {
1514				commit.push((col::STATEMENTS, h.to_vec(), None));
1515				if submit_index.evicted.contains(h) {
1516					commit.push((col::EXPIRED, h.to_vec(), Some((h, current_time).encode())));
1517				}
1518			}
1519			if let Err(e) = self.db.commit(commit) {
1520				log::debug!(
1521					target: LOG_TARGET,
1522					"Statement validation failed: database error {}, {:?}",
1523					e,
1524					statement
1525				);
1526				self.metrics.report(|metrics| {
1527					metrics.internal_errors.with_label_values(&["db_commit"]).inc();
1528				});
1529				return SubmitResult::InternalError(Error::Db(e.to_string()));
1530			}
1531			evicted
1532		}; // Release submit index lock
1533		{
1534			let mut query_index = self.query_index.write();
1535			for h in &evicted {
1536				query_index.remove(h);
1537			}
1538			query_index.insert(hash, &statement);
1539			query_index.recent.insert(hash);
1540			self.subscription_manager.notify(statement);
1541		} // Release query index lock
1542		self.metrics.report(|metrics| metrics.submitted_statements.inc());
1543		log::trace!(target: LOG_TARGET, "Statement submitted: {:?}", HexDisplay::from(&hash));
1544		SubmitResult::New
1545	}
1546
1547	/// Remove a statement by hash.
1548	fn remove(&self, hash: &Hash) -> Result<()> {
1549		let current_time = self.timestamp();
1550		let was_expired = {
1551			let mut submit_index = self.submit_index.write();
1552			if submit_index.make_expired(hash, current_time) {
1553				let mut commit = vec![(col::STATEMENTS, hash.to_vec(), None)];
1554				if submit_index.evicted.contains(hash) {
1555					commit.push((col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())));
1556				}
1557				if let Err(e) = self.db.commit(commit) {
1558					log::debug!(
1559						target: LOG_TARGET,
1560						"Error removing statement: database error {}, {:?}",
1561						e,
1562						HexDisplay::from(hash),
1563					);
1564					return Err(Error::Db(e.to_string()));
1565				}
1566				true
1567			} else {
1568				false
1569			}
1570		};
1571		if was_expired {
1572			let mut query_index = self.query_index.write();
1573			query_index.remove(hash);
1574		}
1575		Ok(())
1576	}
1577
1578	/// Remove all statements by an account.
1579	fn remove_by(&self, who: [u8; 32]) -> Result<()> {
1580		let evicted = {
1581			let mut submit_index = self.submit_index.write();
1582			let mut evicted = Vec::new();
1583			if let Some(account_rec) = submit_index.accounts.get(&who) {
1584				evicted.extend(account_rec.by_priority.keys().map(|k| k.hash));
1585			}
1586
1587			let current_time = self.timestamp();
1588			let mut commit = Vec::new();
1589			for hash in &evicted {
1590				submit_index.make_expired(hash, current_time);
1591				commit.push((col::STATEMENTS, hash.to_vec(), None));
1592				if submit_index.evicted.contains(hash) {
1593					commit.push((col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())));
1594				}
1595			}
1596			self.db.commit(commit).map_err(|e| {
1597				log::debug!(
1598					target: LOG_TARGET,
1599					"Error removing statement: database error {}, remove by {:?}",
1600					e,
1601					HexDisplay::from(&who),
1602				);
1603
1604				Error::Db(e.to_string())
1605			})?;
1606			evicted
1607		};
1608		if !evicted.is_empty() {
1609			let mut query_index = self.query_index.write();
1610			for hash in &evicted {
1611				query_index.remove(hash);
1612			}
1613		}
1614		Ok(())
1615	}
1616}
1617
1618impl StatementStoreSubscriptionApi for Store {
1619	fn subscribe_statement(
1620		&self,
1621		topic_filter: OptimizedTopicFilter,
1622	) -> Result<(Vec<Vec<u8>>, async_channel::Sender<StatementEvent>, SubscriptionStatementsStream)>
1623	{
1624		// Keep the query index read lock until after we have subscribed to avoid missing
1625		// statements.
1626		let mut existing_statements = Vec::new();
1627		let query_index = self.query_index.read();
1628		self.collect_statements_locked(
1629			None,
1630			&topic_filter,
1631			&query_index,
1632			&mut existing_statements,
1633			|statement| Some(statement.encode()),
1634		)?;
1635		let (subscription_sender, subscription_stream) =
1636			self.subscription_manager.subscribe(topic_filter);
1637		if existing_statements.is_empty() {
1638			subscription_sender
1639				.send_blocking(StatementEvent::NewStatements {
1640					statements: vec![],
1641					remaining: Some(0),
1642				})
1643				.ok();
1644		}
1645		Ok((existing_statements, subscription_sender, subscription_stream))
1646	}
1647}
1648
1649#[cfg(test)]
1650mod tests {
1651
1652	use crate::{col, Store};
1653	use sc_keystore::Keystore;
1654	use sp_core::{Decode, Encode, Pair};
1655	use sp_statement_store::{
1656		AccountId, Channel, DecryptionKey, InvalidReason, Proof, RejectionReason, Statement,
1657		StatementSource, StatementStore, SubmitResult, Topic,
1658	};
1659
1660	type Extrinsic = sp_runtime::OpaqueExtrinsic;
1661	type Hash = sp_core::H256;
1662	type Hashing = sp_runtime::traits::BlakeTwo256;
1663	type BlockNumber = u64;
1664	type Header = sp_runtime::generic::Header<BlockNumber, Hashing>;
1665	type Block = sp_runtime::generic::Block<Header, Extrinsic>;
1666
1667	const TEST_BEST_BLOCK_HASH: [u8; 32] = [1u8; 32];
1668
1669	/// Maximum seed value used by `account(seed)`/`statement(seed, ...)` in this
1670	/// test module. Increase if you add tests that pass larger seed values to
1671	/// `statement(..)`. The reverse-lookup table in `TestClient::storage` is
1672	/// populated lazily for seeds in `0..=MAX_TEST_ACCOUNT_SEED`.
1673	const MAX_TEST_ACCOUNT_SEED: u64 = 64;
1674
1675	/// Reverse-lookup table from a real sr25519 public key back to the synthetic
1676	/// `u64` seed it was derived from. Populated once with seeds in
1677	/// `0..=MAX_TEST_ACCOUNT_SEED`, then consulted by `TestClient::storage` to
1678	/// figure out which allowance bucket to return for a given account.
1679	fn account_seed_table() -> &'static std::collections::BTreeMap<AccountId, u64> {
1680		use std::sync::OnceLock;
1681		static TABLE: OnceLock<std::collections::BTreeMap<AccountId, u64>> = OnceLock::new();
1682		TABLE.get_or_init(|| {
1683			let mut t = std::collections::BTreeMap::new();
1684			for seed in 0..=MAX_TEST_ACCOUNT_SEED {
1685				t.insert(account_keypair(seed).public().0, seed);
1686			}
1687			t
1688		})
1689	}
1690
1691	#[derive(Clone)]
1692	pub(crate) struct TestClient;
1693
1694	pub(crate) type TestBackend = sc_client_api::in_mem::Backend<Block>;
1695
1696	impl sc_client_api::StorageProvider<Block, TestBackend> for TestClient {
1697		fn storage(
1698			&self,
1699			_hash: Hash,
1700			key: &sc_client_api::StorageKey,
1701		) -> sp_blockchain::Result<Option<sc_client_api::StorageData>> {
1702			use sp_statement_store::StatementAllowance;
1703
1704			assert_eq!(&key.0[0..21], b":statement_allowance:" as &[u8],);
1705
1706			// Recover the synthetic test seed from the account id. Unknown accounts
1707			// (e.g. //Alice for `signed_statement`) fall through to a generic default.
1708			let account_bytes: AccountId = key.0[21..53].try_into().unwrap();
1709			let seed = account_seed_table().get(&account_bytes).copied();
1710			let allowance = match seed {
1711				// Account 0 has no allowance (used to test eviction of all statements)
1712				Some(0) => return Ok(None),
1713				Some(1) => StatementAllowance::new(1, 1000),
1714				Some(2) => StatementAllowance::new(2, 1000),
1715				Some(3) => StatementAllowance::new(3, 1000),
1716				Some(4) => StatementAllowance::new(4, 1000),
1717				Some(42) => StatementAllowance::new(42, (42 * crate::MAX_STATEMENT_SIZE) as u32),
1718				Some(_) | None => StatementAllowance::new(100, 1000),
1719			};
1720			Ok(Some(sc_client_api::StorageData(allowance.encode())))
1721		}
1722
1723		fn storage_hash(
1724			&self,
1725			_hash: Hash,
1726			_key: &sc_client_api::StorageKey,
1727		) -> sp_blockchain::Result<Option<Hash>> {
1728			unimplemented!()
1729		}
1730
1731		fn storage_keys(
1732			&self,
1733			_hash: Hash,
1734			_prefix: Option<&sc_client_api::StorageKey>,
1735			_start_key: Option<&sc_client_api::StorageKey>,
1736		) -> sp_blockchain::Result<
1737			sc_client_api::backend::KeysIter<
1738				<TestBackend as sc_client_api::Backend<Block>>::State,
1739				Block,
1740			>,
1741		> {
1742			unimplemented!()
1743		}
1744
1745		fn storage_pairs(
1746			&self,
1747			_hash: Hash,
1748			_prefix: Option<&sc_client_api::StorageKey>,
1749			_start_key: Option<&sc_client_api::StorageKey>,
1750		) -> sp_blockchain::Result<
1751			sc_client_api::backend::PairsIter<
1752				<TestBackend as sc_client_api::Backend<Block>>::State,
1753				Block,
1754			>,
1755		> {
1756			unimplemented!()
1757		}
1758
1759		fn child_storage(
1760			&self,
1761			_hash: Hash,
1762			_child_info: &sc_client_api::ChildInfo,
1763			_key: &sc_client_api::StorageKey,
1764		) -> sp_blockchain::Result<Option<sc_client_api::StorageData>> {
1765			unimplemented!()
1766		}
1767
1768		fn child_storage_keys(
1769			&self,
1770			_hash: Hash,
1771			_child_info: sc_client_api::ChildInfo,
1772			_prefix: Option<&sc_client_api::StorageKey>,
1773			_start_key: Option<&sc_client_api::StorageKey>,
1774		) -> sp_blockchain::Result<
1775			sc_client_api::backend::KeysIter<
1776				<TestBackend as sc_client_api::Backend<Block>>::State,
1777				Block,
1778			>,
1779		> {
1780			unimplemented!()
1781		}
1782
1783		fn child_storage_hash(
1784			&self,
1785			_hash: Hash,
1786			_child_info: &sc_client_api::ChildInfo,
1787			_key: &sc_client_api::StorageKey,
1788		) -> sp_blockchain::Result<Option<Hash>> {
1789			unimplemented!()
1790		}
1791
1792		fn closest_merkle_value(
1793			&self,
1794			_hash: Hash,
1795			_key: &sc_client_api::StorageKey,
1796		) -> sp_blockchain::Result<Option<sc_client_api::MerkleValue<Hash>>> {
1797			unimplemented!()
1798		}
1799
1800		fn child_closest_merkle_value(
1801			&self,
1802			_hash: Hash,
1803			_child_info: &sc_client_api::ChildInfo,
1804			_key: &sc_client_api::StorageKey,
1805		) -> sp_blockchain::Result<Option<sc_client_api::MerkleValue<Hash>>> {
1806			unimplemented!()
1807		}
1808	}
1809
1810	impl sp_blockchain::HeaderBackend<Block> for TestClient {
1811		fn header(&self, _hash: Hash) -> sp_blockchain::Result<Option<Header>> {
1812			unimplemented!()
1813		}
1814		fn info(&self) -> sp_blockchain::Info<Block> {
1815			sp_blockchain::Info {
1816				best_hash: TEST_BEST_BLOCK_HASH.into(),
1817				best_number: 0,
1818				genesis_hash: Default::default(),
1819				finalized_hash: TEST_BEST_BLOCK_HASH.into(),
1820				finalized_number: 1,
1821				finalized_state: None,
1822				number_leaves: 0,
1823				block_gap: None,
1824			}
1825		}
1826		fn status(&self, _hash: Hash) -> sp_blockchain::Result<sp_blockchain::BlockStatus> {
1827			unimplemented!()
1828		}
1829		fn number(&self, _hash: Hash) -> sp_blockchain::Result<Option<BlockNumber>> {
1830			unimplemented!()
1831		}
1832		fn hash(&self, _number: BlockNumber) -> sp_blockchain::Result<Option<Hash>> {
1833			unimplemented!()
1834		}
1835	}
1836
1837	fn test_store() -> (Store, tempfile::TempDir) {
1838		sp_tracing::init_for_tests();
1839		let temp_dir = tempfile::Builder::new().tempdir().expect("Error creating test dir");
1840
1841		let client = std::sync::Arc::new(TestClient);
1842		let mut path: std::path::PathBuf = temp_dir.path().into();
1843		path.push("db");
1844		let keystore = std::sync::Arc::new(sc_keystore::LocalKeystore::in_memory());
1845		let store = Store::new::<Block, TestClient, TestBackend>(
1846			&path,
1847			Default::default(),
1848			client,
1849			keystore,
1850			None,
1851			Box::new(sp_core::testing::TaskExecutor::new()),
1852		)
1853		.unwrap();
1854		(store, temp_dir) // return order is important. Store must be dropped before TempDir
1855	}
1856
1857	pub fn signed_statement(data: u8) -> Statement {
1858		signed_statement_with_topics(data, &[], None)
1859	}
1860
1861	fn signed_statement_with_topics(
1862		data: u8,
1863		topics: &[Topic],
1864		dec_key: Option<DecryptionKey>,
1865	) -> Statement {
1866		let mut statement = Statement::new();
1867		statement.set_plain_data(vec![data]);
1868		statement.set_expiry(u64::MAX);
1869
1870		for i in 0..topics.len() {
1871			statement.set_topic(i, topics[i]);
1872		}
1873		if let Some(key) = dec_key {
1874			statement.set_decryption_key(key);
1875		}
1876		let kp = sp_core::ed25519::Pair::from_string("//Alice", None).unwrap();
1877		statement.sign_ed25519_private(&kp);
1878		statement
1879	}
1880
1881	fn topic(data: u64) -> Topic {
1882		let mut bytes = [0u8; 32];
1883		bytes[0..8].copy_from_slice(&data.to_le_bytes());
1884		Topic::from(bytes)
1885	}
1886
1887	fn dec_key(data: u64) -> DecryptionKey {
1888		let mut dec_key: DecryptionKey = Default::default();
1889		dec_key[0..8].copy_from_slice(&data.to_le_bytes());
1890		dec_key
1891	}
1892
1893	/// Returns the deterministic ed25519 keypair used to author statements for the
1894	/// synthetic test account `seed`.
1895	///
1896	/// Uses ed25519 rather than sr25519 because schnorrkel signing is non-deterministic
1897	/// (the signature depends on RNG state), so calling `statement(id, prio, ch, len)`
1898	/// twice would produce different hashes. Several tests compare statement hashes
1899	/// against pre-computed values; ed25519 keeps those comparisons stable.
1900	fn account_keypair(seed: u64) -> sp_core::ed25519::Pair {
1901		sp_core::ed25519::Pair::from_string(&format!("//StatementAccount{seed}"), None)
1902			.expect("Derivation path is valid; qed")
1903	}
1904
1905	fn account(id: u64) -> AccountId {
1906		account_keypair(id).public().0
1907	}
1908
1909	/// Signs `stmt` with `account_id`'s test keypair. Tests that build a statement via
1910	/// `unsigned_statement(..)` and then mutate it call this exactly once at the end.
1911	fn sign_with(stmt: &mut Statement, account_id: u64) {
1912		stmt.sign_ed25519_private(&account_keypair(account_id));
1913	}
1914
1915	fn channel(id: u64) -> Channel {
1916		let mut channel: Channel = Default::default();
1917		channel[0..8].copy_from_slice(&id.to_le_bytes());
1918		channel
1919	}
1920
1921	/// Builds a test statement without signing it. Use this when a test needs to mutate
1922	/// the statement (encryption, expiry change, topic update, etc.) before submission —
1923	/// call `sign_with(&mut stmt, account_id)` once after all mutations.
1924	fn unsigned_statement(
1925		account_id: u64,
1926		priority: u32,
1927		c: Option<u64>,
1928		data_len: usize,
1929	) -> Statement {
1930		assert!(
1931			account_id <= MAX_TEST_ACCOUNT_SEED,
1932			"account_id {account_id} exceeds MAX_TEST_ACCOUNT_SEED ({MAX_TEST_ACCOUNT_SEED}); \
1933			 raise the constant if you need a wider range",
1934		);
1935		let mut statement = Statement::new();
1936		let mut data = Vec::new();
1937		data.resize(data_len, 0);
1938		statement.set_plain_data(data);
1939		statement.set_expiry_from_parts(u32::MAX, priority);
1940		if let Some(c) = c {
1941			statement.set_channel(channel(c));
1942		}
1943		statement
1944	}
1945
1946	fn statement(account_id: u64, priority: u32, c: Option<u64>, data_len: usize) -> Statement {
1947		let mut statement = unsigned_statement(account_id, priority, c, data_len);
1948		sign_with(&mut statement, account_id);
1949		statement
1950	}
1951
1952	#[test]
1953	fn submit_one() {
1954		let (store, _temp) = test_store();
1955		let statement0 = signed_statement(0);
1956		assert_eq!(store.submit(statement0, StatementSource::Network), SubmitResult::New);
1957		let statement1 = statement(1, 1, None, 0);
1958		assert_eq!(store.submit(statement1, StatementSource::Network), SubmitResult::New);
1959	}
1960
1961	#[test]
1962	fn save_and_load_statements() {
1963		let (store, temp) = test_store();
1964		let statement0 = signed_statement(0);
1965		let statement1 = signed_statement(1);
1966		let statement2 = signed_statement(2);
1967		assert_eq!(store.submit(statement0.clone(), StatementSource::Network), SubmitResult::New);
1968		assert_eq!(store.submit(statement1.clone(), StatementSource::Network), SubmitResult::New);
1969		assert_eq!(store.submit(statement2.clone(), StatementSource::Network), SubmitResult::New);
1970		assert_eq!(store.statements().unwrap().len(), 3);
1971		assert_eq!(store.broadcasts(&[]).unwrap().len(), 3);
1972		assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1.clone()));
1973		let keystore = store.keystore.clone();
1974		drop(store);
1975
1976		let client = std::sync::Arc::new(TestClient);
1977		let mut path: std::path::PathBuf = temp.path().into();
1978		path.push("db");
1979		let store = Store::new::<Block, TestClient, TestBackend>(
1980			&path,
1981			Default::default(),
1982			client,
1983			keystore,
1984			None,
1985			Box::new(sp_core::testing::TaskExecutor::new()),
1986		)
1987		.unwrap();
1988		assert_eq!(store.statements().unwrap().len(), 3);
1989		assert_eq!(store.broadcasts(&[]).unwrap().len(), 3);
1990		assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1));
1991	}
1992
1993	#[test]
1994	fn take_recent_statements_clears_index() {
1995		let (store, _temp) = test_store();
1996		let statement0 = signed_statement(0);
1997		let statement1 = signed_statement(1);
1998		let statement2 = signed_statement(2);
1999		let statement3 = signed_statement(3);
2000
2001		let _ = store.submit(statement0.clone(), StatementSource::Local);
2002		let _ = store.submit(statement1.clone(), StatementSource::Local);
2003		let _ = store.submit(statement2.clone(), StatementSource::Local);
2004
2005		let recent1 = store.take_recent_statements().unwrap();
2006		let (recent1_hashes, recent1_statements): (Vec<_>, Vec<_>) = recent1.into_iter().unzip();
2007		let expected1 = vec![statement0, statement1, statement2];
2008		assert!(expected1.iter().all(|s| recent1_hashes.contains(&s.hash())));
2009		assert!(expected1.iter().all(|s| recent1_statements.contains(s)));
2010
2011		// Recent statements are cleared.
2012		let recent2 = store.take_recent_statements().unwrap();
2013		assert_eq!(recent2.len(), 0);
2014
2015		store.submit(statement3.clone(), StatementSource::Network);
2016
2017		let recent3 = store.take_recent_statements().unwrap();
2018		let (recent3_hashes, recent3_statements): (Vec<_>, Vec<_>) = recent3.into_iter().unzip();
2019		let expected3 = vec![statement3];
2020		assert!(expected3.iter().all(|s| recent3_hashes.contains(&s.hash())));
2021		assert!(expected3.iter().all(|s| recent3_statements.contains(s)));
2022
2023		// Recent statements are cleared, but statements remain in the store.
2024		assert_eq!(store.statements().unwrap().len(), 4);
2025	}
2026
2027	#[test]
2028	fn search_by_topic_and_key() {
2029		let (store, _temp) = test_store();
2030		let statement0 = signed_statement(0);
2031		let statement1 = signed_statement_with_topics(1, &[topic(0)], None);
2032		let statement2 = signed_statement_with_topics(2, &[topic(0), topic(1)], Some(dec_key(2)));
2033		let statement3 = signed_statement_with_topics(3, &[topic(0), topic(1), topic(2)], None);
2034		let statement4 =
2035			signed_statement_with_topics(4, &[topic(0), topic(42), topic(2), topic(3)], None);
2036		let statements = vec![statement0, statement1, statement2, statement3, statement4];
2037		for s in &statements {
2038			store.submit(s.clone(), StatementSource::Network);
2039		}
2040
2041		let assert_topics = |topics: &[u64], key: Option<u64>, expected: &[u8]| {
2042			let key = key.map(dec_key);
2043			let topics: Vec<_> = topics.iter().map(|t| topic(*t)).collect();
2044			let mut got_vals: Vec<_> = if let Some(key) = key {
2045				store.posted(&topics, key).unwrap().into_iter().map(|d| d[0]).collect()
2046			} else {
2047				store.broadcasts(&topics).unwrap().into_iter().map(|d| d[0]).collect()
2048			};
2049			got_vals.sort();
2050			assert_eq!(expected.to_vec(), got_vals);
2051		};
2052
2053		assert_topics(&[], None, &[0, 1, 3, 4]);
2054		assert_topics(&[], Some(2), &[2]);
2055		assert_topics(&[0], None, &[1, 3, 4]);
2056		assert_topics(&[1], None, &[3]);
2057		assert_topics(&[2], None, &[3, 4]);
2058		assert_topics(&[3], None, &[4]);
2059		assert_topics(&[42], None, &[4]);
2060
2061		assert_topics(&[0, 1], None, &[3]);
2062		assert_topics(&[0, 1], Some(2), &[2]);
2063		assert_topics(&[0, 1, 99], Some(2), &[]);
2064		assert_topics(&[1, 2], None, &[3]);
2065		assert_topics(&[99], None, &[]);
2066		assert_topics(&[0, 99], None, &[]);
2067		assert_topics(&[0, 1, 2, 3, 42], None, &[]);
2068	}
2069
2070	#[test]
2071	fn constraints() {
2072		let (store, _temp) = test_store();
2073
2074		store.submit_index.write().config.max_total_size = 3000;
2075		let source = StatementSource::Network;
2076		let ok = SubmitResult::New;
2077
2078		// Account 1 (limit = 1 msg, 1000 bytes)
2079
2080		// Oversized statement is not allowed. Limit for account 1 is 1 msg, 1000 bytes
2081		assert!(matches!(
2082			store.submit(statement(1, 1, Some(1), 2000), source),
2083			SubmitResult::Rejected(_)
2084		));
2085		assert_eq!(store.submit(statement(1, 1, Some(1), 500), source), ok);
2086		// Would not replace channel message with same priority
2087		assert!(matches!(
2088			store.submit(statement(1, 1, Some(1), 200), source),
2089			SubmitResult::Rejected(_)
2090		));
2091		assert_eq!(store.submit(statement(1, 2, Some(1), 600), source), ok);
2092		// Submit another message to another channel with lower priority. Should not be allowed
2093		// because msg count limit is 1
2094		assert!(matches!(
2095			store.submit(statement(1, 1, Some(2), 100), source),
2096			SubmitResult::Rejected(_)
2097		));
2098		assert_eq!(store.submit_index.read().evicted.len(), 1);
2099
2100		// Account 2 (limit = 2 msg, 1000 bytes)
2101
2102		let s2_prio1 = statement(2, 1, None, 500);
2103		let s2_prio2 = statement(2, 2, None, 100);
2104		assert_eq!(store.submit(s2_prio1.clone(), source), ok);
2105		assert_eq!(store.submit(s2_prio2.clone(), source), ok);
2106		// Equal priority to lowest should be rejected
2107		assert!(matches!(
2108			store.submit(statement(2, 1, None, 50), source),
2109			SubmitResult::Rejected(RejectionReason::AccountFull { .. })
2110		));
2111		// Should evict priority 1
2112		let s2_prio3 = statement(2, 3, None, 500);
2113		assert_eq!(store.submit(s2_prio3.clone(), source), ok);
2114		assert_eq!(store.submit_index.read().evicted.len(), 2);
2115		assert!(store.submit_index.read().evicted.contains(&s2_prio1.hash()));
2116		assert!(store.statement(&s2_prio1.hash()).unwrap().is_none());
2117		// Should evict all
2118		assert_eq!(store.submit(statement(2, 4, None, 1000), source), ok);
2119		assert_eq!(store.submit_index.read().evicted.len(), 4);
2120		assert!(store.submit_index.read().evicted.contains(&s2_prio2.hash()));
2121		assert!(store.submit_index.read().evicted.contains(&s2_prio3.hash()));
2122
2123		// Account 3 (limit = 3 msg, 1000 bytes)
2124
2125		let s3_prio2 = statement(3, 2, Some(1), 300);
2126		let s3_prio3 = statement(3, 3, Some(2), 300);
2127		assert_eq!(store.submit(s3_prio2.clone(), source), ok);
2128		assert_eq!(store.submit(s3_prio3.clone(), source), ok);
2129		assert_eq!(store.submit(statement(3, 4, Some(3), 300), source), ok);
2130		// Should evict 2 and 3
2131		assert_eq!(store.submit(statement(3, 5, None, 500), source), ok);
2132		assert_eq!(store.submit_index.read().evicted.len(), 6);
2133		assert!(store.submit_index.read().evicted.contains(&s3_prio2.hash()));
2134		assert!(store.submit_index.read().evicted.contains(&s3_prio3.hash()));
2135
2136		assert_eq!(store.submit_index.read().total_size, 2400);
2137		assert_eq!(store.submit_index.read().entries.len(), 4);
2138
2139		// Should be over the global size limit
2140		assert!(matches!(
2141			store.submit(statement(1, 1, None, 700), source),
2142			SubmitResult::Rejected(_)
2143		));
2144		// Should be over the global count limit
2145		store.submit_index.write().config.max_total_statements = 4;
2146		assert!(matches!(
2147			store.submit(statement(1, 1, None, 100), source),
2148			SubmitResult::Rejected(_)
2149		));
2150
2151		let mut expected_statements = vec![
2152			statement(1, 2, Some(1), 600).hash(),
2153			statement(2, 4, None, 1000).hash(),
2154			statement(3, 4, Some(3), 300).hash(),
2155			statement(3, 5, None, 500).hash(),
2156		];
2157		expected_statements.sort();
2158		let mut statements: Vec<_> =
2159			store.statements().unwrap().into_iter().map(|(hash, _)| hash).collect();
2160		statements.sort();
2161		assert_eq!(expected_statements, statements);
2162	}
2163
2164	#[test]
2165	fn max_statement_size_for_gossiping() {
2166		let (store, _temp) = test_store();
2167		store.submit_index.write().config.max_total_size = 42 * crate::MAX_STATEMENT_SIZE;
2168
2169		assert_eq!(
2170			store.submit(
2171				statement(42, 1, Some(1), crate::MAX_STATEMENT_SIZE - 500),
2172				StatementSource::Local
2173			),
2174			SubmitResult::New
2175		);
2176
2177		assert!(matches!(
2178			store.submit(
2179				statement(42, 2, Some(1), 2 * crate::MAX_STATEMENT_SIZE),
2180				StatementSource::Local
2181			),
2182			SubmitResult::Invalid(_)
2183		));
2184	}
2185
2186	#[test]
2187	fn expired_statements_are_purged() {
2188		use super::DEFAULT_PURGE_AFTER_SEC;
2189		let (mut store, temp) = test_store();
2190		let mut statement = unsigned_statement(1, 1, Some(3), 100);
2191		store.set_time(0);
2192		statement.set_topic(0, topic(4));
2193		sign_with(&mut statement, 1);
2194		store.submit(statement.clone(), StatementSource::Network);
2195		assert_eq!(store.submit_index.read().entries.len(), 1);
2196		store.remove(&statement.hash()).unwrap();
2197		assert_eq!(store.submit_index.read().entries.len(), 0);
2198		assert_eq!(store.submit_index.read().accounts.len(), 0);
2199		store.set_time(DEFAULT_PURGE_AFTER_SEC + 1);
2200		store.maintain();
2201		assert_eq!(store.submit_index.read().evicted.len(), 0);
2202		let keystore = store.keystore.clone();
2203		drop(store);
2204
2205		let client = std::sync::Arc::new(TestClient);
2206		let mut path: std::path::PathBuf = temp.path().into();
2207		path.push("db");
2208		let store = Store::new::<Block, TestClient, TestBackend>(
2209			&path,
2210			Default::default(),
2211			client,
2212			keystore,
2213			None,
2214			Box::new(sp_core::testing::TaskExecutor::new()),
2215		)
2216		.unwrap();
2217		assert_eq!(store.statements().unwrap().len(), 0);
2218		assert_eq!(store.submit_index.read().evicted.len(), 0);
2219	}
2220
2221	#[test]
2222	fn posted_clear_decrypts() {
2223		let (store, _temp) = test_store();
2224		let public = store
2225			.keystore
2226			.ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
2227			.unwrap();
2228		let statement1 = statement(1, 1, None, 100);
2229		let mut statement2 = unsigned_statement(1, 2, None, 0);
2230		let plain = b"The most valuable secret".to_vec();
2231		statement2.encrypt(&plain, &public).unwrap();
2232		sign_with(&mut statement2, 1);
2233		store.submit(statement1, StatementSource::Network);
2234		store.submit(statement2, StatementSource::Network);
2235		let posted_clear = store.posted_clear(&[], public.into()).unwrap();
2236		assert_eq!(posted_clear, vec![plain]);
2237	}
2238
2239	#[test]
2240	fn broadcasts_stmt_returns_encoded_statements() {
2241		let (store, _tmp) = test_store();
2242
2243		// no key, no topic
2244		let s0 = signed_statement_with_topics(0, &[], None);
2245		// same, but with a topic = 42
2246		let s1 = signed_statement_with_topics(1, &[topic(42)], None);
2247		// has a decryption key -> must NOT be returned by broadcasts_stmt
2248		let s2 = signed_statement_with_topics(2, &[topic(42)], Some(dec_key(99)));
2249
2250		for s in [&s0, &s1, &s2] {
2251			store.submit(s.clone(), StatementSource::Network);
2252		}
2253
2254		// no topic filter
2255		let mut hashes: Vec<_> = store
2256			.broadcasts_stmt(&[])
2257			.unwrap()
2258			.into_iter()
2259			.map(|bytes| Statement::decode(&mut &bytes[..]).unwrap().hash())
2260			.collect();
2261		hashes.sort();
2262		let expected_hashes = {
2263			let mut e = vec![s0.hash(), s1.hash()];
2264			e.sort();
2265			e
2266		};
2267		assert_eq!(hashes, expected_hashes);
2268
2269		// filter on topic 42
2270		let got = store.broadcasts_stmt(&[topic(42)]).unwrap();
2271		assert_eq!(got.len(), 1);
2272		let st = Statement::decode(&mut &got[0][..]).unwrap();
2273		assert_eq!(st.hash(), s1.hash());
2274	}
2275
2276	#[test]
2277	fn posted_stmt_returns_encoded_statements_for_dest() {
2278		let (store, _tmp) = test_store();
2279
2280		let public1 = store
2281			.keystore
2282			.ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
2283			.unwrap();
2284		let dest: [u8; 32] = public1.into();
2285
2286		let public2 = store
2287			.keystore
2288			.ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
2289			.unwrap();
2290
2291		// A statement that does have dec_key = dest
2292		let mut s_with_key = unsigned_statement(1, 1, None, 0);
2293		let plain1 = b"The most valuable secret".to_vec();
2294		s_with_key.encrypt(&plain1, &public1).unwrap();
2295		sign_with(&mut s_with_key, 1);
2296
2297		// A statement with a different dec_key
2298		let mut s_other_key = unsigned_statement(2, 2, None, 0);
2299		let plain2 = b"The second most valuable secret".to_vec();
2300		s_other_key.encrypt(&plain2, &public2).unwrap();
2301		sign_with(&mut s_other_key, 2);
2302
2303		// Submit them all
2304		for s in [&s_with_key, &s_other_key] {
2305			store.submit(s.clone(), StatementSource::Network);
2306		}
2307
2308		// posted_stmt should only return the one with dec_key = dest
2309		let retrieved = store.posted_stmt(&[], dest).unwrap();
2310		assert_eq!(retrieved.len(), 1, "Only one statement has dec_key=dest");
2311
2312		// Re-decode that returned statement to confirm it is correct
2313		let returned_stmt = Statement::decode(&mut &retrieved[0][..]).unwrap();
2314		assert_eq!(
2315			returned_stmt.hash(),
2316			s_with_key.hash(),
2317			"Returned statement must match s_with_key"
2318		);
2319	}
2320
2321	#[test]
2322	fn posted_clear_stmt_returns_statement_followed_by_plain_data() {
2323		let (store, _tmp) = test_store();
2324
2325		let public1 = store
2326			.keystore
2327			.ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
2328			.unwrap();
2329		let dest: [u8; 32] = public1.into();
2330
2331		let public2 = store
2332			.keystore
2333			.ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
2334			.unwrap();
2335
2336		// A statement that does have dec_key = dest
2337		let mut s_with_key = unsigned_statement(1, 1, None, 0);
2338		let plain1 = b"The most valuable secret".to_vec();
2339		s_with_key.encrypt(&plain1, &public1).unwrap();
2340		sign_with(&mut s_with_key, 1);
2341
2342		// A statement with a different dec_key
2343		let mut s_other_key = unsigned_statement(2, 2, None, 0);
2344		let plain2 = b"The second most valuable secret".to_vec();
2345		s_other_key.encrypt(&plain2, &public2).unwrap();
2346		sign_with(&mut s_other_key, 2);
2347
2348		// Submit them all
2349		for s in [&s_with_key, &s_other_key] {
2350			store.submit(s.clone(), StatementSource::Network);
2351		}
2352
2353		// posted_stmt should only return the one with dec_key = dest
2354		let retrieved = store.posted_clear_stmt(&[], dest).unwrap();
2355		assert_eq!(retrieved.len(), 1, "Only one statement has dec_key=dest");
2356
2357		// We expect: [ encoded Statement ] + [ the decrypted bytes ]
2358		let encoded_stmt = s_with_key.encode();
2359		let stmt_len = encoded_stmt.len();
2360
2361		// 1) statement is first
2362		assert_eq!(&retrieved[0][..stmt_len], &encoded_stmt[..]);
2363
2364		// 2) followed by the decrypted payload
2365		let trailing = &retrieved[0][stmt_len..];
2366		assert_eq!(trailing, &plain1[..]);
2367	}
2368
2369	#[test]
2370	fn posted_clear_returns_plain_data_for_dest_and_topics() {
2371		let (store, _tmp) = test_store();
2372
2373		// prepare two key-pairs
2374		let public_dest = store
2375			.keystore
2376			.ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
2377			.unwrap();
2378		let dest: [u8; 32] = public_dest.into();
2379
2380		let public_other = store
2381			.keystore
2382			.ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
2383			.unwrap();
2384
2385		// statement that SHOULD be returned (matches dest & topic 42)
2386		let mut s_good = unsigned_statement(1, 1, None, 0);
2387		let plaintext_good = b"The most valuable secret".to_vec();
2388		s_good.encrypt(&plaintext_good, &public_dest).unwrap();
2389		s_good.set_topic(0, topic(42));
2390		sign_with(&mut s_good, 1);
2391
2392		// statement that should NOT be returned (same dest but different topic)
2393		let mut s_wrong_topic = unsigned_statement(2, 2, None, 0);
2394		s_wrong_topic.encrypt(b"Wrong topic", &public_dest).unwrap();
2395		s_wrong_topic.set_topic(0, topic(99));
2396		sign_with(&mut s_wrong_topic, 2);
2397
2398		// statement that should NOT be returned (different dest)
2399		let mut s_other_dest = unsigned_statement(3, 3, None, 0);
2400		s_other_dest.encrypt(b"Other dest", &public_other).unwrap();
2401		s_other_dest.set_topic(0, topic(42));
2402		sign_with(&mut s_other_dest, 3);
2403
2404		// submit all
2405		for s in [&s_good, &s_wrong_topic, &s_other_dest] {
2406			store.submit(s.clone(), StatementSource::Network);
2407		}
2408
2409		// call posted_clear with the topic filter and dest
2410		let retrieved = store.posted_clear(&[topic(42)], dest).unwrap();
2411
2412		// exactly one element, equal to the expected plaintext
2413		assert_eq!(retrieved, vec![plaintext_good]);
2414	}
2415
2416	#[test]
2417	fn already_expired_statement_is_rejected() {
2418		let (mut store, _temp) = test_store();
2419
2420		// Set current time to 1000 seconds
2421		store.set_time(1000);
2422
2423		// Create a statement that has already expired (expiration at 500 seconds, before current
2424		// time)
2425		let mut expired_statement = unsigned_statement(1, 1, None, 100);
2426		// set_expiry_from_parts: first arg is expiration timestamp in seconds, second is priority
2427		expired_statement.set_expiry_from_parts(500, 1);
2428		sign_with(&mut expired_statement, 1);
2429
2430		// Submit should fail with AlreadyExpired
2431		assert_eq!(
2432			store.submit(expired_statement, StatementSource::Network),
2433			SubmitResult::Invalid(InvalidReason::AlreadyExpired)
2434		);
2435
2436		// Verify the statement was not added
2437		assert_eq!(store.statements().unwrap().len(), 0);
2438
2439		// Now create a statement that is not expired (expiration at 2000 seconds, after current
2440		// time)
2441		let mut valid_statement = unsigned_statement(1, 1, None, 100);
2442		valid_statement.set_expiry_from_parts(2000, 1);
2443		sign_with(&mut valid_statement, 1);
2444
2445		// Submit should succeed
2446		assert_eq!(store.submit(valid_statement, StatementSource::Network), SubmitResult::New);
2447		assert_eq!(store.statements().unwrap().len(), 1);
2448	}
2449
2450	#[test]
2451	fn remove_by_covers_various_situations() {
2452		use sp_statement_store::{StatementSource, StatementStore, SubmitResult};
2453
2454		// Use a fresh store and fixed time so we can control purging.
2455		let (mut store, _temp) = test_store();
2456		store.set_time(0);
2457
2458		// Reuse helpers from this module.
2459		let t42 = topic(42);
2460		let k7 = dec_key(7);
2461
2462		// Account A = 4 (has per-account limits (4, 1000) in the mock runtime)
2463		// - Mix of topic, decryption-key and channel to exercise every index.
2464		let mut s_a1 = unsigned_statement(4, 10, Some(100), 100);
2465		s_a1.set_topic(0, t42);
2466		sign_with(&mut s_a1, 4);
2467		let h_a1 = s_a1.hash();
2468
2469		let mut s_a2 = unsigned_statement(4, 20, Some(200), 150);
2470		s_a2.set_decryption_key(k7);
2471		sign_with(&mut s_a2, 4);
2472		let h_a2 = s_a2.hash();
2473
2474		let s_a3 = statement(4, 30, None, 50);
2475		let h_a3 = s_a3.hash();
2476
2477		// Account B = 3 (control group that must remain untouched).
2478		let s_b1 = statement(3, 10, None, 100);
2479		let h_b1 = s_b1.hash();
2480
2481		let mut s_b2 = unsigned_statement(3, 15, Some(300), 100);
2482		s_b2.set_topic(0, t42);
2483		s_b2.set_decryption_key(k7);
2484		sign_with(&mut s_b2, 3);
2485		let h_b2 = s_b2.hash();
2486
2487		// Submit all statements.
2488		for s in [&s_a1, &s_a2, &s_a3, &s_b1, &s_b2] {
2489			assert_eq!(store.submit(s.clone(), StatementSource::Network), SubmitResult::New);
2490		}
2491
2492		// --- Pre-conditions: everything is indexed as expected.
2493		{
2494			let submit_idx = store.submit_index.read();
2495			assert_eq!(submit_idx.entries.len(), 5, "all 5 should be present");
2496			assert!(submit_idx.accounts.contains_key(&account(4)));
2497			assert!(submit_idx.accounts.contains_key(&account(3)));
2498			assert_eq!(submit_idx.total_size, 100 + 150 + 50 + 100 + 100);
2499
2500			let query_idx = store.query_index.read();
2501			// Topic and key sets contain both A & B entries.
2502			let set_t = query_idx.by_topic.get(&t42).expect("topic set exists");
2503			assert!(set_t.contains(&h_a1) && set_t.contains(&h_b2));
2504
2505			let set_k = query_idx.by_dec_key.get(&Some(k7)).expect("key set exists");
2506			assert!(set_k.contains(&h_a2) && set_k.contains(&h_b2));
2507		}
2508
2509		// --- Action: remove all statements by Account A.
2510		store.remove_by(account(4)).expect("remove_by should succeed");
2511
2512		// --- Post-conditions: A's statements are gone and marked expired; B's remain.
2513		{
2514			// A's statements removed from DB view.
2515			for h in [h_a1, h_a2, h_a3] {
2516				assert!(store.statement(&h).unwrap().is_none(), "A's statement should be removed");
2517			}
2518
2519			// B's statements still present.
2520			for h in [h_b1, h_b2] {
2521				assert!(store.statement(&h).unwrap().is_some(), "B's statement should remain");
2522			}
2523
2524			let submit_idx = store.submit_index.read();
2525
2526			// Account map updated.
2527			assert!(!submit_idx.accounts.contains_key(&account(4)), "Account A must be gone");
2528			assert!(submit_idx.accounts.contains_key(&account(3)), "Account B must remain");
2529
2530			// Removed statements are marked expired.
2531			assert!(submit_idx.evicted.contains(&h_a1));
2532			assert!(submit_idx.evicted.contains(&h_a2));
2533			assert!(submit_idx.evicted.contains(&h_a3));
2534			assert_eq!(submit_idx.evicted.len(), 3);
2535
2536			// Entry count & total_size reflect only B's data.
2537			assert_eq!(submit_idx.entries.len(), 2);
2538			assert_eq!(submit_idx.total_size, 100 + 100);
2539
2540			let query_idx = store.query_index.read();
2541			// Topic index: only B2 remains for topic 42.
2542			let set_t = query_idx.by_topic.get(&t42).expect("topic set exists");
2543			assert!(set_t.contains(&h_b2));
2544			assert!(!set_t.contains(&h_a1));
2545
2546			// Decryption-key index: only B2 remains for key 7.
2547			let set_k = query_idx.by_dec_key.get(&Some(k7)).expect("key set exists");
2548			assert!(set_k.contains(&h_b2));
2549			assert!(!set_k.contains(&h_a2));
2550		}
2551
2552		// --- Idempotency: removing again is a no-op and should not error.
2553		store.remove_by(account(4)).expect("second remove_by should be a no-op");
2554
2555		// --- Purge: advance time beyond TTL and run maintenance; expired entries disappear.
2556		let purge_after = store.submit_index.read().config.purge_after_sec;
2557		store.set_time(purge_after + 1);
2558		store.maintain();
2559		assert_eq!(store.submit_index.read().evicted.len(), 0, "expired entries should be purged");
2560
2561		// --- Reuse: Account A can submit again after purge.
2562		let s_new = statement(4, 40, None, 10);
2563		assert_eq!(store.submit(s_new, StatementSource::Network), SubmitResult::New);
2564	}
2565
2566	#[test]
2567	fn check_expiration_repopulates_account_list_when_empty() {
2568		let (mut store, _temp) = test_store();
2569		store.set_time(1000);
2570
2571		// Create statements for multiple accounts
2572		// Note: The statement() helper uses set_expiry_from_parts(u32::MAX, priority)
2573		// which creates a very large expiry value that won't trigger expiration
2574		let s1 = statement(1, 1, None, 100);
2575		let s2 = statement(2, 1, None, 100);
2576		let s3 = statement(3, 1, None, 100);
2577
2578		for s in [&s1, &s2, &s3] {
2579			store.submit(s.clone(), StatementSource::Network);
2580		}
2581
2582		// Initially, accounts_to_check_for_expiry_stmts is empty
2583		assert!(store.submit_index.read().accounts_to_check_for_expiry_stmts.is_empty());
2584
2585		// First call to check_expiration should populate the list
2586		store.enforce_limits();
2587
2588		// Now accounts_to_check_for_expiry_stmts should contain all 3 accounts
2589		let accounts = store.submit_index.read().accounts_to_check_for_expiry_stmts.clone();
2590		assert_eq!(accounts.len(), 3, "Should have 3 accounts to check");
2591		assert!(accounts.contains(&account(1)));
2592		assert!(accounts.contains(&account(2)));
2593		assert!(accounts.contains(&account(3)));
2594
2595		// No statements should have been expired since they're all valid
2596		assert_eq!(store.submit_index.read().evicted.len(), 0);
2597		assert_eq!(store.submit_index.read().entries.len(), 3);
2598	}
2599
2600	#[test]
2601	fn check_expiration_expires_statements_past_current_time() {
2602		let (mut store, _temp) = test_store();
2603
2604		// The check_expiration function compares Expiry(current_time << 32) against
2605		// Expiry(expiry) where expiry is the full 64-bit value with timestamp in high 32 bits.
2606		// Statements with expiration timestamp < current_time will be expired.
2607
2608		store.set_time(100);
2609
2610		// Create a statement that will expire at timestamp 500
2611		let mut expired_stmt = unsigned_statement(1, 1, None, 100);
2612		expired_stmt.set_expiry_from_parts(500, 1);
2613		sign_with(&mut expired_stmt, 1);
2614		let expired_hash = expired_stmt.hash();
2615		store.submit(expired_stmt, StatementSource::Network);
2616
2617		// Create a statement that won't expire (far future expiry)
2618		let valid_stmt = statement(2, 1, None, 100); // Uses u32::MAX as timestamp
2619		let valid_hash = valid_stmt.hash();
2620		store.submit(valid_stmt, StatementSource::Network);
2621
2622		// Verify both statements are in the store
2623		assert_eq!(store.submit_index.read().entries.len(), 2);
2624
2625		// First check_expiration populates the account list
2626		store.enforce_limits();
2627		assert!(!store.submit_index.read().accounts_to_check_for_expiry_stmts.is_empty());
2628
2629		// Advance time past the expiry of the first statement
2630		store.set_time(1000);
2631
2632		// Second check_expiration should find and expire the statement
2633		store.enforce_limits();
2634
2635		// Naturally-expired statements are not added to the expired map (AlreadyExpired check
2636		// in submit rejects them without consulting the map)
2637		let index = store.submit_index.read();
2638		assert!(
2639			!index.evicted.contains(&expired_hash),
2640			"Naturally expired statement must not be added to the expired map"
2641		);
2642		assert!(
2643			!index.entries.contains_key(&expired_hash),
2644			"Expired statement should be removed from entries"
2645		);
2646
2647		// The valid statement should still be in entries
2648		assert!(
2649			index.entries.contains_key(&valid_hash),
2650			"Valid statement should still be in entries"
2651		);
2652		assert!(!index.evicted.contains(&valid_hash), "Valid statement should not be expired");
2653	}
2654
2655	#[test]
2656	fn check_expiration_removes_checked_accounts_from_list_when_expiring() {
2657		let (mut store, _temp) = test_store();
2658		store.set_time(100);
2659
2660		// Create statements with expiry at timestamp 200
2661		let mut stmt1 = unsigned_statement(1, 1, None, 100);
2662		stmt1.set_expiry_from_parts(200, 1);
2663		sign_with(&mut stmt1, 1);
2664		store.submit(stmt1, StatementSource::Network);
2665
2666		let mut stmt2 = unsigned_statement(2, 1, None, 100);
2667		stmt2.set_expiry_from_parts(200, 1);
2668		sign_with(&mut stmt2, 2);
2669		store.submit(stmt2, StatementSource::Network);
2670
2671		let mut stmt3 = unsigned_statement(3, 1, None, 100);
2672		stmt3.set_expiry_from_parts(200, 1);
2673		sign_with(&mut stmt3, 3);
2674		store.submit(stmt3, StatementSource::Network);
2675
2676		// First call populates the list
2677		store.enforce_limits();
2678		assert_eq!(
2679			store.submit_index.read().accounts_to_check_for_expiry_stmts.len(),
2680			3,
2681			"Should have 3 accounts to check"
2682		);
2683
2684		// Advance time past expiry
2685		store.set_time(300);
2686
2687		// Second call should check accounts, expire statements, and remove checked accounts
2688		store.enforce_limits();
2689
2690		// The list should now be empty (all accounts checked and removed)
2691		assert!(
2692			store.submit_index.read().accounts_to_check_for_expiry_stmts.is_empty(),
2693			"All accounts should have been checked and removed after expiration"
2694		);
2695
2696		// All statements were naturally expired (past their own timestamp), so they are not
2697		// added to the expired map AlreadyExpired check in submit handles re-gossip prevention
2698		assert_eq!(store.submit_index.read().evicted.len(), 0);
2699		assert_eq!(store.submit_index.read().entries.len(), 0);
2700	}
2701
2702	#[test]
2703	fn check_expiration_truncates_list_even_when_nothing_expires() {
2704		let (mut store, _temp) = test_store();
2705		store.set_time(1000);
2706
2707		// Create statements for multiple accounts with far future expiry (using statement helper)
2708		// The statement() helper uses set_expiry_from_parts(u32::MAX, priority) which creates
2709		// a very large expiry value that won't trigger expiration
2710		for acc_id in 1..=5u64 {
2711			let stmt = statement(acc_id, 1, None, 100);
2712			store.submit(stmt, StatementSource::Network);
2713		}
2714
2715		// First call populates the list
2716		store.enforce_limits();
2717		assert_eq!(store.submit_index.read().accounts_to_check_for_expiry_stmts.len(), 5);
2718
2719		// Second call checks accounts and truncates the list (even though nothing expires)
2720		store.enforce_limits();
2721
2722		// The list should now be empty - accounts are removed after being checked
2723		assert!(
2724			store.submit_index.read().accounts_to_check_for_expiry_stmts.is_empty(),
2725			"List should be empty after all accounts have been checked"
2726		);
2727
2728		// No statements should have been expired
2729		assert_eq!(store.submit_index.read().evicted.len(), 0);
2730		assert_eq!(store.submit_index.read().entries.len(), 5);
2731	}
2732
2733	#[test]
2734	fn check_expiration_handles_multiple_statements_per_account() {
2735		let (mut store, _temp) = test_store();
2736		store.set_time(100);
2737
2738		// Create multiple statements for the same account with different expiry timestamps
2739		// Account 42 has limit of 42 statements
2740		let mut stmt1 = unsigned_statement(42, 1, Some(1), 100);
2741		stmt1.set_expiry_from_parts(200, 1); // Expires at timestamp 200
2742		sign_with(&mut stmt1, 42);
2743		let hash1 = stmt1.hash();
2744		store.submit(stmt1, StatementSource::Network);
2745
2746		let mut stmt2 = unsigned_statement(42, 2, Some(2), 100);
2747		stmt2.set_expiry_from_parts(300, 2); // Expires at timestamp 300
2748		sign_with(&mut stmt2, 42);
2749		let hash2 = stmt2.hash();
2750		store.submit(stmt2, StatementSource::Network);
2751
2752		let mut stmt3 = unsigned_statement(42, 3, Some(3), 100);
2753		stmt3.set_expiry_from_parts(500, 3); // Expires at timestamp 500
2754		sign_with(&mut stmt3, 42);
2755		let hash3 = stmt3.hash();
2756		store.submit(stmt3, StatementSource::Network);
2757
2758		// Verify all statements are in the store
2759		assert_eq!(store.submit_index.read().entries.len(), 3);
2760
2761		// First check_expiration populates the account list
2762		store.enforce_limits();
2763
2764		// Advance time to 250 (stmt1 should expire since 250 > 200)
2765		store.set_time(250);
2766		store.enforce_limits();
2767
2768		{
2769			let index = store.submit_index.read();
2770			// Naturally expired statements are not added to the expired map.
2771			assert!(!index.evicted.contains(&hash1), "stmt1 naturally expired, not in map");
2772			assert!(!index.evicted.contains(&hash2), "stmt2 should not be expired yet");
2773			assert!(!index.evicted.contains(&hash3), "stmt3 should not be expired yet");
2774			assert_eq!(index.entries.len(), 2);
2775		}
2776
2777		// Repopulate the account list for next check
2778		store.enforce_limits();
2779
2780		// Advance time to 400 (stmt2 should also expire since 400 > 300)
2781		store.set_time(400);
2782		store.enforce_limits();
2783
2784		{
2785			let index = store.submit_index.read();
2786			assert!(!index.evicted.contains(&hash1));
2787			assert!(!index.evicted.contains(&hash2), "stmt2 naturally expired, not in map");
2788			assert!(!index.evicted.contains(&hash3), "stmt3 should not be expired yet");
2789			assert_eq!(index.entries.len(), 1);
2790		}
2791
2792		// Repopulate and check again at time 600 (stmt3 should expire since 600 > 500)
2793		store.enforce_limits();
2794		store.set_time(600);
2795		store.enforce_limits();
2796
2797		{
2798			let index = store.submit_index.read();
2799			assert!(!index.evicted.contains(&hash1));
2800			assert!(!index.evicted.contains(&hash2));
2801			assert!(!index.evicted.contains(&hash3), "stmt3 naturally expired, not in map");
2802			assert_eq!(index.entries.len(), 0);
2803		}
2804	}
2805
2806	#[test]
2807	fn check_expiration_does_nothing_when_no_expired_statements() {
2808		let (mut store, _temp) = test_store();
2809		store.set_time(1000);
2810
2811		// Create statement with expiry far in the future
2812		// The statement() helper uses set_expiry_from_parts(u32::MAX, priority)
2813		let stmt = statement(1, 1, None, 100);
2814		let hash = stmt.hash();
2815		store.submit(stmt, StatementSource::Network);
2816
2817		// Populate the account list
2818		store.enforce_limits();
2819
2820		// Check expiration - nothing should happen
2821		store.enforce_limits();
2822
2823		// Statement should still be there
2824		let index = store.submit_index.read();
2825		assert!(index.entries.contains_key(&hash));
2826		assert!(!index.evicted.contains(&hash));
2827		assert_eq!(index.entries.len(), 1);
2828		assert_eq!(index.evicted.len(), 0);
2829	}
2830
2831	#[test]
2832	fn check_expiration_correctly_updates_account_data() {
2833		let (mut store, _temp) = test_store();
2834		store.set_time(100);
2835
2836		// Create a statement with expiry at timestamp 200
2837		let mut stmt = unsigned_statement(1, 1, Some(1), 100);
2838		stmt.set_expiry_from_parts(200, 1);
2839		sign_with(&mut stmt, 1);
2840		let hash = stmt.hash();
2841		store.submit(stmt, StatementSource::Network);
2842
2843		// Verify account exists before expiration
2844		{
2845			let index = store.submit_index.read();
2846			assert!(index.accounts.contains_key(&account(1)));
2847			assert_eq!(index.total_size, 100);
2848		}
2849
2850		// Populate and then expire
2851		store.enforce_limits();
2852		store.set_time(300);
2853		store.enforce_limits();
2854
2855		// Verify account is removed after its only statement expires
2856		{
2857			let index = store.submit_index.read();
2858			assert!(
2859				!index.accounts.contains_key(&account(1)),
2860				"Account should be removed when all its statements expire"
2861			);
2862			assert_eq!(index.total_size, 0, "Total size should be zero");
2863			assert!(!index.evicted.contains(&hash), "Naturally expired, not in map");
2864		}
2865	}
2866
2867	#[test]
2868	fn check_expiration_clears_topic_and_key_indexes() {
2869		let (mut store, _temp) = test_store();
2870		store.set_time(100);
2871
2872		// Create a statement with topic and decryption key
2873		let mut stmt = unsigned_statement(1, 1, Some(1), 100);
2874		stmt.set_expiry_from_parts(200, 1);
2875		stmt.set_topic(0, topic(42));
2876		stmt.set_decryption_key(dec_key(7));
2877		sign_with(&mut stmt, 1);
2878		let hash = stmt.hash();
2879		store.submit(stmt, StatementSource::Network);
2880
2881		// Verify indexes are populated
2882		{
2883			let query_index = store.query_index.read();
2884			assert!(query_index.by_topic.get(&topic(42)).map_or(false, |s| s.contains(&hash)));
2885			assert!(query_index
2886				.by_dec_key
2887				.get(&Some(dec_key(7)))
2888				.map_or(false, |s| s.contains(&hash)));
2889		}
2890
2891		// Populate and then expire
2892		store.enforce_limits();
2893		store.set_time(300);
2894		store.enforce_limits();
2895
2896		// Verify indexes are cleared
2897		{
2898			let query_index = store.query_index.read();
2899			// Topic set should be empty or removed
2900			assert!(
2901				query_index.by_topic.get(&topic(42)).map_or(true, |s| s.is_empty()),
2902				"Topic index should be cleared"
2903			);
2904			// Key set should be empty or removed
2905			assert!(
2906				query_index.by_dec_key.get(&Some(dec_key(7))).map_or(true, |s| s.is_empty()),
2907				"Decryption key index should be cleared"
2908			);
2909			assert!(
2910				!store.submit_index.read().evicted.contains(&hash),
2911				"Naturally expired, not in map"
2912			);
2913		}
2914	}
2915
2916	#[test]
2917	fn check_expiration_handles_empty_store() {
2918		let (mut store, _temp) = test_store();
2919		store.set_time(1000);
2920
2921		// With no statements, check_expiration should not panic
2922		store.enforce_limits();
2923
2924		// Second call should also work (empty repopulation)
2925		store.enforce_limits();
2926
2927		assert!(store.submit_index.read().accounts_to_check_for_expiry_stmts.is_empty());
2928		assert_eq!(store.submit_index.read().entries.len(), 0);
2929		assert_eq!(store.submit_index.read().evicted.len(), 0);
2930	}
2931
2932	#[test]
2933	fn check_expiration_expires_properly_formatted_statements() {
2934		// With the fix (Expiry(current_time << 32)), check_expiration properly
2935		// compares timestamps and can expire statements submitted through normal flow.
2936
2937		let (mut store, _temp) = test_store();
2938		store.set_time(1000);
2939
2940		// Create a statement with expiration timestamp just 1 second in the future
2941		let mut stmt = unsigned_statement(1, 1, None, 100);
2942		stmt.set_expiry_from_parts(1001, 1); // Expires at timestamp 1001
2943		sign_with(&mut stmt, 1);
2944		let hash = stmt.hash();
2945		store.submit(stmt, StatementSource::Network);
2946
2947		assert_eq!(store.submit_index.read().entries.len(), 1);
2948
2949		// Populate the accounts list
2950		store.enforce_limits();
2951
2952		// Advance time past the expiration timestamp
2953		store.set_time(2000);
2954		store.enforce_limits();
2955
2956		// Statement SHOULD be expired because check_expiration now compares
2957		// Expiry(2000 << 32) against Expiry(1001 << 32 | 1), and
2958		// (2000 << 32) > (1001 << 32 | 1)
2959		let index = store.submit_index.read();
2960		assert!(
2961			!index.entries.contains_key(&hash),
2962			"Statement should be removed from entries after expiration"
2963		);
2964		// Naturally expired: timestamp 1001 < current_time 2000, not added to expired map.
2965		assert!(!index.evicted.contains(&hash), "Naturally expired, not in map");
2966	}
2967
2968	#[test]
2969	fn check_expiration_updates_database_columns() {
2970		// This test verifies that check_expiration properly updates the database.
2971		let (mut store, _temp) = test_store();
2972		store.set_time(100);
2973
2974		// Create a statement with expiry at timestamp 200
2975		let mut stmt = unsigned_statement(1, 1, None, 100);
2976		stmt.set_expiry_from_parts(200, 1);
2977		sign_with(&mut stmt, 1);
2978		let hash = stmt.hash();
2979		store.submit(stmt.clone(), StatementSource::Network);
2980
2981		// Verify statement is in the database
2982		let db_entry = store.db.get(col::STATEMENTS, &hash).unwrap();
2983		assert!(db_entry.is_some(), "Statement should be in col::STATEMENTS after submit");
2984
2985		// Populate the accounts list
2986		store.enforce_limits();
2987
2988		// Advance time past expiry and run check_expiration
2989		store.set_time(300);
2990		store.enforce_limits();
2991
2992		// Verify in-memory state is updated correctly
2993		{
2994			let index = store.submit_index.read();
2995			assert!(
2996				!index.entries.contains_key(&hash),
2997				"Statement should be removed from in-memory entries"
2998			);
2999			// Naturally expired: not added to expired map, no need for suppression.
3000			assert!(
3001				!index.evicted.contains(&hash),
3002				"Naturally expired statement must not be in the expired map"
3003			);
3004		}
3005
3006		let db_entry = store.db.get(col::STATEMENTS, &hash).unwrap();
3007		assert!(
3008			db_entry.is_none(),
3009			"Statement should be removed from col::STATEMENTS after expiration"
3010		);
3011
3012		// Naturally expired statements are not written to col::EXPIRED either, so that
3013		// the optimization survives node restarts.
3014		let expired_entry = store.db.get(col::EXPIRED, &hash).unwrap();
3015		assert!(expired_entry.is_none(), "Naturally expired: not written to col::EXPIRED");
3016	}
3017
3018	#[test]
3019	fn enforce_allowances_evicts_excess_statements() {
3020		// This test verifies that check_expiration correctly evicts statements
3021		// when statements exceed the current allowance. We directly insert into
3022		// the index (bypassing submit's validation) to simulate statements that
3023		// existed before allowances were reduced.
3024		let (mut store, _temp) = test_store();
3025		store.set_time(0);
3026
3027		// Account 4 has allowance (4 statements, 1000 bytes) from TestClient
3028		let s1 = statement(4, 10, None, 100); // lowest priority - will be evicted
3029		let s2 = statement(4, 20, None, 100);
3030		let s3 = statement(4, 30, None, 100);
3031		let s4 = statement(4, 40, None, 100);
3032		let s5 = statement(4, 50, None, 100); // highest priority
3033
3034		let h1 = s1.hash();
3035		let h5 = s5.hash();
3036
3037		// Directly insert into index, bypassing `submit`'s allowance check
3038		{
3039			let mut submit_index = store.submit_index.write();
3040			let mut query_index = store.query_index.write();
3041			for s in [&s1, &s2, &s3, &s4, &s5] {
3042				submit_index.insert_new(s.hash(), account(4), s);
3043				query_index.insert(s.hash(), s);
3044			}
3045		}
3046
3047		// Verify initial state - all 5 should be present
3048		assert_eq!(store.submit_index.read().entries.len(), 5);
3049		assert_eq!(store.submit_index.read().total_size, 500);
3050
3051		// Run check_expiration which handles both expiration and allowance enforcement
3052		// First call populates the accounts list, second call processes them
3053		// Since account 4 has max_count=4, one statement should be evicted
3054		store.enforce_limits();
3055		store.enforce_limits();
3056
3057		// Should evict the lowest priority statement (s1)
3058		let index = store.submit_index.read();
3059		assert_eq!(index.entries.len(), 4, "Should have 4 statements after eviction");
3060		assert!(!index.entries.contains_key(&h1), "Lowest priority should be evicted");
3061		assert!(index.entries.contains_key(&h5), "Highest priority should remain");
3062		assert_eq!(index.total_size, 400);
3063
3064		// Evicted statement should be marked as expired
3065		assert!(index.evicted.contains(&h1));
3066	}
3067
3068	#[test]
3069	fn enforce_allowances_evicts_all_when_no_allowance_found() {
3070		let (mut store, _temp) = test_store();
3071		store.set_time(0);
3072
3073		// Account 0 has NO allowance in TestClient
3074		let s1 = statement(0, 10, None, 100);
3075		let s2 = statement(0, 20, None, 150);
3076
3077		let h1 = s1.hash();
3078		let h2 = s2.hash();
3079
3080		// Directly insert statements for account with no allowance
3081		{
3082			let mut submit_index = store.submit_index.write();
3083			let mut query_index = store.query_index.write();
3084			submit_index.insert_new(h1, account(0), &s1);
3085			query_index.insert(h1, &s1);
3086			submit_index.insert_new(h2, account(0), &s2);
3087			query_index.insert(h2, &s2);
3088		}
3089
3090		assert_eq!(store.submit_index.read().entries.len(), 2);
3091
3092		// Run check_expiration - should evict ALL statements since no allowance exists
3093		// First call populates the accounts list, second call processes them
3094		store.enforce_limits();
3095		store.enforce_limits();
3096
3097		let index = store.submit_index.read();
3098		assert_eq!(index.entries.len(), 0, "All statements should be evicted");
3099		assert!(!index.accounts.contains_key(&account(0)), "Account should be removed");
3100		assert!(index.evicted.contains(&h1));
3101		assert!(index.evicted.contains(&h2));
3102	}
3103
3104	#[test]
3105	fn enforce_allowances_based_on_size() {
3106		// This test verifies that check_expiration evicts based on size limits.
3107		let (mut store, _temp) = test_store();
3108		store.set_time(0);
3109
3110		// Account 2 has allowance (2, 1000) from TestClient
3111		// Insert 2 statements that together exceed 1000 bytes
3112		let s1 = statement(2, 10, None, 600); // lowest priority
3113		let s2 = statement(2, 20, None, 600); // higher priority
3114
3115		let h1 = s1.hash();
3116		let h2 = s2.hash();
3117
3118		// Directly insert both statements (total 1200 bytes > 1000 limit)
3119		{
3120			let mut submit_index = store.submit_index.write();
3121			let mut query_index = store.query_index.write();
3122			submit_index.insert_new(h1, account(2), &s1);
3123			query_index.insert(h1, &s1);
3124			submit_index.insert_new(h2, account(2), &s2);
3125			query_index.insert(h2, &s2);
3126		}
3127
3128		assert_eq!(store.submit_index.read().total_size, 1200);
3129
3130		// Run check_expiration - should evict s1 to get under 1000 bytes
3131		// First call populates the accounts list, second call processes them
3132		store.enforce_limits();
3133		store.enforce_limits();
3134
3135		let index = store.submit_index.read();
3136		assert_eq!(index.entries.len(), 1);
3137		assert!(index.entries.contains_key(&h2), "Higher priority should remain");
3138		assert!(!index.entries.contains_key(&h1), "Lower priority should be evicted");
3139		assert_eq!(index.total_size, 600);
3140	}
3141
3142	#[test]
3143	fn channel_replacement_only_higher_priority_succeeds() {
3144		let (store, _temp) = test_store();
3145		let source = StatementSource::Network;
3146
3147		// Account 1: max_count=1, max_size=1000
3148		// Submit channel 1 with priority 5
3149		let s1 = statement(1, 5, Some(1), 100);
3150		let h1 = s1.hash();
3151		assert_eq!(store.submit(s1, source), SubmitResult::New);
3152
3153		// Lower priority on same channel → ChannelPriorityTooLow
3154		let result = store.submit(statement(1, 3, Some(1), 100), source);
3155		assert!(
3156			matches!(result, SubmitResult::Rejected(RejectionReason::ChannelPriorityTooLow { .. })),
3157			"Lower priority should be rejected with ChannelPriorityTooLow, got: {result:?}"
3158		);
3159
3160		// Equal priority on same channel → ChannelPriorityTooLow (check is <=)
3161		// Use different data_len to get a distinct hash with same priority
3162		let result = store.submit(statement(1, 5, Some(1), 101), source);
3163		assert!(
3164			matches!(result, SubmitResult::Rejected(RejectionReason::ChannelPriorityTooLow { .. })),
3165			"Equal priority should be rejected with ChannelPriorityTooLow, got: {result:?}"
3166		);
3167
3168		// Higher priority on same channel → replaces
3169		let s2 = statement(1, 10, Some(1), 200);
3170		let h2 = s2.hash();
3171		assert_eq!(store.submit(s2, source), SubmitResult::New);
3172
3173		{
3174			let index = store.submit_index.read();
3175			assert_eq!(index.entries.len(), 1);
3176			assert!(!index.entries.contains_key(&h1), "Old channel message should be gone");
3177			assert!(index.entries.contains_key(&h2), "New channel message should exist");
3178			assert!(index.evicted.contains(&h1), "Old should be in expired");
3179			assert_eq!(index.total_size, 200);
3180		}
3181	}
3182
3183	#[test]
3184	fn submit_rejects_malformed_statements() {
3185		let (store, _temp) = test_store();
3186
3187		let mut base = Statement::new();
3188		base.set_expiry(u64::MAX);
3189		base.set_plain_data(vec![1]);
3190
3191		let ed_kp = sp_core::ed25519::Pair::from_string("//Alice", None).unwrap();
3192		let sr_kp = sp_core::sr25519::Pair::from_string("//Alice", None).unwrap();
3193		let ecdsa_kp = sp_core::ecdsa::Pair::from_string("//Alice", None).unwrap();
3194
3195		assert_eq!(
3196			store.submit(base.clone(), StatementSource::Network),
3197			SubmitResult::Invalid(InvalidReason::NoProof)
3198		);
3199
3200		let bad_proofs = [
3201			Proof::Ed25519 { signature: [0xAB; 64], signer: ed_kp.public().0 },
3202			Proof::Sr25519 { signature: [0xCD; 64], signer: sr_kp.public().0 },
3203			Proof::Secp256k1Ecdsa { signature: [0xEF; 65], signer: ecdsa_kp.public().0 },
3204		];
3205		for proof in bad_proofs {
3206			let mut s = base.clone();
3207			s.set_proof(proof);
3208			assert_eq!(
3209				store.submit(s, StatementSource::Network),
3210				SubmitResult::Invalid(InvalidReason::BadProof)
3211			);
3212		}
3213
3214		let mut wrong_signer = base.clone();
3215		wrong_signer.sign_ed25519_private(&ed_kp);
3216		let alice_sig = match wrong_signer.proof().unwrap() {
3217			Proof::Ed25519 { signature, .. } => *signature,
3218			_ => panic!("expected Ed25519 proof after sign_ed25519_private"),
3219		};
3220		let bob_kp = sp_core::ed25519::Pair::from_string("//Bob", None).unwrap();
3221		wrong_signer.set_proof(Proof::Ed25519 { signature: alice_sig, signer: bob_kp.public().0 });
3222		assert_eq!(
3223			store.submit(wrong_signer, StatementSource::Network),
3224			SubmitResult::Invalid(InvalidReason::BadProof)
3225		);
3226	}
3227
3228	#[test]
3229	fn channel_replacement_with_size_increase_evicts_others() {
3230		let (store, _temp) = test_store();
3231		let source = StatementSource::Network;
3232
3233		// Account 3: max_count=3, max_size=1000
3234		// channel msg (200b) + two non-channel msgs (300b each) = 800b
3235		let s_ch = statement(3, 5, Some(1), 200);
3236		let s_low = statement(3, 2, None, 300);
3237		let s_mid = statement(3, 3, None, 300);
3238		let h_ch = s_ch.hash();
3239		let h_low = s_low.hash();
3240		let h_mid = s_mid.hash();
3241
3242		assert_eq!(store.submit(s_ch, source), SubmitResult::New);
3243		assert_eq!(store.submit(s_low, source), SubmitResult::New);
3244		assert_eq!(store.submit(s_mid, source), SubmitResult::New);
3245		assert_eq!(store.submit_index.read().total_size, 800);
3246
3247		// Replace channel with 600b message (priority 10 > 5)
3248		// Must evict lowest priority non-channel statement (priority 2) to fit
3249		let s_ch_big = statement(3, 10, Some(1), 600);
3250		let h_ch_big = s_ch_big.hash();
3251		assert_eq!(store.submit(s_ch_big, source), SubmitResult::New);
3252
3253		{
3254			let index = store.submit_index.read();
3255			assert_eq!(index.entries.len(), 2);
3256			assert!(!index.entries.contains_key(&h_ch), "Old channel message replaced");
3257			assert!(!index.entries.contains_key(&h_low), "Priority 2 evicted to fit size");
3258			assert!(index.entries.contains_key(&h_mid), "Priority 3 should remain");
3259			assert!(index.entries.contains_key(&h_ch_big), "New channel message added");
3260			assert_eq!(index.total_size, 900); // 300 (mid) + 600 (new channel)
3261		}
3262	}
3263
3264	#[test]
3265	fn subscription_reconnect_receives_current_state() {
3266		use crate::StatementStoreSubscriptionApi;
3267		use sp_statement_store::OptimizedTopicFilter;
3268
3269		let (store, _temp) = test_store();
3270		let source = StatementSource::Local;
3271
3272		// Submit 3 statements
3273		for i in 0..3u8 {
3274			let res = store.submit(signed_statement(i), source);
3275			assert_eq!(res, SubmitResult::New);
3276		}
3277
3278		// First subscribe → should get 3 existing statements
3279		let (existing, sender, stream) =
3280			store.subscribe_statement(OptimizedTopicFilter::Any).unwrap();
3281		assert_eq!(existing.len(), 3, "First subscribe should return 3 existing statements");
3282
3283		// Drop stream
3284		drop(stream);
3285		drop(sender);
3286
3287		// Submit 2 more while disconnected
3288		for i in 3..5u8 {
3289			assert_eq!(store.submit(signed_statement(i), source), SubmitResult::New);
3290		}
3291		let (existing, sender, stream) =
3292			store.subscribe_statement(OptimizedTopicFilter::Any).unwrap();
3293		assert_eq!(existing.len(), 5, "Re-subscribe should return all 5 current statements");
3294
3295		// Drop and remove one statement
3296		drop(stream);
3297		drop(sender);
3298		let hash_to_remove = signed_statement(0).hash();
3299		store.remove(&hash_to_remove).unwrap();
3300
3301		// Re-subscribe → should get 4
3302		let (existing, _sender, _stream) =
3303			store.subscribe_statement(OptimizedTopicFilter::Any).unwrap();
3304		assert_eq!(existing.len(), 4, "Re-subscribe after removal should return 4 statements");
3305	}
3306
3307	#[test]
3308	fn subscription_reconnect_with_topic_filter() {
3309		use crate::StatementStoreSubscriptionApi;
3310		use sp_statement_store::OptimizedTopicFilter;
3311
3312		let (store, _temp) = test_store();
3313		let source = StatementSource::Local;
3314		let topic_a = topic(1);
3315		let topic_b = topic(2);
3316
3317		// s1: topic A only
3318		let s1 = signed_statement_with_topics(1, &[topic_a], None);
3319		// s2: topic B only
3320		let s2 = signed_statement_with_topics(2, &[topic_b], None);
3321		// s3: topics A + B
3322		let s3 = signed_statement_with_topics(3, &[topic_a, topic_b], None);
3323
3324		assert_eq!(store.submit(s1, source), SubmitResult::New);
3325		assert_eq!(store.submit(s2, source), SubmitResult::New);
3326		assert_eq!(store.submit(s3, source), SubmitResult::New);
3327
3328		// Subscribe with MatchAll([A]) → s1, s3
3329		let filter_a = OptimizedTopicFilter::MatchAll(std::collections::HashSet::from([topic_a]));
3330		let (existing, sender, stream) = store.subscribe_statement(filter_a.clone()).unwrap();
3331		assert_eq!(existing.len(), 2, "MatchAll([A]) should match s1 and s3");
3332
3333		// Drop and add s4 with topic A
3334		drop(sender);
3335		drop(stream);
3336		let s4 = signed_statement_with_topics(4, &[topic_a], None);
3337		assert_eq!(store.submit(s4, source), SubmitResult::New);
3338		// Re-subscribe with same filter → s1, s3, s4
3339		let (existing, sender, stream) = store.subscribe_statement(filter_a).unwrap();
3340		assert_eq!(existing.len(), 3, "Re-subscribe MatchAll([A]) should return s1, s3, s4");
3341
3342		// Drop and re-subscribe with different filter MatchAll([B]) → s2, s3
3343		drop(sender);
3344		drop(stream);
3345		let filter_b = OptimizedTopicFilter::MatchAll(std::collections::HashSet::from([topic_b]));
3346		let (existing, _sender, _stream) = store.subscribe_statement(filter_b).unwrap();
3347		assert_eq!(existing.len(), 2, "Re-subscribe MatchAll([B]) should return s2 and s3");
3348	}
3349}