1use crate::{
38 rate_limit::{RateLimitConfig, RateLimiter},
39 types::{
40 entry_accounted_size, promotion_backoff_blocks, signing_payload, HopBlockNumber,
41 HopEntryMeta, HopError, HopHash, PoolStatus, RecipientVec, SenderId, HOP_ACK_CONTEXT,
42 HOP_CLAIM_CONTEXT, HOP_META_VERSION, MAX_PROMOTION_ATTEMPTS,
43 },
44};
45use codec::{Decode, Encode};
46use parking_lot::{Mutex, RwLock};
47use sp_core::H256;
48use sp_crypto_hashing::blake2_256;
49use sp_runtime::{
50 traits::{IdentifyAccount, Verify},
51 MultiSignature, MultiSigner,
52};
53use std::{
54 collections::{BTreeSet, HashMap, HashSet},
55 fs,
56 path::{Path, PathBuf},
57 process,
58 sync::{
59 atomic::{AtomicU64, Ordering},
60 Arc,
61 },
62 time::{SystemTime, UNIX_EPOCH},
63};
64
65static TMP_SEQ: AtomicU64 = AtomicU64::new(0);
69
70const BLOBS_DIR: &str = "blobs";
71const META_DIR: &str = "meta";
72const BLOB_EXT: &str = "blob";
73const META_EXT: &str = "meta";
74const SHARD_COUNT: u16 = 256;
77
78pub struct HopDataPool {
80 index: Mutex<HashMap<HopHash, HopEntryMeta>>,
82 user_usage: RwLock<HashMap<SenderId, AtomicU64>>,
90 max_size: u64,
92 max_user_size: u64,
94 current_size: AtomicU64,
96 retention_secs: u64,
98 data_dir: PathBuf,
100 rate_limiter: Arc<RateLimiter>,
102}
103
104impl HopDataPool {
105 pub fn new(
110 max_size: u64,
111 max_user_size: u64,
112 retention_secs: u64,
113 data_dir: PathBuf,
114 rate_limit_cfg: RateLimitConfig,
115 ) -> Result<Self, HopError> {
116 for i in 0..SHARD_COUNT {
118 let shard = format!("{:02x}", i as u8);
119 fs::create_dir_all(data_dir.join(BLOBS_DIR).join(&shard))?;
120 fs::create_dir_all(data_dir.join(META_DIR).join(&shard))?;
121 }
122
123 let mut index = HashMap::new();
124 let mut user_usage: HashMap<SenderId, AtomicU64> = HashMap::new();
125 let mut current_size = 0u64;
126
127 for i in 0..SHARD_COUNT {
129 let shard = format!("{:02x}", i as u8);
130
131 let meta_shard_dir = data_dir.join(META_DIR).join(&shard);
133 if let Ok(entries) = fs::read_dir(&meta_shard_dir) {
134 for entry in entries.flatten() {
135 let path = entry.path();
136 if path.extension().and_then(|e| e.to_str()) != Some(META_EXT) {
137 if path
138 .file_name()
139 .and_then(|n| n.to_str())
140 .map_or(false, |n| n.contains(".tmp."))
141 {
142 let _ = fs::remove_file(&path);
143 }
144 continue;
145 }
146
147 let stem = match path.file_stem().and_then(|s| s.to_str()) {
148 Some(s) => s.to_string(),
149 None => continue,
150 };
151
152 let Some(hash) = parse_hex_hash(&stem) else {
153 tracing::warn!(target: "hop", path = ?path, "Removing .meta with invalid name");
154 let _ = fs::remove_file(&path);
155 continue;
156 };
157
158 let meta_bytes = match fs::read(&path) {
159 Ok(b) => b,
160 Err(e) => {
161 tracing::warn!(target: "hop", path = ?path, error = %e, "Removing unreadable .meta");
162 let _ = fs::remove_file(&path);
163 continue;
164 },
165 };
166 let meta = match HopEntryMeta::decode(&mut &meta_bytes[..]) {
167 Ok(m) => m,
168 Err(e) => {
169 tracing::warn!(target: "hop", path = ?path, error = %e, "Removing corrupt .meta");
170 let _ = fs::remove_file(&path);
171 continue;
172 },
173 };
174 if meta.version != HOP_META_VERSION {
175 tracing::warn!(
176 target: "hop",
177 path = ?path,
178 version = meta.version,
179 expected = HOP_META_VERSION,
180 "Removing .meta with unsupported on-disk version",
181 );
182 let _ = fs::remove_file(&path);
183 let _ = fs::remove_file(Self::entry_path(
184 &data_dir, &hash, BLOBS_DIR, BLOB_EXT,
185 ));
186 continue;
187 }
188
189 let blob_path = Self::entry_path(&data_dir, &hash, BLOBS_DIR, BLOB_EXT);
190 if !blob_path.exists() {
191 tracing::warn!(target: "hop", hash = ?stem, "Removing orphan .meta (no .blob)");
192 let _ = fs::remove_file(&path);
193 continue;
194 }
195
196 let accounted = entry_accounted_size(meta.size, meta.recipients.len());
197 current_size += accounted;
198 user_usage
199 .entry(meta.sender_id)
200 .or_default()
201 .fetch_add(accounted, Ordering::Relaxed);
202 index.insert(hash, meta);
203 }
204 }
205
206 let blob_shard_dir = data_dir.join(BLOBS_DIR).join(&shard);
208 if let Ok(entries) = fs::read_dir(&blob_shard_dir) {
209 for entry in entries.flatten() {
210 let path = entry.path();
211 if path.extension().and_then(|e| e.to_str()) != Some(BLOB_EXT) {
212 if path
213 .file_name()
214 .and_then(|n| n.to_str())
215 .map_or(false, |n| n.contains(".tmp."))
216 {
217 let _ = fs::remove_file(&path);
218 }
219 continue;
220 }
221 let stem = match path.file_stem().and_then(|s| s.to_str()) {
222 Some(s) => s.to_string(),
223 None => continue,
224 };
225 let is_orphan = match parse_hex_hash(&stem) {
231 Some(hash) => !index.contains_key(&hash),
232 None => true,
233 };
234 if is_orphan {
235 tracing::warn!(target: "hop", hash = ?stem, "Removing orphan .blob (no .meta)");
236 let _ = fs::remove_file(&path);
237 }
238 }
239 }
240 }
241
242 tracing::info!(
243 target: "hop",
244 entries = index.len(),
245 total_bytes = current_size,
246 "Recovered HOP pool from disk"
247 );
248
249 Ok(Self {
250 index: Mutex::new(index),
251 user_usage: RwLock::new(user_usage),
252 max_size,
253 max_user_size,
254 current_size: AtomicU64::new(current_size),
255 retention_secs,
256 data_dir,
257 rate_limiter: Arc::new(RateLimiter::new(rate_limit_cfg)),
258 })
259 }
260
261 fn charge_user(&self, sender_id: &SenderId, accounted: u64) -> Result<(), HopError> {
267 {
269 let usage = self.user_usage.read();
270 if let Some(counter) = usage.get(sender_id) {
271 return self.try_charge(counter, accounted);
272 }
273 }
274 let mut usage = self.user_usage.write();
276 let counter = usage.entry(*sender_id).or_default();
277 self.try_charge(counter, accounted)
278 }
279
280 fn try_charge(&self, counter: &AtomicU64, accounted: u64) -> Result<(), HopError> {
285 let previous = counter.fetch_add(accounted, Ordering::Relaxed);
286 if previous.saturating_add(accounted) > self.max_user_size {
287 counter.fetch_sub(accounted, Ordering::Relaxed);
288 return Err(HopError::UserQuotaExceeded { used: previous, limit: self.max_user_size });
289 }
290 Ok(())
291 }
292
293 fn release_user_quota(&self, sender_id: &SenderId, accounted: u64) {
297 if let Some(counter) = self.user_usage.read().get(sender_id) {
298 saturating_release(counter, accounted);
299 }
300 }
301
302 fn entry_path(data_dir: &Path, hash: &HopHash, subdir: &str, ext: &str) -> PathBuf {
304 let hex = hex::encode(hash);
305 data_dir.join(subdir).join(&hex[..2]).join(format!("{}.{}", hex, ext))
306 }
307
308 fn blob_path(&self, hash: &HopHash) -> PathBuf {
310 Self::entry_path(&self.data_dir, hash, BLOBS_DIR, BLOB_EXT)
311 }
312
313 fn meta_path(&self, hash: &HopHash) -> PathBuf {
315 Self::entry_path(&self.data_dir, hash, META_DIR, META_EXT)
316 }
317
318 fn write_atomic(path: &Path, data: &[u8]) -> Result<(), HopError> {
325 let suffix = format!("tmp.{}.{}", process::id(), TMP_SEQ.fetch_add(1, Ordering::Relaxed));
326 let tmp_path = path.with_extension(suffix);
327 if let Err(e) = fs::write(&tmp_path, data) {
328 let _ = fs::remove_file(&tmp_path);
329 return Err(e.into());
330 }
331 if let Err(e) = fs::rename(&tmp_path, path) {
332 let _ = fs::remove_file(&tmp_path);
333 return Err(e.into());
334 }
335 Ok(())
336 }
337
338 pub fn insert(
342 &self,
343 data: Vec<u8>,
344 recipients: RecipientVec,
345 sender_id: SenderId,
346 signer: MultiSigner,
347 signature: MultiSignature,
348 submit_timestamp: u64,
349 ) -> Result<HopHash, HopError> {
350 if recipients.is_empty() {
351 return Err(HopError::NoRecipients);
352 }
353 let unique: BTreeSet<&MultiSigner> = recipients.iter().map(|r| &r.signer).collect();
354 if unique.len() != recipients.len() {
355 return Err(HopError::DuplicateRecipient);
356 }
357
358 if data.is_empty() {
359 return Err(HopError::EmptyData);
360 }
361
362 let data_len = data.len() as u64;
363
364 let accounted = entry_accounted_size(data_len, recipients.len());
372
373 if let Err(retry_after_secs) = self.rate_limiter.check(&sender_id, accounted) {
375 return Err(HopError::RateLimited { retry_after_secs });
376 }
377
378 let previous_size = self.current_size.fetch_add(accounted, Ordering::Relaxed);
379 if previous_size.saturating_add(accounted) > self.max_size {
380 self.current_size.fetch_sub(accounted, Ordering::Relaxed);
381 return Err(HopError::PoolFull(previous_size, self.max_size));
382 }
383
384 if let Err(e) = self.charge_user(&sender_id, accounted) {
385 self.current_size.fetch_sub(accounted, Ordering::Relaxed);
386 return Err(e);
387 }
388
389 let hash = H256(blake2_256(&data));
390
391 {
393 let index = self.index.lock();
394 if index.contains_key(&hash) {
395 self.release_user_quota(&sender_id, accounted);
396 self.current_size.fetch_sub(accounted, Ordering::Relaxed);
397 return Err(HopError::DuplicateEntry);
398 }
399 }
400
401 let blob_path = self.blob_path(&hash);
404 if let Err(e) = Self::write_atomic(&blob_path, &data) {
405 self.release_user_quota(&sender_id, accounted);
406 self.current_size.fetch_sub(accounted, Ordering::Relaxed);
407 return Err(e);
408 }
409
410 let expires_at = SystemTime::now()
411 .duration_since(UNIX_EPOCH)
412 .unwrap_or_default()
413 .as_secs()
414 .saturating_add(self.retention_secs);
415 let meta = HopEntryMeta::new(
416 data_len,
417 expires_at,
418 recipients,
419 sender_id,
420 signer,
421 signature,
422 submit_timestamp,
423 );
424 let meta_bytes = meta.encode();
425 let meta_path = self.meta_path(&hash);
426
427 {
432 let mut index = self.index.lock();
433 if index.contains_key(&hash) {
434 tracing::debug!(
435 target: "hop",
436 hash = ?hex::encode(hash),
437 "Duplicate insert race lost; keeping winner's files"
438 );
439 drop(index);
442 self.release_user_quota(&sender_id, accounted);
443 self.current_size.fetch_sub(accounted, Ordering::Relaxed);
444 return Err(HopError::DuplicateEntry);
445 }
446 if let Err(e) = Self::write_atomic(&meta_path, &meta_bytes) {
447 let _ = fs::remove_file(&blob_path);
450 drop(index);
451 self.release_user_quota(&sender_id, accounted);
452 self.current_size.fetch_sub(accounted, Ordering::Relaxed);
453 return Err(e);
454 }
455 index.insert(hash, meta);
456 }
457
458 tracing::info!(
459 target: "hop",
460 hash = ?hex::encode(hash),
461 size = data_len,
462 accounted,
463 expires_at,
464 "Data added to HOP pool"
465 );
466
467 Ok(hash)
468 }
469
470 fn read_and_verify_blob(&self, hash: &HopHash) -> Result<Vec<u8>, HopError> {
477 let blob_path = self.blob_path(hash);
478 let data = fs::read(&blob_path).map_err(|e| {
479 if e.kind() == std::io::ErrorKind::NotFound {
480 HopError::NotFound
481 } else {
482 HopError::IoError(e)
483 }
484 })?;
485 if H256(blake2_256(&data)) != *hash {
486 tracing::error!(
487 target: "hop",
488 hash = ?hex::encode(hash),
489 size = data.len(),
490 "Blob integrity check failed; purging entry"
491 );
492 self.purge_corrupt_entry(hash);
493 return Err(HopError::NotFound);
494 }
495 Ok(data)
496 }
497
498 fn purge_corrupt_entry(&self, hash: &HopHash) {
501 let removed = {
502 let mut index = self.index.lock();
503 index.remove(hash)
504 };
505 if let Some(meta) = removed {
506 let accounted = entry_accounted_size(meta.size, meta.recipients.len());
507 self.current_size.fetch_sub(accounted, Ordering::Relaxed);
508 self.release_user_quota(&meta.sender_id, accounted);
509 }
510 let _ = fs::remove_file(self.blob_path(hash));
511 let _ = fs::remove_file(self.meta_path(hash));
512 }
513
514 fn read_or_log(&self, hash: &HopHash) -> Option<Vec<u8>> {
517 match self.read_and_verify_blob(hash) {
518 Ok(data) => Some(data),
519 Err(HopError::NotFound) => None,
520 Err(e) => {
521 tracing::error!(
522 target: "hop",
523 hash = ?hex::encode(hash),
524 error = ?e,
525 "Failed to read blob from disk"
526 );
527 None
528 },
529 }
530 }
531
532 pub fn get(&self, hash: &HopHash) -> Option<Vec<u8>> {
534 {
535 let index = self.index.lock();
536 if !index.contains_key(hash) {
537 return None;
538 }
539 }
540 self.read_or_log(hash)
541 }
542
543 pub fn get_with_auth(
549 &self,
550 hash: &HopHash,
551 ) -> Option<(Vec<u8>, MultiSigner, MultiSignature, u64)> {
552 let (signer, signature, submit_timestamp) = {
553 let index = self.index.lock();
554 let meta = index.get(hash)?;
555 (meta.signer.clone(), meta.signature.clone(), meta.submit_timestamp)
556 };
557 let data = self.read_or_log(hash)?;
558 Some((data, signer, signature, submit_timestamp))
559 }
560
561 fn find_recipient_idx(
567 meta: &HopEntryMeta,
568 hash: &HopHash,
569 signature: &[u8],
570 context: &[u8],
571 ) -> Result<usize, HopError> {
572 let multi_sig =
573 MultiSignature::decode(&mut &signature[..]).map_err(|_| HopError::InvalidSignature)?;
574 let payload = signing_payload(context, hash);
575
576 meta.recipients
577 .iter()
578 .position(|r| multi_sig.verify(&payload[..], &r.signer.clone().into_account()))
579 .ok_or(HopError::NotRecipient)
580 }
581
582 pub fn claim(&self, hash: &HopHash, signature: &[u8]) -> Result<Vec<u8>, HopError> {
590 {
591 let index = self.index.lock();
592 let meta = index.get(hash).ok_or(HopError::NotFound)?;
593 let idx = Self::find_recipient_idx(meta, hash, signature, HOP_CLAIM_CONTEXT)
596 .map_err(|_| HopError::NotFound)?;
597
598 if meta.recipients[idx].claimed {
600 return Err(HopError::AlreadyClaimed);
601 }
602 }
603 self.read_and_verify_blob(hash)
606 }
607
608 pub fn ack(&self, hash: &HopHash, signature: &[u8]) -> Result<(), HopError> {
613 {
615 let index = self.index.lock();
616 let meta = index.get(hash).ok_or(HopError::NotFound)?;
617 let idx = Self::find_recipient_idx(meta, hash, signature, HOP_ACK_CONTEXT)
618 .map_err(|_| HopError::NotFound)?;
619 if meta.recipients[idx].claimed {
620 return Ok(());
621 }
622 }
623
624 let mut index = self.index.lock();
627 let meta = index.get_mut(hash).ok_or(HopError::NotFound)?;
628 let idx = Self::find_recipient_idx(meta, hash, signature, HOP_ACK_CONTEXT)
629 .map_err(|_| HopError::NotFound)?;
630
631 if meta.recipients[idx].claimed {
632 return Ok(());
633 }
634
635 meta.recipients[idx].claimed = true;
636
637 if meta.recipients.iter().all(|r| r.claimed) {
639 let accounted = entry_accounted_size(meta.size, meta.recipients.len());
640 let sender = meta.sender_id;
641 index.remove(hash);
642 self.current_size.fetch_sub(accounted, Ordering::Relaxed);
643 self.release_user_quota(&sender, accounted);
644 drop(index);
645
646 let _ = fs::remove_file(self.blob_path(hash));
648 let _ = fs::remove_file(self.meta_path(hash));
649
650 tracing::info!(
651 target: "hop",
652 hash = ?hex::encode(hash),
653 "All recipients acked, data removed"
654 );
655 } else {
656 let claimed_count = meta.recipients.iter().filter(|r| r.claimed).count();
657 let meta_bytes = meta.encode();
659 let meta_path = self.meta_path(hash);
660 if let Err(e) = Self::write_atomic(&meta_path, &meta_bytes) {
661 tracing::error!(target: "hop", hash = ?hex::encode(hash), error = %e, "Failed to persist ack state");
662 }
663 drop(index);
664
665 tracing::debug!(
666 target: "hop",
667 hash = ?hex::encode(hash),
668 claimed = claimed_count,
669 "Recipient acked"
670 );
671 }
672
673 Ok(())
674 }
675
676 #[cfg(test)]
678 pub fn has(&self, hash: &HopHash) -> bool {
679 let index = self.index.lock();
680 index.contains_key(hash)
681 }
682
683 #[cfg(test)]
685 pub fn remove(&self, hash: &HopHash) -> Result<(), HopError> {
686 let meta = {
687 let mut index = self.index.lock();
688 index.remove(hash)
689 };
690
691 if let Some(meta) = meta {
692 let accounted = entry_accounted_size(meta.size, meta.recipients.len());
693 self.current_size.fetch_sub(accounted, Ordering::Relaxed);
694 self.release_user_quota(&meta.sender_id, accounted);
695
696 let _ = fs::remove_file(self.blob_path(hash));
698 let _ = fs::remove_file(self.meta_path(hash));
699
700 tracing::debug!(
701 target: "hop",
702 hash = ?hex::encode(hash),
703 "Data removed from pool"
704 );
705
706 Ok(())
707 } else {
708 Err(HopError::NotFound)
709 }
710 }
711
712 pub fn status(&self) -> PoolStatus {
714 let index = self.index.lock();
715 PoolStatus {
716 entry_count: index.len(),
717 total_bytes: self.current_size.load(Ordering::Relaxed),
718 max_bytes: self.max_size,
719 }
720 }
721
722 pub fn cleanup_expired(&self) -> u64 {
729 const CLEANUP_BATCH_SIZE: usize = 10_000;
730 let mut total_freed: u64 = 0;
731 let now_secs = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
732
733 loop {
734 let expired: Vec<(HopHash, HopEntryMeta)> = {
738 let mut index = self.index.lock();
739 let expired_keys: Vec<HopHash> = index
740 .iter()
741 .filter(|(_, m)| now_secs >= m.expires_at)
742 .map(|(h, _)| *h)
743 .take(CLEANUP_BATCH_SIZE)
744 .collect();
745
746 expired_keys
747 .into_iter()
748 .filter_map(|hash| index.remove(&hash).map(|meta| (hash, meta)))
749 .collect()
750 };
751
752 if expired.is_empty() {
753 break;
754 }
755
756 let freed: u64 = expired
758 .iter()
759 .map(|(_, meta)| entry_accounted_size(meta.size, meta.recipients.len()))
760 .sum();
761 self.current_size.fetch_sub(freed, Ordering::Relaxed);
762 total_freed = total_freed.saturating_add(freed);
763
764 {
765 let usage = self.user_usage.read();
766 for (_, meta) in &expired {
767 if let Some(counter) = usage.get(&meta.sender_id) {
768 let accounted = entry_accounted_size(meta.size, meta.recipients.len());
769 saturating_release(counter, accounted);
770 }
771 }
772 }
773
774 for (hash, _) in &expired {
776 let _ = fs::remove_file(self.blob_path(hash));
777 let _ = fs::remove_file(self.meta_path(hash));
778 }
779 }
780
781 {
789 let index = self.index.lock();
790 let mut usage = self.user_usage.write();
791 let live: HashSet<&SenderId> = index.values().map(|m| &m.sender_id).collect();
792 usage.retain(|sender_id, counter| {
793 counter.load(Ordering::Relaxed) > 0 || live.contains(sender_id)
794 });
795 }
796
797 self.rate_limiter.evict_stale();
799
800 total_freed
801 }
802
803 pub fn get_promotable(
807 &self,
808 current_block: HopBlockNumber,
809 buffer_secs: u64,
810 limit: usize,
811 ) -> Vec<HopHash> {
812 let now_secs = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
813 let index = self.index.lock();
814 index
815 .iter()
816 .filter(|(_, meta)| {
817 !meta.promoted &&
818 now_secs.saturating_add(buffer_secs) >= meta.expires_at &&
819 meta.promotion_attempts < MAX_PROMOTION_ATTEMPTS &&
820 current_block >= meta.next_promotion_attempt_at
821 })
822 .map(|(h, _)| *h)
823 .take(limit)
824 .collect()
825 }
826
827 pub fn mark_promoted(&self, hash: &HopHash) {
830 let mut index = self.index.lock();
831 if let Some(meta) = index.get_mut(hash) {
832 meta.promoted = true;
833 let meta_bytes = meta.encode();
834 let meta_path = self.meta_path(hash);
835 drop(index);
836
837 if let Err(e) = Self::write_atomic(&meta_path, &meta_bytes) {
838 tracing::error!(
839 target: "hop",
840 hash = ?hex::encode(hash),
841 error = %e,
842 "Failed to persist promoted state"
843 );
844 }
845 }
846 }
847
848 pub fn record_promotion_attempt(
858 &self,
859 hash: &HopHash,
860 current_block: HopBlockNumber,
861 check_interval_blocks: u32,
862 ) {
863 let mut index = self.index.lock();
864 if let Some(meta) = index.get_mut(hash) {
865 meta.promotion_attempts = meta.promotion_attempts.saturating_add(1);
866 let backoff = promotion_backoff_blocks(meta.promotion_attempts, check_interval_blocks);
867 meta.next_promotion_attempt_at = current_block.saturating_add(backoff);
868 let meta_bytes = meta.encode();
869 let meta_path = self.meta_path(hash);
870 drop(index);
871
872 if let Err(e) = Self::write_atomic(&meta_path, &meta_bytes) {
873 tracing::error!(
874 target: "hop",
875 hash = ?hex::encode(hash),
876 error = %e,
877 "Failed to persist promotion-attempt state"
878 );
879 }
880 }
881 }
882}
883
884fn parse_hex_hash(stem: &str) -> Option<HopHash> {
887 let bytes = hex::decode(stem).ok()?;
888 let arr: [u8; 32] = bytes.try_into().ok()?;
889 Some(H256(arr))
890}
891
892fn saturating_release(counter: &AtomicU64, accounted: u64) {
898 let _ = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |previous| {
899 Some(previous - accounted.min(previous))
900 });
901}
902
903#[cfg(test)]
904mod tests {
905 use super::*;
906 use crate::types::{Recipient, MAX_RECIPIENTS};
907 use sp_core::{crypto::Pair, ed25519, sr25519};
908 use sp_runtime::MultiSigner;
909 use tempfile::TempDir;
910
911 const SENDER_A: SenderId = [1u8; 32];
912 const SENDER_B: SenderId = [2u8; 32];
913
914 fn acct(data_size: u64, num_recipients: usize) -> u64 {
916 entry_accounted_size(data_size, num_recipients)
917 }
918
919 fn make_pool(max_size: u64, retention_secs: u64) -> (HopDataPool, TempDir) {
920 let dir = TempDir::new().unwrap();
921 let pool = HopDataPool::new(
922 max_size,
923 max_size,
924 retention_secs,
925 dir.path().to_path_buf(),
926 RateLimitConfig::disabled(),
927 )
928 .unwrap();
929 (pool, dir)
930 }
931
932 fn make_pool_with_user_cap(
933 max_size: u64,
934 max_user_size: u64,
935 retention_secs: u64,
936 ) -> (HopDataPool, TempDir) {
937 let dir = TempDir::new().unwrap();
938 let pool = HopDataPool::new(
939 max_size,
940 max_user_size,
941 retention_secs,
942 dir.path().to_path_buf(),
943 RateLimitConfig::disabled(),
944 )
945 .unwrap();
946 (pool, dir)
947 }
948
949 fn create_test_pool() -> (HopDataPool, TempDir) {
950 make_pool(1024 * 1024, 100)
951 }
952
953 fn test_recipient() -> (ed25519::Pair, MultiSigner) {
954 let pair = ed25519::Pair::from_seed(&[1u8; 32]);
955 let signer = MultiSigner::Ed25519(pair.public());
956 (pair, signer)
957 }
958
959 fn dummy_auth() -> (MultiSigner, MultiSignature) {
963 let pair = ed25519::Pair::from_seed(&[7u8; 32]);
964 let signer = MultiSigner::Ed25519(pair.public());
965 let sig = MultiSignature::Ed25519(pair.sign(&[]));
966 (signer, sig)
967 }
968
969 fn sign_ed(pair: &ed25519::Pair, context: &[u8], hash: &HopHash) -> Vec<u8> {
970 let payload = signing_payload(context, hash);
971 MultiSignature::Ed25519(pair.sign(&payload)).encode()
972 }
973
974 fn sign_sr(pair: &sr25519::Pair, context: &[u8], hash: &HopHash) -> Vec<u8> {
975 let payload = signing_payload(context, hash);
976 MultiSignature::Sr25519(pair.sign(&payload)).encode()
977 }
978
979 fn user_usage(pool: &HopDataPool, sender: &SenderId) -> u64 {
980 pool.user_usage
981 .read()
982 .get(sender)
983 .map(|c| c.load(Ordering::Relaxed))
984 .unwrap_or(0)
985 }
986
987 fn bv(v: Vec<MultiSigner>) -> RecipientVec {
990 let recipients: Vec<Recipient> =
991 v.into_iter().map(|signer| Recipient { signer, claimed: false }).collect();
992 RecipientVec::try_from(recipients).expect("test recipient list exceeds MAX_RECIPIENTS")
993 }
994
995 #[test]
996 fn test_insert_and_get() {
997 let (pool, _dir) = create_test_pool();
998 let (_, signer) = test_recipient();
999 let data = vec![1, 2, 3, 4, 5];
1000 let hash = pool
1001 .insert(data.clone(), bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1002 .unwrap();
1003
1004 let retrieved = pool.get(&hash).unwrap();
1005 assert_eq!(data, retrieved);
1006 }
1007
1008 #[test]
1009 fn test_insert_no_recipients() {
1010 let (pool, _dir) = create_test_pool();
1011 let data = vec![1, 2, 3, 4, 5];
1012 let result = pool.insert(data, bv(vec![]), SENDER_A, dummy_auth().0, dummy_auth().1, 0);
1013 assert!(matches!(result, Err(HopError::NoRecipients)));
1014 }
1015
1016 #[test]
1017 fn test_duplicate_insert() {
1018 let (pool, _dir) = create_test_pool();
1019 let (_, signer) = test_recipient();
1020 let data = vec![1, 2, 3, 4, 5];
1021
1022 pool.insert(
1023 data.clone(),
1024 bv(vec![signer.clone()]),
1025 SENDER_A,
1026 dummy_auth().0,
1027 dummy_auth().1,
1028 0,
1029 )
1030 .unwrap();
1031 let result =
1032 pool.insert(data, bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0);
1033
1034 assert!(matches!(result, Err(HopError::DuplicateEntry)));
1035 }
1036
1037 #[test]
1038 fn test_too_many_recipients_rejected_at_type_level() {
1039 let recipients: Vec<Recipient> = (0..=MAX_RECIPIENTS as u64)
1043 .map(|i| {
1044 let mut seed = [0u8; 32];
1045 seed[..8].copy_from_slice(&i.to_le_bytes());
1046 Recipient {
1047 signer: MultiSigner::Ed25519(ed25519::Pair::from_seed(&seed).public()),
1048 claimed: false,
1049 }
1050 })
1051 .collect();
1052 assert_eq!(recipients.len(), MAX_RECIPIENTS as usize + 1);
1053 assert!(RecipientVec::try_from(recipients).is_err());
1054 }
1055
1056 #[test]
1057 fn test_duplicate_recipient_rejected() {
1058 let (pool, _dir) = create_test_pool();
1059 let (_, signer) = test_recipient();
1060 let result = pool.insert(
1061 vec![1, 2, 3],
1062 bv(vec![signer.clone(), signer]),
1063 SENDER_A,
1064 dummy_auth().0,
1065 dummy_auth().1,
1066 0,
1067 );
1068 assert!(matches!(result, Err(HopError::DuplicateRecipient)));
1069 }
1070
1071 #[test]
1072 fn test_pool_full() {
1073 let (pool, _dir) = make_pool(acct(60, 1), 100);
1075 let (_, signer) = test_recipient();
1076
1077 let data1 = vec![0u8; 60];
1078 let data2 = vec![1u8; 50];
1079
1080 pool.insert(data1, bv(vec![signer.clone()]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1081 .unwrap();
1082 let result =
1083 pool.insert(data2, bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0);
1084
1085 assert!(matches!(result, Err(HopError::PoolFull(_, _))));
1086 }
1087
1088 #[test]
1089 fn test_remove() {
1090 let (pool, _dir) = create_test_pool();
1091 let (_, signer) = test_recipient();
1092 let data = vec![1, 2, 3, 4, 5];
1093 let hash = pool
1094 .insert(data, bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1095 .unwrap();
1096
1097 assert!(pool.has(&hash));
1098 pool.remove(&hash).unwrap();
1099 assert!(!pool.has(&hash));
1100
1101 assert!(!pool.blob_path(&hash).exists());
1103 assert!(!pool.meta_path(&hash).exists());
1104 }
1105
1106 #[test]
1107 fn test_status() {
1108 let (pool, _dir) = create_test_pool();
1109 let (_, signer) = test_recipient();
1110 let data1 = vec![1, 2, 3, 4, 5];
1111 let data2 = vec![6, 7, 8];
1112
1113 pool.insert(
1114 data1.clone(),
1115 bv(vec![signer.clone()]),
1116 SENDER_A,
1117 dummy_auth().0,
1118 dummy_auth().1,
1119 0,
1120 )
1121 .unwrap();
1122 pool.insert(data2.clone(), bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1123 .unwrap();
1124
1125 let status = pool.status();
1126 assert_eq!(status.entry_count, 2);
1127 assert_eq!(status.total_bytes, acct(data1.len() as u64, 1) + acct(data2.len() as u64, 1));
1128 }
1129
1130 #[test]
1131 fn test_claim_valid_signature() {
1132 let (pool, _dir) = create_test_pool();
1133 let (pair, signer) = test_recipient();
1134 let data = vec![1, 2, 3, 4, 5];
1135 let hash = pool
1136 .insert(data.clone(), bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1137 .unwrap();
1138
1139 let claim = sign_ed(&pair, HOP_CLAIM_CONTEXT, &hash);
1140 let ack = sign_ed(&pair, HOP_ACK_CONTEXT, &hash);
1141 let result = pool.claim(&hash, &claim).unwrap();
1142 assert_eq!(data, result);
1143
1144 assert!(pool.has(&hash));
1146
1147 pool.ack(&hash, &ack).unwrap();
1148 assert!(!pool.has(&hash));
1149 }
1150
1151 #[test]
1152 fn test_claim_sig_rejected_on_ack() {
1153 let (pool, _dir) = create_test_pool();
1155 let (pair, signer) = test_recipient();
1156 let hash = pool
1157 .insert(vec![1, 2, 3], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1158 .unwrap();
1159
1160 let claim = sign_ed(&pair, HOP_CLAIM_CONTEXT, &hash);
1161 pool.claim(&hash, &claim).unwrap();
1162 assert!(matches!(pool.ack(&hash, &claim), Err(HopError::NotFound)));
1163 }
1164
1165 #[test]
1166 fn test_claim_invalid_signature() {
1167 let (pool, _dir) = create_test_pool();
1168 let (_, signer) = test_recipient();
1169 let data = vec![1, 2, 3, 4, 5];
1170 let hash = pool
1171 .insert(data, bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1172 .unwrap();
1173
1174 let result = pool.claim(&hash, &[0u8; 3]);
1176 assert!(matches!(result, Err(HopError::NotFound)));
1177 }
1178
1179 #[test]
1180 fn test_claim_wrong_key() {
1181 let (pool, _dir) = create_test_pool();
1182 let (_, signer) = test_recipient();
1183 let hash = pool
1184 .insert(
1185 vec![1, 2, 3, 4, 5],
1186 bv(vec![signer]),
1187 SENDER_A,
1188 dummy_auth().0,
1189 dummy_auth().1,
1190 0,
1191 )
1192 .unwrap();
1193
1194 let wrong_pair = ed25519::Pair::from_seed(&[99u8; 32]);
1195 let wrong_claim = sign_ed(&wrong_pair, HOP_CLAIM_CONTEXT, &hash);
1196 assert!(matches!(pool.claim(&hash, &wrong_claim), Err(HopError::NotFound)));
1197 assert!(pool.has(&hash));
1198 }
1199
1200 #[test]
1201 fn test_claim_multi_recipient() {
1202 let (pool, _dir) = create_test_pool();
1203 let pair1 = ed25519::Pair::from_seed(&[1u8; 32]);
1204 let pair2 = ed25519::Pair::from_seed(&[2u8; 32]);
1205 let signer1 = MultiSigner::Ed25519(pair1.public());
1206 let signer2 = MultiSigner::Ed25519(pair2.public());
1207
1208 let data = vec![1, 2, 3, 4, 5];
1209 let hash = pool
1210 .insert(
1211 data.clone(),
1212 bv(vec![signer1, signer2]),
1213 SENDER_A,
1214 dummy_auth().0,
1215 dummy_auth().1,
1216 0,
1217 )
1218 .unwrap();
1219
1220 let claim1 = sign_ed(&pair1, HOP_CLAIM_CONTEXT, &hash);
1221 let ack1 = sign_ed(&pair1, HOP_ACK_CONTEXT, &hash);
1222 assert_eq!(data, pool.claim(&hash, &claim1).unwrap());
1223 pool.ack(&hash, &ack1).unwrap();
1224 assert!(pool.has(&hash));
1225
1226 let claim2 = sign_ed(&pair2, HOP_CLAIM_CONTEXT, &hash);
1227 let ack2 = sign_ed(&pair2, HOP_ACK_CONTEXT, &hash);
1228 assert_eq!(data, pool.claim(&hash, &claim2).unwrap());
1229 pool.ack(&hash, &ack2).unwrap();
1230 assert!(!pool.has(&hash));
1231 assert_eq!(pool.status().total_bytes, 0);
1232 }
1233
1234 #[test]
1235 fn test_claim_after_ack_returns_already_claimed() {
1236 let (pool, _dir) = create_test_pool();
1237 let (pair, signer) = test_recipient();
1238 let pair2 = ed25519::Pair::from_seed(&[2u8; 32]);
1239 let signer2 = MultiSigner::Ed25519(pair2.public());
1240
1241 let hash = pool
1242 .insert(
1243 vec![1, 2, 3, 4, 5],
1244 bv(vec![signer, signer2]),
1245 SENDER_A,
1246 dummy_auth().0,
1247 dummy_auth().1,
1248 0,
1249 )
1250 .unwrap();
1251
1252 let claim = sign_ed(&pair, HOP_CLAIM_CONTEXT, &hash);
1253 let ack = sign_ed(&pair, HOP_ACK_CONTEXT, &hash);
1254 pool.claim(&hash, &claim).unwrap();
1255 pool.ack(&hash, &ack).unwrap();
1256
1257 assert!(matches!(pool.claim(&hash, &claim), Err(HopError::AlreadyClaimed)));
1259 }
1260
1261 #[test]
1262 fn test_claim_not_found() {
1263 let (pool, _dir) = create_test_pool();
1264 let fake_hash = H256([0u8; 32]);
1265 let result = pool.claim(&fake_hash, &[0u8; 64]);
1266 assert!(matches!(result, Err(HopError::NotFound)));
1267 }
1268
1269 #[test]
1270 fn test_per_user_cap_is_hard_limit() {
1271 let (pool, _dir) = make_pool_with_user_cap(10_000, acct(60, 1), 100);
1273 let (_, signer) = test_recipient();
1274
1275 pool.insert(
1276 vec![0u8; 60],
1277 bv(vec![signer.clone()]),
1278 SENDER_A,
1279 dummy_auth().0,
1280 dummy_auth().1,
1281 0,
1282 )
1283 .unwrap();
1284
1285 let result = pool.insert(
1287 vec![1u8; 10],
1288 bv(vec![signer.clone()]),
1289 SENDER_A,
1290 dummy_auth().0,
1291 dummy_auth().1,
1292 0,
1293 );
1294 assert!(matches!(result, Err(HopError::UserQuotaExceeded { .. })));
1295
1296 pool.insert(vec![2u8; 60], bv(vec![signer]), SENDER_B, dummy_auth().0, dummy_auth().1, 0)
1298 .unwrap();
1299 }
1300
1301 #[test]
1302 fn test_quota_released_after_ack() {
1303 let (pool, _dir) = make_pool_with_user_cap(10_000, acct(100, 1), 100);
1304 let (pair, signer) = test_recipient();
1305
1306 let hash = pool
1307 .insert(
1308 vec![0u8; 100],
1309 bv(vec![signer.clone()]),
1310 SENDER_A,
1311 dummy_auth().0,
1312 dummy_auth().1,
1313 0,
1314 )
1315 .unwrap();
1316
1317 let result = pool.insert(
1319 vec![1u8; 10],
1320 bv(vec![signer.clone()]),
1321 SENDER_A,
1322 dummy_auth().0,
1323 dummy_auth().1,
1324 0,
1325 );
1326 assert!(matches!(result, Err(HopError::UserQuotaExceeded { .. })));
1327
1328 let claim = sign_ed(&pair, HOP_CLAIM_CONTEXT, &hash);
1329 let ack = sign_ed(&pair, HOP_ACK_CONTEXT, &hash);
1330 pool.claim(&hash, &claim).unwrap();
1331 pool.ack(&hash, &ack).unwrap();
1332
1333 pool.insert(vec![2u8; 100], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1335 .unwrap();
1336 }
1337
1338 #[test]
1339 fn test_cleanup_expired_releases_quota() {
1340 let (pool, _dir) = make_pool(10_000, 0);
1341 let (_, signer) = test_recipient();
1342
1343 pool.insert(vec![0u8; 100], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1344 .unwrap();
1345 let charged = acct(100, 1);
1346 assert_eq!(user_usage(&pool, &SENDER_A), charged);
1347
1348 let freed = pool.cleanup_expired();
1349 assert_eq!(freed, charged);
1350 assert_eq!(pool.status().total_bytes, 0);
1351 assert_eq!(user_usage(&pool, &SENDER_A), 0);
1352 }
1353
1354 #[test]
1355 fn test_cleanup_expired_honors_wall_clock_retention() {
1356 let (pool, _dir) = make_pool(10_000, 1);
1359 let (_, signer) = test_recipient();
1360
1361 let hash = pool
1362 .insert(vec![0u8; 100], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1363 .unwrap();
1364
1365 assert_eq!(
1367 pool.cleanup_expired(),
1368 0,
1369 "entry should still be live before retention elapses"
1370 );
1371 assert!(pool.has(&hash));
1372
1373 std::thread::sleep(std::time::Duration::from_millis(1_200));
1374
1375 assert!(
1376 pool.cleanup_expired() > 0,
1377 "entry should be reaped once wall-clock retention elapses"
1378 );
1379 assert!(!pool.has(&hash));
1380 }
1381
1382 #[test]
1383 fn test_user_counter_preserved_until_cleanup() {
1384 let (pool, _dir) = create_test_pool();
1388 let (pair, signer) = test_recipient();
1389
1390 let hash = pool
1391 .insert(vec![0u8; 50], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1392 .unwrap();
1393 assert!(pool.user_usage.read().contains_key(&SENDER_A));
1394
1395 let claim = sign_ed(&pair, HOP_CLAIM_CONTEXT, &hash);
1396 let ack = sign_ed(&pair, HOP_ACK_CONTEXT, &hash);
1397 pool.claim(&hash, &claim).unwrap();
1398 pool.ack(&hash, &ack).unwrap();
1399
1400 assert_eq!(user_usage(&pool, &SENDER_A), 0);
1401 assert!(pool.user_usage.read().contains_key(&SENDER_A));
1402 }
1403
1404 #[test]
1405 fn test_cleanup_expired_evicts_idle_user_counters() {
1406 let (pool, _dir) = make_pool(10_000, 10);
1410 let (pair, signer) = test_recipient();
1411
1412 let hash = pool
1413 .insert(vec![0u8; 50], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1414 .unwrap();
1415 let claim = sign_ed(&pair, HOP_CLAIM_CONTEXT, &hash);
1416 let ack = sign_ed(&pair, HOP_ACK_CONTEXT, &hash);
1417 pool.claim(&hash, &claim).unwrap();
1418 pool.ack(&hash, &ack).unwrap();
1419 assert!(pool.user_usage.read().contains_key(&SENDER_A));
1420
1421 pool.cleanup_expired();
1422 assert!(!pool.user_usage.read().contains_key(&SENDER_A));
1423 }
1424
1425 #[test]
1426 fn test_cleanup_expired_keeps_active_user_counters() {
1427 let (pool, _dir) = make_pool(10_000, 100);
1431 let (_, signer) = test_recipient();
1432
1433 pool.insert(vec![0u8; 50], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1434 .unwrap();
1435 pool.cleanup_expired();
1439 assert!(pool.user_usage.read().contains_key(&SENDER_A));
1440 }
1441
1442 #[test]
1443 fn test_cleanup_expired_processes_more_than_one_batch() {
1444 const BATCHES: u32 = 2;
1448 const PER_BATCH: u32 = 10_000 + 1; let total = BATCHES * PER_BATCH;
1450
1451 let dir = TempDir::new().unwrap();
1452 let entry_bytes = std::mem::size_of::<u32>() as u64;
1454 let pool = HopDataPool::new(
1455 (acct(entry_bytes, 1) * total as u64) + 1024,
1456 u64::MAX,
1457 0,
1458 dir.path().to_path_buf(),
1459 RateLimitConfig::disabled(),
1460 )
1461 .unwrap();
1462 let (_, signer) = test_recipient();
1463
1464 for i in 0..total {
1465 let mut sender = SENDER_A;
1466 sender[0] = (i & 0xff) as u8;
1467 sender[1] = ((i >> 8) & 0xff) as u8;
1468 sender[2] = ((i >> 16) & 0xff) as u8;
1469 let data = i.to_le_bytes().to_vec();
1473 pool.insert(data, bv(vec![signer.clone()]), sender, dummy_auth().0, dummy_auth().1, 0)
1474 .unwrap();
1475 }
1476 assert_eq!(pool.status().entry_count, total as usize);
1477
1478 pool.cleanup_expired();
1479 assert_eq!(pool.status().entry_count, 0);
1480 assert_eq!(pool.status().total_bytes, 0);
1481 assert!(pool.user_usage.read().is_empty());
1482 }
1483
1484 #[test]
1485 fn test_restart_recovery() {
1486 let dir = TempDir::new().unwrap();
1487 let (_, signer) = test_recipient();
1488 let expected_accounted = acct(100, 1);
1489
1490 let hash;
1491 {
1492 let pool = HopDataPool::new(
1493 1024 * 1024,
1494 1024 * 1024,
1495 100,
1496 dir.path().to_path_buf(),
1497 RateLimitConfig::disabled(),
1498 )
1499 .unwrap();
1500 hash = pool
1501 .insert(
1502 vec![42u8; 100],
1503 bv(vec![signer]),
1504 SENDER_A,
1505 dummy_auth().0,
1506 dummy_auth().1,
1507 0,
1508 )
1509 .unwrap();
1510 assert!(pool.has(&hash));
1511 assert_eq!(pool.status().entry_count, 1);
1512 assert_eq!(pool.status().total_bytes, expected_accounted);
1513 }
1514
1515 {
1516 let pool = HopDataPool::new(
1517 1024 * 1024,
1518 1024 * 1024,
1519 100,
1520 dir.path().to_path_buf(),
1521 RateLimitConfig::disabled(),
1522 )
1523 .unwrap();
1524 assert!(pool.has(&hash));
1525 assert_eq!(pool.status().entry_count, 1);
1526 assert_eq!(pool.status().total_bytes, expected_accounted);
1527
1528 let data = pool.get(&hash).unwrap();
1529 assert_eq!(data, vec![42u8; 100]);
1530 assert_eq!(user_usage(&pool, &SENDER_A), expected_accounted);
1531 }
1532 }
1533
1534 #[test]
1535 fn test_orphan_blob_cleanup() {
1536 let dir = TempDir::new().unwrap();
1537 {
1538 let _pool = HopDataPool::new(
1539 1024 * 1024,
1540 1024 * 1024,
1541 100,
1542 dir.path().to_path_buf(),
1543 RateLimitConfig::disabled(),
1544 )
1545 .unwrap();
1546 }
1547
1548 let orphan_hash = "aa".to_string() + &"bb".repeat(15);
1549 let blob_path = dir.path().join("blobs").join("aa").join(format!("{}.blob", orphan_hash));
1550 fs::write(&blob_path, b"orphan data").unwrap();
1551 assert!(blob_path.exists());
1552
1553 let _pool = HopDataPool::new(
1554 1024 * 1024,
1555 1024 * 1024,
1556 100,
1557 dir.path().to_path_buf(),
1558 RateLimitConfig::disabled(),
1559 )
1560 .unwrap();
1561 assert!(!blob_path.exists());
1562 }
1563
1564 #[test]
1565 fn test_corrupt_meta_cleanup() {
1566 let dir = TempDir::new().unwrap();
1567 {
1568 let _pool = HopDataPool::new(
1569 1024 * 1024,
1570 1024 * 1024,
1571 100,
1572 dir.path().to_path_buf(),
1573 RateLimitConfig::disabled(),
1574 )
1575 .unwrap();
1576 }
1577
1578 let fake_hash = "bb".to_string() + &"cc".repeat(15);
1579 let meta_path = dir.path().join("meta").join("bb").join(format!("{}.meta", fake_hash));
1580 fs::write(&meta_path, b"not valid SCALE data").unwrap();
1581 assert!(meta_path.exists());
1582
1583 let pool = HopDataPool::new(
1584 1024 * 1024,
1585 1024 * 1024,
1586 100,
1587 dir.path().to_path_buf(),
1588 RateLimitConfig::disabled(),
1589 )
1590 .unwrap();
1591 assert!(!meta_path.exists());
1592 assert_eq!(pool.status().entry_count, 0);
1593 }
1594
1595 #[test]
1596 fn test_claim_sr25519() {
1597 let (pool, _dir) = create_test_pool();
1598 let pair = sr25519::Pair::from_seed(&[3u8; 32]);
1599 let signer = MultiSigner::Sr25519(pair.public());
1600
1601 let data = vec![10, 20, 30];
1602 let hash = pool
1603 .insert(data.clone(), bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1604 .unwrap();
1605
1606 let claim = sign_sr(&pair, HOP_CLAIM_CONTEXT, &hash);
1607 let ack = sign_sr(&pair, HOP_ACK_CONTEXT, &hash);
1608 assert_eq!(data, pool.claim(&hash, &claim).unwrap());
1609 pool.ack(&hash, &ack).unwrap();
1610 assert!(!pool.has(&hash));
1611 }
1612
1613 #[test]
1614 fn test_claim_mixed_key_types() {
1615 let (pool, _dir) = create_test_pool();
1616 let ed_pair = ed25519::Pair::from_seed(&[4u8; 32]);
1617 let sr_pair = sr25519::Pair::from_seed(&[5u8; 32]);
1618 let ed_signer = MultiSigner::Ed25519(ed_pair.public());
1619 let sr_signer = MultiSigner::Sr25519(sr_pair.public());
1620
1621 let data = vec![42, 43, 44];
1622 let hash = pool
1623 .insert(
1624 data.clone(),
1625 bv(vec![ed_signer, sr_signer]),
1626 SENDER_A,
1627 dummy_auth().0,
1628 dummy_auth().1,
1629 0,
1630 )
1631 .unwrap();
1632
1633 let sr_claim = sign_sr(&sr_pair, HOP_CLAIM_CONTEXT, &hash);
1634 let sr_ack = sign_sr(&sr_pair, HOP_ACK_CONTEXT, &hash);
1635 assert_eq!(data, pool.claim(&hash, &sr_claim).unwrap());
1636 pool.ack(&hash, &sr_ack).unwrap();
1637 assert!(pool.has(&hash));
1638
1639 let ed_claim = sign_ed(&ed_pair, HOP_CLAIM_CONTEXT, &hash);
1640 let ed_ack = sign_ed(&ed_pair, HOP_ACK_CONTEXT, &hash);
1641 assert_eq!(data, pool.claim(&hash, &ed_claim).unwrap());
1642 pool.ack(&hash, &ed_ack).unwrap();
1643 assert!(!pool.has(&hash));
1644 }
1645
1646 #[test]
1647 fn test_claim_is_repeatable() {
1648 let (pool, _dir) = create_test_pool();
1649 let (pair, signer) = test_recipient();
1650 let data = vec![1, 2, 3, 4, 5];
1651 let hash = pool
1652 .insert(data.clone(), bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1653 .unwrap();
1654
1655 let claim = sign_ed(&pair, HOP_CLAIM_CONTEXT, &hash);
1656 assert_eq!(data, pool.claim(&hash, &claim).unwrap());
1657 assert_eq!(data, pool.claim(&hash, &claim).unwrap());
1658 assert!(pool.has(&hash));
1659 }
1660
1661 #[test]
1662 fn test_ack_idempotent() {
1663 let (pool, _dir) = create_test_pool();
1664 let (pair, signer) = test_recipient();
1665 let pair2 = ed25519::Pair::from_seed(&[2u8; 32]);
1666 let signer2 = MultiSigner::Ed25519(pair2.public());
1667
1668 let hash = pool
1669 .insert(
1670 vec![1, 2, 3, 4, 5],
1671 bv(vec![signer, signer2]),
1672 SENDER_A,
1673 dummy_auth().0,
1674 dummy_auth().1,
1675 0,
1676 )
1677 .unwrap();
1678 let ack = sign_ed(&pair, HOP_ACK_CONTEXT, &hash);
1679
1680 pool.ack(&hash, &ack).unwrap();
1681 pool.ack(&hash, &ack).unwrap();
1682 assert!(pool.has(&hash));
1683 }
1684
1685 #[test]
1686 fn test_multi_recipient_partial_ack() {
1687 let (pool, _dir) = create_test_pool();
1688 let pair1 = ed25519::Pair::from_seed(&[1u8; 32]);
1689 let pair2 = ed25519::Pair::from_seed(&[2u8; 32]);
1690 let signer1 = MultiSigner::Ed25519(pair1.public());
1691 let signer2 = MultiSigner::Ed25519(pair2.public());
1692
1693 let data = vec![1, 2, 3, 4, 5];
1694 let hash = pool
1695 .insert(
1696 data.clone(),
1697 bv(vec![signer1, signer2]),
1698 SENDER_A,
1699 dummy_auth().0,
1700 dummy_auth().1,
1701 0,
1702 )
1703 .unwrap();
1704
1705 let claim1 = sign_ed(&pair1, HOP_CLAIM_CONTEXT, &hash);
1706 let ack1 = sign_ed(&pair1, HOP_ACK_CONTEXT, &hash);
1707 let claim2 = sign_ed(&pair2, HOP_CLAIM_CONTEXT, &hash);
1708 let ack2 = sign_ed(&pair2, HOP_ACK_CONTEXT, &hash);
1709
1710 assert_eq!(data, pool.claim(&hash, &claim1).unwrap());
1711 pool.ack(&hash, &ack1).unwrap();
1712 assert!(pool.has(&hash));
1713
1714 assert_eq!(data, pool.claim(&hash, &claim2).unwrap());
1715 pool.ack(&hash, &ack2).unwrap();
1716 assert!(!pool.has(&hash));
1717 assert_eq!(pool.status().total_bytes, 0);
1718 }
1719
1720 #[test]
1721 fn test_concurrent_inserts_respect_capacity() {
1722 use std::{sync::Barrier, thread};
1723
1724 let (_, signer) = test_recipient();
1725 let (pool, _dir) = make_pool(acct(50, 1) * 4, 100);
1727 let pool = Arc::new(pool);
1728 let barrier = Arc::new(Barrier::new(10));
1729
1730 let handles: Vec<_> = (0..10u8)
1731 .map(|i| {
1732 let pool = pool.clone();
1733 let signer = signer.clone();
1734 let barrier = barrier.clone();
1735 thread::spawn(move || {
1736 barrier.wait();
1737 pool.insert(
1738 vec![i; 50],
1739 bv(vec![signer]),
1740 SENDER_A,
1741 dummy_auth().0,
1742 dummy_auth().1,
1743 0,
1744 )
1745 })
1746 })
1747 .collect();
1748
1749 let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
1750 let successes = results.iter().filter(|r| r.is_ok()).count();
1751
1752 assert!(successes <= 4, "Got {} successes, max should be 4", successes);
1753 assert!(pool.status().total_bytes <= acct(50, 1) * 4);
1754 }
1755
1756 #[test]
1757 fn test_concurrent_inserts_respect_user_quota() {
1758 use std::{sync::Barrier, thread};
1759
1760 let (_, signer) = test_recipient();
1761 let per_entry = acct(100, 1);
1764 let (pool, _dir) = make_pool_with_user_cap(per_entry * 20, per_entry * 3, 100);
1765 let pool = Arc::new(pool);
1766 let barrier = Arc::new(Barrier::new(10));
1767
1768 let handles: Vec<_> = (0..10u8)
1769 .map(|i| {
1770 let pool = pool.clone();
1771 let signer = signer.clone();
1772 let barrier = barrier.clone();
1773 thread::spawn(move || {
1774 barrier.wait();
1775 pool.insert(
1776 vec![i; 100],
1777 bv(vec![signer]),
1778 SENDER_A,
1779 dummy_auth().0,
1780 dummy_auth().1,
1781 0,
1782 )
1783 })
1784 })
1785 .collect();
1786
1787 let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
1788 let successes = results.iter().filter(|r| r.is_ok()).count();
1789
1790 assert!(successes <= 3, "hard per-user cap violated: {} successes", successes);
1792 assert!(user_usage(&pool, &SENDER_A) <= per_entry * 3);
1793 }
1794
1795 #[test]
1796 fn test_concurrent_claim_and_ack() {
1797 use std::{sync::Barrier, thread};
1798
1799 let (pool, _dir) = create_test_pool();
1800 let pool = Arc::new(pool);
1801
1802 let pairs: Vec<_> = (1..=5u8)
1803 .map(|i| {
1804 let pair = ed25519::Pair::from_seed(&[i; 32]);
1805 let signer = MultiSigner::Ed25519(pair.public());
1806 (pair, signer)
1807 })
1808 .collect();
1809
1810 let signers: Vec<_> = pairs.iter().map(|(_, s)| s.clone()).collect();
1811 let data = vec![42u8; 100];
1812 let hash = pool
1813 .insert(data.clone(), bv(signers), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1814 .unwrap();
1815
1816 let barrier = Arc::new(Barrier::new(5));
1817
1818 let handles: Vec<_> = pairs
1819 .into_iter()
1820 .map(|(pair, _)| {
1821 let pool = pool.clone();
1822 let barrier = barrier.clone();
1823 let data = data.clone();
1824 thread::spawn(move || {
1825 barrier.wait();
1826 let claim = sign_ed(&pair, HOP_CLAIM_CONTEXT, &hash);
1827 let ack = sign_ed(&pair, HOP_ACK_CONTEXT, &hash);
1828
1829 let claimed = pool.claim(&hash, &claim).unwrap();
1830 assert_eq!(data, claimed);
1831 pool.ack(&hash, &ack).unwrap();
1832 })
1833 })
1834 .collect();
1835
1836 for h in handles {
1837 h.join().unwrap();
1838 }
1839
1840 assert!(!pool.has(&hash));
1841 assert_eq!(pool.status().total_bytes, 0);
1842 }
1843
1844 #[test]
1845 fn test_concurrent_duplicate_insert_preserves_files() {
1846 use std::{sync::Barrier, thread};
1847
1848 let (kp, signer) = test_recipient();
1852 let (pool, _dir) = make_pool(1024 * 1024, 100);
1853 let pool = Arc::new(pool);
1854 let data = vec![0xABu8; 4096];
1855 let barrier = Arc::new(Barrier::new(2));
1856
1857 let handles: Vec<_> = (0..2)
1858 .map(|_| {
1859 let pool = pool.clone();
1860 let barrier = barrier.clone();
1861 let signer = signer.clone();
1862 let data = data.clone();
1863 thread::spawn(move || {
1864 barrier.wait();
1865 pool.insert(data, bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1866 })
1867 })
1868 .collect();
1869 let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
1870
1871 let oks: Vec<_> = results.iter().filter_map(|r| r.as_ref().ok()).collect();
1872 let dupes = results.iter().filter(|r| matches!(r, Err(HopError::DuplicateEntry))).count();
1873 assert_eq!(oks.len(), 1, "exactly one insert must win the race");
1874 assert_eq!(dupes, 1, "the other must report DuplicateEntry");
1875
1876 let hash = *oks[0];
1877 let sig = sign_ed(&kp, HOP_CLAIM_CONTEXT, &hash);
1878 let claimed = pool.claim(&hash, &sig).expect("claim must succeed");
1879 assert_eq!(claimed, data);
1880 }
1881
1882 #[test]
1883 fn test_concurrent_duplicate_insert_keeps_winner_meta_on_disk() {
1884 use std::{sync::Barrier, thread};
1885
1886 let dir = TempDir::new().unwrap();
1890 let pool = Arc::new(
1891 HopDataPool::new(
1892 1024 * 1024,
1893 1024 * 1024,
1894 100,
1895 dir.path().to_path_buf(),
1896 RateLimitConfig::disabled(),
1897 )
1898 .unwrap(),
1899 );
1900
1901 let signer_a = MultiSigner::Ed25519(ed25519::Pair::from_seed(&[11u8; 32]).public());
1902 let signer_b = MultiSigner::Ed25519(ed25519::Pair::from_seed(&[22u8; 32]).public());
1903 let data = vec![0xCDu8; 4096];
1904
1905 let barrier = Arc::new(Barrier::new(2));
1906 let (p1, d1, b1, s1) = (pool.clone(), data.clone(), barrier.clone(), signer_a.clone());
1907 let h1 = thread::spawn(move || {
1908 b1.wait();
1909 p1.insert(d1, bv(vec![s1]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1910 });
1911 let (p2, d2, b2, s2) = (pool.clone(), data.clone(), barrier.clone(), signer_b.clone());
1912 let h2 = thread::spawn(move || {
1913 b2.wait();
1914 p2.insert(d2, bv(vec![s2]), SENDER_B, dummy_auth().0, dummy_auth().1, 0)
1915 });
1916
1917 let r1 = h1.join().unwrap();
1918 let r2 = h2.join().unwrap();
1919
1920 let (winner_hash, winner_sender) = match (&r1, &r2) {
1921 (Ok(h), Err(HopError::DuplicateEntry)) => (*h, SENDER_A),
1922 (Err(HopError::DuplicateEntry), Ok(h)) => (*h, SENDER_B),
1923 other => panic!("expected exactly one winner and one DuplicateEntry, got {other:?}"),
1924 };
1925
1926 drop(pool);
1929 let pool2 = HopDataPool::new(
1930 1024 * 1024,
1931 1024 * 1024,
1932 100,
1933 dir.path().to_path_buf(),
1934 RateLimitConfig::disabled(),
1935 )
1936 .unwrap();
1937
1938 let recovered_sender = pool2
1939 .index
1940 .lock()
1941 .get(&winner_hash)
1942 .expect("winner's entry must survive restart")
1943 .sender_id;
1944 assert_eq!(
1945 recovered_sender, winner_sender,
1946 "on-disk meta diverged from the winning insert; loser's meta overwrote the winner's",
1947 );
1948 }
1949
1950 #[test]
1951 fn test_saturating_release_concurrent_no_underflow() {
1952 use std::{sync::Barrier, thread};
1953
1954 const THREADS: u64 = 32;
1959 const RELEASE_PER_THREAD: u64 = 7;
1960 let counter = Arc::new(AtomicU64::new(THREADS * RELEASE_PER_THREAD));
1961 let barrier = Arc::new(Barrier::new(THREADS as usize));
1962
1963 let handles: Vec<_> = (0..THREADS)
1964 .map(|_| {
1965 let counter = counter.clone();
1966 let barrier = barrier.clone();
1967 thread::spawn(move || {
1968 barrier.wait();
1969 saturating_release(&counter, RELEASE_PER_THREAD);
1970 })
1971 })
1972 .collect();
1973 for h in handles {
1974 h.join().unwrap();
1975 }
1976
1977 assert_eq!(counter.load(Ordering::Relaxed), 0, "counter underflowed or did not reach zero");
1978
1979 saturating_release(&counter, u64::MAX);
1981 assert_eq!(counter.load(Ordering::Relaxed), 0);
1982 }
1983
1984 #[test]
1985 fn test_get_promotable_within_buffer() {
1986 let (pool, _dir) = make_pool(1024 * 1024, 3600);
1989 let (_, signer) = test_recipient();
1990
1991 let hash = pool
1992 .insert(vec![1, 2, 3], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1993 .unwrap();
1994
1995 let promotable = pool.get_promotable(50, 180, usize::MAX);
1997 assert!(promotable.is_empty());
1998
1999 let promotable = pool.get_promotable(0, 6000, usize::MAX);
2001 assert_eq!(promotable.len(), 1);
2002 assert_eq!(promotable[0], hash);
2003 }
2004
2005 #[test]
2006 fn test_get_promotable_excludes_promoted() {
2007 let (pool, _dir) = make_pool(1024 * 1024, 100);
2008 let (_, signer) = test_recipient();
2009
2010 let hash = pool
2011 .insert(vec![1, 2, 3], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
2012 .unwrap();
2013
2014 let promotable = pool.get_promotable(80, 180, usize::MAX);
2015 assert_eq!(promotable.len(), 1);
2016
2017 pool.mark_promoted(&hash);
2018
2019 let promotable = pool.get_promotable(80, 180, usize::MAX);
2020 assert!(promotable.is_empty());
2021 }
2022
2023 #[test]
2024 fn test_mark_promoted_persists_across_restart() {
2025 let dir = TempDir::new().unwrap();
2026 let (_, signer) = test_recipient();
2027
2028 let hash;
2029 {
2030 let pool = HopDataPool::new(
2031 1024 * 1024,
2032 1024 * 1024,
2033 100,
2034 dir.path().to_path_buf(),
2035 RateLimitConfig::disabled(),
2036 )
2037 .unwrap();
2038 hash = pool
2039 .insert(
2040 vec![42u8; 10],
2041 bv(vec![signer]),
2042 SENDER_A,
2043 dummy_auth().0,
2044 dummy_auth().1,
2045 0,
2046 )
2047 .unwrap();
2048 pool.mark_promoted(&hash);
2049 }
2050
2051 {
2052 let pool = HopDataPool::new(
2053 1024 * 1024,
2054 1024 * 1024,
2055 100,
2056 dir.path().to_path_buf(),
2057 RateLimitConfig::disabled(),
2058 )
2059 .unwrap();
2060 let promotable = pool.get_promotable(80, 180, usize::MAX);
2061 assert!(promotable.is_empty(), "promoted entry should not be promotable after restart");
2062 assert!(pool.has(&hash), "entry should still exist");
2063 }
2064 }
2065
2066 #[test]
2067 fn test_cleanup_expired_removes_promoted() {
2068 let (pool, _dir) = make_pool(1024 * 1024, 0);
2069 let (_, signer) = test_recipient();
2070
2071 let hash = pool
2072 .insert(vec![1, 2, 3], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
2073 .unwrap();
2074 pool.mark_promoted(&hash);
2075 assert!(pool.has(&hash));
2076
2077 let freed = pool.cleanup_expired();
2078 assert!(freed > 0);
2079 assert!(!pool.has(&hash));
2080 }
2081
2082 #[test]
2083 fn test_rate_limit_rejects_burst_overflow() {
2084 let dir = TempDir::new().unwrap();
2085 let cfg = RateLimitConfig {
2089 enabled: true,
2090 submit_rate_per_min: 60,
2091 submit_burst: 2,
2092 bandwidth_per_min: 1024 * 1024 * 60,
2093 bandwidth_burst: 1024 * 1024,
2094 };
2095 let pool =
2096 HopDataPool::new(1024 * 1024, 1024 * 1024, 100, dir.path().to_path_buf(), cfg).unwrap();
2097 let (_, signer) = test_recipient();
2098
2099 pool.insert(
2100 vec![1, 2, 3],
2101 bv(vec![signer.clone()]),
2102 SENDER_A,
2103 dummy_auth().0,
2104 dummy_auth().1,
2105 0,
2106 )
2107 .unwrap();
2108 pool.insert(
2109 vec![4, 5, 6],
2110 bv(vec![signer.clone()]),
2111 SENDER_A,
2112 dummy_auth().0,
2113 dummy_auth().1,
2114 0,
2115 )
2116 .unwrap();
2117 assert!(matches!(
2118 pool.insert(
2119 vec![7, 8, 9],
2120 bv(vec![signer]),
2121 SENDER_A,
2122 dummy_auth().0,
2123 dummy_auth().1,
2124 0,
2125 ),
2126 Err(HopError::RateLimited { .. })
2127 ));
2128 }
2129
2130 #[test]
2131 fn test_meta_version_mismatch_rejected() {
2132 let dir = TempDir::new().unwrap();
2136 let (_, signer) = test_recipient();
2137 let recipients = bv(vec![signer.clone()]);
2138 let mut meta =
2139 HopEntryMeta::new(100, 0, recipients, SENDER_A, dummy_auth().0, dummy_auth().1, 0);
2140 meta.version = 0;
2141
2142 let fake_hash = "ee".to_string() + &"ff".repeat(15);
2143 let meta_dir = dir.path().join("meta").join("ee");
2144 let blob_dir = dir.path().join("blobs").join("ee");
2145 fs::create_dir_all(&meta_dir).unwrap();
2146 fs::create_dir_all(&blob_dir).unwrap();
2147 let meta_path = meta_dir.join(format!("{}.meta", fake_hash));
2148 let blob_path = blob_dir.join(format!("{}.blob", fake_hash));
2149 fs::write(&meta_path, meta.encode()).unwrap();
2150 fs::write(&blob_path, b"x").unwrap();
2151
2152 let pool = HopDataPool::new(
2153 1024 * 1024,
2154 1024 * 1024,
2155 100,
2156 dir.path().to_path_buf(),
2157 RateLimitConfig::disabled(),
2158 )
2159 .unwrap();
2160 assert!(!meta_path.exists(), "stale-version .meta should be removed");
2161 assert!(!blob_path.exists(), "matching .blob should also be removed");
2162 assert_eq!(pool.status().entry_count, 0);
2163 }
2164
2165 #[test]
2166 fn test_promotion_backoff_skips_until_due_then_gives_up() {
2167 use crate::types::MAX_PROMOTION_ATTEMPTS;
2168
2169 let (pool, _dir) = make_pool(1024 * 1024, 100);
2170 let (_, signer) = test_recipient();
2171 let hash = pool
2172 .insert(vec![1u8; 100], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
2173 .unwrap();
2174
2175 let buffer = 300_u64;
2178 let current = 60;
2179 assert_eq!(pool.get_promotable(current, buffer, 10), vec![hash]);
2180
2181 let check_interval_blocks: u32 = 10;
2183 pool.record_promotion_attempt(&hash, current, check_interval_blocks);
2184 assert!(
2185 pool.get_promotable(current, buffer, 10).is_empty(),
2186 "entry should be skipped until back-off elapses"
2187 );
2188 assert_eq!(pool.get_promotable(current + 10, buffer, 10), vec![hash]);
2189
2190 let mut now = current + 10;
2194 for next_attempt in 2..=MAX_PROMOTION_ATTEMPTS {
2195 pool.record_promotion_attempt(&hash, now, check_interval_blocks);
2196 let shift = (next_attempt - 1).min(5);
2197 let backoff = check_interval_blocks << shift;
2198 now += backoff;
2199 }
2200 assert!(
2201 pool.get_promotable(now + 10_000, buffer, 10).is_empty(),
2202 "entry should give up after MAX_PROMOTION_ATTEMPTS"
2203 );
2204 }
2205}