1#![warn(missing_docs)]
48#![warn(unused_extern_crates)]
49
50mod metrics;
51mod subscription;
52
53#[cfg(feature = "test-helpers")]
54pub mod subxt_client;
55#[cfg(feature = "test-helpers")]
56pub mod test_utils;
57
58use crate::subscription::{SubscriptionStatementsStream, SubscriptionsHandle};
59use futures::FutureExt;
60use metrics::MetricsLink as PrometheusMetrics;
61use parking_lot::{lock_api::RwLockUpgradableReadGuard, RwLock};
62use prometheus_endpoint::Registry as PrometheusRegistry;
63use sc_client_api::{backend::StorageProvider, Backend, StorageKey};
64use sc_keystore::LocalKeystore;
65use sp_blockchain::HeaderBackend;
66use sp_core::{crypto::UncheckedFrom, hexdisplay::HexDisplay, traits::SpawnNamed, Decode, Encode};
67use sp_runtime::traits::Block as BlockT;
68use sp_statement_store::{
69 runtime_api::{StatementSource, StatementStoreExt},
70 AccountId, BlockHash, Channel, DecryptionKey, FilterDecision, Hash, InvalidReason,
71 OptimizedTopicFilter, RejectionReason, Result, SignatureVerificationResult, Statement,
72 StatementAllowance, StatementEvent, SubmitResult, Topic,
73};
74pub use sp_statement_store::{Error, StatementStore, MAX_TOPICS};
75use std::{
76 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
77 sync::Arc,
78 time::{Duration, Instant},
79};
80pub use subscription::StatementStoreSubscriptionApi;
81
82const KEY_VERSION: &[u8] = b"version".as_slice();
83const CURRENT_VERSION: u32 = 1;
84
85const LOG_TARGET: &str = "statement-store";
86
87pub const DEFAULT_PURGE_AFTER_SEC: u64 = 2 * 24 * 60 * 60; pub const DEFAULT_MAX_TOTAL_STATEMENTS: usize = 4 * 1024 * 1024; pub const DEFAULT_MAX_TOTAL_SIZE: usize = 2 * 1024 * 1024 * 1024; pub const MAX_STATEMENT_SIZE: usize =
97 sc_network_statement::config::MAX_STATEMENT_NOTIFICATION_SIZE as usize - 1;
98
99const MAX_EXPIRY_STATEMENTS_PER_ITERATION: usize = 10_000;
101const MAX_EXPIRY_ACCOUNTS_PER_ITERATION: usize = 10_000;
103const MAX_EXPIRY_TIME_PER_ITERATION: Duration = Duration::from_millis(100);
105
106const NUM_FILTER_WORKERS: usize = 1;
108
109const MAINTENANCE_PERIOD: std::time::Duration = std::time::Duration::from_secs(29);
110
111enum AllowanceBlock {
113 Best,
115 Finalized,
117}
118
119const ENFORCE_LIMITS_PERIOD: std::time::Duration = std::time::Duration::from_secs(31);
123
124mod col {
125 pub const META: u8 = 0;
126 pub const STATEMENTS: u8 = 1;
127 pub const EXPIRED: u8 = 2;
128
129 pub const COUNT: u8 = 3;
130}
131
132#[derive(Eq, PartialEq, Debug, Ord, PartialOrd, Clone, Copy)]
133struct Expiry(u64);
134
135impl Expiry {
136 fn get_expiration_timestamp_secs(self) -> u64 {
138 self.0 >> 32
139 }
140}
141
142#[derive(PartialEq, Eq)]
143struct PriorityKey {
144 hash: Hash,
145 expiry: Expiry,
146}
147
148impl PartialOrd for PriorityKey {
149 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
150 Some(self.cmp(other))
151 }
152}
153
154impl Ord for PriorityKey {
155 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
156 self.expiry.cmp(&other.expiry).then_with(|| self.hash.cmp(&other.hash))
157 }
158}
159
160#[derive(PartialEq, Eq)]
161struct ChannelEntry {
162 hash: Hash,
163 expiry: Expiry,
164}
165
166#[derive(Default)]
167struct StatementsForAccount {
168 by_priority: BTreeMap<PriorityKey, (Option<Channel>, usize)>,
170 channels: HashMap<Channel, ChannelEntry>,
172 data_size: usize,
174}
175
176impl StatementsForAccount {
177 fn expired_by_iter(
179 &self,
180 current_time: u64,
181 ) -> impl Iterator<Item = (&PriorityKey, &(Option<Channel>, usize))> {
182 let range = PriorityKey { hash: Hash::default(), expiry: Expiry(0) }..PriorityKey {
183 hash: Hash::default(),
184 expiry: Expiry(current_time << 32),
185 };
186 self.by_priority.range(range)
187 }
188}
189
190pub const DEFAULT_NETWORK_WORKERS: usize = 1;
192
193pub use sc_network_statement::config::DEFAULT_STATEMENTS_PER_SECOND as DEFAULT_RATE_LIMIT;
195
196#[derive(Debug, Clone, Copy)]
198pub struct Config {
199 pub max_total_statements: usize,
202 pub max_total_size: usize,
205 pub purge_after_sec: u64,
207 pub network_workers: usize,
209 pub rate_limit: u32,
211}
212
213impl Config {
214 pub fn validate(&self) -> Result<()> {
216 if self.max_total_statements == 0 {
217 return Err(Error::InvalidConfig(
218 "max_total_statements must be greater than zero".into(),
219 ));
220 }
221 if self.max_total_size == 0 {
222 return Err(Error::InvalidConfig("max_total_size must be greater than zero".into()));
223 }
224 if self.network_workers == 0 {
225 return Err(Error::InvalidConfig("network_workers must be greater than zero".into()));
226 }
227 Ok(())
228 }
229}
230
231impl Default for Config {
232 fn default() -> Self {
233 Config {
234 max_total_statements: DEFAULT_MAX_TOTAL_STATEMENTS,
235 max_total_size: DEFAULT_MAX_TOTAL_SIZE,
236 purge_after_sec: DEFAULT_PURGE_AFTER_SEC,
237 network_workers: DEFAULT_NETWORK_WORKERS,
238 rate_limit: DEFAULT_RATE_LIMIT,
239 }
240 }
241}
242
243#[derive(Default)]
245struct EvictedIndex {
246 hashes: HashSet<Hash>,
247 queue: BTreeSet<(u64, Hash)>,
248 pending_cleanup: Vec<Hash>,
249}
250
251impl EvictedIndex {
252 fn insert(&mut self, hash: Hash, purge_at: u64) {
253 if self.hashes.len() >= DEFAULT_MAX_TOTAL_STATEMENTS {
254 if let Some(&key) = self.queue.iter().next() {
255 self.queue.remove(&key);
256 self.hashes.remove(&key.1);
257 self.pending_cleanup.push(key.1);
258 }
259 }
260 self.hashes.insert(hash);
261 self.queue.insert((purge_at, hash));
262 }
263
264 fn contains(&self, hash: &Hash) -> bool {
265 self.hashes.contains(hash)
266 }
267
268 fn len(&self) -> usize {
269 self.hashes.len()
270 }
271
272 fn drain_due(&mut self, current_time: u64) -> Vec<Hash> {
275 let cutoff = (current_time.saturating_add(1), Hash::default());
276 let to_keep = self.queue.split_off(&cutoff);
277 let due = std::mem::replace(&mut self.queue, to_keep);
278 let mut result: Vec<Hash> = std::mem::take(&mut self.pending_cleanup);
279 result.extend(due.into_iter().map(|(_, hash)| {
280 self.hashes.remove(&hash);
281 hash
282 }));
283 result
284 }
285}
286
287#[derive(Default)]
289struct QueryIndex {
290 by_topic: HashMap<Topic, HashSet<Hash>>,
291 by_dec_key: HashMap<Option<DecryptionKey>, HashSet<Hash>>,
292 topics_and_keys: HashMap<Hash, ([Option<Topic>; MAX_TOPICS], Option<DecryptionKey>)>,
293 recent: HashSet<Hash>,
294}
295
296#[derive(Default)]
298struct SubmitIndex {
299 entries: HashMap<Hash, (AccountId, Expiry, usize)>,
300 evicted: EvictedIndex,
301 accounts: HashMap<AccountId, StatementsForAccount>,
302 accounts_to_check_for_expiry_stmts: Vec<AccountId>,
303 config: Config,
304 total_size: usize,
305}
306
307struct ClientWrapper<Block, Client, BE> {
308 client: Arc<Client>,
309 _block: std::marker::PhantomData<Block>,
310 _backend: std::marker::PhantomData<BE>,
311}
312
313impl<Block, Client, BE> ClientWrapper<Block, Client, BE>
314where
315 Block: BlockT,
316 Block::Hash: From<BlockHash>,
317 BE: Backend<Block> + 'static,
318 Client: HeaderBackend<Block> + StorageProvider<Block, BE> + Send + Sync + 'static,
319{
320 fn read_allowance(
321 &self,
322 account_id: &AccountId,
323 allowance_block: AllowanceBlock,
324 ) -> Result<Option<StatementAllowance>> {
325 use sp_statement_store::{statement_allowance_key, StatementAllowance};
326
327 let block_hash = match allowance_block {
328 AllowanceBlock::Best => self.client.info().best_hash,
329 AllowanceBlock::Finalized => self.client.info().finalized_hash,
330 };
331 let key = statement_allowance_key(account_id);
332 let storage_key = StorageKey(key);
333 self.client
334 .storage(block_hash, &storage_key)
335 .map_err(|e| Error::Storage(format!("Failed to read allowance: {:?}", e)))?
336 .map(|value| {
337 StatementAllowance::decode(&mut &value.0[..])
338 .map_err(|e| Error::Decode(format!("Failed to decode allowance: {:?}", e)))
339 })
340 .transpose()
341 }
342}
343
344pub struct Store {
346 db: parity_db::Db,
347 submit_index: RwLock<SubmitIndex>,
348 query_index: RwLock<QueryIndex>,
349 read_allowance_fn:
350 Box<dyn Fn(&AccountId, AllowanceBlock) -> Result<Option<StatementAllowance>> + Send + Sync>,
351 subscription_manager: SubscriptionsHandle,
352 keystore: Arc<LocalKeystore>,
353 time_override: Option<u64>,
355 metrics: PrometheusMetrics,
356}
357
358enum IndexQuery {
359 Unknown,
360 Exists,
361 Expired,
362}
363
364impl QueryIndex {
365 fn insert(&mut self, hash: Hash, statement: &Statement) {
366 let mut all_topics = [None; MAX_TOPICS];
367 let mut nt = 0;
368 while let Some(t) = statement.topic(nt) {
369 self.by_topic.entry(t).or_default().insert(hash);
370 all_topics[nt] = Some(t);
371 nt += 1;
372 }
373 let key = statement.decryption_key();
374 self.by_dec_key.entry(key).or_default().insert(hash);
375 self.topics_and_keys.insert(hash, (all_topics, key));
376 }
377
378 fn take_recent(&mut self) -> HashSet<Hash> {
379 std::mem::take(&mut self.recent)
380 }
381
382 fn remove(&mut self, hash: &Hash) {
383 let _ = self.recent.remove(hash);
384 if let Some((topics, key)) = self.topics_and_keys.remove(hash) {
385 for t in topics.into_iter().flatten() {
386 if let std::collections::hash_map::Entry::Occupied(mut set) = self.by_topic.entry(t)
387 {
388 set.get_mut().remove(hash);
389 if set.get().is_empty() {
390 set.remove_entry();
391 }
392 }
393 }
394 if let std::collections::hash_map::Entry::Occupied(mut set) = self.by_dec_key.entry(key)
395 {
396 set.get_mut().remove(hash);
397 if set.get().is_empty() {
398 set.remove_entry();
399 }
400 }
401 }
402 }
403
404 fn iterate_with(
405 &self,
406 key: Option<DecryptionKey>,
407 topic: &OptimizedTopicFilter,
408 f: impl FnMut(&Hash) -> Result<()>,
409 ) -> Result<()> {
410 match topic {
411 OptimizedTopicFilter::Any => self.iterate_with_any(key, f),
412 OptimizedTopicFilter::MatchAll(topics) => {
413 self.iterate_with_match_all(key, topics.iter(), f)
414 },
415 OptimizedTopicFilter::MatchAny(topics) => {
416 self.iterate_with_match_any(key, topics.iter(), f)
417 },
418 }
419 }
420
421 fn iterate_with_match_any<'a>(
422 &self,
423 key: Option<DecryptionKey>,
424 match_any_topics: impl ExactSizeIterator<Item = &'a Topic>,
425 mut f: impl FnMut(&Hash) -> Result<()>,
426 ) -> Result<()> {
427 let Some(key_set) = self.by_dec_key.get(&key).filter(|k| !k.is_empty()) else {
428 return Ok(());
429 };
430
431 for t in match_any_topics {
432 let set = self.by_topic.get(t);
433
434 for item in set.iter().flat_map(|set| set.iter()) {
435 if key_set.contains(item) {
436 log::trace!(
437 target: LOG_TARGET,
438 "Iterating by topic/key: statement {:?}",
439 HexDisplay::from(item)
440 );
441 f(item)?
442 }
443 }
444 }
445 Ok(())
446 }
447
448 fn iterate_with_any(
449 &self,
450 key: Option<DecryptionKey>,
451 mut f: impl FnMut(&Hash) -> Result<()>,
452 ) -> Result<()> {
453 let key_set = self.by_dec_key.get(&key);
454 if key_set.map_or(true, |s| s.is_empty()) {
455 return Ok(());
457 }
458
459 for item in key_set.map(|hashes| hashes.iter()).into_iter().flatten() {
460 f(item)?
461 }
462 Ok(())
463 }
464
465 fn iterate_with_match_all<'a>(
466 &self,
467 key: Option<DecryptionKey>,
468 match_all_topics: impl ExactSizeIterator<Item = &'a Topic>,
469 mut f: impl FnMut(&Hash) -> Result<()>,
470 ) -> Result<()> {
471 let empty = HashSet::new();
472 let mut sets: [&HashSet<Hash>; MAX_TOPICS + 1] = [∅ MAX_TOPICS + 1];
473 let num_topics = match_all_topics.len();
474 if num_topics > MAX_TOPICS {
475 return Ok(());
476 }
477 let key_set = self.by_dec_key.get(&key);
478 if key_set.map_or(true, |s| s.is_empty()) {
479 return Ok(());
481 }
482 sets[0] = key_set.expect("Function returns if key_set is None");
483 for (i, t) in match_all_topics.enumerate() {
484 let set = self.by_topic.get(t);
485 if set.map_or(0, |s| s.len()) == 0 {
486 return Ok(());
488 }
489 sets[i + 1] = set.expect("Function returns if set is None");
490 }
491 let sets = &mut sets[0..num_topics + 1];
492 sets.sort_by_key(|s| s.len());
494 for item in sets[0] {
495 if sets[1..].iter().all(|set| set.contains(item)) {
496 log::trace!(
497 target: LOG_TARGET,
498 "Iterating by topic/key: statement {:?}",
499 HexDisplay::from(item)
500 );
501 f(item)?
502 }
503 }
504 Ok(())
505 }
506}
507
508impl SubmitIndex {
509 fn new(config: Config) -> SubmitIndex {
510 SubmitIndex { config, ..Default::default() }
511 }
512
513 fn insert_new(&mut self, hash: Hash, account: AccountId, statement: &Statement) {
514 let expiry = Expiry(statement.expiry());
515 self.entries.insert(hash, (account, expiry, statement.data_len()));
516 self.total_size += statement.data_len();
517 let account_info = self.accounts.entry(account).or_default();
518 account_info.data_size += statement.data_len();
519 if let Some(channel) = statement.channel() {
520 account_info.channels.insert(channel, ChannelEntry { hash, expiry });
521 }
522 account_info
523 .by_priority
524 .insert(PriorityKey { hash, expiry }, (statement.channel(), statement.data_len()));
525 }
526
527 fn query(&self, hash: &Hash) -> IndexQuery {
528 if self.entries.contains_key(hash) {
529 return IndexQuery::Exists;
530 }
531 if self.evicted.contains(hash) {
532 return IndexQuery::Expired;
533 }
534 IndexQuery::Unknown
535 }
536
537 fn insert_expired(&mut self, hash: Hash, timestamp: u64) {
538 let purge_at = timestamp.saturating_add(self.config.purge_after_sec);
539 self.evicted.insert(hash, purge_at);
540 }
541
542 fn maintain(&mut self, current_time: u64) -> Vec<Hash> {
543 self.evicted.drain_due(current_time)
544 }
545
546 fn make_expired(&mut self, hash: &Hash, current_time: u64) -> bool {
547 if let Some((account, expiry, len)) = self.entries.remove(hash) {
548 self.total_size -= len;
549 if current_time < expiry.get_expiration_timestamp_secs() {
550 let purge_at = expiry
551 .get_expiration_timestamp_secs()
552 .min(current_time.saturating_add(self.config.purge_after_sec));
553 self.evicted.insert(*hash, purge_at);
554 }
555 if let std::collections::hash_map::Entry::Occupied(mut account_rec) =
556 self.accounts.entry(account)
557 {
558 let key = PriorityKey { hash: *hash, expiry };
559 if let Some((channel, len)) = account_rec.get_mut().by_priority.remove(&key) {
560 account_rec.get_mut().data_size -= len;
561 if let Some(channel) = channel {
562 account_rec.get_mut().channels.remove(&channel);
563 }
564 }
565 if account_rec.get().by_priority.is_empty() {
566 account_rec.remove_entry();
567 }
568 }
569 log::trace!(target: LOG_TARGET, "Expired statement {:?}", HexDisplay::from(hash));
570 true
571 } else {
572 false
573 }
574 }
575
576 fn insert(
577 &mut self,
578 hash: Hash,
579 statement: &Statement,
580 account: &AccountId,
581 validation: &StatementAllowance,
582 current_time: u64,
583 ) -> std::result::Result<HashSet<Hash>, RejectionReason> {
584 let statement_len = statement.data_len();
585 if statement_len > validation.max_size as usize {
586 log::debug!(
587 target: LOG_TARGET,
588 "Ignored oversize message: {:?} ({} bytes)",
589 HexDisplay::from(&hash),
590 statement_len,
591 );
592 return Err(RejectionReason::DataTooLarge {
593 submitted_size: statement_len,
594 available_size: validation.max_size as usize,
595 });
596 }
597
598 let mut evicted = HashSet::new();
599 let mut would_free_size = 0;
600 let expiry = Expiry(statement.expiry());
601 let (max_size, max_count) = (validation.max_size as usize, validation.max_count as usize);
602 if let Some(account_rec) = self.accounts.get(account) {
606 if let Some(channel) = statement.channel() {
607 if let Some(channel_record) = account_rec.channels.get(&channel) {
608 if expiry <= channel_record.expiry {
609 log::debug!(
611 target: LOG_TARGET,
612 "Ignored lower priority channel message: {:?} {:?} <= {:?}",
613 HexDisplay::from(&hash),
614 expiry,
615 channel_record.expiry,
616 );
617 return Err(RejectionReason::ChannelPriorityTooLow {
618 submitted_expiry: expiry.0,
619 min_expiry: channel_record.expiry.0,
620 });
621 } else {
622 log::debug!(
625 target: LOG_TARGET,
626 "Replacing higher priority channel message: {:?} ({:?}) > {:?} ({:?})",
627 HexDisplay::from(&hash),
628 expiry,
629 HexDisplay::from(&channel_record.hash),
630 channel_record.expiry,
631 );
632 let key = PriorityKey {
633 hash: channel_record.hash,
634 expiry: channel_record.expiry,
635 };
636 if let Some((_channel, len)) = account_rec.by_priority.get(&key) {
637 would_free_size += *len;
638 evicted.insert(channel_record.hash);
639 }
640 }
641 }
642 }
643 for (entry, (_, len)) in account_rec.by_priority.iter() {
645 if (account_rec.data_size - would_free_size + statement_len <= max_size) &&
646 account_rec.by_priority.len() + 1 - evicted.len() <= max_count
647 {
648 break;
650 }
651 if evicted.contains(&entry.hash) {
652 continue;
654 }
655 if entry.expiry >= expiry {
656 log::debug!(
657 target: LOG_TARGET,
658 "Ignored message due to constraints {:?} {:?} < {:?}",
659 HexDisplay::from(&hash),
660 expiry,
661 entry.expiry,
662 );
663 return Err(RejectionReason::AccountFull {
664 submitted_expiry: expiry.0,
665 min_expiry: entry.expiry.0,
666 });
667 }
668 evicted.insert(entry.hash);
669 would_free_size += len;
670 }
671 }
672 if !((self.total_size - would_free_size + statement_len <= self.config.max_total_size) &&
674 self.entries.len() + 1 - evicted.len() <= self.config.max_total_statements)
675 {
676 log::debug!(
677 target: LOG_TARGET,
678 "Ignored statement {} because the store is full (size={}, count={})",
679 HexDisplay::from(&hash),
680 self.total_size,
681 self.entries.len(),
682 );
683 return Err(RejectionReason::StoreFull);
684 }
685
686 for h in &evicted {
687 self.make_expired(h, current_time);
688 }
689 self.insert_new(hash, *account, statement);
690 Ok(evicted)
691 }
692}
693
694impl Store {
695 pub fn new_shared<Block, Client, BE>(
698 path: &std::path::Path,
699 config: Config,
700 client: Arc<Client>,
701 keystore: Arc<LocalKeystore>,
702 prometheus: Option<&PrometheusRegistry>,
703 task_spawner: Box<dyn SpawnNamed>,
704 ) -> Result<Arc<Store>>
705 where
706 Block: BlockT,
707 Block::Hash: From<BlockHash>,
708 BE: Backend<Block> + 'static,
709 Client: HeaderBackend<Block> + StorageProvider<Block, BE> + Send + Sync + 'static,
710 {
711 let store =
712 Arc::new(Self::new(path, config, client, keystore, prometheus, task_spawner.clone())?);
713
714 let worker_store = store.clone();
716 task_spawner.spawn(
717 "statement-store-maintenance",
718 Some("statement-store"),
719 Box::pin(async move {
720 let mut maintenance_interval = tokio::time::interval(MAINTENANCE_PERIOD);
721 let mut enforce_limits_interval = tokio::time::interval(ENFORCE_LIMITS_PERIOD);
722 loop {
723 futures::select! {
724 _ = maintenance_interval.tick().fuse() => {worker_store.maintain();}
725 _ = enforce_limits_interval.tick().fuse() => {worker_store.enforce_limits();}
726 }
727 }
728 }),
729 );
730
731 Ok(store)
732 }
733
734 #[doc(hidden)]
737 pub fn new<Block, Client, BE>(
738 path: &std::path::Path,
739 config: Config,
740 client: Arc<Client>,
741 keystore: Arc<LocalKeystore>,
742 prometheus: Option<&PrometheusRegistry>,
743 task_spawner: Box<dyn SpawnNamed>,
744 ) -> Result<Store>
745 where
746 Block: BlockT,
747 Block::Hash: From<BlockHash>,
748 BE: Backend<Block> + 'static,
749 Client: HeaderBackend<Block> + StorageProvider<Block, BE> + Send + Sync + 'static,
750 {
751 config.validate()?;
752
753 let mut path: std::path::PathBuf = path.into();
754 path.push("statements");
755
756 let mut db_config = parity_db::Options::with_columns(&path, col::COUNT);
757
758 let statement_col = &mut db_config.columns[col::STATEMENTS as usize];
759 statement_col.ref_counted = false;
760 statement_col.preimage = true;
761 statement_col.uniform = true;
762 let db = parity_db::Db::open_or_create(&db_config).map_err(|e| Error::Db(e.to_string()))?;
763 match db.get(col::META, &KEY_VERSION).map_err(|e| Error::Db(e.to_string()))? {
764 Some(version) => {
765 let version = u32::from_le_bytes(
766 version
767 .try_into()
768 .map_err(|_| Error::Db("Error reading database version".into()))?,
769 );
770 if version != CURRENT_VERSION {
771 return Err(Error::Db(format!("Unsupported database version: {version}")));
772 }
773 },
774 None => {
775 db.commit([(
776 col::META,
777 KEY_VERSION.to_vec(),
778 Some(CURRENT_VERSION.to_le_bytes().to_vec()),
779 )])
780 .map_err(|e| Error::Db(e.to_string()))?;
781 },
782 }
783
784 let storage_reader =
785 ClientWrapper { client, _block: Default::default(), _backend: Default::default() };
786 let read_allowance_fn =
787 Box::new(move |account_id: &AccountId, allowance_block: AllowanceBlock| {
788 storage_reader.read_allowance(account_id, allowance_block)
789 });
790
791 let store = Store {
792 db,
793 submit_index: RwLock::new(SubmitIndex::new(config)),
794 query_index: RwLock::new(QueryIndex::default()),
795 read_allowance_fn,
796 keystore,
797 time_override: None,
798 metrics: PrometheusMetrics::new(prometheus),
799 subscription_manager: SubscriptionsHandle::new(
800 task_spawner.clone(),
801 NUM_FILTER_WORKERS,
802 ),
803 };
804 store.populate()?;
805 Ok(store)
806 }
807
808 fn populate(&self) -> Result<()> {
813 {
816 let mut submit_index = self.submit_index.write();
817 let mut query_index = self.query_index.write();
818 self.db
819 .iter_column_while(col::STATEMENTS, |item| {
820 let statement = item.value;
821 if let Ok(statement) = Statement::decode(&mut statement.as_slice()) {
822 let hash = statement.hash();
823 log::trace!(
824 target: LOG_TARGET,
825 "Statement loaded {:?}",
826 HexDisplay::from(&hash)
827 );
828 if let Some(account_id) = statement.account_id() {
829 submit_index.insert_new(hash, account_id, &statement);
830 query_index.insert(hash, &statement);
831 } else {
832 log::debug!(
833 target: LOG_TARGET,
834 "Error decoding statement loaded from the DB: {:?}",
835 HexDisplay::from(&hash)
836 );
837 }
838 }
839 true
840 })
841 .map_err(|e| Error::Db(e.to_string()))?;
842 self.db
843 .iter_column_while(col::EXPIRED, |item| {
844 let expired_info = item.value;
845 if let Ok((hash, timestamp)) =
846 <(Hash, u64)>::decode(&mut expired_info.as_slice())
847 {
848 log::trace!(
849 target: LOG_TARGET,
850 "Statement loaded (expired): {:?}",
851 HexDisplay::from(&hash)
852 );
853 submit_index.insert_expired(hash, timestamp);
854 }
855 true
856 })
857 .map_err(|e| Error::Db(e.to_string()))?;
858 }
859
860 self.maintain();
861 Ok(())
862 }
863
864 fn collect_statements_locked<R>(
865 &self,
866 key: Option<DecryptionKey>,
867 topic_filter: &OptimizedTopicFilter,
868 query_index: &QueryIndex,
869 result: &mut Vec<R>,
870 mut f: impl FnMut(Statement) -> Option<R>,
871 ) -> Result<()> {
872 query_index.iterate_with(key, topic_filter, |hash| {
873 match self.db.get(col::STATEMENTS, hash).map_err(|e| Error::Db(e.to_string()))? {
874 Some(entry) => {
875 if let Ok(statement) = Statement::decode(&mut entry.as_slice()) {
876 if let Some(data) = f(statement) {
877 result.push(data);
878 }
879 } else {
880 log::warn!(
882 target: LOG_TARGET,
883 "Corrupt statement {:?}",
884 HexDisplay::from(hash)
885 );
886 }
887 },
888 None => {
889 log::debug!(
891 target: LOG_TARGET,
892 "Missing statement {:?}",
893 HexDisplay::from(hash)
894 );
895 },
896 }
897 Ok(())
898 })?;
899 Ok(())
900 }
901
902 fn collect_statements<R>(
903 &self,
904 key: Option<DecryptionKey>,
905 topic_filter: &OptimizedTopicFilter,
906 f: impl FnMut(Statement) -> Option<R>,
907 ) -> Result<Vec<R>> {
908 let mut result = Vec::new();
909 let query_index = self.query_index.read();
910 self.collect_statements_locked(key, topic_filter, &query_index, &mut result, f)?;
911 Ok(result)
912 }
913
914 fn collect_evictions(
916 &self,
917 account: &AccountId,
918 account_rec: &StatementsForAccount,
919 current_time: u64,
920 ) -> Vec<Hash> {
921 let mut to_evict = Vec::new();
922 let mut expired_count = 0usize;
923 let mut expired_size = 0usize;
924 for (key, (_, len)) in account_rec.expired_by_iter(current_time) {
925 to_evict.push(key.hash);
926 expired_count += 1;
927 expired_size += len;
928 }
929
930 let allowance = match (self.read_allowance_fn)(account, AllowanceBlock::Finalized) {
933 Ok(Some(allowance)) => allowance,
934 Ok(None) => {
935 log::debug!(
936 target: LOG_TARGET,
937 "No allowance found for account {:?}, treating as zero allowance",
938 HexDisplay::from(account)
939 );
940 StatementAllowance { max_count: 0, max_size: 0 }
941 },
942 Err(e) => {
943 log::error!(target: LOG_TARGET, "Error reading allowance: {:?}", e);
944 return to_evict;
946 },
947 };
948
949 let mut remaining_count = account_rec.by_priority.len() - expired_count;
951 let mut remaining_size = account_rec.data_size - expired_size;
952
953 if remaining_count > allowance.max_count as usize ||
955 remaining_size > allowance.max_size as usize
956 {
957 log::debug!(
958 target: LOG_TARGET,
959 "Account {:?} exceeds allowance: count={}/{}, size={}/{}",
960 HexDisplay::from(account),
961 remaining_count,
962 allowance.max_count,
963 remaining_size,
964 allowance.max_size
965 );
966
967 for (key, (_, len)) in account_rec.by_priority.iter().skip(expired_count) {
969 if remaining_count <= allowance.max_count as usize &&
970 remaining_size <= allowance.max_size as usize
971 {
972 break;
973 }
974 to_evict.push(key.hash);
975 remaining_count -= 1;
976 remaining_size -= len;
977 log::debug!(
978 target: LOG_TARGET,
979 "Evicting statement {:?} due to allowance enforcement",
980 HexDisplay::from(&key.hash)
981 );
982 }
983 }
984
985 to_evict
986 }
987
988 fn enforce_limits(&self) {
1005 let _start_check_expiration_timer = self.metrics.start_check_expiration_timer();
1006 let current_time = self.timestamp();
1007
1008 let (to_evict, num_accounts_checked) = {
1009 let submit_index = self.submit_index.upgradable_read();
1010 if submit_index.accounts_to_check_for_expiry_stmts.is_empty() {
1011 let existing_accounts = submit_index.accounts.keys().cloned().collect::<Vec<_>>();
1012 let mut submit_index = RwLockUpgradableReadGuard::upgrade(submit_index);
1013 submit_index.accounts_to_check_for_expiry_stmts = existing_accounts;
1014 return;
1015 }
1016
1017 let mut to_evict = Vec::new();
1018 let mut num_accounts_checked = 0;
1019 let start = Instant::now();
1020
1021 for account in submit_index.accounts_to_check_for_expiry_stmts.iter().rev() {
1022 num_accounts_checked += 1;
1023 if let Some(account_rec) = submit_index.accounts.get(account) {
1024 to_evict.extend(self.collect_evictions(account, account_rec, current_time));
1025 }
1026
1027 if to_evict.len() >= MAX_EXPIRY_STATEMENTS_PER_ITERATION ||
1028 num_accounts_checked >= MAX_EXPIRY_ACCOUNTS_PER_ITERATION ||
1029 start.elapsed() >= MAX_EXPIRY_TIME_PER_ITERATION
1030 {
1031 break;
1032 }
1033 }
1034
1035 (to_evict, num_accounts_checked)
1036 };
1037
1038 let mut expired = 0;
1039
1040 for hash in to_evict {
1041 if let Err(e) = self.remove(&hash) {
1042 log::debug!(
1043 target: LOG_TARGET,
1044 "Error marking statement {:?} as expired: {:?}",
1045 HexDisplay::from(&hash),
1046 e
1047 );
1048 } else {
1049 expired += 1;
1050 log::trace!(
1051 target: LOG_TARGET,
1052 "Marked statement {:?} as expired",
1053 HexDisplay::from(&hash)
1054 );
1055 }
1056 }
1057
1058 let mut submit_index = self.submit_index.write();
1059 let new_len = submit_index
1060 .accounts_to_check_for_expiry_stmts
1061 .len()
1062 .saturating_sub(num_accounts_checked);
1063 submit_index.accounts_to_check_for_expiry_stmts.truncate(new_len);
1064
1065 drop(_start_check_expiration_timer);
1066
1067 self.metrics.report(|metrics| {
1068 metrics.statements_expired_total.inc_by(expired);
1069 });
1070 }
1071
1072 pub fn maintain(&self) {
1074 log::trace!(target: LOG_TARGET, "Started store maintenance");
1075 let (
1076 deleted,
1077 active_count,
1078 expired_count,
1079 total_size,
1080 accounts_count,
1081 capacity_statements,
1082 capacity_bytes,
1083 ): (Vec<_>, usize, usize, usize, usize, usize, usize) = {
1084 let mut submit_index = self.submit_index.write();
1085 let deleted = submit_index.maintain(self.timestamp());
1086 (
1087 deleted,
1088 submit_index.entries.len(),
1089 submit_index.evicted.len(),
1090 submit_index.total_size,
1091 submit_index.accounts.len(),
1092 submit_index.config.max_total_statements,
1093 submit_index.config.max_total_size,
1094 )
1095 };
1096 let deleted: Vec<_> =
1097 deleted.into_iter().map(|hash| (col::EXPIRED, hash.to_vec(), None)).collect();
1098 let deleted_count = deleted.len() as u64;
1099 if let Err(e) = self.db.commit(deleted) {
1100 log::warn!(target: LOG_TARGET, "Error writing to the statement database: {:?}", e);
1101 } else {
1102 self.metrics.report(|metrics| metrics.statements_pruned.inc_by(deleted_count));
1103 }
1104
1105 self.metrics.report(|metrics| {
1106 metrics.statements_total.set(active_count as u64);
1107 metrics.bytes_total.set(total_size as u64);
1108 metrics.accounts_total.set(accounts_count as u64);
1109 metrics.expired_total.set(expired_count as u64);
1110 metrics.capacity_statements.set(capacity_statements as u64);
1111 metrics.capacity_bytes.set(capacity_bytes as u64);
1112 });
1113
1114 log::trace!(
1115 target: LOG_TARGET,
1116 "Completed store maintenance. Purged: {}, Active: {}, Expired: {}",
1117 deleted_count,
1118 active_count,
1119 expired_count
1120 );
1121 }
1122
1123 fn timestamp(&self) -> u64 {
1124 self.time_override.unwrap_or_else(|| {
1125 std::time::SystemTime::now()
1126 .duration_since(std::time::UNIX_EPOCH)
1127 .unwrap_or_default()
1128 .as_secs()
1129 })
1130 }
1131
1132 #[cfg(test)]
1133 fn set_time(&mut self, time: u64) {
1134 self.time_override = Some(time);
1135 }
1136
1137 pub fn as_statement_store_ext(self: Arc<Self>) -> StatementStoreExt {
1139 StatementStoreExt::new(self)
1140 }
1141
1142 fn posted_clear_inner<R>(
1145 &self,
1146 match_all_topics: &[Topic],
1147 dest: [u8; 32],
1148 mut map_f: impl FnMut(Statement, Vec<u8>) -> R,
1150 ) -> Result<Vec<R>> {
1151 self.collect_statements(
1152 Some(dest),
1153 &OptimizedTopicFilter::MatchAll(match_all_topics.iter().cloned().collect()),
1154 |statement| {
1155 if let (Some(key), Some(_)) = (statement.decryption_key(), statement.data()) {
1156 let public: sp_core::ed25519::Public = UncheckedFrom::unchecked_from(key);
1157 let public: sp_statement_store::ed25519::Public = public.into();
1158 match self.keystore.key_pair::<sp_statement_store::ed25519::Pair>(&public) {
1159 Err(e) => {
1160 log::debug!(
1161 target: LOG_TARGET,
1162 "Keystore error: {:?}, for statement {:?}",
1163 e,
1164 HexDisplay::from(&statement.hash())
1165 );
1166 None
1167 },
1168 Ok(None) => {
1169 log::debug!(
1170 target: LOG_TARGET,
1171 "Keystore is missing key for statement {:?}",
1172 HexDisplay::from(&statement.hash())
1173 );
1174 None
1175 },
1176 Ok(Some(pair)) => match statement.decrypt_private(&pair.into_inner()) {
1177 Ok(r) => r.map(|data| map_f(statement, data)),
1178 Err(e) => {
1179 log::debug!(
1180 target: LOG_TARGET,
1181 "Decryption error: {:?}, for statement {:?}",
1182 e,
1183 HexDisplay::from(&statement.hash())
1184 );
1185 None
1186 },
1187 },
1188 }
1189 } else {
1190 None
1191 }
1192 },
1193 )
1194 }
1195}
1196
1197impl StatementStore for Store {
1198 fn statements(&self) -> Result<Vec<(Hash, Statement)>> {
1200 let query_index = self.query_index.read();
1201 let mut result = Vec::with_capacity(query_index.topics_and_keys.len());
1202 for hash in query_index.topics_and_keys.keys().cloned() {
1203 let Some(encoded) =
1204 self.db.get(col::STATEMENTS, &hash).map_err(|e| Error::Db(e.to_string()))?
1205 else {
1206 continue;
1207 };
1208 if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) {
1209 result.push((hash, statement));
1210 }
1211 }
1212 Ok(result)
1213 }
1214
1215 fn take_recent_statements(&self) -> Result<Vec<(Hash, Statement)>> {
1216 let mut query_index = self.query_index.write();
1217 let recent = query_index.take_recent();
1218 let mut result = Vec::with_capacity(recent.len());
1219 for hash in recent {
1220 let Some(encoded) =
1221 self.db.get(col::STATEMENTS, &hash).map_err(|e| Error::Db(e.to_string()))?
1222 else {
1223 continue;
1224 };
1225 if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) {
1226 result.push((hash, statement));
1227 }
1228 }
1229 Ok(result)
1230 }
1231
1232 fn statement(&self, hash: &Hash) -> Result<Option<Statement>> {
1234 Ok(
1235 match self
1236 .db
1237 .get(col::STATEMENTS, hash.as_slice())
1238 .map_err(|e| Error::Db(e.to_string()))?
1239 {
1240 Some(entry) => {
1241 log::trace!(
1242 target: LOG_TARGET,
1243 "Queried statement {:?}",
1244 HexDisplay::from(hash)
1245 );
1246 Some(
1247 Statement::decode(&mut entry.as_slice())
1248 .map_err(|e| Error::Decode(e.to_string()))?,
1249 )
1250 },
1251 None => {
1252 log::trace!(
1253 target: LOG_TARGET,
1254 "Queried missing statement {:?}",
1255 HexDisplay::from(hash)
1256 );
1257 None
1258 },
1259 },
1260 )
1261 }
1262
1263 fn has_statement(&self, hash: &Hash) -> bool {
1264 self.query_index.read().topics_and_keys.contains_key(hash)
1265 }
1266
1267 fn statement_hashes(&self) -> Vec<Hash> {
1268 self.query_index.read().topics_and_keys.keys().cloned().collect()
1269 }
1270
1271 fn statements_by_hashes(
1272 &self,
1273 hashes: &[Hash],
1274 filter: &mut dyn FnMut(&Hash, &[u8], &Statement) -> FilterDecision,
1275 ) -> Result<(Vec<(Hash, Statement)>, usize)> {
1276 let mut result = Vec::new();
1277 let mut processed = 0;
1278 for hash in hashes {
1279 processed += 1;
1280 let Some(encoded) =
1281 self.db.get(col::STATEMENTS, hash).map_err(|e| Error::Db(e.to_string()))?
1282 else {
1283 continue;
1284 };
1285 let Ok(statement) = Statement::decode(&mut encoded.as_slice()) else { continue };
1286 match filter(hash, &encoded, &statement) {
1287 FilterDecision::Skip => {},
1288 FilterDecision::Take => {
1289 result.push((*hash, statement));
1290 },
1291 FilterDecision::Abort => {
1292 processed -= 1;
1294 break;
1295 },
1296 }
1297 }
1298
1299 Ok((result, processed))
1300 }
1301
1302 fn broadcasts(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>> {
1305 self.collect_statements(
1306 None,
1307 &OptimizedTopicFilter::MatchAll(match_all_topics.iter().cloned().collect()),
1308 |statement| statement.into_data(),
1309 )
1310 }
1311
1312 fn posted(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
1316 self.collect_statements(
1317 Some(dest),
1318 &OptimizedTopicFilter::MatchAll(match_all_topics.iter().cloned().collect()),
1319 |statement| statement.into_data(),
1320 )
1321 }
1322
1323 fn posted_clear(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
1326 self.posted_clear_inner(match_all_topics, dest, |_statement, data| data)
1327 }
1328
1329 fn broadcasts_stmt(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>> {
1332 self.collect_statements(
1333 None,
1334 &OptimizedTopicFilter::MatchAll(match_all_topics.iter().cloned().collect()),
1335 |statement| Some(statement.encode()),
1336 )
1337 }
1338
1339 fn posted_stmt(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
1343 self.collect_statements(
1344 Some(dest),
1345 &OptimizedTopicFilter::MatchAll(match_all_topics.iter().cloned().collect()),
1346 |statement| Some(statement.encode()),
1347 )
1348 }
1349
1350 fn posted_clear_stmt(
1353 &self,
1354 match_all_topics: &[Topic],
1355 dest: [u8; 32],
1356 ) -> Result<Vec<Vec<u8>>> {
1357 self.posted_clear_inner(match_all_topics, dest, |statement, data| {
1358 let mut res = Vec::with_capacity(statement.size_hint() + data.len());
1359 statement.encode_to(&mut res);
1360 res.extend_from_slice(&data);
1361 res
1362 })
1363 }
1364
1365 fn submit(&self, statement: Statement, source: StatementSource) -> SubmitResult {
1367 let _histogram_submit_start_timer = self.metrics.start_submit_timer();
1368 let hash = statement.hash();
1369 if self.timestamp() >= statement.get_expiration_timestamp_secs().into() {
1371 log::debug!(
1372 target: LOG_TARGET,
1373 "Statement is already expired: {:?}",
1374 HexDisplay::from(&hash),
1375 );
1376 let reason = InvalidReason::AlreadyExpired;
1377 self.metrics.report(|metrics| {
1378 metrics.validations_invalid.with_label_values(&[reason.label()]).inc();
1379 });
1380 return SubmitResult::Invalid(reason);
1381 }
1382 let encoded_size = statement.encoded_size();
1383 if encoded_size > MAX_STATEMENT_SIZE {
1384 log::debug!(
1385 target: LOG_TARGET,
1386 "Statement is too big for propogation: {:?} ({}/{} bytes)",
1387 HexDisplay::from(&hash),
1388 statement.encoded_size(),
1389 MAX_STATEMENT_SIZE
1390 );
1391 let reason = InvalidReason::EncodingTooLarge {
1392 submitted_size: encoded_size,
1393 max_size: MAX_STATEMENT_SIZE,
1394 };
1395 self.metrics.report(|metrics| {
1396 metrics.validations_invalid.with_label_values(&[reason.label()]).inc();
1397 });
1398 return SubmitResult::Invalid(reason);
1399 }
1400
1401 match self.submit_index.read().query(&hash) {
1402 IndexQuery::Expired => {
1403 if !source.can_be_resubmitted() {
1404 self.metrics.report(|metrics| {
1405 metrics.known_statements.with_label_values(&["known_expired"]).inc();
1406 });
1407 return SubmitResult::KnownExpired;
1408 }
1409 },
1410 IndexQuery::Exists => {
1411 if !source.can_be_resubmitted() {
1412 self.metrics.report(|metrics| {
1413 metrics.known_statements.with_label_values(&["known"]).inc();
1414 });
1415 return SubmitResult::Known;
1416 }
1417 },
1418 IndexQuery::Unknown => {},
1419 }
1420
1421 let Some(account_id) = statement.account_id() else {
1422 log::debug!(
1423 target: LOG_TARGET,
1424 "Statement validation failed: Missing proof ({:?})",
1425 HexDisplay::from(&hash),
1426 );
1427 let reason = InvalidReason::NoProof;
1428 self.metrics.report(|metrics| {
1429 metrics.validations_invalid.with_label_values(&[reason.label()]).inc();
1430 });
1431 return SubmitResult::Invalid(reason);
1432 };
1433
1434 match statement.verify_signature() {
1435 SignatureVerificationResult::Valid(_) => {},
1436 SignatureVerificationResult::Invalid => {
1437 log::debug!(
1438 target: LOG_TARGET,
1439 "Statement validation failed: BadProof, {:?}",
1440 HexDisplay::from(&hash),
1441 );
1442 let reason = InvalidReason::BadProof;
1443 self.metrics.report(|metrics| {
1444 metrics.validations_invalid.with_label_values(&[reason.label()]).inc();
1445 });
1446 return SubmitResult::Invalid(reason);
1447 },
1448 SignatureVerificationResult::NoSignature => {
1449 log::debug!(
1450 target: LOG_TARGET,
1451 "Statement validation failed: NoProof, {:?}",
1452 HexDisplay::from(&hash),
1453 );
1454 let reason = InvalidReason::NoProof;
1455 self.metrics.report(|metrics| {
1456 metrics.validations_invalid.with_label_values(&[reason.label()]).inc();
1457 });
1458 return SubmitResult::Invalid(reason);
1459 },
1460 };
1461
1462 let validation = match (self.read_allowance_fn)(&account_id, AllowanceBlock::Best) {
1469 Ok(Some(allowance)) => allowance,
1470 Ok(None) => {
1471 log::debug!(
1472 target: LOG_TARGET,
1473 "Account {} has no statement allowance set",
1474 HexDisplay::from(&account_id),
1475 );
1476 let reason = RejectionReason::NoAllowance;
1477 self.metrics.report(|metrics| {
1478 metrics.rejections.with_label_values(&[reason.label()]).inc();
1479 });
1480 return SubmitResult::Rejected(reason);
1481 },
1482 Err(e) => {
1483 log::debug!(
1484 target: LOG_TARGET,
1485 "Reading statement allowance for account {} failed",
1486 HexDisplay::from(&account_id),
1487 );
1488 self.metrics.report(|metrics| {
1489 metrics.internal_errors.with_label_values(&["read_allowance"]).inc();
1490 });
1491 return SubmitResult::InternalError(e);
1492 },
1493 };
1494
1495 let current_time = self.timestamp();
1496 let evicted = {
1497 let mut submit_index = self.submit_index.write();
1498
1499 let evicted =
1500 match submit_index.insert(hash, &statement, &account_id, &validation, current_time)
1501 {
1502 Ok(evicted) => evicted,
1503 Err(reason) => {
1504 self.metrics.report(|metrics| {
1505 metrics.rejections.with_label_values(&[reason.label()]).inc();
1506 });
1507 return SubmitResult::Rejected(reason);
1508 },
1509 };
1510
1511 let mut commit = Vec::new();
1512 commit.push((col::STATEMENTS, hash.to_vec(), Some(statement.encode())));
1513 for h in &evicted {
1514 commit.push((col::STATEMENTS, h.to_vec(), None));
1515 if submit_index.evicted.contains(h) {
1516 commit.push((col::EXPIRED, h.to_vec(), Some((h, current_time).encode())));
1517 }
1518 }
1519 if let Err(e) = self.db.commit(commit) {
1520 log::debug!(
1521 target: LOG_TARGET,
1522 "Statement validation failed: database error {}, {:?}",
1523 e,
1524 statement
1525 );
1526 self.metrics.report(|metrics| {
1527 metrics.internal_errors.with_label_values(&["db_commit"]).inc();
1528 });
1529 return SubmitResult::InternalError(Error::Db(e.to_string()));
1530 }
1531 evicted
1532 }; {
1534 let mut query_index = self.query_index.write();
1535 for h in &evicted {
1536 query_index.remove(h);
1537 }
1538 query_index.insert(hash, &statement);
1539 query_index.recent.insert(hash);
1540 self.subscription_manager.notify(statement);
1541 } self.metrics.report(|metrics| metrics.submitted_statements.inc());
1543 log::trace!(target: LOG_TARGET, "Statement submitted: {:?}", HexDisplay::from(&hash));
1544 SubmitResult::New
1545 }
1546
1547 fn remove(&self, hash: &Hash) -> Result<()> {
1549 let current_time = self.timestamp();
1550 let was_expired = {
1551 let mut submit_index = self.submit_index.write();
1552 if submit_index.make_expired(hash, current_time) {
1553 let mut commit = vec![(col::STATEMENTS, hash.to_vec(), None)];
1554 if submit_index.evicted.contains(hash) {
1555 commit.push((col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())));
1556 }
1557 if let Err(e) = self.db.commit(commit) {
1558 log::debug!(
1559 target: LOG_TARGET,
1560 "Error removing statement: database error {}, {:?}",
1561 e,
1562 HexDisplay::from(hash),
1563 );
1564 return Err(Error::Db(e.to_string()));
1565 }
1566 true
1567 } else {
1568 false
1569 }
1570 };
1571 if was_expired {
1572 let mut query_index = self.query_index.write();
1573 query_index.remove(hash);
1574 }
1575 Ok(())
1576 }
1577
1578 fn remove_by(&self, who: [u8; 32]) -> Result<()> {
1580 let evicted = {
1581 let mut submit_index = self.submit_index.write();
1582 let mut evicted = Vec::new();
1583 if let Some(account_rec) = submit_index.accounts.get(&who) {
1584 evicted.extend(account_rec.by_priority.keys().map(|k| k.hash));
1585 }
1586
1587 let current_time = self.timestamp();
1588 let mut commit = Vec::new();
1589 for hash in &evicted {
1590 submit_index.make_expired(hash, current_time);
1591 commit.push((col::STATEMENTS, hash.to_vec(), None));
1592 if submit_index.evicted.contains(hash) {
1593 commit.push((col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())));
1594 }
1595 }
1596 self.db.commit(commit).map_err(|e| {
1597 log::debug!(
1598 target: LOG_TARGET,
1599 "Error removing statement: database error {}, remove by {:?}",
1600 e,
1601 HexDisplay::from(&who),
1602 );
1603
1604 Error::Db(e.to_string())
1605 })?;
1606 evicted
1607 };
1608 if !evicted.is_empty() {
1609 let mut query_index = self.query_index.write();
1610 for hash in &evicted {
1611 query_index.remove(hash);
1612 }
1613 }
1614 Ok(())
1615 }
1616}
1617
1618impl StatementStoreSubscriptionApi for Store {
1619 fn subscribe_statement(
1620 &self,
1621 topic_filter: OptimizedTopicFilter,
1622 ) -> Result<(Vec<Vec<u8>>, async_channel::Sender<StatementEvent>, SubscriptionStatementsStream)>
1623 {
1624 let mut existing_statements = Vec::new();
1627 let query_index = self.query_index.read();
1628 self.collect_statements_locked(
1629 None,
1630 &topic_filter,
1631 &query_index,
1632 &mut existing_statements,
1633 |statement| Some(statement.encode()),
1634 )?;
1635 let (subscription_sender, subscription_stream) =
1636 self.subscription_manager.subscribe(topic_filter);
1637 if existing_statements.is_empty() {
1638 subscription_sender
1639 .send_blocking(StatementEvent::NewStatements {
1640 statements: vec![],
1641 remaining: Some(0),
1642 })
1643 .ok();
1644 }
1645 Ok((existing_statements, subscription_sender, subscription_stream))
1646 }
1647}
1648
1649#[cfg(test)]
1650mod tests {
1651
1652 use crate::{col, Store};
1653 use sc_keystore::Keystore;
1654 use sp_core::{Decode, Encode, Pair};
1655 use sp_statement_store::{
1656 AccountId, Channel, DecryptionKey, InvalidReason, Proof, RejectionReason, Statement,
1657 StatementSource, StatementStore, SubmitResult, Topic,
1658 };
1659
1660 type Extrinsic = sp_runtime::OpaqueExtrinsic;
1661 type Hash = sp_core::H256;
1662 type Hashing = sp_runtime::traits::BlakeTwo256;
1663 type BlockNumber = u64;
1664 type Header = sp_runtime::generic::Header<BlockNumber, Hashing>;
1665 type Block = sp_runtime::generic::Block<Header, Extrinsic>;
1666
1667 const TEST_BEST_BLOCK_HASH: [u8; 32] = [1u8; 32];
1668
1669 const MAX_TEST_ACCOUNT_SEED: u64 = 64;
1674
1675 fn account_seed_table() -> &'static std::collections::BTreeMap<AccountId, u64> {
1680 use std::sync::OnceLock;
1681 static TABLE: OnceLock<std::collections::BTreeMap<AccountId, u64>> = OnceLock::new();
1682 TABLE.get_or_init(|| {
1683 let mut t = std::collections::BTreeMap::new();
1684 for seed in 0..=MAX_TEST_ACCOUNT_SEED {
1685 t.insert(account_keypair(seed).public().0, seed);
1686 }
1687 t
1688 })
1689 }
1690
1691 #[derive(Clone)]
1692 pub(crate) struct TestClient;
1693
1694 pub(crate) type TestBackend = sc_client_api::in_mem::Backend<Block>;
1695
1696 impl sc_client_api::StorageProvider<Block, TestBackend> for TestClient {
1697 fn storage(
1698 &self,
1699 _hash: Hash,
1700 key: &sc_client_api::StorageKey,
1701 ) -> sp_blockchain::Result<Option<sc_client_api::StorageData>> {
1702 use sp_statement_store::StatementAllowance;
1703
1704 assert_eq!(&key.0[0..21], b":statement_allowance:" as &[u8],);
1705
1706 let account_bytes: AccountId = key.0[21..53].try_into().unwrap();
1709 let seed = account_seed_table().get(&account_bytes).copied();
1710 let allowance = match seed {
1711 Some(0) => return Ok(None),
1713 Some(1) => StatementAllowance::new(1, 1000),
1714 Some(2) => StatementAllowance::new(2, 1000),
1715 Some(3) => StatementAllowance::new(3, 1000),
1716 Some(4) => StatementAllowance::new(4, 1000),
1717 Some(42) => StatementAllowance::new(42, (42 * crate::MAX_STATEMENT_SIZE) as u32),
1718 Some(_) | None => StatementAllowance::new(100, 1000),
1719 };
1720 Ok(Some(sc_client_api::StorageData(allowance.encode())))
1721 }
1722
1723 fn storage_hash(
1724 &self,
1725 _hash: Hash,
1726 _key: &sc_client_api::StorageKey,
1727 ) -> sp_blockchain::Result<Option<Hash>> {
1728 unimplemented!()
1729 }
1730
1731 fn storage_keys(
1732 &self,
1733 _hash: Hash,
1734 _prefix: Option<&sc_client_api::StorageKey>,
1735 _start_key: Option<&sc_client_api::StorageKey>,
1736 ) -> sp_blockchain::Result<
1737 sc_client_api::backend::KeysIter<
1738 <TestBackend as sc_client_api::Backend<Block>>::State,
1739 Block,
1740 >,
1741 > {
1742 unimplemented!()
1743 }
1744
1745 fn storage_pairs(
1746 &self,
1747 _hash: Hash,
1748 _prefix: Option<&sc_client_api::StorageKey>,
1749 _start_key: Option<&sc_client_api::StorageKey>,
1750 ) -> sp_blockchain::Result<
1751 sc_client_api::backend::PairsIter<
1752 <TestBackend as sc_client_api::Backend<Block>>::State,
1753 Block,
1754 >,
1755 > {
1756 unimplemented!()
1757 }
1758
1759 fn child_storage(
1760 &self,
1761 _hash: Hash,
1762 _child_info: &sc_client_api::ChildInfo,
1763 _key: &sc_client_api::StorageKey,
1764 ) -> sp_blockchain::Result<Option<sc_client_api::StorageData>> {
1765 unimplemented!()
1766 }
1767
1768 fn child_storage_keys(
1769 &self,
1770 _hash: Hash,
1771 _child_info: sc_client_api::ChildInfo,
1772 _prefix: Option<&sc_client_api::StorageKey>,
1773 _start_key: Option<&sc_client_api::StorageKey>,
1774 ) -> sp_blockchain::Result<
1775 sc_client_api::backend::KeysIter<
1776 <TestBackend as sc_client_api::Backend<Block>>::State,
1777 Block,
1778 >,
1779 > {
1780 unimplemented!()
1781 }
1782
1783 fn child_storage_hash(
1784 &self,
1785 _hash: Hash,
1786 _child_info: &sc_client_api::ChildInfo,
1787 _key: &sc_client_api::StorageKey,
1788 ) -> sp_blockchain::Result<Option<Hash>> {
1789 unimplemented!()
1790 }
1791
1792 fn closest_merkle_value(
1793 &self,
1794 _hash: Hash,
1795 _key: &sc_client_api::StorageKey,
1796 ) -> sp_blockchain::Result<Option<sc_client_api::MerkleValue<Hash>>> {
1797 unimplemented!()
1798 }
1799
1800 fn child_closest_merkle_value(
1801 &self,
1802 _hash: Hash,
1803 _child_info: &sc_client_api::ChildInfo,
1804 _key: &sc_client_api::StorageKey,
1805 ) -> sp_blockchain::Result<Option<sc_client_api::MerkleValue<Hash>>> {
1806 unimplemented!()
1807 }
1808 }
1809
1810 impl sp_blockchain::HeaderBackend<Block> for TestClient {
1811 fn header(&self, _hash: Hash) -> sp_blockchain::Result<Option<Header>> {
1812 unimplemented!()
1813 }
1814 fn info(&self) -> sp_blockchain::Info<Block> {
1815 sp_blockchain::Info {
1816 best_hash: TEST_BEST_BLOCK_HASH.into(),
1817 best_number: 0,
1818 genesis_hash: Default::default(),
1819 finalized_hash: TEST_BEST_BLOCK_HASH.into(),
1820 finalized_number: 1,
1821 finalized_state: None,
1822 number_leaves: 0,
1823 block_gap: None,
1824 }
1825 }
1826 fn status(&self, _hash: Hash) -> sp_blockchain::Result<sp_blockchain::BlockStatus> {
1827 unimplemented!()
1828 }
1829 fn number(&self, _hash: Hash) -> sp_blockchain::Result<Option<BlockNumber>> {
1830 unimplemented!()
1831 }
1832 fn hash(&self, _number: BlockNumber) -> sp_blockchain::Result<Option<Hash>> {
1833 unimplemented!()
1834 }
1835 }
1836
1837 fn test_store() -> (Store, tempfile::TempDir) {
1838 sp_tracing::init_for_tests();
1839 let temp_dir = tempfile::Builder::new().tempdir().expect("Error creating test dir");
1840
1841 let client = std::sync::Arc::new(TestClient);
1842 let mut path: std::path::PathBuf = temp_dir.path().into();
1843 path.push("db");
1844 let keystore = std::sync::Arc::new(sc_keystore::LocalKeystore::in_memory());
1845 let store = Store::new::<Block, TestClient, TestBackend>(
1846 &path,
1847 Default::default(),
1848 client,
1849 keystore,
1850 None,
1851 Box::new(sp_core::testing::TaskExecutor::new()),
1852 )
1853 .unwrap();
1854 (store, temp_dir) }
1856
1857 pub fn signed_statement(data: u8) -> Statement {
1858 signed_statement_with_topics(data, &[], None)
1859 }
1860
1861 fn signed_statement_with_topics(
1862 data: u8,
1863 topics: &[Topic],
1864 dec_key: Option<DecryptionKey>,
1865 ) -> Statement {
1866 let mut statement = Statement::new();
1867 statement.set_plain_data(vec![data]);
1868 statement.set_expiry(u64::MAX);
1869
1870 for i in 0..topics.len() {
1871 statement.set_topic(i, topics[i]);
1872 }
1873 if let Some(key) = dec_key {
1874 statement.set_decryption_key(key);
1875 }
1876 let kp = sp_core::ed25519::Pair::from_string("//Alice", None).unwrap();
1877 statement.sign_ed25519_private(&kp);
1878 statement
1879 }
1880
1881 fn topic(data: u64) -> Topic {
1882 let mut bytes = [0u8; 32];
1883 bytes[0..8].copy_from_slice(&data.to_le_bytes());
1884 Topic::from(bytes)
1885 }
1886
1887 fn dec_key(data: u64) -> DecryptionKey {
1888 let mut dec_key: DecryptionKey = Default::default();
1889 dec_key[0..8].copy_from_slice(&data.to_le_bytes());
1890 dec_key
1891 }
1892
1893 fn account_keypair(seed: u64) -> sp_core::ed25519::Pair {
1901 sp_core::ed25519::Pair::from_string(&format!("//StatementAccount{seed}"), None)
1902 .expect("Derivation path is valid; qed")
1903 }
1904
1905 fn account(id: u64) -> AccountId {
1906 account_keypair(id).public().0
1907 }
1908
1909 fn sign_with(stmt: &mut Statement, account_id: u64) {
1912 stmt.sign_ed25519_private(&account_keypair(account_id));
1913 }
1914
1915 fn channel(id: u64) -> Channel {
1916 let mut channel: Channel = Default::default();
1917 channel[0..8].copy_from_slice(&id.to_le_bytes());
1918 channel
1919 }
1920
1921 fn unsigned_statement(
1925 account_id: u64,
1926 priority: u32,
1927 c: Option<u64>,
1928 data_len: usize,
1929 ) -> Statement {
1930 assert!(
1931 account_id <= MAX_TEST_ACCOUNT_SEED,
1932 "account_id {account_id} exceeds MAX_TEST_ACCOUNT_SEED ({MAX_TEST_ACCOUNT_SEED}); \
1933 raise the constant if you need a wider range",
1934 );
1935 let mut statement = Statement::new();
1936 let mut data = Vec::new();
1937 data.resize(data_len, 0);
1938 statement.set_plain_data(data);
1939 statement.set_expiry_from_parts(u32::MAX, priority);
1940 if let Some(c) = c {
1941 statement.set_channel(channel(c));
1942 }
1943 statement
1944 }
1945
1946 fn statement(account_id: u64, priority: u32, c: Option<u64>, data_len: usize) -> Statement {
1947 let mut statement = unsigned_statement(account_id, priority, c, data_len);
1948 sign_with(&mut statement, account_id);
1949 statement
1950 }
1951
1952 #[test]
1953 fn submit_one() {
1954 let (store, _temp) = test_store();
1955 let statement0 = signed_statement(0);
1956 assert_eq!(store.submit(statement0, StatementSource::Network), SubmitResult::New);
1957 let statement1 = statement(1, 1, None, 0);
1958 assert_eq!(store.submit(statement1, StatementSource::Network), SubmitResult::New);
1959 }
1960
1961 #[test]
1962 fn save_and_load_statements() {
1963 let (store, temp) = test_store();
1964 let statement0 = signed_statement(0);
1965 let statement1 = signed_statement(1);
1966 let statement2 = signed_statement(2);
1967 assert_eq!(store.submit(statement0.clone(), StatementSource::Network), SubmitResult::New);
1968 assert_eq!(store.submit(statement1.clone(), StatementSource::Network), SubmitResult::New);
1969 assert_eq!(store.submit(statement2.clone(), StatementSource::Network), SubmitResult::New);
1970 assert_eq!(store.statements().unwrap().len(), 3);
1971 assert_eq!(store.broadcasts(&[]).unwrap().len(), 3);
1972 assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1.clone()));
1973 let keystore = store.keystore.clone();
1974 drop(store);
1975
1976 let client = std::sync::Arc::new(TestClient);
1977 let mut path: std::path::PathBuf = temp.path().into();
1978 path.push("db");
1979 let store = Store::new::<Block, TestClient, TestBackend>(
1980 &path,
1981 Default::default(),
1982 client,
1983 keystore,
1984 None,
1985 Box::new(sp_core::testing::TaskExecutor::new()),
1986 )
1987 .unwrap();
1988 assert_eq!(store.statements().unwrap().len(), 3);
1989 assert_eq!(store.broadcasts(&[]).unwrap().len(), 3);
1990 assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1));
1991 }
1992
1993 #[test]
1994 fn take_recent_statements_clears_index() {
1995 let (store, _temp) = test_store();
1996 let statement0 = signed_statement(0);
1997 let statement1 = signed_statement(1);
1998 let statement2 = signed_statement(2);
1999 let statement3 = signed_statement(3);
2000
2001 let _ = store.submit(statement0.clone(), StatementSource::Local);
2002 let _ = store.submit(statement1.clone(), StatementSource::Local);
2003 let _ = store.submit(statement2.clone(), StatementSource::Local);
2004
2005 let recent1 = store.take_recent_statements().unwrap();
2006 let (recent1_hashes, recent1_statements): (Vec<_>, Vec<_>) = recent1.into_iter().unzip();
2007 let expected1 = vec![statement0, statement1, statement2];
2008 assert!(expected1.iter().all(|s| recent1_hashes.contains(&s.hash())));
2009 assert!(expected1.iter().all(|s| recent1_statements.contains(s)));
2010
2011 let recent2 = store.take_recent_statements().unwrap();
2013 assert_eq!(recent2.len(), 0);
2014
2015 store.submit(statement3.clone(), StatementSource::Network);
2016
2017 let recent3 = store.take_recent_statements().unwrap();
2018 let (recent3_hashes, recent3_statements): (Vec<_>, Vec<_>) = recent3.into_iter().unzip();
2019 let expected3 = vec![statement3];
2020 assert!(expected3.iter().all(|s| recent3_hashes.contains(&s.hash())));
2021 assert!(expected3.iter().all(|s| recent3_statements.contains(s)));
2022
2023 assert_eq!(store.statements().unwrap().len(), 4);
2025 }
2026
2027 #[test]
2028 fn search_by_topic_and_key() {
2029 let (store, _temp) = test_store();
2030 let statement0 = signed_statement(0);
2031 let statement1 = signed_statement_with_topics(1, &[topic(0)], None);
2032 let statement2 = signed_statement_with_topics(2, &[topic(0), topic(1)], Some(dec_key(2)));
2033 let statement3 = signed_statement_with_topics(3, &[topic(0), topic(1), topic(2)], None);
2034 let statement4 =
2035 signed_statement_with_topics(4, &[topic(0), topic(42), topic(2), topic(3)], None);
2036 let statements = vec![statement0, statement1, statement2, statement3, statement4];
2037 for s in &statements {
2038 store.submit(s.clone(), StatementSource::Network);
2039 }
2040
2041 let assert_topics = |topics: &[u64], key: Option<u64>, expected: &[u8]| {
2042 let key = key.map(dec_key);
2043 let topics: Vec<_> = topics.iter().map(|t| topic(*t)).collect();
2044 let mut got_vals: Vec<_> = if let Some(key) = key {
2045 store.posted(&topics, key).unwrap().into_iter().map(|d| d[0]).collect()
2046 } else {
2047 store.broadcasts(&topics).unwrap().into_iter().map(|d| d[0]).collect()
2048 };
2049 got_vals.sort();
2050 assert_eq!(expected.to_vec(), got_vals);
2051 };
2052
2053 assert_topics(&[], None, &[0, 1, 3, 4]);
2054 assert_topics(&[], Some(2), &[2]);
2055 assert_topics(&[0], None, &[1, 3, 4]);
2056 assert_topics(&[1], None, &[3]);
2057 assert_topics(&[2], None, &[3, 4]);
2058 assert_topics(&[3], None, &[4]);
2059 assert_topics(&[42], None, &[4]);
2060
2061 assert_topics(&[0, 1], None, &[3]);
2062 assert_topics(&[0, 1], Some(2), &[2]);
2063 assert_topics(&[0, 1, 99], Some(2), &[]);
2064 assert_topics(&[1, 2], None, &[3]);
2065 assert_topics(&[99], None, &[]);
2066 assert_topics(&[0, 99], None, &[]);
2067 assert_topics(&[0, 1, 2, 3, 42], None, &[]);
2068 }
2069
2070 #[test]
2071 fn constraints() {
2072 let (store, _temp) = test_store();
2073
2074 store.submit_index.write().config.max_total_size = 3000;
2075 let source = StatementSource::Network;
2076 let ok = SubmitResult::New;
2077
2078 assert!(matches!(
2082 store.submit(statement(1, 1, Some(1), 2000), source),
2083 SubmitResult::Rejected(_)
2084 ));
2085 assert_eq!(store.submit(statement(1, 1, Some(1), 500), source), ok);
2086 assert!(matches!(
2088 store.submit(statement(1, 1, Some(1), 200), source),
2089 SubmitResult::Rejected(_)
2090 ));
2091 assert_eq!(store.submit(statement(1, 2, Some(1), 600), source), ok);
2092 assert!(matches!(
2095 store.submit(statement(1, 1, Some(2), 100), source),
2096 SubmitResult::Rejected(_)
2097 ));
2098 assert_eq!(store.submit_index.read().evicted.len(), 1);
2099
2100 let s2_prio1 = statement(2, 1, None, 500);
2103 let s2_prio2 = statement(2, 2, None, 100);
2104 assert_eq!(store.submit(s2_prio1.clone(), source), ok);
2105 assert_eq!(store.submit(s2_prio2.clone(), source), ok);
2106 assert!(matches!(
2108 store.submit(statement(2, 1, None, 50), source),
2109 SubmitResult::Rejected(RejectionReason::AccountFull { .. })
2110 ));
2111 let s2_prio3 = statement(2, 3, None, 500);
2113 assert_eq!(store.submit(s2_prio3.clone(), source), ok);
2114 assert_eq!(store.submit_index.read().evicted.len(), 2);
2115 assert!(store.submit_index.read().evicted.contains(&s2_prio1.hash()));
2116 assert!(store.statement(&s2_prio1.hash()).unwrap().is_none());
2117 assert_eq!(store.submit(statement(2, 4, None, 1000), source), ok);
2119 assert_eq!(store.submit_index.read().evicted.len(), 4);
2120 assert!(store.submit_index.read().evicted.contains(&s2_prio2.hash()));
2121 assert!(store.submit_index.read().evicted.contains(&s2_prio3.hash()));
2122
2123 let s3_prio2 = statement(3, 2, Some(1), 300);
2126 let s3_prio3 = statement(3, 3, Some(2), 300);
2127 assert_eq!(store.submit(s3_prio2.clone(), source), ok);
2128 assert_eq!(store.submit(s3_prio3.clone(), source), ok);
2129 assert_eq!(store.submit(statement(3, 4, Some(3), 300), source), ok);
2130 assert_eq!(store.submit(statement(3, 5, None, 500), source), ok);
2132 assert_eq!(store.submit_index.read().evicted.len(), 6);
2133 assert!(store.submit_index.read().evicted.contains(&s3_prio2.hash()));
2134 assert!(store.submit_index.read().evicted.contains(&s3_prio3.hash()));
2135
2136 assert_eq!(store.submit_index.read().total_size, 2400);
2137 assert_eq!(store.submit_index.read().entries.len(), 4);
2138
2139 assert!(matches!(
2141 store.submit(statement(1, 1, None, 700), source),
2142 SubmitResult::Rejected(_)
2143 ));
2144 store.submit_index.write().config.max_total_statements = 4;
2146 assert!(matches!(
2147 store.submit(statement(1, 1, None, 100), source),
2148 SubmitResult::Rejected(_)
2149 ));
2150
2151 let mut expected_statements = vec![
2152 statement(1, 2, Some(1), 600).hash(),
2153 statement(2, 4, None, 1000).hash(),
2154 statement(3, 4, Some(3), 300).hash(),
2155 statement(3, 5, None, 500).hash(),
2156 ];
2157 expected_statements.sort();
2158 let mut statements: Vec<_> =
2159 store.statements().unwrap().into_iter().map(|(hash, _)| hash).collect();
2160 statements.sort();
2161 assert_eq!(expected_statements, statements);
2162 }
2163
2164 #[test]
2165 fn max_statement_size_for_gossiping() {
2166 let (store, _temp) = test_store();
2167 store.submit_index.write().config.max_total_size = 42 * crate::MAX_STATEMENT_SIZE;
2168
2169 assert_eq!(
2170 store.submit(
2171 statement(42, 1, Some(1), crate::MAX_STATEMENT_SIZE - 500),
2172 StatementSource::Local
2173 ),
2174 SubmitResult::New
2175 );
2176
2177 assert!(matches!(
2178 store.submit(
2179 statement(42, 2, Some(1), 2 * crate::MAX_STATEMENT_SIZE),
2180 StatementSource::Local
2181 ),
2182 SubmitResult::Invalid(_)
2183 ));
2184 }
2185
2186 #[test]
2187 fn expired_statements_are_purged() {
2188 use super::DEFAULT_PURGE_AFTER_SEC;
2189 let (mut store, temp) = test_store();
2190 let mut statement = unsigned_statement(1, 1, Some(3), 100);
2191 store.set_time(0);
2192 statement.set_topic(0, topic(4));
2193 sign_with(&mut statement, 1);
2194 store.submit(statement.clone(), StatementSource::Network);
2195 assert_eq!(store.submit_index.read().entries.len(), 1);
2196 store.remove(&statement.hash()).unwrap();
2197 assert_eq!(store.submit_index.read().entries.len(), 0);
2198 assert_eq!(store.submit_index.read().accounts.len(), 0);
2199 store.set_time(DEFAULT_PURGE_AFTER_SEC + 1);
2200 store.maintain();
2201 assert_eq!(store.submit_index.read().evicted.len(), 0);
2202 let keystore = store.keystore.clone();
2203 drop(store);
2204
2205 let client = std::sync::Arc::new(TestClient);
2206 let mut path: std::path::PathBuf = temp.path().into();
2207 path.push("db");
2208 let store = Store::new::<Block, TestClient, TestBackend>(
2209 &path,
2210 Default::default(),
2211 client,
2212 keystore,
2213 None,
2214 Box::new(sp_core::testing::TaskExecutor::new()),
2215 )
2216 .unwrap();
2217 assert_eq!(store.statements().unwrap().len(), 0);
2218 assert_eq!(store.submit_index.read().evicted.len(), 0);
2219 }
2220
2221 #[test]
2222 fn posted_clear_decrypts() {
2223 let (store, _temp) = test_store();
2224 let public = store
2225 .keystore
2226 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
2227 .unwrap();
2228 let statement1 = statement(1, 1, None, 100);
2229 let mut statement2 = unsigned_statement(1, 2, None, 0);
2230 let plain = b"The most valuable secret".to_vec();
2231 statement2.encrypt(&plain, &public).unwrap();
2232 sign_with(&mut statement2, 1);
2233 store.submit(statement1, StatementSource::Network);
2234 store.submit(statement2, StatementSource::Network);
2235 let posted_clear = store.posted_clear(&[], public.into()).unwrap();
2236 assert_eq!(posted_clear, vec![plain]);
2237 }
2238
2239 #[test]
2240 fn broadcasts_stmt_returns_encoded_statements() {
2241 let (store, _tmp) = test_store();
2242
2243 let s0 = signed_statement_with_topics(0, &[], None);
2245 let s1 = signed_statement_with_topics(1, &[topic(42)], None);
2247 let s2 = signed_statement_with_topics(2, &[topic(42)], Some(dec_key(99)));
2249
2250 for s in [&s0, &s1, &s2] {
2251 store.submit(s.clone(), StatementSource::Network);
2252 }
2253
2254 let mut hashes: Vec<_> = store
2256 .broadcasts_stmt(&[])
2257 .unwrap()
2258 .into_iter()
2259 .map(|bytes| Statement::decode(&mut &bytes[..]).unwrap().hash())
2260 .collect();
2261 hashes.sort();
2262 let expected_hashes = {
2263 let mut e = vec![s0.hash(), s1.hash()];
2264 e.sort();
2265 e
2266 };
2267 assert_eq!(hashes, expected_hashes);
2268
2269 let got = store.broadcasts_stmt(&[topic(42)]).unwrap();
2271 assert_eq!(got.len(), 1);
2272 let st = Statement::decode(&mut &got[0][..]).unwrap();
2273 assert_eq!(st.hash(), s1.hash());
2274 }
2275
2276 #[test]
2277 fn posted_stmt_returns_encoded_statements_for_dest() {
2278 let (store, _tmp) = test_store();
2279
2280 let public1 = store
2281 .keystore
2282 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
2283 .unwrap();
2284 let dest: [u8; 32] = public1.into();
2285
2286 let public2 = store
2287 .keystore
2288 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
2289 .unwrap();
2290
2291 let mut s_with_key = unsigned_statement(1, 1, None, 0);
2293 let plain1 = b"The most valuable secret".to_vec();
2294 s_with_key.encrypt(&plain1, &public1).unwrap();
2295 sign_with(&mut s_with_key, 1);
2296
2297 let mut s_other_key = unsigned_statement(2, 2, None, 0);
2299 let plain2 = b"The second most valuable secret".to_vec();
2300 s_other_key.encrypt(&plain2, &public2).unwrap();
2301 sign_with(&mut s_other_key, 2);
2302
2303 for s in [&s_with_key, &s_other_key] {
2305 store.submit(s.clone(), StatementSource::Network);
2306 }
2307
2308 let retrieved = store.posted_stmt(&[], dest).unwrap();
2310 assert_eq!(retrieved.len(), 1, "Only one statement has dec_key=dest");
2311
2312 let returned_stmt = Statement::decode(&mut &retrieved[0][..]).unwrap();
2314 assert_eq!(
2315 returned_stmt.hash(),
2316 s_with_key.hash(),
2317 "Returned statement must match s_with_key"
2318 );
2319 }
2320
2321 #[test]
2322 fn posted_clear_stmt_returns_statement_followed_by_plain_data() {
2323 let (store, _tmp) = test_store();
2324
2325 let public1 = store
2326 .keystore
2327 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
2328 .unwrap();
2329 let dest: [u8; 32] = public1.into();
2330
2331 let public2 = store
2332 .keystore
2333 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
2334 .unwrap();
2335
2336 let mut s_with_key = unsigned_statement(1, 1, None, 0);
2338 let plain1 = b"The most valuable secret".to_vec();
2339 s_with_key.encrypt(&plain1, &public1).unwrap();
2340 sign_with(&mut s_with_key, 1);
2341
2342 let mut s_other_key = unsigned_statement(2, 2, None, 0);
2344 let plain2 = b"The second most valuable secret".to_vec();
2345 s_other_key.encrypt(&plain2, &public2).unwrap();
2346 sign_with(&mut s_other_key, 2);
2347
2348 for s in [&s_with_key, &s_other_key] {
2350 store.submit(s.clone(), StatementSource::Network);
2351 }
2352
2353 let retrieved = store.posted_clear_stmt(&[], dest).unwrap();
2355 assert_eq!(retrieved.len(), 1, "Only one statement has dec_key=dest");
2356
2357 let encoded_stmt = s_with_key.encode();
2359 let stmt_len = encoded_stmt.len();
2360
2361 assert_eq!(&retrieved[0][..stmt_len], &encoded_stmt[..]);
2363
2364 let trailing = &retrieved[0][stmt_len..];
2366 assert_eq!(trailing, &plain1[..]);
2367 }
2368
2369 #[test]
2370 fn posted_clear_returns_plain_data_for_dest_and_topics() {
2371 let (store, _tmp) = test_store();
2372
2373 let public_dest = store
2375 .keystore
2376 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
2377 .unwrap();
2378 let dest: [u8; 32] = public_dest.into();
2379
2380 let public_other = store
2381 .keystore
2382 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
2383 .unwrap();
2384
2385 let mut s_good = unsigned_statement(1, 1, None, 0);
2387 let plaintext_good = b"The most valuable secret".to_vec();
2388 s_good.encrypt(&plaintext_good, &public_dest).unwrap();
2389 s_good.set_topic(0, topic(42));
2390 sign_with(&mut s_good, 1);
2391
2392 let mut s_wrong_topic = unsigned_statement(2, 2, None, 0);
2394 s_wrong_topic.encrypt(b"Wrong topic", &public_dest).unwrap();
2395 s_wrong_topic.set_topic(0, topic(99));
2396 sign_with(&mut s_wrong_topic, 2);
2397
2398 let mut s_other_dest = unsigned_statement(3, 3, None, 0);
2400 s_other_dest.encrypt(b"Other dest", &public_other).unwrap();
2401 s_other_dest.set_topic(0, topic(42));
2402 sign_with(&mut s_other_dest, 3);
2403
2404 for s in [&s_good, &s_wrong_topic, &s_other_dest] {
2406 store.submit(s.clone(), StatementSource::Network);
2407 }
2408
2409 let retrieved = store.posted_clear(&[topic(42)], dest).unwrap();
2411
2412 assert_eq!(retrieved, vec![plaintext_good]);
2414 }
2415
2416 #[test]
2417 fn already_expired_statement_is_rejected() {
2418 let (mut store, _temp) = test_store();
2419
2420 store.set_time(1000);
2422
2423 let mut expired_statement = unsigned_statement(1, 1, None, 100);
2426 expired_statement.set_expiry_from_parts(500, 1);
2428 sign_with(&mut expired_statement, 1);
2429
2430 assert_eq!(
2432 store.submit(expired_statement, StatementSource::Network),
2433 SubmitResult::Invalid(InvalidReason::AlreadyExpired)
2434 );
2435
2436 assert_eq!(store.statements().unwrap().len(), 0);
2438
2439 let mut valid_statement = unsigned_statement(1, 1, None, 100);
2442 valid_statement.set_expiry_from_parts(2000, 1);
2443 sign_with(&mut valid_statement, 1);
2444
2445 assert_eq!(store.submit(valid_statement, StatementSource::Network), SubmitResult::New);
2447 assert_eq!(store.statements().unwrap().len(), 1);
2448 }
2449
2450 #[test]
2451 fn remove_by_covers_various_situations() {
2452 use sp_statement_store::{StatementSource, StatementStore, SubmitResult};
2453
2454 let (mut store, _temp) = test_store();
2456 store.set_time(0);
2457
2458 let t42 = topic(42);
2460 let k7 = dec_key(7);
2461
2462 let mut s_a1 = unsigned_statement(4, 10, Some(100), 100);
2465 s_a1.set_topic(0, t42);
2466 sign_with(&mut s_a1, 4);
2467 let h_a1 = s_a1.hash();
2468
2469 let mut s_a2 = unsigned_statement(4, 20, Some(200), 150);
2470 s_a2.set_decryption_key(k7);
2471 sign_with(&mut s_a2, 4);
2472 let h_a2 = s_a2.hash();
2473
2474 let s_a3 = statement(4, 30, None, 50);
2475 let h_a3 = s_a3.hash();
2476
2477 let s_b1 = statement(3, 10, None, 100);
2479 let h_b1 = s_b1.hash();
2480
2481 let mut s_b2 = unsigned_statement(3, 15, Some(300), 100);
2482 s_b2.set_topic(0, t42);
2483 s_b2.set_decryption_key(k7);
2484 sign_with(&mut s_b2, 3);
2485 let h_b2 = s_b2.hash();
2486
2487 for s in [&s_a1, &s_a2, &s_a3, &s_b1, &s_b2] {
2489 assert_eq!(store.submit(s.clone(), StatementSource::Network), SubmitResult::New);
2490 }
2491
2492 {
2494 let submit_idx = store.submit_index.read();
2495 assert_eq!(submit_idx.entries.len(), 5, "all 5 should be present");
2496 assert!(submit_idx.accounts.contains_key(&account(4)));
2497 assert!(submit_idx.accounts.contains_key(&account(3)));
2498 assert_eq!(submit_idx.total_size, 100 + 150 + 50 + 100 + 100);
2499
2500 let query_idx = store.query_index.read();
2501 let set_t = query_idx.by_topic.get(&t42).expect("topic set exists");
2503 assert!(set_t.contains(&h_a1) && set_t.contains(&h_b2));
2504
2505 let set_k = query_idx.by_dec_key.get(&Some(k7)).expect("key set exists");
2506 assert!(set_k.contains(&h_a2) && set_k.contains(&h_b2));
2507 }
2508
2509 store.remove_by(account(4)).expect("remove_by should succeed");
2511
2512 {
2514 for h in [h_a1, h_a2, h_a3] {
2516 assert!(store.statement(&h).unwrap().is_none(), "A's statement should be removed");
2517 }
2518
2519 for h in [h_b1, h_b2] {
2521 assert!(store.statement(&h).unwrap().is_some(), "B's statement should remain");
2522 }
2523
2524 let submit_idx = store.submit_index.read();
2525
2526 assert!(!submit_idx.accounts.contains_key(&account(4)), "Account A must be gone");
2528 assert!(submit_idx.accounts.contains_key(&account(3)), "Account B must remain");
2529
2530 assert!(submit_idx.evicted.contains(&h_a1));
2532 assert!(submit_idx.evicted.contains(&h_a2));
2533 assert!(submit_idx.evicted.contains(&h_a3));
2534 assert_eq!(submit_idx.evicted.len(), 3);
2535
2536 assert_eq!(submit_idx.entries.len(), 2);
2538 assert_eq!(submit_idx.total_size, 100 + 100);
2539
2540 let query_idx = store.query_index.read();
2541 let set_t = query_idx.by_topic.get(&t42).expect("topic set exists");
2543 assert!(set_t.contains(&h_b2));
2544 assert!(!set_t.contains(&h_a1));
2545
2546 let set_k = query_idx.by_dec_key.get(&Some(k7)).expect("key set exists");
2548 assert!(set_k.contains(&h_b2));
2549 assert!(!set_k.contains(&h_a2));
2550 }
2551
2552 store.remove_by(account(4)).expect("second remove_by should be a no-op");
2554
2555 let purge_after = store.submit_index.read().config.purge_after_sec;
2557 store.set_time(purge_after + 1);
2558 store.maintain();
2559 assert_eq!(store.submit_index.read().evicted.len(), 0, "expired entries should be purged");
2560
2561 let s_new = statement(4, 40, None, 10);
2563 assert_eq!(store.submit(s_new, StatementSource::Network), SubmitResult::New);
2564 }
2565
2566 #[test]
2567 fn check_expiration_repopulates_account_list_when_empty() {
2568 let (mut store, _temp) = test_store();
2569 store.set_time(1000);
2570
2571 let s1 = statement(1, 1, None, 100);
2575 let s2 = statement(2, 1, None, 100);
2576 let s3 = statement(3, 1, None, 100);
2577
2578 for s in [&s1, &s2, &s3] {
2579 store.submit(s.clone(), StatementSource::Network);
2580 }
2581
2582 assert!(store.submit_index.read().accounts_to_check_for_expiry_stmts.is_empty());
2584
2585 store.enforce_limits();
2587
2588 let accounts = store.submit_index.read().accounts_to_check_for_expiry_stmts.clone();
2590 assert_eq!(accounts.len(), 3, "Should have 3 accounts to check");
2591 assert!(accounts.contains(&account(1)));
2592 assert!(accounts.contains(&account(2)));
2593 assert!(accounts.contains(&account(3)));
2594
2595 assert_eq!(store.submit_index.read().evicted.len(), 0);
2597 assert_eq!(store.submit_index.read().entries.len(), 3);
2598 }
2599
2600 #[test]
2601 fn check_expiration_expires_statements_past_current_time() {
2602 let (mut store, _temp) = test_store();
2603
2604 store.set_time(100);
2609
2610 let mut expired_stmt = unsigned_statement(1, 1, None, 100);
2612 expired_stmt.set_expiry_from_parts(500, 1);
2613 sign_with(&mut expired_stmt, 1);
2614 let expired_hash = expired_stmt.hash();
2615 store.submit(expired_stmt, StatementSource::Network);
2616
2617 let valid_stmt = statement(2, 1, None, 100); let valid_hash = valid_stmt.hash();
2620 store.submit(valid_stmt, StatementSource::Network);
2621
2622 assert_eq!(store.submit_index.read().entries.len(), 2);
2624
2625 store.enforce_limits();
2627 assert!(!store.submit_index.read().accounts_to_check_for_expiry_stmts.is_empty());
2628
2629 store.set_time(1000);
2631
2632 store.enforce_limits();
2634
2635 let index = store.submit_index.read();
2638 assert!(
2639 !index.evicted.contains(&expired_hash),
2640 "Naturally expired statement must not be added to the expired map"
2641 );
2642 assert!(
2643 !index.entries.contains_key(&expired_hash),
2644 "Expired statement should be removed from entries"
2645 );
2646
2647 assert!(
2649 index.entries.contains_key(&valid_hash),
2650 "Valid statement should still be in entries"
2651 );
2652 assert!(!index.evicted.contains(&valid_hash), "Valid statement should not be expired");
2653 }
2654
2655 #[test]
2656 fn check_expiration_removes_checked_accounts_from_list_when_expiring() {
2657 let (mut store, _temp) = test_store();
2658 store.set_time(100);
2659
2660 let mut stmt1 = unsigned_statement(1, 1, None, 100);
2662 stmt1.set_expiry_from_parts(200, 1);
2663 sign_with(&mut stmt1, 1);
2664 store.submit(stmt1, StatementSource::Network);
2665
2666 let mut stmt2 = unsigned_statement(2, 1, None, 100);
2667 stmt2.set_expiry_from_parts(200, 1);
2668 sign_with(&mut stmt2, 2);
2669 store.submit(stmt2, StatementSource::Network);
2670
2671 let mut stmt3 = unsigned_statement(3, 1, None, 100);
2672 stmt3.set_expiry_from_parts(200, 1);
2673 sign_with(&mut stmt3, 3);
2674 store.submit(stmt3, StatementSource::Network);
2675
2676 store.enforce_limits();
2678 assert_eq!(
2679 store.submit_index.read().accounts_to_check_for_expiry_stmts.len(),
2680 3,
2681 "Should have 3 accounts to check"
2682 );
2683
2684 store.set_time(300);
2686
2687 store.enforce_limits();
2689
2690 assert!(
2692 store.submit_index.read().accounts_to_check_for_expiry_stmts.is_empty(),
2693 "All accounts should have been checked and removed after expiration"
2694 );
2695
2696 assert_eq!(store.submit_index.read().evicted.len(), 0);
2699 assert_eq!(store.submit_index.read().entries.len(), 0);
2700 }
2701
2702 #[test]
2703 fn check_expiration_truncates_list_even_when_nothing_expires() {
2704 let (mut store, _temp) = test_store();
2705 store.set_time(1000);
2706
2707 for acc_id in 1..=5u64 {
2711 let stmt = statement(acc_id, 1, None, 100);
2712 store.submit(stmt, StatementSource::Network);
2713 }
2714
2715 store.enforce_limits();
2717 assert_eq!(store.submit_index.read().accounts_to_check_for_expiry_stmts.len(), 5);
2718
2719 store.enforce_limits();
2721
2722 assert!(
2724 store.submit_index.read().accounts_to_check_for_expiry_stmts.is_empty(),
2725 "List should be empty after all accounts have been checked"
2726 );
2727
2728 assert_eq!(store.submit_index.read().evicted.len(), 0);
2730 assert_eq!(store.submit_index.read().entries.len(), 5);
2731 }
2732
2733 #[test]
2734 fn check_expiration_handles_multiple_statements_per_account() {
2735 let (mut store, _temp) = test_store();
2736 store.set_time(100);
2737
2738 let mut stmt1 = unsigned_statement(42, 1, Some(1), 100);
2741 stmt1.set_expiry_from_parts(200, 1); sign_with(&mut stmt1, 42);
2743 let hash1 = stmt1.hash();
2744 store.submit(stmt1, StatementSource::Network);
2745
2746 let mut stmt2 = unsigned_statement(42, 2, Some(2), 100);
2747 stmt2.set_expiry_from_parts(300, 2); sign_with(&mut stmt2, 42);
2749 let hash2 = stmt2.hash();
2750 store.submit(stmt2, StatementSource::Network);
2751
2752 let mut stmt3 = unsigned_statement(42, 3, Some(3), 100);
2753 stmt3.set_expiry_from_parts(500, 3); sign_with(&mut stmt3, 42);
2755 let hash3 = stmt3.hash();
2756 store.submit(stmt3, StatementSource::Network);
2757
2758 assert_eq!(store.submit_index.read().entries.len(), 3);
2760
2761 store.enforce_limits();
2763
2764 store.set_time(250);
2766 store.enforce_limits();
2767
2768 {
2769 let index = store.submit_index.read();
2770 assert!(!index.evicted.contains(&hash1), "stmt1 naturally expired, not in map");
2772 assert!(!index.evicted.contains(&hash2), "stmt2 should not be expired yet");
2773 assert!(!index.evicted.contains(&hash3), "stmt3 should not be expired yet");
2774 assert_eq!(index.entries.len(), 2);
2775 }
2776
2777 store.enforce_limits();
2779
2780 store.set_time(400);
2782 store.enforce_limits();
2783
2784 {
2785 let index = store.submit_index.read();
2786 assert!(!index.evicted.contains(&hash1));
2787 assert!(!index.evicted.contains(&hash2), "stmt2 naturally expired, not in map");
2788 assert!(!index.evicted.contains(&hash3), "stmt3 should not be expired yet");
2789 assert_eq!(index.entries.len(), 1);
2790 }
2791
2792 store.enforce_limits();
2794 store.set_time(600);
2795 store.enforce_limits();
2796
2797 {
2798 let index = store.submit_index.read();
2799 assert!(!index.evicted.contains(&hash1));
2800 assert!(!index.evicted.contains(&hash2));
2801 assert!(!index.evicted.contains(&hash3), "stmt3 naturally expired, not in map");
2802 assert_eq!(index.entries.len(), 0);
2803 }
2804 }
2805
2806 #[test]
2807 fn check_expiration_does_nothing_when_no_expired_statements() {
2808 let (mut store, _temp) = test_store();
2809 store.set_time(1000);
2810
2811 let stmt = statement(1, 1, None, 100);
2814 let hash = stmt.hash();
2815 store.submit(stmt, StatementSource::Network);
2816
2817 store.enforce_limits();
2819
2820 store.enforce_limits();
2822
2823 let index = store.submit_index.read();
2825 assert!(index.entries.contains_key(&hash));
2826 assert!(!index.evicted.contains(&hash));
2827 assert_eq!(index.entries.len(), 1);
2828 assert_eq!(index.evicted.len(), 0);
2829 }
2830
2831 #[test]
2832 fn check_expiration_correctly_updates_account_data() {
2833 let (mut store, _temp) = test_store();
2834 store.set_time(100);
2835
2836 let mut stmt = unsigned_statement(1, 1, Some(1), 100);
2838 stmt.set_expiry_from_parts(200, 1);
2839 sign_with(&mut stmt, 1);
2840 let hash = stmt.hash();
2841 store.submit(stmt, StatementSource::Network);
2842
2843 {
2845 let index = store.submit_index.read();
2846 assert!(index.accounts.contains_key(&account(1)));
2847 assert_eq!(index.total_size, 100);
2848 }
2849
2850 store.enforce_limits();
2852 store.set_time(300);
2853 store.enforce_limits();
2854
2855 {
2857 let index = store.submit_index.read();
2858 assert!(
2859 !index.accounts.contains_key(&account(1)),
2860 "Account should be removed when all its statements expire"
2861 );
2862 assert_eq!(index.total_size, 0, "Total size should be zero");
2863 assert!(!index.evicted.contains(&hash), "Naturally expired, not in map");
2864 }
2865 }
2866
2867 #[test]
2868 fn check_expiration_clears_topic_and_key_indexes() {
2869 let (mut store, _temp) = test_store();
2870 store.set_time(100);
2871
2872 let mut stmt = unsigned_statement(1, 1, Some(1), 100);
2874 stmt.set_expiry_from_parts(200, 1);
2875 stmt.set_topic(0, topic(42));
2876 stmt.set_decryption_key(dec_key(7));
2877 sign_with(&mut stmt, 1);
2878 let hash = stmt.hash();
2879 store.submit(stmt, StatementSource::Network);
2880
2881 {
2883 let query_index = store.query_index.read();
2884 assert!(query_index.by_topic.get(&topic(42)).map_or(false, |s| s.contains(&hash)));
2885 assert!(query_index
2886 .by_dec_key
2887 .get(&Some(dec_key(7)))
2888 .map_or(false, |s| s.contains(&hash)));
2889 }
2890
2891 store.enforce_limits();
2893 store.set_time(300);
2894 store.enforce_limits();
2895
2896 {
2898 let query_index = store.query_index.read();
2899 assert!(
2901 query_index.by_topic.get(&topic(42)).map_or(true, |s| s.is_empty()),
2902 "Topic index should be cleared"
2903 );
2904 assert!(
2906 query_index.by_dec_key.get(&Some(dec_key(7))).map_or(true, |s| s.is_empty()),
2907 "Decryption key index should be cleared"
2908 );
2909 assert!(
2910 !store.submit_index.read().evicted.contains(&hash),
2911 "Naturally expired, not in map"
2912 );
2913 }
2914 }
2915
2916 #[test]
2917 fn check_expiration_handles_empty_store() {
2918 let (mut store, _temp) = test_store();
2919 store.set_time(1000);
2920
2921 store.enforce_limits();
2923
2924 store.enforce_limits();
2926
2927 assert!(store.submit_index.read().accounts_to_check_for_expiry_stmts.is_empty());
2928 assert_eq!(store.submit_index.read().entries.len(), 0);
2929 assert_eq!(store.submit_index.read().evicted.len(), 0);
2930 }
2931
2932 #[test]
2933 fn check_expiration_expires_properly_formatted_statements() {
2934 let (mut store, _temp) = test_store();
2938 store.set_time(1000);
2939
2940 let mut stmt = unsigned_statement(1, 1, None, 100);
2942 stmt.set_expiry_from_parts(1001, 1); sign_with(&mut stmt, 1);
2944 let hash = stmt.hash();
2945 store.submit(stmt, StatementSource::Network);
2946
2947 assert_eq!(store.submit_index.read().entries.len(), 1);
2948
2949 store.enforce_limits();
2951
2952 store.set_time(2000);
2954 store.enforce_limits();
2955
2956 let index = store.submit_index.read();
2960 assert!(
2961 !index.entries.contains_key(&hash),
2962 "Statement should be removed from entries after expiration"
2963 );
2964 assert!(!index.evicted.contains(&hash), "Naturally expired, not in map");
2966 }
2967
2968 #[test]
2969 fn check_expiration_updates_database_columns() {
2970 let (mut store, _temp) = test_store();
2972 store.set_time(100);
2973
2974 let mut stmt = unsigned_statement(1, 1, None, 100);
2976 stmt.set_expiry_from_parts(200, 1);
2977 sign_with(&mut stmt, 1);
2978 let hash = stmt.hash();
2979 store.submit(stmt.clone(), StatementSource::Network);
2980
2981 let db_entry = store.db.get(col::STATEMENTS, &hash).unwrap();
2983 assert!(db_entry.is_some(), "Statement should be in col::STATEMENTS after submit");
2984
2985 store.enforce_limits();
2987
2988 store.set_time(300);
2990 store.enforce_limits();
2991
2992 {
2994 let index = store.submit_index.read();
2995 assert!(
2996 !index.entries.contains_key(&hash),
2997 "Statement should be removed from in-memory entries"
2998 );
2999 assert!(
3001 !index.evicted.contains(&hash),
3002 "Naturally expired statement must not be in the expired map"
3003 );
3004 }
3005
3006 let db_entry = store.db.get(col::STATEMENTS, &hash).unwrap();
3007 assert!(
3008 db_entry.is_none(),
3009 "Statement should be removed from col::STATEMENTS after expiration"
3010 );
3011
3012 let expired_entry = store.db.get(col::EXPIRED, &hash).unwrap();
3015 assert!(expired_entry.is_none(), "Naturally expired: not written to col::EXPIRED");
3016 }
3017
3018 #[test]
3019 fn enforce_allowances_evicts_excess_statements() {
3020 let (mut store, _temp) = test_store();
3025 store.set_time(0);
3026
3027 let s1 = statement(4, 10, None, 100); let s2 = statement(4, 20, None, 100);
3030 let s3 = statement(4, 30, None, 100);
3031 let s4 = statement(4, 40, None, 100);
3032 let s5 = statement(4, 50, None, 100); let h1 = s1.hash();
3035 let h5 = s5.hash();
3036
3037 {
3039 let mut submit_index = store.submit_index.write();
3040 let mut query_index = store.query_index.write();
3041 for s in [&s1, &s2, &s3, &s4, &s5] {
3042 submit_index.insert_new(s.hash(), account(4), s);
3043 query_index.insert(s.hash(), s);
3044 }
3045 }
3046
3047 assert_eq!(store.submit_index.read().entries.len(), 5);
3049 assert_eq!(store.submit_index.read().total_size, 500);
3050
3051 store.enforce_limits();
3055 store.enforce_limits();
3056
3057 let index = store.submit_index.read();
3059 assert_eq!(index.entries.len(), 4, "Should have 4 statements after eviction");
3060 assert!(!index.entries.contains_key(&h1), "Lowest priority should be evicted");
3061 assert!(index.entries.contains_key(&h5), "Highest priority should remain");
3062 assert_eq!(index.total_size, 400);
3063
3064 assert!(index.evicted.contains(&h1));
3066 }
3067
3068 #[test]
3069 fn enforce_allowances_evicts_all_when_no_allowance_found() {
3070 let (mut store, _temp) = test_store();
3071 store.set_time(0);
3072
3073 let s1 = statement(0, 10, None, 100);
3075 let s2 = statement(0, 20, None, 150);
3076
3077 let h1 = s1.hash();
3078 let h2 = s2.hash();
3079
3080 {
3082 let mut submit_index = store.submit_index.write();
3083 let mut query_index = store.query_index.write();
3084 submit_index.insert_new(h1, account(0), &s1);
3085 query_index.insert(h1, &s1);
3086 submit_index.insert_new(h2, account(0), &s2);
3087 query_index.insert(h2, &s2);
3088 }
3089
3090 assert_eq!(store.submit_index.read().entries.len(), 2);
3091
3092 store.enforce_limits();
3095 store.enforce_limits();
3096
3097 let index = store.submit_index.read();
3098 assert_eq!(index.entries.len(), 0, "All statements should be evicted");
3099 assert!(!index.accounts.contains_key(&account(0)), "Account should be removed");
3100 assert!(index.evicted.contains(&h1));
3101 assert!(index.evicted.contains(&h2));
3102 }
3103
3104 #[test]
3105 fn enforce_allowances_based_on_size() {
3106 let (mut store, _temp) = test_store();
3108 store.set_time(0);
3109
3110 let s1 = statement(2, 10, None, 600); let s2 = statement(2, 20, None, 600); let h1 = s1.hash();
3116 let h2 = s2.hash();
3117
3118 {
3120 let mut submit_index = store.submit_index.write();
3121 let mut query_index = store.query_index.write();
3122 submit_index.insert_new(h1, account(2), &s1);
3123 query_index.insert(h1, &s1);
3124 submit_index.insert_new(h2, account(2), &s2);
3125 query_index.insert(h2, &s2);
3126 }
3127
3128 assert_eq!(store.submit_index.read().total_size, 1200);
3129
3130 store.enforce_limits();
3133 store.enforce_limits();
3134
3135 let index = store.submit_index.read();
3136 assert_eq!(index.entries.len(), 1);
3137 assert!(index.entries.contains_key(&h2), "Higher priority should remain");
3138 assert!(!index.entries.contains_key(&h1), "Lower priority should be evicted");
3139 assert_eq!(index.total_size, 600);
3140 }
3141
3142 #[test]
3143 fn channel_replacement_only_higher_priority_succeeds() {
3144 let (store, _temp) = test_store();
3145 let source = StatementSource::Network;
3146
3147 let s1 = statement(1, 5, Some(1), 100);
3150 let h1 = s1.hash();
3151 assert_eq!(store.submit(s1, source), SubmitResult::New);
3152
3153 let result = store.submit(statement(1, 3, Some(1), 100), source);
3155 assert!(
3156 matches!(result, SubmitResult::Rejected(RejectionReason::ChannelPriorityTooLow { .. })),
3157 "Lower priority should be rejected with ChannelPriorityTooLow, got: {result:?}"
3158 );
3159
3160 let result = store.submit(statement(1, 5, Some(1), 101), source);
3163 assert!(
3164 matches!(result, SubmitResult::Rejected(RejectionReason::ChannelPriorityTooLow { .. })),
3165 "Equal priority should be rejected with ChannelPriorityTooLow, got: {result:?}"
3166 );
3167
3168 let s2 = statement(1, 10, Some(1), 200);
3170 let h2 = s2.hash();
3171 assert_eq!(store.submit(s2, source), SubmitResult::New);
3172
3173 {
3174 let index = store.submit_index.read();
3175 assert_eq!(index.entries.len(), 1);
3176 assert!(!index.entries.contains_key(&h1), "Old channel message should be gone");
3177 assert!(index.entries.contains_key(&h2), "New channel message should exist");
3178 assert!(index.evicted.contains(&h1), "Old should be in expired");
3179 assert_eq!(index.total_size, 200);
3180 }
3181 }
3182
3183 #[test]
3184 fn submit_rejects_malformed_statements() {
3185 let (store, _temp) = test_store();
3186
3187 let mut base = Statement::new();
3188 base.set_expiry(u64::MAX);
3189 base.set_plain_data(vec![1]);
3190
3191 let ed_kp = sp_core::ed25519::Pair::from_string("//Alice", None).unwrap();
3192 let sr_kp = sp_core::sr25519::Pair::from_string("//Alice", None).unwrap();
3193 let ecdsa_kp = sp_core::ecdsa::Pair::from_string("//Alice", None).unwrap();
3194
3195 assert_eq!(
3196 store.submit(base.clone(), StatementSource::Network),
3197 SubmitResult::Invalid(InvalidReason::NoProof)
3198 );
3199
3200 let bad_proofs = [
3201 Proof::Ed25519 { signature: [0xAB; 64], signer: ed_kp.public().0 },
3202 Proof::Sr25519 { signature: [0xCD; 64], signer: sr_kp.public().0 },
3203 Proof::Secp256k1Ecdsa { signature: [0xEF; 65], signer: ecdsa_kp.public().0 },
3204 ];
3205 for proof in bad_proofs {
3206 let mut s = base.clone();
3207 s.set_proof(proof);
3208 assert_eq!(
3209 store.submit(s, StatementSource::Network),
3210 SubmitResult::Invalid(InvalidReason::BadProof)
3211 );
3212 }
3213
3214 let mut wrong_signer = base.clone();
3215 wrong_signer.sign_ed25519_private(&ed_kp);
3216 let alice_sig = match wrong_signer.proof().unwrap() {
3217 Proof::Ed25519 { signature, .. } => *signature,
3218 _ => panic!("expected Ed25519 proof after sign_ed25519_private"),
3219 };
3220 let bob_kp = sp_core::ed25519::Pair::from_string("//Bob", None).unwrap();
3221 wrong_signer.set_proof(Proof::Ed25519 { signature: alice_sig, signer: bob_kp.public().0 });
3222 assert_eq!(
3223 store.submit(wrong_signer, StatementSource::Network),
3224 SubmitResult::Invalid(InvalidReason::BadProof)
3225 );
3226 }
3227
3228 #[test]
3229 fn channel_replacement_with_size_increase_evicts_others() {
3230 let (store, _temp) = test_store();
3231 let source = StatementSource::Network;
3232
3233 let s_ch = statement(3, 5, Some(1), 200);
3236 let s_low = statement(3, 2, None, 300);
3237 let s_mid = statement(3, 3, None, 300);
3238 let h_ch = s_ch.hash();
3239 let h_low = s_low.hash();
3240 let h_mid = s_mid.hash();
3241
3242 assert_eq!(store.submit(s_ch, source), SubmitResult::New);
3243 assert_eq!(store.submit(s_low, source), SubmitResult::New);
3244 assert_eq!(store.submit(s_mid, source), SubmitResult::New);
3245 assert_eq!(store.submit_index.read().total_size, 800);
3246
3247 let s_ch_big = statement(3, 10, Some(1), 600);
3250 let h_ch_big = s_ch_big.hash();
3251 assert_eq!(store.submit(s_ch_big, source), SubmitResult::New);
3252
3253 {
3254 let index = store.submit_index.read();
3255 assert_eq!(index.entries.len(), 2);
3256 assert!(!index.entries.contains_key(&h_ch), "Old channel message replaced");
3257 assert!(!index.entries.contains_key(&h_low), "Priority 2 evicted to fit size");
3258 assert!(index.entries.contains_key(&h_mid), "Priority 3 should remain");
3259 assert!(index.entries.contains_key(&h_ch_big), "New channel message added");
3260 assert_eq!(index.total_size, 900); }
3262 }
3263
3264 #[test]
3265 fn subscription_reconnect_receives_current_state() {
3266 use crate::StatementStoreSubscriptionApi;
3267 use sp_statement_store::OptimizedTopicFilter;
3268
3269 let (store, _temp) = test_store();
3270 let source = StatementSource::Local;
3271
3272 for i in 0..3u8 {
3274 let res = store.submit(signed_statement(i), source);
3275 assert_eq!(res, SubmitResult::New);
3276 }
3277
3278 let (existing, sender, stream) =
3280 store.subscribe_statement(OptimizedTopicFilter::Any).unwrap();
3281 assert_eq!(existing.len(), 3, "First subscribe should return 3 existing statements");
3282
3283 drop(stream);
3285 drop(sender);
3286
3287 for i in 3..5u8 {
3289 assert_eq!(store.submit(signed_statement(i), source), SubmitResult::New);
3290 }
3291 let (existing, sender, stream) =
3292 store.subscribe_statement(OptimizedTopicFilter::Any).unwrap();
3293 assert_eq!(existing.len(), 5, "Re-subscribe should return all 5 current statements");
3294
3295 drop(stream);
3297 drop(sender);
3298 let hash_to_remove = signed_statement(0).hash();
3299 store.remove(&hash_to_remove).unwrap();
3300
3301 let (existing, _sender, _stream) =
3303 store.subscribe_statement(OptimizedTopicFilter::Any).unwrap();
3304 assert_eq!(existing.len(), 4, "Re-subscribe after removal should return 4 statements");
3305 }
3306
3307 #[test]
3308 fn subscription_reconnect_with_topic_filter() {
3309 use crate::StatementStoreSubscriptionApi;
3310 use sp_statement_store::OptimizedTopicFilter;
3311
3312 let (store, _temp) = test_store();
3313 let source = StatementSource::Local;
3314 let topic_a = topic(1);
3315 let topic_b = topic(2);
3316
3317 let s1 = signed_statement_with_topics(1, &[topic_a], None);
3319 let s2 = signed_statement_with_topics(2, &[topic_b], None);
3321 let s3 = signed_statement_with_topics(3, &[topic_a, topic_b], None);
3323
3324 assert_eq!(store.submit(s1, source), SubmitResult::New);
3325 assert_eq!(store.submit(s2, source), SubmitResult::New);
3326 assert_eq!(store.submit(s3, source), SubmitResult::New);
3327
3328 let filter_a = OptimizedTopicFilter::MatchAll(std::collections::HashSet::from([topic_a]));
3330 let (existing, sender, stream) = store.subscribe_statement(filter_a.clone()).unwrap();
3331 assert_eq!(existing.len(), 2, "MatchAll([A]) should match s1 and s3");
3332
3333 drop(sender);
3335 drop(stream);
3336 let s4 = signed_statement_with_topics(4, &[topic_a], None);
3337 assert_eq!(store.submit(s4, source), SubmitResult::New);
3338 let (existing, sender, stream) = store.subscribe_statement(filter_a).unwrap();
3340 assert_eq!(existing.len(), 3, "Re-subscribe MatchAll([A]) should return s1, s3, s4");
3341
3342 drop(sender);
3344 drop(stream);
3345 let filter_b = OptimizedTopicFilter::MatchAll(std::collections::HashSet::from([topic_b]));
3346 let (existing, _sender, _stream) = store.subscribe_statement(filter_b).unwrap();
3347 assert_eq!(existing.len(), 2, "Re-subscribe MatchAll([B]) should return s2 and s3");
3348 }
3349}