1#![warn(missing_docs)]
48#![warn(unused_extern_crates)]
49
50mod metrics;
51
52pub use sp_statement_store::{Error, StatementStore, MAX_TOPICS};
53
54use metrics::MetricsLink as PrometheusMetrics;
55use parking_lot::RwLock;
56use prometheus_endpoint::Registry as PrometheusRegistry;
57use sc_keystore::LocalKeystore;
58use sp_api::ProvideRuntimeApi;
59use sp_blockchain::HeaderBackend;
60use sp_core::{crypto::UncheckedFrom, hexdisplay::HexDisplay, traits::SpawnNamed, Decode, Encode};
61use sp_runtime::traits::Block as BlockT;
62use sp_statement_store::{
63 runtime_api::{
64 InvalidStatement, StatementSource, StatementStoreExt, ValidStatement, ValidateStatement,
65 },
66 AccountId, BlockHash, Channel, DecryptionKey, Hash, InvalidReason, Proof, RejectionReason,
67 Result, Statement, SubmitResult, Topic,
68};
69use std::{
70 collections::{BTreeMap, HashMap, HashSet},
71 sync::Arc,
72};
73
74const KEY_VERSION: &[u8] = b"version".as_slice();
75const CURRENT_VERSION: u32 = 1;
76
77const LOG_TARGET: &str = "statement-store";
78
79pub const DEFAULT_PURGE_AFTER_SEC: u64 = 2 * 24 * 60 * 60; pub const DEFAULT_MAX_TOTAL_STATEMENTS: usize = 4 * 1024 * 1024; pub const DEFAULT_MAX_TOTAL_SIZE: usize = 2 * 1024 * 1024 * 1024; pub const MAX_STATEMENT_SIZE: usize =
89 sc_network_statement::config::MAX_STATEMENT_NOTIFICATION_SIZE as usize - 1;
90
91const MAINTENANCE_PERIOD: std::time::Duration = std::time::Duration::from_secs(30);
92
93mod col {
94 pub const META: u8 = 0;
95 pub const STATEMENTS: u8 = 1;
96 pub const EXPIRED: u8 = 2;
97
98 pub const COUNT: u8 = 3;
99}
100
101#[derive(Eq, PartialEq, Debug, Ord, PartialOrd, Clone, Copy)]
102struct Priority(u32);
103
104#[derive(PartialEq, Eq)]
105struct PriorityKey {
106 hash: Hash,
107 priority: Priority,
108}
109
110impl PartialOrd for PriorityKey {
111 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
112 Some(self.cmp(other))
113 }
114}
115
116impl Ord for PriorityKey {
117 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
118 self.priority.cmp(&other.priority).then_with(|| self.hash.cmp(&other.hash))
119 }
120}
121
122#[derive(PartialEq, Eq)]
123struct ChannelEntry {
124 hash: Hash,
125 priority: Priority,
126}
127
128#[derive(Default)]
129struct StatementsForAccount {
130 by_priority: BTreeMap<PriorityKey, (Option<Channel>, usize)>,
132 channels: HashMap<Channel, ChannelEntry>,
134 data_size: usize,
136}
137
138pub struct Options {
140 max_total_statements: usize,
143 max_total_size: usize,
146 purge_after_sec: u64,
148}
149
150impl Default for Options {
151 fn default() -> Self {
152 Options {
153 max_total_statements: DEFAULT_MAX_TOTAL_STATEMENTS,
154 max_total_size: DEFAULT_MAX_TOTAL_SIZE,
155 purge_after_sec: DEFAULT_PURGE_AFTER_SEC,
156 }
157 }
158}
159
160#[derive(Default)]
161struct Index {
162 recent: HashSet<Hash>,
163 by_topic: HashMap<Topic, HashSet<Hash>>,
164 by_dec_key: HashMap<Option<DecryptionKey>, HashSet<Hash>>,
165 topics_and_keys: HashMap<Hash, ([Option<Topic>; MAX_TOPICS], Option<DecryptionKey>)>,
166 entries: HashMap<Hash, (AccountId, Priority, usize)>,
167 expired: HashMap<Hash, u64>, accounts: HashMap<AccountId, StatementsForAccount>,
169 options: Options,
170 total_size: usize,
171}
172
173struct ClientWrapper<Block, Client> {
174 client: Arc<Client>,
175 _block: std::marker::PhantomData<Block>,
176}
177
178impl<Block, Client> ClientWrapper<Block, Client>
179where
180 Block: BlockT,
181 Block::Hash: From<BlockHash>,
182 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
183 Client::Api: ValidateStatement<Block>,
184{
185 fn validate_statement(
186 &self,
187 block: Option<BlockHash>,
188 source: StatementSource,
189 statement: Statement,
190 ) -> std::result::Result<ValidStatement, InvalidStatement> {
191 let api = self.client.runtime_api();
192 let block = block.map(Into::into).unwrap_or_else(|| {
193 self.client.info().finalized_hash
195 });
196 api.validate_statement(block, source, statement)
197 .map_err(|_| InvalidStatement::InternalError)?
198 }
199}
200
201pub struct Store {
203 db: parity_db::Db,
204 index: RwLock<Index>,
205 validate_fn: Box<
206 dyn Fn(
207 Option<BlockHash>,
208 StatementSource,
209 Statement,
210 ) -> std::result::Result<ValidStatement, InvalidStatement>
211 + Send
212 + Sync,
213 >,
214 keystore: Arc<LocalKeystore>,
215 time_override: Option<u64>,
217 metrics: PrometheusMetrics,
218}
219
220enum IndexQuery {
221 Unknown,
222 Exists,
223 Expired,
224}
225
226impl Index {
227 fn new(options: Options) -> Index {
228 Index { options, ..Default::default() }
229 }
230
231 fn insert_new(&mut self, hash: Hash, account: AccountId, statement: &Statement) {
232 let mut all_topics = [None; MAX_TOPICS];
233 let mut nt = 0;
234 while let Some(t) = statement.topic(nt) {
235 self.by_topic.entry(t).or_default().insert(hash);
236 all_topics[nt] = Some(t);
237 nt += 1;
238 }
239 let key = statement.decryption_key();
240 self.by_dec_key.entry(key).or_default().insert(hash);
241 if nt > 0 || key.is_some() {
242 self.topics_and_keys.insert(hash, (all_topics, key));
243 }
244 let priority = Priority(statement.priority().unwrap_or(0));
245 self.entries.insert(hash, (account, priority, statement.data_len()));
246 self.recent.insert(hash);
247 self.total_size += statement.data_len();
248 let account_info = self.accounts.entry(account).or_default();
249 account_info.data_size += statement.data_len();
250 if let Some(channel) = statement.channel() {
251 account_info.channels.insert(channel, ChannelEntry { hash, priority });
252 }
253 account_info
254 .by_priority
255 .insert(PriorityKey { hash, priority }, (statement.channel(), statement.data_len()));
256 }
257
258 fn query(&self, hash: &Hash) -> IndexQuery {
259 if self.entries.contains_key(hash) {
260 return IndexQuery::Exists
261 }
262 if self.expired.contains_key(hash) {
263 return IndexQuery::Expired
264 }
265 IndexQuery::Unknown
266 }
267
268 fn insert_expired(&mut self, hash: Hash, timestamp: u64) {
269 self.expired.insert(hash, timestamp);
270 }
271
272 fn iterate_with(
273 &self,
274 key: Option<DecryptionKey>,
275 match_all_topics: &[Topic],
276 mut f: impl FnMut(&Hash) -> Result<()>,
277 ) -> Result<()> {
278 let empty = HashSet::new();
279 let mut sets: [&HashSet<Hash>; MAX_TOPICS + 1] = [∅ MAX_TOPICS + 1];
280 if match_all_topics.len() > MAX_TOPICS {
281 return Ok(())
282 }
283 let key_set = self.by_dec_key.get(&key);
284 if key_set.map_or(0, |s| s.len()) == 0 {
285 return Ok(())
287 }
288 sets[0] = key_set.expect("Function returns if key_set is None");
289 for (i, t) in match_all_topics.iter().enumerate() {
290 let set = self.by_topic.get(t);
291 if set.map_or(0, |s| s.len()) == 0 {
292 return Ok(())
294 }
295 sets[i + 1] = set.expect("Function returns if set is None");
296 }
297 let sets = &mut sets[0..match_all_topics.len() + 1];
298 sets.sort_by_key(|s| s.len());
300 for item in sets[0] {
301 if sets[1..].iter().all(|set| set.contains(item)) {
302 log::trace!(
303 target: LOG_TARGET,
304 "Iterating by topic/key: statement {:?}",
305 HexDisplay::from(item)
306 );
307 f(item)?
308 }
309 }
310 Ok(())
311 }
312
313 fn maintain(&mut self, current_time: u64) -> Vec<Hash> {
314 let mut purged = Vec::new();
316 self.expired.retain(|hash, timestamp| {
317 if *timestamp + self.options.purge_after_sec <= current_time {
318 purged.push(*hash);
319 log::trace!(target: LOG_TARGET, "Purged statement {:?}", HexDisplay::from(hash));
320 false
321 } else {
322 true
323 }
324 });
325 purged
326 }
327
328 fn take_recent(&mut self) -> HashSet<Hash> {
329 std::mem::take(&mut self.recent)
330 }
331
332 fn make_expired(&mut self, hash: &Hash, current_time: u64) -> bool {
333 if let Some((account, priority, len)) = self.entries.remove(hash) {
334 self.total_size -= len;
335 if let Some((topics, key)) = self.topics_and_keys.remove(hash) {
336 for t in topics.into_iter().flatten() {
337 if let std::collections::hash_map::Entry::Occupied(mut set) =
338 self.by_topic.entry(t)
339 {
340 set.get_mut().remove(hash);
341 if set.get().is_empty() {
342 set.remove_entry();
343 }
344 }
345 }
346 if let std::collections::hash_map::Entry::Occupied(mut set) =
347 self.by_dec_key.entry(key)
348 {
349 set.get_mut().remove(hash);
350 if set.get().is_empty() {
351 set.remove_entry();
352 }
353 }
354 }
355 let _ = self.recent.remove(hash);
356 self.expired.insert(*hash, current_time);
357 if let std::collections::hash_map::Entry::Occupied(mut account_rec) =
358 self.accounts.entry(account)
359 {
360 let key = PriorityKey { hash: *hash, priority };
361 if let Some((channel, len)) = account_rec.get_mut().by_priority.remove(&key) {
362 account_rec.get_mut().data_size -= len;
363 if let Some(channel) = channel {
364 account_rec.get_mut().channels.remove(&channel);
365 }
366 }
367 if account_rec.get().by_priority.is_empty() {
368 account_rec.remove_entry();
369 }
370 }
371 log::trace!(target: LOG_TARGET, "Expired statement {:?}", HexDisplay::from(hash));
372 true
373 } else {
374 false
375 }
376 }
377
378 fn insert(
379 &mut self,
380 hash: Hash,
381 statement: &Statement,
382 account: &AccountId,
383 validation: &ValidStatement,
384 current_time: u64,
385 ) -> std::result::Result<HashSet<Hash>, RejectionReason> {
386 let statement_len = statement.data_len();
387 if statement_len > validation.max_size as usize {
388 log::debug!(
389 target: LOG_TARGET,
390 "Ignored oversize message: {:?} ({} bytes)",
391 HexDisplay::from(&hash),
392 statement_len,
393 );
394 return Err(RejectionReason::DataTooLarge {
395 submitted_size: statement_len,
396 available_size: validation.max_size as usize,
397 });
398 }
399
400 let mut evicted = HashSet::new();
401 let mut would_free_size = 0;
402 let priority = Priority(statement.priority().unwrap_or(0));
403 let (max_size, max_count) = (validation.max_size as usize, validation.max_count as usize);
404 if let Some(account_rec) = self.accounts.get(account) {
408 if let Some(channel) = statement.channel() {
409 if let Some(channel_record) = account_rec.channels.get(&channel) {
410 if priority <= channel_record.priority {
411 log::debug!(
413 target: LOG_TARGET,
414 "Ignored lower priority channel message: {:?} {:?} <= {:?}",
415 HexDisplay::from(&hash),
416 priority,
417 channel_record.priority,
418 );
419 return Err(RejectionReason::ChannelPriorityTooLow {
420 submitted_priority: priority.0,
421 min_priority: channel_record.priority.0,
422 });
423 } else {
424 log::debug!(
427 target: LOG_TARGET,
428 "Replacing higher priority channel message: {:?} ({:?}) > {:?} ({:?})",
429 HexDisplay::from(&hash),
430 priority,
431 HexDisplay::from(&channel_record.hash),
432 channel_record.priority,
433 );
434 let key = PriorityKey {
435 hash: channel_record.hash,
436 priority: channel_record.priority,
437 };
438 if let Some((_channel, len)) = account_rec.by_priority.get(&key) {
439 would_free_size += *len;
440 evicted.insert(channel_record.hash);
441 }
442 }
443 }
444 }
445 for (entry, (_, len)) in account_rec.by_priority.iter() {
447 if (account_rec.data_size - would_free_size + statement_len <= max_size) &&
448 account_rec.by_priority.len() + 1 - evicted.len() <= max_count
449 {
450 break
452 }
453 if evicted.contains(&entry.hash) {
454 continue
456 }
457 if entry.priority >= priority {
458 log::debug!(
459 target: LOG_TARGET,
460 "Ignored message due to constraints {:?} {:?} < {:?}",
461 HexDisplay::from(&hash),
462 priority,
463 entry.priority,
464 );
465 return Err(RejectionReason::AccountFull {
466 submitted_priority: priority.0,
467 min_priority: entry.priority.0,
468 });
469 }
470 evicted.insert(entry.hash);
471 would_free_size += len;
472 }
473 }
474 if !((self.total_size - would_free_size + statement_len <= self.options.max_total_size) &&
476 self.entries.len() + 1 - evicted.len() <= self.options.max_total_statements)
477 {
478 log::debug!(
479 target: LOG_TARGET,
480 "Ignored statement {} because the store is full (size={}, count={})",
481 HexDisplay::from(&hash),
482 self.total_size,
483 self.entries.len(),
484 );
485 return Err(RejectionReason::StoreFull);
486 }
487
488 for h in &evicted {
489 self.make_expired(h, current_time);
490 }
491 self.insert_new(hash, *account, statement);
492 Ok(evicted)
493 }
494}
495
496impl Store {
497 pub fn new_shared<Block, Client>(
500 path: &std::path::Path,
501 options: Options,
502 client: Arc<Client>,
503 keystore: Arc<LocalKeystore>,
504 prometheus: Option<&PrometheusRegistry>,
505 task_spawner: &dyn SpawnNamed,
506 ) -> Result<Arc<Store>>
507 where
508 Block: BlockT,
509 Block::Hash: From<BlockHash>,
510 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
511 Client::Api: ValidateStatement<Block>,
512 {
513 let store = Arc::new(Self::new(path, options, client, keystore, prometheus)?);
514
515 let worker_store = store.clone();
517 task_spawner.spawn(
518 "statement-store-maintenance",
519 Some("statement-store"),
520 Box::pin(async move {
521 let mut interval = tokio::time::interval(MAINTENANCE_PERIOD);
522 loop {
523 interval.tick().await;
524 worker_store.maintain();
525 }
526 }),
527 );
528
529 Ok(store)
530 }
531
532 #[doc(hidden)]
535 pub fn new<Block, Client>(
536 path: &std::path::Path,
537 options: Options,
538 client: Arc<Client>,
539 keystore: Arc<LocalKeystore>,
540 prometheus: Option<&PrometheusRegistry>,
541 ) -> Result<Store>
542 where
543 Block: BlockT,
544 Block::Hash: From<BlockHash>,
545 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
546 Client::Api: ValidateStatement<Block>,
547 {
548 let mut path: std::path::PathBuf = path.into();
549 path.push("statements");
550
551 let mut config = parity_db::Options::with_columns(&path, col::COUNT);
552
553 let statement_col = &mut config.columns[col::STATEMENTS as usize];
554 statement_col.ref_counted = false;
555 statement_col.preimage = true;
556 statement_col.uniform = true;
557 let db = parity_db::Db::open_or_create(&config).map_err(|e| Error::Db(e.to_string()))?;
558 match db.get(col::META, &KEY_VERSION).map_err(|e| Error::Db(e.to_string()))? {
559 Some(version) => {
560 let version = u32::from_le_bytes(
561 version
562 .try_into()
563 .map_err(|_| Error::Db("Error reading database version".into()))?,
564 );
565 if version != CURRENT_VERSION {
566 return Err(Error::Db(format!("Unsupported database version: {version}")))
567 }
568 },
569 None => {
570 db.commit([(
571 col::META,
572 KEY_VERSION.to_vec(),
573 Some(CURRENT_VERSION.to_le_bytes().to_vec()),
574 )])
575 .map_err(|e| Error::Db(e.to_string()))?;
576 },
577 }
578
579 let validator = ClientWrapper { client, _block: Default::default() };
580 let validate_fn = Box::new(move |block, source, statement| {
581 validator.validate_statement(block, source, statement)
582 });
583
584 let store = Store {
585 db,
586 index: RwLock::new(Index::new(options)),
587 validate_fn,
588 keystore,
589 time_override: None,
590 metrics: PrometheusMetrics::new(prometheus),
591 };
592 store.populate()?;
593 Ok(store)
594 }
595
596 fn populate(&self) -> Result<()> {
601 {
602 let mut index = self.index.write();
603 self.db
604 .iter_column_while(col::STATEMENTS, |item| {
605 let statement = item.value;
606 if let Ok(statement) = Statement::decode(&mut statement.as_slice()) {
607 let hash = statement.hash();
608 log::trace!(
609 target: LOG_TARGET,
610 "Statement loaded {:?}",
611 HexDisplay::from(&hash)
612 );
613 if let Some(account_id) = statement.account_id() {
614 index.insert_new(hash, account_id, &statement);
615 } else {
616 log::debug!(
617 target: LOG_TARGET,
618 "Error decoding statement loaded from the DB: {:?}",
619 HexDisplay::from(&hash)
620 );
621 }
622 }
623 true
624 })
625 .map_err(|e| Error::Db(e.to_string()))?;
626 self.db
627 .iter_column_while(col::EXPIRED, |item| {
628 let expired_info = item.value;
629 if let Ok((hash, timestamp)) =
630 <(Hash, u64)>::decode(&mut expired_info.as_slice())
631 {
632 log::trace!(
633 target: LOG_TARGET,
634 "Statement loaded (expired): {:?}",
635 HexDisplay::from(&hash)
636 );
637 index.insert_expired(hash, timestamp);
638 }
639 true
640 })
641 .map_err(|e| Error::Db(e.to_string()))?;
642 }
643
644 self.maintain();
645 Ok(())
646 }
647
648 fn collect_statements<R>(
649 &self,
650 key: Option<DecryptionKey>,
651 match_all_topics: &[Topic],
652 mut f: impl FnMut(Statement) -> Option<R>,
653 ) -> Result<Vec<R>> {
654 let mut result = Vec::new();
655 let index = self.index.read();
656 index.iterate_with(key, match_all_topics, |hash| {
657 match self.db.get(col::STATEMENTS, hash).map_err(|e| Error::Db(e.to_string()))? {
658 Some(entry) => {
659 if let Ok(statement) = Statement::decode(&mut entry.as_slice()) {
660 if let Some(data) = f(statement) {
661 result.push(data);
662 }
663 } else {
664 log::warn!(
666 target: LOG_TARGET,
667 "Corrupt statement {:?}",
668 HexDisplay::from(hash)
669 );
670 }
671 },
672 None => {
673 log::warn!(
675 target: LOG_TARGET,
676 "Missing statement {:?}",
677 HexDisplay::from(hash)
678 );
679 },
680 }
681 Ok(())
682 })?;
683 Ok(result)
684 }
685
686 pub fn maintain(&self) {
688 log::trace!(target: LOG_TARGET, "Started store maintenance");
689 let (deleted, active_count, expired_count): (Vec<_>, usize, usize) = {
690 let mut index = self.index.write();
691 let deleted = index.maintain(self.timestamp());
692 (deleted, index.entries.len(), index.expired.len())
693 };
694 let deleted: Vec<_> =
695 deleted.into_iter().map(|hash| (col::EXPIRED, hash.to_vec(), None)).collect();
696 let deleted_count = deleted.len() as u64;
697 if let Err(e) = self.db.commit(deleted) {
698 log::warn!(target: LOG_TARGET, "Error writing to the statement database: {:?}", e);
699 } else {
700 self.metrics.report(|metrics| metrics.statements_pruned.inc_by(deleted_count));
701 }
702 log::trace!(
703 target: LOG_TARGET,
704 "Completed store maintenance. Purged: {}, Active: {}, Expired: {}",
705 deleted_count,
706 active_count,
707 expired_count
708 );
709 }
710
711 fn timestamp(&self) -> u64 {
712 self.time_override.unwrap_or_else(|| {
713 std::time::SystemTime::now()
714 .duration_since(std::time::UNIX_EPOCH)
715 .unwrap_or_default()
716 .as_secs()
717 })
718 }
719
720 #[cfg(test)]
721 fn set_time(&mut self, time: u64) {
722 self.time_override = Some(time);
723 }
724
725 pub fn as_statement_store_ext(self: Arc<Self>) -> StatementStoreExt {
727 StatementStoreExt::new(self)
728 }
729
730 fn posted_clear_inner<R>(
733 &self,
734 match_all_topics: &[Topic],
735 dest: [u8; 32],
736 mut map_f: impl FnMut(Statement, Vec<u8>) -> R,
738 ) -> Result<Vec<R>> {
739 self.collect_statements(Some(dest), match_all_topics, |statement| {
740 if let (Some(key), Some(_)) = (statement.decryption_key(), statement.data()) {
741 let public: sp_core::ed25519::Public = UncheckedFrom::unchecked_from(key);
742 let public: sp_statement_store::ed25519::Public = public.into();
743 match self.keystore.key_pair::<sp_statement_store::ed25519::Pair>(&public) {
744 Err(e) => {
745 log::debug!(
746 target: LOG_TARGET,
747 "Keystore error: {:?}, for statement {:?}",
748 e,
749 HexDisplay::from(&statement.hash())
750 );
751 None
752 },
753 Ok(None) => {
754 log::debug!(
755 target: LOG_TARGET,
756 "Keystore is missing key for statement {:?}",
757 HexDisplay::from(&statement.hash())
758 );
759 None
760 },
761 Ok(Some(pair)) => match statement.decrypt_private(&pair.into_inner()) {
762 Ok(r) => r.map(|data| map_f(statement, data)),
763 Err(e) => {
764 log::debug!(
765 target: LOG_TARGET,
766 "Decryption error: {:?}, for statement {:?}",
767 e,
768 HexDisplay::from(&statement.hash())
769 );
770 None
771 },
772 },
773 }
774 } else {
775 None
776 }
777 })
778 }
779}
780
781impl StatementStore for Store {
782 fn statements(&self) -> Result<Vec<(Hash, Statement)>> {
784 let index = self.index.read();
785 let mut result = Vec::with_capacity(index.entries.len());
786 for hash in index.entries.keys().cloned() {
787 let Some(encoded) =
788 self.db.get(col::STATEMENTS, &hash).map_err(|e| Error::Db(e.to_string()))?
789 else {
790 continue
791 };
792 if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) {
793 result.push((hash, statement));
794 }
795 }
796 Ok(result)
797 }
798
799 fn take_recent_statements(&self) -> Result<Vec<(Hash, Statement)>> {
800 let mut index = self.index.write();
801 let recent = index.take_recent();
802 let mut result = Vec::with_capacity(recent.len());
803 for hash in recent {
804 let Some(encoded) =
805 self.db.get(col::STATEMENTS, &hash).map_err(|e| Error::Db(e.to_string()))?
806 else {
807 continue
808 };
809 if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) {
810 result.push((hash, statement));
811 }
812 }
813 Ok(result)
814 }
815
816 fn statement(&self, hash: &Hash) -> Result<Option<Statement>> {
818 Ok(
819 match self
820 .db
821 .get(col::STATEMENTS, hash.as_slice())
822 .map_err(|e| Error::Db(e.to_string()))?
823 {
824 Some(entry) => {
825 log::trace!(
826 target: LOG_TARGET,
827 "Queried statement {:?}",
828 HexDisplay::from(hash)
829 );
830 Some(
831 Statement::decode(&mut entry.as_slice())
832 .map_err(|e| Error::Decode(e.to_string()))?,
833 )
834 },
835 None => {
836 log::trace!(
837 target: LOG_TARGET,
838 "Queried missing statement {:?}",
839 HexDisplay::from(hash)
840 );
841 None
842 },
843 },
844 )
845 }
846
847 fn has_statement(&self, hash: &Hash) -> bool {
848 self.index.read().entries.contains_key(hash)
849 }
850
851 fn broadcasts(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>> {
854 self.collect_statements(None, match_all_topics, |statement| statement.into_data())
855 }
856
857 fn posted(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
861 self.collect_statements(Some(dest), match_all_topics, |statement| statement.into_data())
862 }
863
864 fn posted_clear(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
867 self.posted_clear_inner(match_all_topics, dest, |_statement, data| data)
868 }
869
870 fn broadcasts_stmt(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>> {
873 self.collect_statements(None, match_all_topics, |statement| Some(statement.encode()))
874 }
875
876 fn posted_stmt(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
880 self.collect_statements(Some(dest), match_all_topics, |statement| Some(statement.encode()))
881 }
882
883 fn posted_clear_stmt(
886 &self,
887 match_all_topics: &[Topic],
888 dest: [u8; 32],
889 ) -> Result<Vec<Vec<u8>>> {
890 self.posted_clear_inner(match_all_topics, dest, |statement, data| {
891 let mut res = Vec::with_capacity(statement.size_hint() + data.len());
892 statement.encode_to(&mut res);
893 res.extend_from_slice(&data);
894 res
895 })
896 }
897
898 fn submit(&self, statement: Statement, source: StatementSource) -> SubmitResult {
900 let hash = statement.hash();
901 let encoded_size = statement.encoded_size();
902 if encoded_size > MAX_STATEMENT_SIZE {
903 log::debug!(
904 target: LOG_TARGET,
905 "Statement is too big for propogation: {:?} ({}/{} bytes)",
906 HexDisplay::from(&hash),
907 statement.encoded_size(),
908 MAX_STATEMENT_SIZE
909 );
910 return SubmitResult::Invalid(InvalidReason::EncodingTooLarge {
911 submitted_size: encoded_size,
912 max_size: MAX_STATEMENT_SIZE,
913 });
914 }
915
916 match self.index.read().query(&hash) {
917 IndexQuery::Expired =>
918 if !source.can_be_resubmitted() {
919 return SubmitResult::KnownExpired
920 },
921 IndexQuery::Exists =>
922 if !source.can_be_resubmitted() {
923 return SubmitResult::Known
924 },
925 IndexQuery::Unknown => {},
926 }
927
928 let Some(account_id) = statement.account_id() else {
929 log::debug!(
930 target: LOG_TARGET,
931 "Statement validation failed: Missing proof ({:?})",
932 HexDisplay::from(&hash),
933 );
934 self.metrics.report(|metrics| metrics.validations_invalid.inc());
935 return SubmitResult::Invalid(InvalidReason::NoProof);
936 };
937
938 let at_block = if let Some(Proof::OnChain { block_hash, .. }) = statement.proof() {
940 Some(*block_hash)
941 } else {
942 None
943 };
944 let validation_result = (self.validate_fn)(at_block, source, statement.clone());
945 let validation = match validation_result {
946 Ok(validation) => validation,
947 Err(InvalidStatement::BadProof) => {
948 log::debug!(
949 target: LOG_TARGET,
950 "Statement validation failed: BadProof, {:?}",
951 HexDisplay::from(&hash),
952 );
953 self.metrics.report(|metrics| metrics.validations_invalid.inc());
954 return SubmitResult::Invalid(InvalidReason::BadProof);
955 },
956 Err(InvalidStatement::NoProof) => {
957 log::debug!(
958 target: LOG_TARGET,
959 "Statement validation failed: NoProof, {:?}",
960 HexDisplay::from(&hash),
961 );
962 self.metrics.report(|metrics| metrics.validations_invalid.inc());
963 return SubmitResult::Invalid(InvalidReason::NoProof);
964 },
965 Err(InvalidStatement::InternalError) =>
966 return SubmitResult::InternalError(Error::Runtime),
967 };
968
969 let current_time = self.timestamp();
970 let mut commit = Vec::new();
971 {
972 let mut index = self.index.write();
973
974 let evicted =
975 match index.insert(hash, &statement, &account_id, &validation, current_time) {
976 Ok(evicted) => evicted,
977 Err(reason) => return SubmitResult::Rejected(reason),
978 };
979
980 commit.push((col::STATEMENTS, hash.to_vec(), Some(statement.encode())));
981 for hash in evicted {
982 commit.push((col::STATEMENTS, hash.to_vec(), None));
983 commit.push((col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())));
984 }
985 if let Err(e) = self.db.commit(commit) {
986 log::debug!(
987 target: LOG_TARGET,
988 "Statement validation failed: database error {}, {:?}",
989 e,
990 statement
991 );
992 return SubmitResult::InternalError(Error::Db(e.to_string()))
993 }
994 } self.metrics.report(|metrics| metrics.submitted_statements.inc());
996 log::trace!(target: LOG_TARGET, "Statement submitted: {:?}", HexDisplay::from(&hash));
997 SubmitResult::New
998 }
999
1000 fn remove(&self, hash: &Hash) -> Result<()> {
1002 let current_time = self.timestamp();
1003 {
1004 let mut index = self.index.write();
1005 if index.make_expired(hash, current_time) {
1006 let commit = [
1007 (col::STATEMENTS, hash.to_vec(), None),
1008 (col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())),
1009 ];
1010 if let Err(e) = self.db.commit(commit) {
1011 log::debug!(
1012 target: LOG_TARGET,
1013 "Error removing statement: database error {}, {:?}",
1014 e,
1015 HexDisplay::from(hash),
1016 );
1017 return Err(Error::Db(e.to_string()))
1018 }
1019 }
1020 }
1021 Ok(())
1022 }
1023
1024 fn remove_by(&self, who: [u8; 32]) -> Result<()> {
1026 let mut index = self.index.write();
1027 let mut evicted = Vec::new();
1028 if let Some(account_rec) = index.accounts.get(&who) {
1029 evicted.extend(account_rec.by_priority.keys().map(|k| k.hash));
1030 }
1031
1032 let current_time = self.timestamp();
1033 let mut commit = Vec::new();
1034 for hash in evicted {
1035 index.make_expired(&hash, current_time);
1036 commit.push((col::STATEMENTS, hash.to_vec(), None));
1037 commit.push((col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())));
1038 }
1039 self.db.commit(commit).map_err(|e| {
1040 log::debug!(
1041 target: LOG_TARGET,
1042 "Error removing statement: database error {}, remove by {:?}",
1043 e,
1044 HexDisplay::from(&who),
1045 );
1046
1047 Error::Db(e.to_string())
1048 })
1049 }
1050}
1051
1052#[cfg(test)]
1053mod tests {
1054 use crate::Store;
1055 use sc_keystore::Keystore;
1056 use sp_core::{Decode, Encode, Pair};
1057 use sp_statement_store::{
1058 runtime_api::{InvalidStatement, ValidStatement, ValidateStatement},
1059 AccountId, Channel, DecryptionKey, Proof, SignatureVerificationResult, Statement,
1060 StatementSource, StatementStore, SubmitResult, Topic,
1061 };
1062
1063 type Extrinsic = sp_runtime::OpaqueExtrinsic;
1064 type Hash = sp_core::H256;
1065 type Hashing = sp_runtime::traits::BlakeTwo256;
1066 type BlockNumber = u64;
1067 type Header = sp_runtime::generic::Header<BlockNumber, Hashing>;
1068 type Block = sp_runtime::generic::Block<Header, Extrinsic>;
1069
1070 const CORRECT_BLOCK_HASH: [u8; 32] = [1u8; 32];
1071
1072 #[derive(Clone)]
1073 pub(crate) struct TestClient;
1074
1075 pub(crate) struct RuntimeApi {
1076 _inner: TestClient,
1077 }
1078
1079 impl sp_api::ProvideRuntimeApi<Block> for TestClient {
1080 type Api = RuntimeApi;
1081 fn runtime_api(&self) -> sp_api::ApiRef<'_, Self::Api> {
1082 RuntimeApi { _inner: self.clone() }.into()
1083 }
1084 }
1085
1086 sp_api::mock_impl_runtime_apis! {
1087 impl ValidateStatement<Block> for RuntimeApi {
1088 fn validate_statement(
1089 _source: StatementSource,
1090 statement: Statement,
1091 ) -> std::result::Result<ValidStatement, InvalidStatement> {
1092 use crate::tests::account;
1093 match statement.verify_signature() {
1094 SignatureVerificationResult::Valid(_) => Ok(ValidStatement{max_count: 100, max_size: 1000}),
1095 SignatureVerificationResult::Invalid => Err(InvalidStatement::BadProof),
1096 SignatureVerificationResult::NoSignature => {
1097 if let Some(Proof::OnChain { block_hash, .. }) = statement.proof() {
1098 if block_hash == &CORRECT_BLOCK_HASH {
1099 let (max_count, max_size) = match statement.account_id() {
1100 Some(a) if a == account(1) => (1, 1000),
1101 Some(a) if a == account(2) => (2, 1000),
1102 Some(a) if a == account(3) => (3, 1000),
1103 Some(a) if a == account(4) => (4, 1000),
1104 Some(a) if a == account(42) => (42, 42 * crate::MAX_STATEMENT_SIZE as u32),
1105 _ => (2, 2000),
1106 };
1107 Ok(ValidStatement{ max_count, max_size })
1108 } else {
1109 Err(InvalidStatement::BadProof)
1110 }
1111 } else {
1112 Err(InvalidStatement::BadProof)
1113 }
1114 }
1115 }
1116 }
1117 }
1118 }
1119
1120 impl sp_blockchain::HeaderBackend<Block> for TestClient {
1121 fn header(&self, _hash: Hash) -> sp_blockchain::Result<Option<Header>> {
1122 unimplemented!()
1123 }
1124 fn info(&self) -> sp_blockchain::Info<Block> {
1125 sp_blockchain::Info {
1126 best_hash: CORRECT_BLOCK_HASH.into(),
1127 best_number: 0,
1128 genesis_hash: Default::default(),
1129 finalized_hash: CORRECT_BLOCK_HASH.into(),
1130 finalized_number: 1,
1131 finalized_state: None,
1132 number_leaves: 0,
1133 block_gap: None,
1134 }
1135 }
1136 fn status(&self, _hash: Hash) -> sp_blockchain::Result<sp_blockchain::BlockStatus> {
1137 unimplemented!()
1138 }
1139 fn number(&self, _hash: Hash) -> sp_blockchain::Result<Option<BlockNumber>> {
1140 unimplemented!()
1141 }
1142 fn hash(&self, _number: BlockNumber) -> sp_blockchain::Result<Option<Hash>> {
1143 unimplemented!()
1144 }
1145 }
1146
1147 fn test_store() -> (Store, tempfile::TempDir) {
1148 sp_tracing::init_for_tests();
1149 let temp_dir = tempfile::Builder::new().tempdir().expect("Error creating test dir");
1150
1151 let client = std::sync::Arc::new(TestClient);
1152 let mut path: std::path::PathBuf = temp_dir.path().into();
1153 path.push("db");
1154 let keystore = std::sync::Arc::new(sc_keystore::LocalKeystore::in_memory());
1155 let store = Store::new(&path, Default::default(), client, keystore, None).unwrap();
1156 (store, temp_dir) }
1158
1159 fn signed_statement(data: u8) -> Statement {
1160 signed_statement_with_topics(data, &[], None)
1161 }
1162
1163 fn signed_statement_with_topics(
1164 data: u8,
1165 topics: &[Topic],
1166 dec_key: Option<DecryptionKey>,
1167 ) -> Statement {
1168 let mut statement = Statement::new();
1169 statement.set_plain_data(vec![data]);
1170 for i in 0..topics.len() {
1171 statement.set_topic(i, topics[i]);
1172 }
1173 if let Some(key) = dec_key {
1174 statement.set_decryption_key(key);
1175 }
1176 let kp = sp_core::ed25519::Pair::from_string("//Alice", None).unwrap();
1177 statement.sign_ed25519_private(&kp);
1178 statement
1179 }
1180
1181 fn topic(data: u64) -> Topic {
1182 let mut topic: Topic = Default::default();
1183 topic[0..8].copy_from_slice(&data.to_le_bytes());
1184 topic
1185 }
1186
1187 fn dec_key(data: u64) -> DecryptionKey {
1188 let mut dec_key: DecryptionKey = Default::default();
1189 dec_key[0..8].copy_from_slice(&data.to_le_bytes());
1190 dec_key
1191 }
1192
1193 fn account(id: u64) -> AccountId {
1194 let mut account: AccountId = Default::default();
1195 account[0..8].copy_from_slice(&id.to_le_bytes());
1196 account
1197 }
1198
1199 fn channel(id: u64) -> Channel {
1200 let mut channel: Channel = Default::default();
1201 channel[0..8].copy_from_slice(&id.to_le_bytes());
1202 channel
1203 }
1204
1205 fn statement(account_id: u64, priority: u32, c: Option<u64>, data_len: usize) -> Statement {
1206 let mut statement = Statement::new();
1207 let mut data = Vec::new();
1208 data.resize(data_len, 0);
1209 statement.set_plain_data(data);
1210 statement.set_priority(priority);
1211 if let Some(c) = c {
1212 statement.set_channel(channel(c));
1213 }
1214 statement.set_proof(Proof::OnChain {
1215 block_hash: CORRECT_BLOCK_HASH,
1216 who: account(account_id),
1217 event_index: 0,
1218 });
1219 statement
1220 }
1221
1222 #[test]
1223 fn submit_one() {
1224 let (store, _temp) = test_store();
1225 let statement0 = signed_statement(0);
1226 assert_eq!(store.submit(statement0, StatementSource::Network), SubmitResult::New);
1227 let unsigned = statement(0, 1, None, 0);
1228 assert_eq!(store.submit(unsigned, StatementSource::Network), SubmitResult::New);
1229 }
1230
1231 #[test]
1232 fn save_and_load_statements() {
1233 let (store, temp) = test_store();
1234 let statement0 = signed_statement(0);
1235 let statement1 = signed_statement(1);
1236 let statement2 = signed_statement(2);
1237 assert_eq!(store.submit(statement0.clone(), StatementSource::Network), SubmitResult::New);
1238 assert_eq!(store.submit(statement1.clone(), StatementSource::Network), SubmitResult::New);
1239 assert_eq!(store.submit(statement2.clone(), StatementSource::Network), SubmitResult::New);
1240 assert_eq!(store.statements().unwrap().len(), 3);
1241 assert_eq!(store.broadcasts(&[]).unwrap().len(), 3);
1242 assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1.clone()));
1243 let keystore = store.keystore.clone();
1244 drop(store);
1245
1246 let client = std::sync::Arc::new(TestClient);
1247 let mut path: std::path::PathBuf = temp.path().into();
1248 path.push("db");
1249 let store = Store::new(&path, Default::default(), client, keystore, None).unwrap();
1250 assert_eq!(store.statements().unwrap().len(), 3);
1251 assert_eq!(store.broadcasts(&[]).unwrap().len(), 3);
1252 assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1));
1253 }
1254
1255 #[test]
1256 fn take_recent_statements_clears_index() {
1257 let (store, _temp) = test_store();
1258 let statement0 = signed_statement(0);
1259 let statement1 = signed_statement(1);
1260 let statement2 = signed_statement(2);
1261 let statement3 = signed_statement(3);
1262
1263 let _ = store.submit(statement0.clone(), StatementSource::Local);
1264 let _ = store.submit(statement1.clone(), StatementSource::Local);
1265 let _ = store.submit(statement2.clone(), StatementSource::Local);
1266
1267 let recent1 = store.take_recent_statements().unwrap();
1268 let (recent1_hashes, recent1_statements): (Vec<_>, Vec<_>) = recent1.into_iter().unzip();
1269 let expected1 = vec![statement0, statement1, statement2];
1270 assert!(expected1.iter().all(|s| recent1_hashes.contains(&s.hash())));
1271 assert!(expected1.iter().all(|s| recent1_statements.contains(s)));
1272
1273 let recent2 = store.take_recent_statements().unwrap();
1275 assert_eq!(recent2.len(), 0);
1276
1277 store.submit(statement3.clone(), StatementSource::Network);
1278
1279 let recent3 = store.take_recent_statements().unwrap();
1280 let (recent3_hashes, recent3_statements): (Vec<_>, Vec<_>) = recent3.into_iter().unzip();
1281 let expected3 = vec![statement3];
1282 assert!(expected3.iter().all(|s| recent3_hashes.contains(&s.hash())));
1283 assert!(expected3.iter().all(|s| recent3_statements.contains(s)));
1284
1285 assert_eq!(store.statements().unwrap().len(), 4);
1287 }
1288
1289 #[test]
1290 fn search_by_topic_and_key() {
1291 let (store, _temp) = test_store();
1292 let statement0 = signed_statement(0);
1293 let statement1 = signed_statement_with_topics(1, &[topic(0)], None);
1294 let statement2 = signed_statement_with_topics(2, &[topic(0), topic(1)], Some(dec_key(2)));
1295 let statement3 = signed_statement_with_topics(3, &[topic(0), topic(1), topic(2)], None);
1296 let statement4 =
1297 signed_statement_with_topics(4, &[topic(0), topic(42), topic(2), topic(3)], None);
1298 let statements = vec![statement0, statement1, statement2, statement3, statement4];
1299 for s in &statements {
1300 store.submit(s.clone(), StatementSource::Network);
1301 }
1302
1303 let assert_topics = |topics: &[u64], key: Option<u64>, expected: &[u8]| {
1304 let key = key.map(dec_key);
1305 let topics: Vec<_> = topics.iter().map(|t| topic(*t)).collect();
1306 let mut got_vals: Vec<_> = if let Some(key) = key {
1307 store.posted(&topics, key).unwrap().into_iter().map(|d| d[0]).collect()
1308 } else {
1309 store.broadcasts(&topics).unwrap().into_iter().map(|d| d[0]).collect()
1310 };
1311 got_vals.sort();
1312 assert_eq!(expected.to_vec(), got_vals);
1313 };
1314
1315 assert_topics(&[], None, &[0, 1, 3, 4]);
1316 assert_topics(&[], Some(2), &[2]);
1317 assert_topics(&[0], None, &[1, 3, 4]);
1318 assert_topics(&[1], None, &[3]);
1319 assert_topics(&[2], None, &[3, 4]);
1320 assert_topics(&[3], None, &[4]);
1321 assert_topics(&[42], None, &[4]);
1322
1323 assert_topics(&[0, 1], None, &[3]);
1324 assert_topics(&[0, 1], Some(2), &[2]);
1325 assert_topics(&[0, 1, 99], Some(2), &[]);
1326 assert_topics(&[1, 2], None, &[3]);
1327 assert_topics(&[99], None, &[]);
1328 assert_topics(&[0, 99], None, &[]);
1329 assert_topics(&[0, 1, 2, 3, 42], None, &[]);
1330 }
1331
1332 #[test]
1333 fn constraints() {
1334 let (store, _temp) = test_store();
1335
1336 store.index.write().options.max_total_size = 3000;
1337 let source = StatementSource::Network;
1338 let ok = SubmitResult::New;
1339
1340 assert!(matches!(
1344 store.submit(statement(1, 1, Some(1), 2000), source),
1345 SubmitResult::Rejected(_)
1346 ));
1347 assert_eq!(store.submit(statement(1, 1, Some(1), 500), source), ok);
1348 assert!(matches!(
1350 store.submit(statement(1, 1, Some(1), 200), source),
1351 SubmitResult::Rejected(_)
1352 ));
1353 assert_eq!(store.submit(statement(1, 2, Some(1), 600), source), ok);
1354 assert!(matches!(
1357 store.submit(statement(1, 1, Some(2), 100), source),
1358 SubmitResult::Rejected(_)
1359 ));
1360 assert_eq!(store.index.read().expired.len(), 1);
1361
1362 assert_eq!(store.submit(statement(2, 1, None, 500), source), ok);
1365 assert_eq!(store.submit(statement(2, 2, None, 100), source), ok);
1366 assert_eq!(store.submit(statement(2, 3, None, 500), source), ok);
1368 assert_eq!(store.index.read().expired.len(), 2);
1369 assert_eq!(store.submit(statement(2, 4, None, 1000), source), ok);
1371 assert_eq!(store.index.read().expired.len(), 4);
1372
1373 assert_eq!(store.submit(statement(3, 2, Some(1), 300), source), ok);
1376 assert_eq!(store.submit(statement(3, 3, Some(2), 300), source), ok);
1377 assert_eq!(store.submit(statement(3, 4, Some(3), 300), source), ok);
1378 assert_eq!(store.submit(statement(3, 5, None, 500), source), ok);
1380 assert_eq!(store.index.read().expired.len(), 6);
1381
1382 assert_eq!(store.index.read().total_size, 2400);
1383 assert_eq!(store.index.read().entries.len(), 4);
1384
1385 assert!(matches!(
1387 store.submit(statement(1, 1, None, 700), source),
1388 SubmitResult::Rejected(_)
1389 ));
1390 store.index.write().options.max_total_statements = 4;
1392 assert!(matches!(
1393 store.submit(statement(1, 1, None, 100), source),
1394 SubmitResult::Rejected(_)
1395 ));
1396
1397 let mut expected_statements = vec![
1398 statement(1, 2, Some(1), 600).hash(),
1399 statement(2, 4, None, 1000).hash(),
1400 statement(3, 4, Some(3), 300).hash(),
1401 statement(3, 5, None, 500).hash(),
1402 ];
1403 expected_statements.sort();
1404 let mut statements: Vec<_> =
1405 store.statements().unwrap().into_iter().map(|(hash, _)| hash).collect();
1406 statements.sort();
1407 assert_eq!(expected_statements, statements);
1408 }
1409
1410 #[test]
1411 fn max_statement_size_for_gossiping() {
1412 let (store, _temp) = test_store();
1413 store.index.write().options.max_total_size = 42 * crate::MAX_STATEMENT_SIZE;
1414
1415 assert_eq!(
1416 store.submit(
1417 statement(42, 1, Some(1), crate::MAX_STATEMENT_SIZE - 500),
1418 StatementSource::Local
1419 ),
1420 SubmitResult::New
1421 );
1422
1423 assert!(matches!(
1424 store.submit(
1425 statement(42, 2, Some(1), 2 * crate::MAX_STATEMENT_SIZE),
1426 StatementSource::Local
1427 ),
1428 SubmitResult::Invalid(_)
1429 ));
1430 }
1431
1432 #[test]
1433 fn expired_statements_are_purged() {
1434 use super::DEFAULT_PURGE_AFTER_SEC;
1435 let (mut store, temp) = test_store();
1436 let mut statement = statement(1, 1, Some(3), 100);
1437 store.set_time(0);
1438 statement.set_topic(0, topic(4));
1439 store.submit(statement.clone(), StatementSource::Network);
1440 assert_eq!(store.index.read().entries.len(), 1);
1441 store.remove(&statement.hash()).unwrap();
1442 assert_eq!(store.index.read().entries.len(), 0);
1443 assert_eq!(store.index.read().accounts.len(), 0);
1444 store.set_time(DEFAULT_PURGE_AFTER_SEC + 1);
1445 store.maintain();
1446 assert_eq!(store.index.read().expired.len(), 0);
1447 let keystore = store.keystore.clone();
1448 drop(store);
1449
1450 let client = std::sync::Arc::new(TestClient);
1451 let mut path: std::path::PathBuf = temp.path().into();
1452 path.push("db");
1453 let store = Store::new(&path, Default::default(), client, keystore, None).unwrap();
1454 assert_eq!(store.statements().unwrap().len(), 0);
1455 assert_eq!(store.index.read().expired.len(), 0);
1456 }
1457
1458 #[test]
1459 fn posted_clear_decrypts() {
1460 let (store, _temp) = test_store();
1461 let public = store
1462 .keystore
1463 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1464 .unwrap();
1465 let statement1 = statement(1, 1, None, 100);
1466 let mut statement2 = statement(1, 2, None, 0);
1467 let plain = b"The most valuable secret".to_vec();
1468 statement2.encrypt(&plain, &public).unwrap();
1469 store.submit(statement1, StatementSource::Network);
1470 store.submit(statement2, StatementSource::Network);
1471 let posted_clear = store.posted_clear(&[], public.into()).unwrap();
1472 assert_eq!(posted_clear, vec![plain]);
1473 }
1474
1475 #[test]
1476 fn broadcasts_stmt_returns_encoded_statements() {
1477 let (store, _tmp) = test_store();
1478
1479 let s0 = signed_statement_with_topics(0, &[], None);
1481 let s1 = signed_statement_with_topics(1, &[topic(42)], None);
1483 let s2 = signed_statement_with_topics(2, &[topic(42)], Some(dec_key(99)));
1485
1486 for s in [&s0, &s1, &s2] {
1487 store.submit(s.clone(), StatementSource::Network);
1488 }
1489
1490 let mut hashes: Vec<_> = store
1492 .broadcasts_stmt(&[])
1493 .unwrap()
1494 .into_iter()
1495 .map(|bytes| Statement::decode(&mut &bytes[..]).unwrap().hash())
1496 .collect();
1497 hashes.sort();
1498 let expected_hashes = {
1499 let mut e = vec![s0.hash(), s1.hash()];
1500 e.sort();
1501 e
1502 };
1503 assert_eq!(hashes, expected_hashes);
1504
1505 let got = store.broadcasts_stmt(&[topic(42)]).unwrap();
1507 assert_eq!(got.len(), 1);
1508 let st = Statement::decode(&mut &got[0][..]).unwrap();
1509 assert_eq!(st.hash(), s1.hash());
1510 }
1511
1512 #[test]
1513 fn posted_stmt_returns_encoded_statements_for_dest() {
1514 let (store, _tmp) = test_store();
1515
1516 let public1 = store
1517 .keystore
1518 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1519 .unwrap();
1520 let dest: [u8; 32] = public1.into();
1521
1522 let public2 = store
1523 .keystore
1524 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1525 .unwrap();
1526
1527 let mut s_with_key = statement(1, 1, None, 0);
1529 let plain1 = b"The most valuable secret".to_vec();
1530 s_with_key.encrypt(&plain1, &public1).unwrap();
1531
1532 let mut s_other_key = statement(2, 2, None, 0);
1534 let plain2 = b"The second most valuable secret".to_vec();
1535 s_other_key.encrypt(&plain2, &public2).unwrap();
1536
1537 for s in [&s_with_key, &s_other_key] {
1539 store.submit(s.clone(), StatementSource::Network);
1540 }
1541
1542 let retrieved = store.posted_stmt(&[], dest).unwrap();
1544 assert_eq!(retrieved.len(), 1, "Only one statement has dec_key=dest");
1545
1546 let returned_stmt = Statement::decode(&mut &retrieved[0][..]).unwrap();
1548 assert_eq!(
1549 returned_stmt.hash(),
1550 s_with_key.hash(),
1551 "Returned statement must match s_with_key"
1552 );
1553 }
1554
1555 #[test]
1556 fn posted_clear_stmt_returns_statement_followed_by_plain_data() {
1557 let (store, _tmp) = test_store();
1558
1559 let public1 = store
1560 .keystore
1561 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1562 .unwrap();
1563 let dest: [u8; 32] = public1.into();
1564
1565 let public2 = store
1566 .keystore
1567 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1568 .unwrap();
1569
1570 let mut s_with_key = statement(1, 1, None, 0);
1572 let plain1 = b"The most valuable secret".to_vec();
1573 s_with_key.encrypt(&plain1, &public1).unwrap();
1574
1575 let mut s_other_key = statement(2, 2, None, 0);
1577 let plain2 = b"The second most valuable secret".to_vec();
1578 s_other_key.encrypt(&plain2, &public2).unwrap();
1579
1580 for s in [&s_with_key, &s_other_key] {
1582 store.submit(s.clone(), StatementSource::Network);
1583 }
1584
1585 let retrieved = store.posted_clear_stmt(&[], dest).unwrap();
1587 assert_eq!(retrieved.len(), 1, "Only one statement has dec_key=dest");
1588
1589 let encoded_stmt = s_with_key.encode();
1591 let stmt_len = encoded_stmt.len();
1592
1593 assert_eq!(&retrieved[0][..stmt_len], &encoded_stmt[..]);
1595
1596 let trailing = &retrieved[0][stmt_len..];
1598 assert_eq!(trailing, &plain1[..]);
1599 }
1600
1601 #[test]
1602 fn posted_clear_returns_plain_data_for_dest_and_topics() {
1603 let (store, _tmp) = test_store();
1604
1605 let public_dest = store
1607 .keystore
1608 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1609 .unwrap();
1610 let dest: [u8; 32] = public_dest.into();
1611
1612 let public_other = store
1613 .keystore
1614 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1615 .unwrap();
1616
1617 let mut s_good = statement(1, 1, None, 0);
1619 let plaintext_good = b"The most valuable secret".to_vec();
1620 s_good.encrypt(&plaintext_good, &public_dest).unwrap();
1621 s_good.set_topic(0, topic(42));
1622
1623 let mut s_wrong_topic = statement(2, 2, None, 0);
1625 s_wrong_topic.encrypt(b"Wrong topic", &public_dest).unwrap();
1626 s_wrong_topic.set_topic(0, topic(99));
1627
1628 let mut s_other_dest = statement(3, 3, None, 0);
1630 s_other_dest.encrypt(b"Other dest", &public_other).unwrap();
1631 s_other_dest.set_topic(0, topic(42));
1632
1633 for s in [&s_good, &s_wrong_topic, &s_other_dest] {
1635 store.submit(s.clone(), StatementSource::Network);
1636 }
1637
1638 let retrieved = store.posted_clear(&[topic(42)], dest).unwrap();
1640
1641 assert_eq!(retrieved, vec![plaintext_good]);
1643 }
1644
1645 #[test]
1646 fn remove_by_covers_various_situations() {
1647 use sp_statement_store::{StatementSource, StatementStore, SubmitResult};
1648
1649 let (mut store, _temp) = test_store();
1651 store.set_time(0);
1652
1653 let t42 = topic(42);
1655 let k7 = dec_key(7);
1656
1657 let mut s_a1 = statement(4, 10, Some(100), 100);
1660 s_a1.set_topic(0, t42);
1661 let h_a1 = s_a1.hash();
1662
1663 let mut s_a2 = statement(4, 20, Some(200), 150);
1664 s_a2.set_decryption_key(k7);
1665 let h_a2 = s_a2.hash();
1666
1667 let s_a3 = statement(4, 30, None, 50);
1668 let h_a3 = s_a3.hash();
1669
1670 let s_b1 = statement(3, 10, None, 100);
1672 let h_b1 = s_b1.hash();
1673
1674 let mut s_b2 = statement(3, 15, Some(300), 100);
1675 s_b2.set_topic(0, t42);
1676 s_b2.set_decryption_key(k7);
1677 let h_b2 = s_b2.hash();
1678
1679 for s in [&s_a1, &s_a2, &s_a3, &s_b1, &s_b2] {
1681 assert_eq!(store.submit(s.clone(), StatementSource::Network), SubmitResult::New);
1682 }
1683
1684 {
1686 let idx = store.index.read();
1687 assert_eq!(idx.entries.len(), 5, "all 5 should be present");
1688 assert!(idx.accounts.contains_key(&account(4)));
1689 assert!(idx.accounts.contains_key(&account(3)));
1690 assert_eq!(idx.total_size, 100 + 150 + 50 + 100 + 100);
1691
1692 let set_t = idx.by_topic.get(&t42).expect("topic set exists");
1694 assert!(set_t.contains(&h_a1) && set_t.contains(&h_b2));
1695
1696 let set_k = idx.by_dec_key.get(&Some(k7)).expect("key set exists");
1697 assert!(set_k.contains(&h_a2) && set_k.contains(&h_b2));
1698 }
1699
1700 store.remove_by(account(4)).expect("remove_by should succeed");
1702
1703 {
1705 for h in [h_a1, h_a2, h_a3] {
1707 assert!(store.statement(&h).unwrap().is_none(), "A's statement should be removed");
1708 }
1709
1710 for h in [h_b1, h_b2] {
1712 assert!(store.statement(&h).unwrap().is_some(), "B's statement should remain");
1713 }
1714
1715 let idx = store.index.read();
1716
1717 assert!(!idx.accounts.contains_key(&account(4)), "Account A must be gone");
1719 assert!(idx.accounts.contains_key(&account(3)), "Account B must remain");
1720
1721 assert!(idx.expired.contains_key(&h_a1));
1723 assert!(idx.expired.contains_key(&h_a2));
1724 assert!(idx.expired.contains_key(&h_a3));
1725 assert_eq!(idx.expired.len(), 3);
1726
1727 assert_eq!(idx.entries.len(), 2);
1729 assert_eq!(idx.total_size, 100 + 100);
1730
1731 let set_t = idx.by_topic.get(&t42).expect("topic set exists");
1733 assert!(set_t.contains(&h_b2));
1734 assert!(!set_t.contains(&h_a1));
1735
1736 let set_k = idx.by_dec_key.get(&Some(k7)).expect("key set exists");
1738 assert!(set_k.contains(&h_b2));
1739 assert!(!set_k.contains(&h_a2));
1740 }
1741
1742 store.remove_by(account(4)).expect("second remove_by should be a no-op");
1744
1745 let purge_after = store.index.read().options.purge_after_sec;
1747 store.set_time(purge_after + 1);
1748 store.maintain();
1749 assert_eq!(store.index.read().expired.len(), 0, "expired entries should be purged");
1750
1751 let s_new = statement(4, 40, None, 10);
1753 assert_eq!(store.submit(s_new, StatementSource::Network), SubmitResult::New);
1754 }
1755}