referrerpolicy=no-referrer-when-downgrade

sc_hop/
pool.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
3
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with this program. If not, see <https://www.gnu.org/licenses/>.
16
17//! HOP data pool: in-memory index backed by sharded on-disk storage.
18//!
19//! ## On-disk layout
20//!
21//! The pool root contains two subdirectories, `blobs/` and `meta/`, each
22//! sharded into 256 subdirectories named `00`–`ff` after the first byte of the
23//! content hash. An entry with hash `H` is stored as:
24//!
25//! - `blobs/<H[0:2]>/<H>.blob` — raw payload bytes
26//! - `meta/<H[0:2]>/<H>.meta` — SCALE-encoded [`HopEntryMeta`]
27//!
28//! ## Recovery
29//!
30//! On startup the pool scans every `meta/` shard, decodes each `.meta` file,
31//! and rebuilds the in-memory index. `.meta` files that are corrupt, have an
32//! unexpected version, or lack a sibling `.blob` are deleted. Then the
33//! corresponding `blobs/` shard is scanned and any `.blob` without an entry in
34//! the freshly-built index (orphan) is also deleted. Stale `.tmp.*` files left
35//! by a previous crash are removed during both scans.
36
37use crate::{
38	rate_limit::{RateLimitConfig, RateLimiter},
39	types::{
40		entry_accounted_size, promotion_backoff_blocks, signing_payload, HopBlockNumber,
41		HopEntryMeta, HopError, HopHash, PoolStatus, RecipientVec, SenderId, HOP_ACK_CONTEXT,
42		HOP_CLAIM_CONTEXT, HOP_META_VERSION, MAX_PROMOTION_ATTEMPTS,
43	},
44};
45use codec::{Decode, Encode};
46use parking_lot::{Mutex, RwLock};
47use sp_core::H256;
48use sp_crypto_hashing::blake2_256;
49use sp_runtime::{
50	traits::{IdentifyAccount, Verify},
51	MultiSignature, MultiSigner,
52};
53use std::{
54	collections::{BTreeSet, HashMap, HashSet},
55	fs,
56	path::{Path, PathBuf},
57	process,
58	sync::{
59		atomic::{AtomicU64, Ordering},
60		Arc,
61	},
62	time::{SystemTime, UNIX_EPOCH},
63};
64
65/// Per-process counter that disambiguates concurrent atomic writes targeting
66/// the same final path. Two threads computing the same content hash would
67/// otherwise share a `<path>.tmp` file and stomp each other's bytes.
68static TMP_SEQ: AtomicU64 = AtomicU64::new(0);
69
70const BLOBS_DIR: &str = "blobs";
71const META_DIR: &str = "meta";
72const BLOB_EXT: &str = "blob";
73const META_EXT: &str = "meta";
74/// Number of shards used for both `blobs/` and `meta/` directories (one per
75/// first-byte value of the content hash: `00`–`ff`).
76const SHARD_COUNT: u16 = 256;
77
78/// HOP data pool with disk-backed blob storage and in-memory metadata index.
79pub struct HopDataPool {
80	/// In-memory metadata index (no blobs).
81	index: Mutex<HashMap<HopHash, HopEntryMeta>>,
82	/// Per-user byte usage tracked by sender id.
83	///
84	/// Counters live directly in the map and are charged via `charge_user`
85	/// inside the read guard, so the reclamation pass in `cleanup_expired`
86	/// (which holds `user_usage.write()` together with `index.lock()`) cannot
87	/// interpose between a lookup and its `fetch_add`. Stale entries —
88	/// counter 0 and no live index entry — are reclaimed by the same pass.
89	user_usage: RwLock<HashMap<SenderId, AtomicU64>>,
90	/// Maximum pool size in bytes (counts both data and per-entry metadata overhead).
91	max_size: u64,
92	/// Fixed hard per-user quota in bytes.
93	max_user_size: u64,
94	/// Current pool size in bytes (accounted size — includes metadata overhead).
95	current_size: AtomicU64,
96	/// Data retention period in seconds.
97	retention_secs: u64,
98	/// Root data directory containing blobs/ and meta/ subdirectories.
99	data_dir: PathBuf,
100	/// Per-account submit rate limiter.
101	rate_limiter: Arc<RateLimiter>,
102}
103
104impl HopDataPool {
105	/// Create a new disk-backed data pool.
106	///
107	/// Creates shard directories under `data_dir` and rebuilds the in-memory index
108	/// from existing `.meta` files on disk (recovery after restart).
109	pub fn new(
110		max_size: u64,
111		max_user_size: u64,
112		retention_secs: u64,
113		data_dir: PathBuf,
114		rate_limit_cfg: RateLimitConfig,
115	) -> Result<Self, HopError> {
116		// Create shard directories (256 each for blobs/ and meta/).
117		for i in 0..SHARD_COUNT {
118			let shard = format!("{:02x}", i as u8);
119			fs::create_dir_all(data_dir.join(BLOBS_DIR).join(&shard))?;
120			fs::create_dir_all(data_dir.join(META_DIR).join(&shard))?;
121		}
122
123		let mut index = HashMap::new();
124		let mut user_usage: HashMap<SenderId, AtomicU64> = HashMap::new();
125		let mut current_size = 0u64;
126
127		// Rebuild index from .meta files and clean orphan .blobs in a single pass.
128		for i in 0..SHARD_COUNT {
129			let shard = format!("{:02x}", i as u8);
130
131			// Scan .meta files → rebuild index (removes corrupt/orphan .meta files).
132			let meta_shard_dir = data_dir.join(META_DIR).join(&shard);
133			if let Ok(entries) = fs::read_dir(&meta_shard_dir) {
134				for entry in entries.flatten() {
135					let path = entry.path();
136					if path.extension().and_then(|e| e.to_str()) != Some(META_EXT) {
137						if path
138							.file_name()
139							.and_then(|n| n.to_str())
140							.map_or(false, |n| n.contains(".tmp."))
141						{
142							let _ = fs::remove_file(&path);
143						}
144						continue;
145					}
146
147					let stem = match path.file_stem().and_then(|s| s.to_str()) {
148						Some(s) => s.to_string(),
149						None => continue,
150					};
151
152					let Some(hash) = parse_hex_hash(&stem) else {
153						tracing::warn!(target: "hop", path = ?path, "Removing .meta with invalid name");
154						let _ = fs::remove_file(&path);
155						continue;
156					};
157
158					let meta_bytes = match fs::read(&path) {
159						Ok(b) => b,
160						Err(e) => {
161							tracing::warn!(target: "hop", path = ?path, error = %e, "Removing unreadable .meta");
162							let _ = fs::remove_file(&path);
163							continue;
164						},
165					};
166					let meta = match HopEntryMeta::decode(&mut &meta_bytes[..]) {
167						Ok(m) => m,
168						Err(e) => {
169							tracing::warn!(target: "hop", path = ?path, error = %e, "Removing corrupt .meta");
170							let _ = fs::remove_file(&path);
171							continue;
172						},
173					};
174					if meta.version != HOP_META_VERSION {
175						tracing::warn!(
176							target: "hop",
177							path = ?path,
178							version = meta.version,
179							expected = HOP_META_VERSION,
180							"Removing .meta with unsupported on-disk version",
181						);
182						let _ = fs::remove_file(&path);
183						let _ = fs::remove_file(Self::entry_path(
184							&data_dir, &hash, BLOBS_DIR, BLOB_EXT,
185						));
186						continue;
187					}
188
189					let blob_path = Self::entry_path(&data_dir, &hash, BLOBS_DIR, BLOB_EXT);
190					if !blob_path.exists() {
191						tracing::warn!(target: "hop", hash = ?stem, "Removing orphan .meta (no .blob)");
192						let _ = fs::remove_file(&path);
193						continue;
194					}
195
196					let accounted = entry_accounted_size(meta.size, meta.recipients.len());
197					current_size += accounted;
198					user_usage
199						.entry(meta.sender_id)
200						.or_default()
201						.fetch_add(accounted, Ordering::Relaxed);
202					index.insert(hash, meta);
203				}
204			}
205
206			// Scan .blob files → remove orphans (blobs without corresponding .meta).
207			let blob_shard_dir = data_dir.join(BLOBS_DIR).join(&shard);
208			if let Ok(entries) = fs::read_dir(&blob_shard_dir) {
209				for entry in entries.flatten() {
210					let path = entry.path();
211					if path.extension().and_then(|e| e.to_str()) != Some(BLOB_EXT) {
212						if path
213							.file_name()
214							.and_then(|n| n.to_str())
215							.map_or(false, |n| n.contains(".tmp."))
216						{
217							let _ = fs::remove_file(&path);
218						}
219						continue;
220					}
221					let stem = match path.file_stem().and_then(|s| s.to_str()) {
222						Some(s) => s.to_string(),
223						None => continue,
224					};
225					// Any blob without a corresponding index entry is an orphan.
226					// The meta scan for this shard already populated `index`, so an
227					// in-memory lookup is sufficient and avoids a syscall per blob.
228					// Blobs with unparseable names have no possible index match and
229					// are always removed.
230					let is_orphan = match parse_hex_hash(&stem) {
231						Some(hash) => !index.contains_key(&hash),
232						None => true,
233					};
234					if is_orphan {
235						tracing::warn!(target: "hop", hash = ?stem, "Removing orphan .blob (no .meta)");
236						let _ = fs::remove_file(&path);
237					}
238				}
239			}
240		}
241
242		tracing::info!(
243			target: "hop",
244			entries = index.len(),
245			total_bytes = current_size,
246			"Recovered HOP pool from disk"
247		);
248
249		Ok(Self {
250			index: Mutex::new(index),
251			user_usage: RwLock::new(user_usage),
252			max_size,
253			max_user_size,
254			current_size: AtomicU64::new(current_size),
255			retention_secs,
256			data_dir,
257			rate_limiter: Arc::new(RateLimiter::new(rate_limit_cfg)),
258		})
259	}
260
261	/// Charge `accounted` bytes against `sender_id`'s per-user quota, creating
262	/// a zero-initialized counter if absent. The read guard held across the
263	/// `fetch_add` excludes the reclamation pass in `cleanup_expired` (which
264	/// takes `user_usage.write()`), so the counter cannot be reclaimed
265	/// between lookup and increment.
266	fn charge_user(&self, sender_id: &SenderId, accounted: u64) -> Result<(), HopError> {
267		// Fast path: sender already in map, a read guard is enough.
268		{
269			let usage = self.user_usage.read();
270			if let Some(counter) = usage.get(sender_id) {
271				return self.try_charge(counter, accounted);
272			}
273		}
274		// Cold path: first insert from this sender — take the write guard.
275		let mut usage = self.user_usage.write();
276		let counter = usage.entry(*sender_id).or_default();
277		self.try_charge(counter, accounted)
278	}
279
280	/// Atomically increment `counter` by `accounted`, rolling back on cap
281	/// overflow. `saturating_add` clamps to `u64::MAX` if concurrent failing
282	/// charges briefly inflate the previous value past the wrap point,
283	/// ensuring overflow always falls into the "exceeds cap" branch.
284	fn try_charge(&self, counter: &AtomicU64, accounted: u64) -> Result<(), HopError> {
285		let previous = counter.fetch_add(accounted, Ordering::Relaxed);
286		if previous.saturating_add(accounted) > self.max_user_size {
287			counter.fetch_sub(accounted, Ordering::Relaxed);
288			return Err(HopError::UserQuotaExceeded { used: previous, limit: self.max_user_size });
289		}
290		Ok(())
291	}
292
293	/// Decrement a user's usage counter. Counters are never removed by this
294	/// path; reclamation happens only in the per-sender pass at the end of
295	/// `cleanup_expired`.
296	fn release_user_quota(&self, sender_id: &SenderId, accounted: u64) {
297		if let Some(counter) = self.user_usage.read().get(sender_id) {
298			saturating_release(counter, accounted);
299		}
300	}
301
302	/// Path to a file within a shard subdirectory rooted at `data_dir`.
303	fn entry_path(data_dir: &Path, hash: &HopHash, subdir: &str, ext: &str) -> PathBuf {
304		let hex = hex::encode(hash);
305		data_dir.join(subdir).join(&hex[..2]).join(format!("{}.{}", hex, ext))
306	}
307
308	/// Path to the blob file for a given hash.
309	fn blob_path(&self, hash: &HopHash) -> PathBuf {
310		Self::entry_path(&self.data_dir, hash, BLOBS_DIR, BLOB_EXT)
311	}
312
313	/// Path to the meta file for a given hash.
314	fn meta_path(&self, hash: &HopHash) -> PathBuf {
315		Self::entry_path(&self.data_dir, hash, META_DIR, META_EXT)
316	}
317
318	/// Atomically write data to a file (write to a unique .tmp path, then rename).
319	///
320	/// The tmp suffix encodes process id + a per-process atomic counter so two
321	/// threads writing the same final path (i.e. same content-addressed hash)
322	/// do not race on a shared tmp file. Removes the tmp file on failure so a
323	/// failed write never leaves an orphan.
324	fn write_atomic(path: &Path, data: &[u8]) -> Result<(), HopError> {
325		let suffix = format!("tmp.{}.{}", process::id(), TMP_SEQ.fetch_add(1, Ordering::Relaxed));
326		let tmp_path = path.with_extension(suffix);
327		if let Err(e) = fs::write(&tmp_path, data) {
328			let _ = fs::remove_file(&tmp_path);
329			return Err(e.into());
330		}
331		if let Err(e) = fs::rename(&tmp_path, path) {
332			let _ = fs::remove_file(&tmp_path);
333			return Err(e.into());
334		}
335		Ok(())
336	}
337
338	/// Insert data into the pool.
339	///
340	/// Returns the hash of the data.
341	pub fn insert(
342		&self,
343		data: Vec<u8>,
344		recipients: RecipientVec,
345		sender_id: SenderId,
346		signer: MultiSigner,
347		signature: MultiSignature,
348		submit_timestamp: u64,
349	) -> Result<HopHash, HopError> {
350		if recipients.is_empty() {
351			return Err(HopError::NoRecipients);
352		}
353		let unique: BTreeSet<&MultiSigner> = recipients.iter().map(|r| &r.signer).collect();
354		if unique.len() != recipients.len() {
355			return Err(HopError::DuplicateRecipient);
356		}
357
358		if data.is_empty() {
359			return Err(HopError::EmptyData);
360		}
361
362		let data_len = data.len() as u64;
363
364		// Total accounted size includes bounded per-recipient metadata overhead so
365		// a submitter cannot inflate memory via large recipient lists while the
366		// capacity counter only tracks `data.len()`. Charge the rate limiter the
367		// same accounted size, otherwise a 1-byte payload with 256 recipients
368		// would cost ~10 KiB of pool capacity while only spending 1 byte of
369		// bandwidth tokens — making the bandwidth dimension non-functional for
370		// fan-out-heavy entries.
371		let accounted = entry_accounted_size(data_len, recipients.len());
372
373		// Rejected requests never reserve capacity — check before any atomic bump.
374		if let Err(retry_after_secs) = self.rate_limiter.check(&sender_id, accounted) {
375			return Err(HopError::RateLimited { retry_after_secs });
376		}
377
378		let previous_size = self.current_size.fetch_add(accounted, Ordering::Relaxed);
379		if previous_size.saturating_add(accounted) > self.max_size {
380			self.current_size.fetch_sub(accounted, Ordering::Relaxed);
381			return Err(HopError::PoolFull(previous_size, self.max_size));
382		}
383
384		if let Err(e) = self.charge_user(&sender_id, accounted) {
385			self.current_size.fetch_sub(accounted, Ordering::Relaxed);
386			return Err(e);
387		}
388
389		let hash = H256(blake2_256(&data));
390
391		// First duplicate check (read lock only).
392		{
393			let index = self.index.lock();
394			if index.contains_key(&hash) {
395				self.release_user_quota(&sender_id, accounted);
396				self.current_size.fetch_sub(accounted, Ordering::Relaxed);
397				return Err(HopError::DuplicateEntry);
398			}
399		}
400
401		// Blob write is outside the lock — content-addressed bytes, racers
402		// produce identical output, rename is atomic.
403		let blob_path = self.blob_path(&hash);
404		if let Err(e) = Self::write_atomic(&blob_path, &data) {
405			self.release_user_quota(&sender_id, accounted);
406			self.current_size.fetch_sub(accounted, Ordering::Relaxed);
407			return Err(e);
408		}
409
410		let expires_at = SystemTime::now()
411			.duration_since(UNIX_EPOCH)
412			.unwrap_or_default()
413			.as_secs()
414			.saturating_add(self.retention_secs);
415		let meta = HopEntryMeta::new(
416			data_len,
417			expires_at,
418			recipients,
419			sender_id,
420			signer,
421			signature,
422			submit_timestamp,
423		);
424		let meta_bytes = meta.encode();
425		let meta_path = self.meta_path(&hash);
426
427		// Meta write goes under the index lock: meta is not content-addressed
428		// (sender_id, signer, signature, recipients, submit_timestamp differ
429		// between submitters), so racing writers would otherwise leave the
430		// loser's bytes on disk, diverging from the winner held in memory.
431		{
432			let mut index = self.index.lock();
433			if index.contains_key(&hash) {
434				tracing::debug!(
435					target: "hop",
436					hash = ?hex::encode(hash),
437					"Duplicate insert race lost; keeping winner's files"
438				);
439				// Drop `index` before `release_user_quota` takes `user_usage.read()`
440				// to keep the outer-to-inner lock order matching `cleanup_expired`.
441				drop(index);
442				self.release_user_quota(&sender_id, accounted);
443				self.current_size.fetch_sub(accounted, Ordering::Relaxed);
444				return Err(HopError::DuplicateEntry);
445			}
446			if let Err(e) = Self::write_atomic(&meta_path, &meta_bytes) {
447				// Index doesn't contain this hash; remove the blob to avoid
448				// leaving an orphan.
449				let _ = fs::remove_file(&blob_path);
450				drop(index);
451				self.release_user_quota(&sender_id, accounted);
452				self.current_size.fetch_sub(accounted, Ordering::Relaxed);
453				return Err(e);
454			}
455			index.insert(hash, meta);
456		}
457
458		tracing::info!(
459			target: "hop",
460			hash = ?hex::encode(hash),
461			size = data_len,
462			accounted,
463			expires_at,
464			"Data added to HOP pool"
465		);
466
467		Ok(hash)
468	}
469
470	/// Read a blob from disk and verify its content hash.
471	///
472	/// Content addressing means `blake2_256(data) == *hash` is an invariant
473	/// — corruption (bit rot, partial write, local tampering) violates it.
474	/// On integrity failure the caller-facing result is the same as a missing
475	/// blob and the broken entry is purged so subsequent reads converge.
476	fn read_and_verify_blob(&self, hash: &HopHash) -> Result<Vec<u8>, HopError> {
477		let blob_path = self.blob_path(hash);
478		let data = fs::read(&blob_path).map_err(|e| {
479			if e.kind() == std::io::ErrorKind::NotFound {
480				HopError::NotFound
481			} else {
482				HopError::IoError(e)
483			}
484		})?;
485		if H256(blake2_256(&data)) != *hash {
486			tracing::error!(
487				target: "hop",
488				hash = ?hex::encode(hash),
489				size = data.len(),
490				"Blob integrity check failed; purging entry"
491			);
492			self.purge_corrupt_entry(hash);
493			return Err(HopError::NotFound);
494		}
495		Ok(data)
496	}
497
498	/// Remove a corrupt entry from the index and best-effort delete its files.
499	/// The accounted size is released back to the pool and the user quota.
500	fn purge_corrupt_entry(&self, hash: &HopHash) {
501		let removed = {
502			let mut index = self.index.lock();
503			index.remove(hash)
504		};
505		if let Some(meta) = removed {
506			let accounted = entry_accounted_size(meta.size, meta.recipients.len());
507			self.current_size.fetch_sub(accounted, Ordering::Relaxed);
508			self.release_user_quota(&meta.sender_id, accounted);
509		}
510		let _ = fs::remove_file(self.blob_path(hash));
511		let _ = fs::remove_file(self.meta_path(hash));
512	}
513
514	/// Read and verify a blob, returning `None` for missing entries and logging
515	/// any other failure. Shared by [`Self::get`] and [`Self::get_with_auth`].
516	fn read_or_log(&self, hash: &HopHash) -> Option<Vec<u8>> {
517		match self.read_and_verify_blob(hash) {
518			Ok(data) => Some(data),
519			Err(HopError::NotFound) => None,
520			Err(e) => {
521				tracing::error!(
522					target: "hop",
523					hash = ?hex::encode(hash),
524					error = ?e,
525					"Failed to read blob from disk"
526				);
527				None
528			},
529		}
530	}
531
532	/// Get data from the pool by content hash.
533	pub fn get(&self, hash: &HopHash) -> Option<Vec<u8>> {
534		{
535			let index = self.index.lock();
536			if !index.contains_key(hash) {
537				return None;
538			}
539		}
540		self.read_or_log(hash)
541	}
542
543	/// Get data alongside the submitter's `MultiSigner`, `hop_submit` signature,
544	/// and submit timestamp.
545	///
546	/// Used by the promoter so the unsigned promotion extrinsic can carry the
547	/// user's submit-time signature for runtime-side verification.
548	pub fn get_with_auth(
549		&self,
550		hash: &HopHash,
551	) -> Option<(Vec<u8>, MultiSigner, MultiSignature, u64)> {
552		let (signer, signature, submit_timestamp) = {
553			let index = self.index.lock();
554			let meta = index.get(hash)?;
555			(meta.signer.clone(), meta.signature.clone(), meta.submit_timestamp)
556		};
557		let data = self.read_or_log(hash)?;
558		Some((data, signer, signature, submit_timestamp))
559	}
560
561	/// Decode `signature` and return the index of the matching recipient in
562	/// `meta.recipients`. `context` is the operation's domain separator (claim
563	/// / ack). Returning an index keeps a single implementation for both
564	/// shared- and exclusive-borrow callers (`meta.recipients[idx]` works in
565	/// either case).
566	fn find_recipient_idx(
567		meta: &HopEntryMeta,
568		hash: &HopHash,
569		signature: &[u8],
570		context: &[u8],
571	) -> Result<usize, HopError> {
572		let multi_sig =
573			MultiSignature::decode(&mut &signature[..]).map_err(|_| HopError::InvalidSignature)?;
574		let payload = signing_payload(context, hash);
575
576		meta.recipients
577			.iter()
578			.position(|r| multi_sig.verify(&payload[..], &r.signer.clone().into_account()))
579			.ok_or(HopError::NotRecipient)
580	}
581
582	/// Claim data from the pool (read-only). Verifies the signature against recipient
583	/// public keys. Returns the data if the signature matches a recipient.
584	///
585	/// This does NOT mark the recipient as claimed — call `ack` after receiving the data
586	/// to confirm receipt.
587	///
588	/// Returns `AlreadyClaimed` if the recipient has already acked (data may be deleted).
589	pub fn claim(&self, hash: &HopHash, signature: &[u8]) -> Result<Vec<u8>, HopError> {
590		{
591			let index = self.index.lock();
592			let meta = index.get(hash).ok_or(HopError::NotFound)?;
593			// Map NotRecipient → NotFound so callers cannot probe whether a hash
594			// exists by observing different error codes.
595			let idx = Self::find_recipient_idx(meta, hash, signature, HOP_CLAIM_CONTEXT)
596				.map_err(|_| HopError::NotFound)?;
597
598			// If this recipient already acked, the data may be gone.
599			if meta.recipients[idx].claimed {
600				return Err(HopError::AlreadyClaimed);
601			}
602		}
603		// Read blob from disk and verify its content hash. May be gone if
604		// concurrently acked and deleted, in which case we surface NotFound.
605		self.read_and_verify_blob(hash)
606	}
607
608	/// Acknowledge receipt of claimed data. Marks the recipient as claimed and triggers
609	/// cleanup when all recipients have acked.
610	///
611	/// Idempotent: acking a recipient that already acked returns `Ok(())`.
612	pub fn ack(&self, hash: &HopHash, signature: &[u8]) -> Result<(), HopError> {
613		// Phase 1: idempotent fast path under read lock.
614		{
615			let index = self.index.lock();
616			let meta = index.get(hash).ok_or(HopError::NotFound)?;
617			let idx = Self::find_recipient_idx(meta, hash, signature, HOP_ACK_CONTEXT)
618				.map_err(|_| HopError::NotFound)?;
619			if meta.recipients[idx].claimed {
620				return Ok(());
621			}
622		}
623
624		// Phase 2: re-run the lookup against the current meta — the entry could
625		// have been removed and re-submitted with a different recipient list since Phase 1.
626		let mut index = self.index.lock();
627		let meta = index.get_mut(hash).ok_or(HopError::NotFound)?;
628		let idx = Self::find_recipient_idx(meta, hash, signature, HOP_ACK_CONTEXT)
629			.map_err(|_| HopError::NotFound)?;
630
631		if meta.recipients[idx].claimed {
632			return Ok(());
633		}
634
635		meta.recipients[idx].claimed = true;
636
637		// If all recipients have acked, remove the entry entirely.
638		if meta.recipients.iter().all(|r| r.claimed) {
639			let accounted = entry_accounted_size(meta.size, meta.recipients.len());
640			let sender = meta.sender_id;
641			index.remove(hash);
642			self.current_size.fetch_sub(accounted, Ordering::Relaxed);
643			self.release_user_quota(&sender, accounted);
644			drop(index);
645
646			// Delete files from disk (best-effort; orphans cleaned on restart).
647			let _ = fs::remove_file(self.blob_path(hash));
648			let _ = fs::remove_file(self.meta_path(hash));
649
650			tracing::info!(
651				target: "hop",
652				hash = ?hex::encode(hash),
653				"All recipients acked, data removed"
654			);
655		} else {
656			let claimed_count = meta.recipients.iter().filter(|r| r.claimed).count();
657			// Persist updated claimed state to disk.
658			let meta_bytes = meta.encode();
659			let meta_path = self.meta_path(hash);
660			if let Err(e) = Self::write_atomic(&meta_path, &meta_bytes) {
661				tracing::error!(target: "hop", hash = ?hex::encode(hash), error = %e, "Failed to persist ack state");
662			}
663			drop(index);
664
665			tracing::debug!(
666				target: "hop",
667				hash = ?hex::encode(hash),
668				claimed = claimed_count,
669				"Recipient acked"
670			);
671		}
672
673		Ok(())
674	}
675
676	/// Check if data exists in the pool.
677	#[cfg(test)]
678	pub fn has(&self, hash: &HopHash) -> bool {
679		let index = self.index.lock();
680		index.contains_key(hash)
681	}
682
683	/// Remove data from the pool.
684	#[cfg(test)]
685	pub fn remove(&self, hash: &HopHash) -> Result<(), HopError> {
686		let meta = {
687			let mut index = self.index.lock();
688			index.remove(hash)
689		};
690
691		if let Some(meta) = meta {
692			let accounted = entry_accounted_size(meta.size, meta.recipients.len());
693			self.current_size.fetch_sub(accounted, Ordering::Relaxed);
694			self.release_user_quota(&meta.sender_id, accounted);
695
696			// Delete files from disk (best-effort).
697			let _ = fs::remove_file(self.blob_path(hash));
698			let _ = fs::remove_file(self.meta_path(hash));
699
700			tracing::debug!(
701				target: "hop",
702				hash = ?hex::encode(hash),
703				"Data removed from pool"
704			);
705
706			Ok(())
707		} else {
708			Err(HopError::NotFound)
709		}
710	}
711
712	/// Get pool status.
713	pub fn status(&self) -> PoolStatus {
714		let index = self.index.lock();
715		PoolStatus {
716			entry_count: index.len(),
717			total_bytes: self.current_size.load(Ordering::Relaxed),
718			max_bytes: self.max_size,
719		}
720	}
721
722	/// Remove expired entries and release their user quotas.
723	/// Returns the total bytes freed.
724	///
725	/// Processes entries in bounded batches to keep the index write lock from
726	/// being held across the full HashMap on huge pools. After all batches the
727	/// per-sender `user_usage` map is GC'd in a single pass.
728	pub fn cleanup_expired(&self) -> u64 {
729		const CLEANUP_BATCH_SIZE: usize = 10_000;
730		let mut total_freed: u64 = 0;
731		let now_secs = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
732
733		loop {
734			// Phase 1: Under index write lock — collect and remove up to one
735			// batch of expired entries. Bounded so the lock hold scales with
736			// batch size, not pool size.
737			let expired: Vec<(HopHash, HopEntryMeta)> = {
738				let mut index = self.index.lock();
739				let expired_keys: Vec<HopHash> = index
740					.iter()
741					.filter(|(_, m)| now_secs >= m.expires_at)
742					.map(|(h, _)| *h)
743					.take(CLEANUP_BATCH_SIZE)
744					.collect();
745
746				expired_keys
747					.into_iter()
748					.filter_map(|hash| index.remove(&hash).map(|meta| (hash, meta)))
749					.collect()
750			};
751
752			if expired.is_empty() {
753				break;
754			}
755
756			// Phase 2: Update counters and batch user-quota release.
757			let freed: u64 = expired
758				.iter()
759				.map(|(_, meta)| entry_accounted_size(meta.size, meta.recipients.len()))
760				.sum();
761			self.current_size.fetch_sub(freed, Ordering::Relaxed);
762			total_freed = total_freed.saturating_add(freed);
763
764			{
765				let usage = self.user_usage.read();
766				for (_, meta) in &expired {
767					if let Some(counter) = usage.get(&meta.sender_id) {
768						let accounted = entry_accounted_size(meta.size, meta.recipients.len());
769						saturating_release(counter, accounted);
770					}
771				}
772			}
773
774			// Phase 3: Delete files from disk (best-effort, no locks held).
775			for (hash, _) in &expired {
776				let _ = fs::remove_file(self.blob_path(hash));
777				let _ = fs::remove_file(self.meta_path(hash));
778			}
779		}
780
781		// Phase 4: Reclaim per-sender counters whose owners have no live
782		// entries. Holding `index.lock()` and `user_usage.write()` together
783		// closes the dominant TOCTOU race (concurrent writers cannot create a
784		// new index entry under our held index lock; concurrent
785		// `release_user_quota` only takes `user_usage.read()` which is
786		// excluded). Build a live-sender set in one index pass so retain is
787		// O(senders + entries) instead of O(senders × entries).
788		{
789			let index = self.index.lock();
790			let mut usage = self.user_usage.write();
791			let live: HashSet<&SenderId> = index.values().map(|m| &m.sender_id).collect();
792			usage.retain(|sender_id, counter| {
793				counter.load(Ordering::Relaxed) > 0 || live.contains(sender_id)
794			});
795		}
796
797		// Let the rate limiter shed stale per-sender state on the same cadence.
798		self.rate_limiter.evict_stale();
799
800		total_freed
801	}
802
803	/// Return hashes of entries within `buffer_secs` of expiry that have not yet been promoted.
804	/// Returns up to `limit` hashes. Use [`Self::get`] to read blob data when needed.
805	/// The maintenance task runs periodically, so remaining entries are picked up next cycle.
806	pub fn get_promotable(
807		&self,
808		current_block: HopBlockNumber,
809		buffer_secs: u64,
810		limit: usize,
811	) -> Vec<HopHash> {
812		let now_secs = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
813		let index = self.index.lock();
814		index
815			.iter()
816			.filter(|(_, meta)| {
817				!meta.promoted &&
818					now_secs.saturating_add(buffer_secs) >= meta.expires_at &&
819					meta.promotion_attempts < MAX_PROMOTION_ATTEMPTS &&
820					current_block >= meta.next_promotion_attempt_at
821			})
822			.map(|(h, _)| *h)
823			.take(limit)
824			.collect()
825	}
826
827	/// Mark an entry as promoted to permanent on-chain storage.
828	/// Persists the updated metadata to disk.
829	pub fn mark_promoted(&self, hash: &HopHash) {
830		let mut index = self.index.lock();
831		if let Some(meta) = index.get_mut(hash) {
832			meta.promoted = true;
833			let meta_bytes = meta.encode();
834			let meta_path = self.meta_path(hash);
835			drop(index);
836
837			if let Err(e) = Self::write_atomic(&meta_path, &meta_bytes) {
838				tracing::error!(
839					target: "hop",
840					hash = ?hex::encode(hash),
841					error = %e,
842					"Failed to persist promoted state"
843				);
844			}
845		}
846	}
847
848	/// Record a promotion attempt: bumps the per-entry attempt counter and
849	/// schedules the next eligible block via exponential back-off. The
850	/// maintenance task will skip the entry until then. Once
851	/// `MAX_PROMOTION_ATTEMPTS` is reached the entry is left to expire.
852	///
853	/// Called on **both** an `Err` from `submit_local` (the tx pool rejected
854	/// us) and an `Ok` followed by a runtime check that the data is not yet
855	/// on-chain (the tx was accepted into the pool but never included). The
856	/// backoff schedule is identical for both cases.
857	pub fn record_promotion_attempt(
858		&self,
859		hash: &HopHash,
860		current_block: HopBlockNumber,
861		check_interval_blocks: u32,
862	) {
863		let mut index = self.index.lock();
864		if let Some(meta) = index.get_mut(hash) {
865			meta.promotion_attempts = meta.promotion_attempts.saturating_add(1);
866			let backoff = promotion_backoff_blocks(meta.promotion_attempts, check_interval_blocks);
867			meta.next_promotion_attempt_at = current_block.saturating_add(backoff);
868			let meta_bytes = meta.encode();
869			let meta_path = self.meta_path(hash);
870			drop(index);
871
872			if let Err(e) = Self::write_atomic(&meta_path, &meta_bytes) {
873				tracing::error!(
874					target: "hop",
875					hash = ?hex::encode(hash),
876					error = %e,
877					"Failed to persist promotion-attempt state"
878				);
879			}
880		}
881	}
882}
883
884/// Decode a 64-char hex stem into a `HopHash`. Returns `None` for any
885/// non-32-byte stem (corrupt name, wrong length, non-hex chars).
886fn parse_hex_hash(stem: &str) -> Option<HopHash> {
887	let bytes = hex::decode(stem).ok()?;
888	let arr: [u8; 32] = bytes.try_into().ok()?;
889	Some(H256(arr))
890}
891
892/// Atomically subtract `accounted` from `counter`, clamped so the counter
893/// cannot underflow. The CAS retry inside `fetch_update` keeps the clamp
894/// value fresh — a plain `counter.fetch_sub(accounted.min(counter.load()), …)`
895/// would race with concurrent releases on the same counter and could wrap
896/// to near `u64::MAX`.
897fn saturating_release(counter: &AtomicU64, accounted: u64) {
898	let _ = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |previous| {
899		Some(previous - accounted.min(previous))
900	});
901}
902
903#[cfg(test)]
904mod tests {
905	use super::*;
906	use crate::types::{Recipient, MAX_RECIPIENTS};
907	use sp_core::{crypto::Pair, ed25519, sr25519};
908	use sp_runtime::MultiSigner;
909	use tempfile::TempDir;
910
911	const SENDER_A: SenderId = [1u8; 32];
912	const SENDER_B: SenderId = [2u8; 32];
913
914	/// Accounted cost of an entry with `data_size` bytes and `num_recipients` recipients.
915	fn acct(data_size: u64, num_recipients: usize) -> u64 {
916		entry_accounted_size(data_size, num_recipients)
917	}
918
919	fn make_pool(max_size: u64, retention_secs: u64) -> (HopDataPool, TempDir) {
920		let dir = TempDir::new().unwrap();
921		let pool = HopDataPool::new(
922			max_size,
923			max_size,
924			retention_secs,
925			dir.path().to_path_buf(),
926			RateLimitConfig::disabled(),
927		)
928		.unwrap();
929		(pool, dir)
930	}
931
932	fn make_pool_with_user_cap(
933		max_size: u64,
934		max_user_size: u64,
935		retention_secs: u64,
936	) -> (HopDataPool, TempDir) {
937		let dir = TempDir::new().unwrap();
938		let pool = HopDataPool::new(
939			max_size,
940			max_user_size,
941			retention_secs,
942			dir.path().to_path_buf(),
943			RateLimitConfig::disabled(),
944		)
945		.unwrap();
946		(pool, dir)
947	}
948
949	fn create_test_pool() -> (HopDataPool, TempDir) {
950		make_pool(1024 * 1024, 100)
951	}
952
953	fn test_recipient() -> (ed25519::Pair, MultiSigner) {
954		let pair = ed25519::Pair::from_seed(&[1u8; 32]);
955		let signer = MultiSigner::Ed25519(pair.public());
956		(pair, signer)
957	}
958
959	/// Deterministic placeholder `(MultiSigner, MultiSignature)` for tests that
960	/// don't exercise submit-signature semantics. The actual values are never
961	/// verified by these tests.
962	fn dummy_auth() -> (MultiSigner, MultiSignature) {
963		let pair = ed25519::Pair::from_seed(&[7u8; 32]);
964		let signer = MultiSigner::Ed25519(pair.public());
965		let sig = MultiSignature::Ed25519(pair.sign(&[]));
966		(signer, sig)
967	}
968
969	fn sign_ed(pair: &ed25519::Pair, context: &[u8], hash: &HopHash) -> Vec<u8> {
970		let payload = signing_payload(context, hash);
971		MultiSignature::Ed25519(pair.sign(&payload)).encode()
972	}
973
974	fn sign_sr(pair: &sr25519::Pair, context: &[u8], hash: &HopHash) -> Vec<u8> {
975		let payload = signing_payload(context, hash);
976		MultiSignature::Sr25519(pair.sign(&payload)).encode()
977	}
978
979	fn user_usage(pool: &HopDataPool, sender: &SenderId) -> u64 {
980		pool.user_usage
981			.read()
982			.get(sender)
983			.map(|c| c.load(Ordering::Relaxed))
984			.unwrap_or(0)
985	}
986
987	/// Convert a `Vec<MultiSigner>` into a `RecipientVec` (with `claimed=false` for
988	/// each) for test ergonomics; panics only if a test exceeds `MAX_RECIPIENTS`.
989	fn bv(v: Vec<MultiSigner>) -> RecipientVec {
990		let recipients: Vec<Recipient> =
991			v.into_iter().map(|signer| Recipient { signer, claimed: false }).collect();
992		RecipientVec::try_from(recipients).expect("test recipient list exceeds MAX_RECIPIENTS")
993	}
994
995	#[test]
996	fn test_insert_and_get() {
997		let (pool, _dir) = create_test_pool();
998		let (_, signer) = test_recipient();
999		let data = vec![1, 2, 3, 4, 5];
1000		let hash = pool
1001			.insert(data.clone(), bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1002			.unwrap();
1003
1004		let retrieved = pool.get(&hash).unwrap();
1005		assert_eq!(data, retrieved);
1006	}
1007
1008	#[test]
1009	fn test_insert_no_recipients() {
1010		let (pool, _dir) = create_test_pool();
1011		let data = vec![1, 2, 3, 4, 5];
1012		let result = pool.insert(data, bv(vec![]), SENDER_A, dummy_auth().0, dummy_auth().1, 0);
1013		assert!(matches!(result, Err(HopError::NoRecipients)));
1014	}
1015
1016	#[test]
1017	fn test_duplicate_insert() {
1018		let (pool, _dir) = create_test_pool();
1019		let (_, signer) = test_recipient();
1020		let data = vec![1, 2, 3, 4, 5];
1021
1022		pool.insert(
1023			data.clone(),
1024			bv(vec![signer.clone()]),
1025			SENDER_A,
1026			dummy_auth().0,
1027			dummy_auth().1,
1028			0,
1029		)
1030		.unwrap();
1031		let result =
1032			pool.insert(data, bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0);
1033
1034		assert!(matches!(result, Err(HopError::DuplicateEntry)));
1035	}
1036
1037	#[test]
1038	fn test_too_many_recipients_rejected_at_type_level() {
1039		// Construction of a `RecipientVec` with more than `MAX_RECIPIENTS` entries
1040		// fails at `try_from`; callers (like the RPC) turn that into a
1041		// `TooManyRecipients` error before reaching the pool.
1042		let recipients: Vec<Recipient> = (0..=MAX_RECIPIENTS as u64)
1043			.map(|i| {
1044				let mut seed = [0u8; 32];
1045				seed[..8].copy_from_slice(&i.to_le_bytes());
1046				Recipient {
1047					signer: MultiSigner::Ed25519(ed25519::Pair::from_seed(&seed).public()),
1048					claimed: false,
1049				}
1050			})
1051			.collect();
1052		assert_eq!(recipients.len(), MAX_RECIPIENTS as usize + 1);
1053		assert!(RecipientVec::try_from(recipients).is_err());
1054	}
1055
1056	#[test]
1057	fn test_duplicate_recipient_rejected() {
1058		let (pool, _dir) = create_test_pool();
1059		let (_, signer) = test_recipient();
1060		let result = pool.insert(
1061			vec![1, 2, 3],
1062			bv(vec![signer.clone(), signer]),
1063			SENDER_A,
1064			dummy_auth().0,
1065			dummy_auth().1,
1066			0,
1067		);
1068		assert!(matches!(result, Err(HopError::DuplicateRecipient)));
1069	}
1070
1071	#[test]
1072	fn test_pool_full() {
1073		// Capacity exactly holds one 60-byte entry with one recipient (60 + 40 = 100).
1074		let (pool, _dir) = make_pool(acct(60, 1), 100);
1075		let (_, signer) = test_recipient();
1076
1077		let data1 = vec![0u8; 60];
1078		let data2 = vec![1u8; 50];
1079
1080		pool.insert(data1, bv(vec![signer.clone()]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1081			.unwrap();
1082		let result =
1083			pool.insert(data2, bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0);
1084
1085		assert!(matches!(result, Err(HopError::PoolFull(_, _))));
1086	}
1087
1088	#[test]
1089	fn test_remove() {
1090		let (pool, _dir) = create_test_pool();
1091		let (_, signer) = test_recipient();
1092		let data = vec![1, 2, 3, 4, 5];
1093		let hash = pool
1094			.insert(data, bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1095			.unwrap();
1096
1097		assert!(pool.has(&hash));
1098		pool.remove(&hash).unwrap();
1099		assert!(!pool.has(&hash));
1100
1101		// Files should be cleaned up.
1102		assert!(!pool.blob_path(&hash).exists());
1103		assert!(!pool.meta_path(&hash).exists());
1104	}
1105
1106	#[test]
1107	fn test_status() {
1108		let (pool, _dir) = create_test_pool();
1109		let (_, signer) = test_recipient();
1110		let data1 = vec![1, 2, 3, 4, 5];
1111		let data2 = vec![6, 7, 8];
1112
1113		pool.insert(
1114			data1.clone(),
1115			bv(vec![signer.clone()]),
1116			SENDER_A,
1117			dummy_auth().0,
1118			dummy_auth().1,
1119			0,
1120		)
1121		.unwrap();
1122		pool.insert(data2.clone(), bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1123			.unwrap();
1124
1125		let status = pool.status();
1126		assert_eq!(status.entry_count, 2);
1127		assert_eq!(status.total_bytes, acct(data1.len() as u64, 1) + acct(data2.len() as u64, 1));
1128	}
1129
1130	#[test]
1131	fn test_claim_valid_signature() {
1132		let (pool, _dir) = create_test_pool();
1133		let (pair, signer) = test_recipient();
1134		let data = vec![1, 2, 3, 4, 5];
1135		let hash = pool
1136			.insert(data.clone(), bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1137			.unwrap();
1138
1139		let claim = sign_ed(&pair, HOP_CLAIM_CONTEXT, &hash);
1140		let ack = sign_ed(&pair, HOP_ACK_CONTEXT, &hash);
1141		let result = pool.claim(&hash, &claim).unwrap();
1142		assert_eq!(data, result);
1143
1144		// Entry still exists until ack.
1145		assert!(pool.has(&hash));
1146
1147		pool.ack(&hash, &ack).unwrap();
1148		assert!(!pool.has(&hash));
1149	}
1150
1151	#[test]
1152	fn test_claim_sig_rejected_on_ack() {
1153		// Domain separation: a claim signature cannot be replayed as an ack.
1154		let (pool, _dir) = create_test_pool();
1155		let (pair, signer) = test_recipient();
1156		let hash = pool
1157			.insert(vec![1, 2, 3], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1158			.unwrap();
1159
1160		let claim = sign_ed(&pair, HOP_CLAIM_CONTEXT, &hash);
1161		pool.claim(&hash, &claim).unwrap();
1162		assert!(matches!(pool.ack(&hash, &claim), Err(HopError::NotFound)));
1163	}
1164
1165	#[test]
1166	fn test_claim_invalid_signature() {
1167		let (pool, _dir) = create_test_pool();
1168		let (_, signer) = test_recipient();
1169		let data = vec![1, 2, 3, 4, 5];
1170		let hash = pool
1171			.insert(data, bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1172			.unwrap();
1173
1174		// Use invalid SCALE bytes — cannot decode as MultiSignature
1175		let result = pool.claim(&hash, &[0u8; 3]);
1176		assert!(matches!(result, Err(HopError::NotFound)));
1177	}
1178
1179	#[test]
1180	fn test_claim_wrong_key() {
1181		let (pool, _dir) = create_test_pool();
1182		let (_, signer) = test_recipient();
1183		let hash = pool
1184			.insert(
1185				vec![1, 2, 3, 4, 5],
1186				bv(vec![signer]),
1187				SENDER_A,
1188				dummy_auth().0,
1189				dummy_auth().1,
1190				0,
1191			)
1192			.unwrap();
1193
1194		let wrong_pair = ed25519::Pair::from_seed(&[99u8; 32]);
1195		let wrong_claim = sign_ed(&wrong_pair, HOP_CLAIM_CONTEXT, &hash);
1196		assert!(matches!(pool.claim(&hash, &wrong_claim), Err(HopError::NotFound)));
1197		assert!(pool.has(&hash));
1198	}
1199
1200	#[test]
1201	fn test_claim_multi_recipient() {
1202		let (pool, _dir) = create_test_pool();
1203		let pair1 = ed25519::Pair::from_seed(&[1u8; 32]);
1204		let pair2 = ed25519::Pair::from_seed(&[2u8; 32]);
1205		let signer1 = MultiSigner::Ed25519(pair1.public());
1206		let signer2 = MultiSigner::Ed25519(pair2.public());
1207
1208		let data = vec![1, 2, 3, 4, 5];
1209		let hash = pool
1210			.insert(
1211				data.clone(),
1212				bv(vec![signer1, signer2]),
1213				SENDER_A,
1214				dummy_auth().0,
1215				dummy_auth().1,
1216				0,
1217			)
1218			.unwrap();
1219
1220		let claim1 = sign_ed(&pair1, HOP_CLAIM_CONTEXT, &hash);
1221		let ack1 = sign_ed(&pair1, HOP_ACK_CONTEXT, &hash);
1222		assert_eq!(data, pool.claim(&hash, &claim1).unwrap());
1223		pool.ack(&hash, &ack1).unwrap();
1224		assert!(pool.has(&hash));
1225
1226		let claim2 = sign_ed(&pair2, HOP_CLAIM_CONTEXT, &hash);
1227		let ack2 = sign_ed(&pair2, HOP_ACK_CONTEXT, &hash);
1228		assert_eq!(data, pool.claim(&hash, &claim2).unwrap());
1229		pool.ack(&hash, &ack2).unwrap();
1230		assert!(!pool.has(&hash));
1231		assert_eq!(pool.status().total_bytes, 0);
1232	}
1233
1234	#[test]
1235	fn test_claim_after_ack_returns_already_claimed() {
1236		let (pool, _dir) = create_test_pool();
1237		let (pair, signer) = test_recipient();
1238		let pair2 = ed25519::Pair::from_seed(&[2u8; 32]);
1239		let signer2 = MultiSigner::Ed25519(pair2.public());
1240
1241		let hash = pool
1242			.insert(
1243				vec![1, 2, 3, 4, 5],
1244				bv(vec![signer, signer2]),
1245				SENDER_A,
1246				dummy_auth().0,
1247				dummy_auth().1,
1248				0,
1249			)
1250			.unwrap();
1251
1252		let claim = sign_ed(&pair, HOP_CLAIM_CONTEXT, &hash);
1253		let ack = sign_ed(&pair, HOP_ACK_CONTEXT, &hash);
1254		pool.claim(&hash, &claim).unwrap();
1255		pool.ack(&hash, &ack).unwrap();
1256
1257		// Same recipient claims again — already acked.
1258		assert!(matches!(pool.claim(&hash, &claim), Err(HopError::AlreadyClaimed)));
1259	}
1260
1261	#[test]
1262	fn test_claim_not_found() {
1263		let (pool, _dir) = create_test_pool();
1264		let fake_hash = H256([0u8; 32]);
1265		let result = pool.claim(&fake_hash, &[0u8; 64]);
1266		assert!(matches!(result, Err(HopError::NotFound)));
1267	}
1268
1269	#[test]
1270	fn test_per_user_cap_is_hard_limit() {
1271		// Pool big enough for multiple users; user cap sized to one 60-byte entry (+ metadata).
1272		let (pool, _dir) = make_pool_with_user_cap(10_000, acct(60, 1), 100);
1273		let (_, signer) = test_recipient();
1274
1275		pool.insert(
1276			vec![0u8; 60],
1277			bv(vec![signer.clone()]),
1278			SENDER_A,
1279			dummy_auth().0,
1280			dummy_auth().1,
1281			0,
1282		)
1283		.unwrap();
1284
1285		// User A is at the cap; next insert is rejected regardless of pool headroom.
1286		let result = pool.insert(
1287			vec![1u8; 10],
1288			bv(vec![signer.clone()]),
1289			SENDER_A,
1290			dummy_auth().0,
1291			dummy_auth().1,
1292			0,
1293		);
1294		assert!(matches!(result, Err(HopError::UserQuotaExceeded { .. })));
1295
1296		// User B has their own independent cap.
1297		pool.insert(vec![2u8; 60], bv(vec![signer]), SENDER_B, dummy_auth().0, dummy_auth().1, 0)
1298			.unwrap();
1299	}
1300
1301	#[test]
1302	fn test_quota_released_after_ack() {
1303		let (pool, _dir) = make_pool_with_user_cap(10_000, acct(100, 1), 100);
1304		let (pair, signer) = test_recipient();
1305
1306		let hash = pool
1307			.insert(
1308				vec![0u8; 100],
1309				bv(vec![signer.clone()]),
1310				SENDER_A,
1311				dummy_auth().0,
1312				dummy_auth().1,
1313				0,
1314			)
1315			.unwrap();
1316
1317		// At cap; next insert rejected.
1318		let result = pool.insert(
1319			vec![1u8; 10],
1320			bv(vec![signer.clone()]),
1321			SENDER_A,
1322			dummy_auth().0,
1323			dummy_auth().1,
1324			0,
1325		);
1326		assert!(matches!(result, Err(HopError::UserQuotaExceeded { .. })));
1327
1328		let claim = sign_ed(&pair, HOP_CLAIM_CONTEXT, &hash);
1329		let ack = sign_ed(&pair, HOP_ACK_CONTEXT, &hash);
1330		pool.claim(&hash, &claim).unwrap();
1331		pool.ack(&hash, &ack).unwrap();
1332
1333		// Quota freed — user can insert again.
1334		pool.insert(vec![2u8; 100], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1335			.unwrap();
1336	}
1337
1338	#[test]
1339	fn test_cleanup_expired_releases_quota() {
1340		let (pool, _dir) = make_pool(10_000, 0);
1341		let (_, signer) = test_recipient();
1342
1343		pool.insert(vec![0u8; 100], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1344			.unwrap();
1345		let charged = acct(100, 1);
1346		assert_eq!(user_usage(&pool, &SENDER_A), charged);
1347
1348		let freed = pool.cleanup_expired();
1349		assert_eq!(freed, charged);
1350		assert_eq!(pool.status().total_bytes, 0);
1351		assert_eq!(user_usage(&pool, &SENDER_A), 0);
1352	}
1353
1354	#[test]
1355	fn test_cleanup_expired_honors_wall_clock_retention() {
1356		// Retention is measured in real seconds, not blocks: insert with a 1 s
1357		// retention, sleep past it, and assert cleanup reaps the entry.
1358		let (pool, _dir) = make_pool(10_000, 1);
1359		let (_, signer) = test_recipient();
1360
1361		let hash = pool
1362			.insert(vec![0u8; 100], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1363			.unwrap();
1364
1365		// Not yet expired — cleanup must be a no-op.
1366		assert_eq!(
1367			pool.cleanup_expired(),
1368			0,
1369			"entry should still be live before retention elapses"
1370		);
1371		assert!(pool.has(&hash));
1372
1373		std::thread::sleep(std::time::Duration::from_millis(1_200));
1374
1375		assert!(
1376			pool.cleanup_expired() > 0,
1377			"entry should be reaped once wall-clock retention elapses"
1378		);
1379		assert!(!pool.has(&hash));
1380	}
1381
1382	#[test]
1383	fn test_user_counter_preserved_until_cleanup() {
1384		// release_user_quota does not remove the map entry — only cleanup_expired
1385		// reclaims stale per-sender slots. Until then the slot remains at 0 so a
1386		// concurrent insert would not orphan its `Arc`.
1387		let (pool, _dir) = create_test_pool();
1388		let (pair, signer) = test_recipient();
1389
1390		let hash = pool
1391			.insert(vec![0u8; 50], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1392			.unwrap();
1393		assert!(pool.user_usage.read().contains_key(&SENDER_A));
1394
1395		let claim = sign_ed(&pair, HOP_CLAIM_CONTEXT, &hash);
1396		let ack = sign_ed(&pair, HOP_ACK_CONTEXT, &hash);
1397		pool.claim(&hash, &claim).unwrap();
1398		pool.ack(&hash, &ack).unwrap();
1399
1400		assert_eq!(user_usage(&pool, &SENDER_A), 0);
1401		assert!(pool.user_usage.read().contains_key(&SENDER_A));
1402	}
1403
1404	#[test]
1405	fn test_cleanup_expired_evicts_idle_user_counters() {
1406		// After cleanup_expired runs and a sender has no live entries with a
1407		// non-zero counter, their map slot must be removed so the map cannot
1408		// grow unbounded across the lifetime of a long-running node.
1409		let (pool, _dir) = make_pool(10_000, 10);
1410		let (pair, signer) = test_recipient();
1411
1412		let hash = pool
1413			.insert(vec![0u8; 50], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1414			.unwrap();
1415		let claim = sign_ed(&pair, HOP_CLAIM_CONTEXT, &hash);
1416		let ack = sign_ed(&pair, HOP_ACK_CONTEXT, &hash);
1417		pool.claim(&hash, &claim).unwrap();
1418		pool.ack(&hash, &ack).unwrap();
1419		assert!(pool.user_usage.read().contains_key(&SENDER_A));
1420
1421		pool.cleanup_expired();
1422		assert!(!pool.user_usage.read().contains_key(&SENDER_A));
1423	}
1424
1425	#[test]
1426	fn test_cleanup_expired_keeps_active_user_counters() {
1427		// A sender with live (non-expired) entries must keep their counter
1428		// even when the counter dropped to 0 between submissions — otherwise
1429		// concurrent in-flight inserts could orphan their `Arc`.
1430		let (pool, _dir) = make_pool(10_000, 100);
1431		let (_, signer) = test_recipient();
1432
1433		pool.insert(vec![0u8; 50], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1434			.unwrap();
1435		// Cleanup at a block where the entry is not yet expired must not
1436		// reclaim the sender's slot — a concurrent insert would otherwise
1437		// orphan its `Arc`.
1438		pool.cleanup_expired();
1439		assert!(pool.user_usage.read().contains_key(&SENDER_A));
1440	}
1441
1442	#[test]
1443	fn test_cleanup_expired_processes_more_than_one_batch() {
1444		// Cleanup batch size is 10_000 — feed it 25_000 entries that all expire,
1445		// confirm every entry is removed (proving the loop terminates rather
1446		// than leaving leftovers from the first batch).
1447		const BATCHES: u32 = 2;
1448		const PER_BATCH: u32 = 10_000 + 1; // > one batch each
1449		let total = BATCHES * PER_BATCH;
1450
1451		let dir = TempDir::new().unwrap();
1452		// Pool sized for ~25k tiny entries (4 bytes each + recipient overhead).
1453		let entry_bytes = std::mem::size_of::<u32>() as u64;
1454		let pool = HopDataPool::new(
1455			(acct(entry_bytes, 1) * total as u64) + 1024,
1456			u64::MAX,
1457			0,
1458			dir.path().to_path_buf(),
1459			RateLimitConfig::disabled(),
1460		)
1461		.unwrap();
1462		let (_, signer) = test_recipient();
1463
1464		for i in 0..total {
1465			let mut sender = SENDER_A;
1466			sender[0] = (i & 0xff) as u8;
1467			sender[1] = ((i >> 8) & 0xff) as u8;
1468			sender[2] = ((i >> 16) & 0xff) as u8;
1469			// Data must be unique per entry — content-addressing means equal
1470			// bytes hash to the same key and the second insert hits
1471			// DuplicateEntry. Embed `i` so each blob is distinct.
1472			let data = i.to_le_bytes().to_vec();
1473			pool.insert(data, bv(vec![signer.clone()]), sender, dummy_auth().0, dummy_auth().1, 0)
1474				.unwrap();
1475		}
1476		assert_eq!(pool.status().entry_count, total as usize);
1477
1478		pool.cleanup_expired();
1479		assert_eq!(pool.status().entry_count, 0);
1480		assert_eq!(pool.status().total_bytes, 0);
1481		assert!(pool.user_usage.read().is_empty());
1482	}
1483
1484	#[test]
1485	fn test_restart_recovery() {
1486		let dir = TempDir::new().unwrap();
1487		let (_, signer) = test_recipient();
1488		let expected_accounted = acct(100, 1);
1489
1490		let hash;
1491		{
1492			let pool = HopDataPool::new(
1493				1024 * 1024,
1494				1024 * 1024,
1495				100,
1496				dir.path().to_path_buf(),
1497				RateLimitConfig::disabled(),
1498			)
1499			.unwrap();
1500			hash = pool
1501				.insert(
1502					vec![42u8; 100],
1503					bv(vec![signer]),
1504					SENDER_A,
1505					dummy_auth().0,
1506					dummy_auth().1,
1507					0,
1508				)
1509				.unwrap();
1510			assert!(pool.has(&hash));
1511			assert_eq!(pool.status().entry_count, 1);
1512			assert_eq!(pool.status().total_bytes, expected_accounted);
1513		}
1514
1515		{
1516			let pool = HopDataPool::new(
1517				1024 * 1024,
1518				1024 * 1024,
1519				100,
1520				dir.path().to_path_buf(),
1521				RateLimitConfig::disabled(),
1522			)
1523			.unwrap();
1524			assert!(pool.has(&hash));
1525			assert_eq!(pool.status().entry_count, 1);
1526			assert_eq!(pool.status().total_bytes, expected_accounted);
1527
1528			let data = pool.get(&hash).unwrap();
1529			assert_eq!(data, vec![42u8; 100]);
1530			assert_eq!(user_usage(&pool, &SENDER_A), expected_accounted);
1531		}
1532	}
1533
1534	#[test]
1535	fn test_orphan_blob_cleanup() {
1536		let dir = TempDir::new().unwrap();
1537		{
1538			let _pool = HopDataPool::new(
1539				1024 * 1024,
1540				1024 * 1024,
1541				100,
1542				dir.path().to_path_buf(),
1543				RateLimitConfig::disabled(),
1544			)
1545			.unwrap();
1546		}
1547
1548		let orphan_hash = "aa".to_string() + &"bb".repeat(15);
1549		let blob_path = dir.path().join("blobs").join("aa").join(format!("{}.blob", orphan_hash));
1550		fs::write(&blob_path, b"orphan data").unwrap();
1551		assert!(blob_path.exists());
1552
1553		let _pool = HopDataPool::new(
1554			1024 * 1024,
1555			1024 * 1024,
1556			100,
1557			dir.path().to_path_buf(),
1558			RateLimitConfig::disabled(),
1559		)
1560		.unwrap();
1561		assert!(!blob_path.exists());
1562	}
1563
1564	#[test]
1565	fn test_corrupt_meta_cleanup() {
1566		let dir = TempDir::new().unwrap();
1567		{
1568			let _pool = HopDataPool::new(
1569				1024 * 1024,
1570				1024 * 1024,
1571				100,
1572				dir.path().to_path_buf(),
1573				RateLimitConfig::disabled(),
1574			)
1575			.unwrap();
1576		}
1577
1578		let fake_hash = "bb".to_string() + &"cc".repeat(15);
1579		let meta_path = dir.path().join("meta").join("bb").join(format!("{}.meta", fake_hash));
1580		fs::write(&meta_path, b"not valid SCALE data").unwrap();
1581		assert!(meta_path.exists());
1582
1583		let pool = HopDataPool::new(
1584			1024 * 1024,
1585			1024 * 1024,
1586			100,
1587			dir.path().to_path_buf(),
1588			RateLimitConfig::disabled(),
1589		)
1590		.unwrap();
1591		assert!(!meta_path.exists());
1592		assert_eq!(pool.status().entry_count, 0);
1593	}
1594
1595	#[test]
1596	fn test_claim_sr25519() {
1597		let (pool, _dir) = create_test_pool();
1598		let pair = sr25519::Pair::from_seed(&[3u8; 32]);
1599		let signer = MultiSigner::Sr25519(pair.public());
1600
1601		let data = vec![10, 20, 30];
1602		let hash = pool
1603			.insert(data.clone(), bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1604			.unwrap();
1605
1606		let claim = sign_sr(&pair, HOP_CLAIM_CONTEXT, &hash);
1607		let ack = sign_sr(&pair, HOP_ACK_CONTEXT, &hash);
1608		assert_eq!(data, pool.claim(&hash, &claim).unwrap());
1609		pool.ack(&hash, &ack).unwrap();
1610		assert!(!pool.has(&hash));
1611	}
1612
1613	#[test]
1614	fn test_claim_mixed_key_types() {
1615		let (pool, _dir) = create_test_pool();
1616		let ed_pair = ed25519::Pair::from_seed(&[4u8; 32]);
1617		let sr_pair = sr25519::Pair::from_seed(&[5u8; 32]);
1618		let ed_signer = MultiSigner::Ed25519(ed_pair.public());
1619		let sr_signer = MultiSigner::Sr25519(sr_pair.public());
1620
1621		let data = vec![42, 43, 44];
1622		let hash = pool
1623			.insert(
1624				data.clone(),
1625				bv(vec![ed_signer, sr_signer]),
1626				SENDER_A,
1627				dummy_auth().0,
1628				dummy_auth().1,
1629				0,
1630			)
1631			.unwrap();
1632
1633		let sr_claim = sign_sr(&sr_pair, HOP_CLAIM_CONTEXT, &hash);
1634		let sr_ack = sign_sr(&sr_pair, HOP_ACK_CONTEXT, &hash);
1635		assert_eq!(data, pool.claim(&hash, &sr_claim).unwrap());
1636		pool.ack(&hash, &sr_ack).unwrap();
1637		assert!(pool.has(&hash));
1638
1639		let ed_claim = sign_ed(&ed_pair, HOP_CLAIM_CONTEXT, &hash);
1640		let ed_ack = sign_ed(&ed_pair, HOP_ACK_CONTEXT, &hash);
1641		assert_eq!(data, pool.claim(&hash, &ed_claim).unwrap());
1642		pool.ack(&hash, &ed_ack).unwrap();
1643		assert!(!pool.has(&hash));
1644	}
1645
1646	#[test]
1647	fn test_claim_is_repeatable() {
1648		let (pool, _dir) = create_test_pool();
1649		let (pair, signer) = test_recipient();
1650		let data = vec![1, 2, 3, 4, 5];
1651		let hash = pool
1652			.insert(data.clone(), bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1653			.unwrap();
1654
1655		let claim = sign_ed(&pair, HOP_CLAIM_CONTEXT, &hash);
1656		assert_eq!(data, pool.claim(&hash, &claim).unwrap());
1657		assert_eq!(data, pool.claim(&hash, &claim).unwrap());
1658		assert!(pool.has(&hash));
1659	}
1660
1661	#[test]
1662	fn test_ack_idempotent() {
1663		let (pool, _dir) = create_test_pool();
1664		let (pair, signer) = test_recipient();
1665		let pair2 = ed25519::Pair::from_seed(&[2u8; 32]);
1666		let signer2 = MultiSigner::Ed25519(pair2.public());
1667
1668		let hash = pool
1669			.insert(
1670				vec![1, 2, 3, 4, 5],
1671				bv(vec![signer, signer2]),
1672				SENDER_A,
1673				dummy_auth().0,
1674				dummy_auth().1,
1675				0,
1676			)
1677			.unwrap();
1678		let ack = sign_ed(&pair, HOP_ACK_CONTEXT, &hash);
1679
1680		pool.ack(&hash, &ack).unwrap();
1681		pool.ack(&hash, &ack).unwrap();
1682		assert!(pool.has(&hash));
1683	}
1684
1685	#[test]
1686	fn test_multi_recipient_partial_ack() {
1687		let (pool, _dir) = create_test_pool();
1688		let pair1 = ed25519::Pair::from_seed(&[1u8; 32]);
1689		let pair2 = ed25519::Pair::from_seed(&[2u8; 32]);
1690		let signer1 = MultiSigner::Ed25519(pair1.public());
1691		let signer2 = MultiSigner::Ed25519(pair2.public());
1692
1693		let data = vec![1, 2, 3, 4, 5];
1694		let hash = pool
1695			.insert(
1696				data.clone(),
1697				bv(vec![signer1, signer2]),
1698				SENDER_A,
1699				dummy_auth().0,
1700				dummy_auth().1,
1701				0,
1702			)
1703			.unwrap();
1704
1705		let claim1 = sign_ed(&pair1, HOP_CLAIM_CONTEXT, &hash);
1706		let ack1 = sign_ed(&pair1, HOP_ACK_CONTEXT, &hash);
1707		let claim2 = sign_ed(&pair2, HOP_CLAIM_CONTEXT, &hash);
1708		let ack2 = sign_ed(&pair2, HOP_ACK_CONTEXT, &hash);
1709
1710		assert_eq!(data, pool.claim(&hash, &claim1).unwrap());
1711		pool.ack(&hash, &ack1).unwrap();
1712		assert!(pool.has(&hash));
1713
1714		assert_eq!(data, pool.claim(&hash, &claim2).unwrap());
1715		pool.ack(&hash, &ack2).unwrap();
1716		assert!(!pool.has(&hash));
1717		assert_eq!(pool.status().total_bytes, 0);
1718	}
1719
1720	#[test]
1721	fn test_concurrent_inserts_respect_capacity() {
1722		use std::{sync::Barrier, thread};
1723
1724		let (_, signer) = test_recipient();
1725		// Capacity for exactly 4 entries of 50 bytes (accounted = 90 each).
1726		let (pool, _dir) = make_pool(acct(50, 1) * 4, 100);
1727		let pool = Arc::new(pool);
1728		let barrier = Arc::new(Barrier::new(10));
1729
1730		let handles: Vec<_> = (0..10u8)
1731			.map(|i| {
1732				let pool = pool.clone();
1733				let signer = signer.clone();
1734				let barrier = barrier.clone();
1735				thread::spawn(move || {
1736					barrier.wait();
1737					pool.insert(
1738						vec![i; 50],
1739						bv(vec![signer]),
1740						SENDER_A,
1741						dummy_auth().0,
1742						dummy_auth().1,
1743						0,
1744					)
1745				})
1746			})
1747			.collect();
1748
1749		let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
1750		let successes = results.iter().filter(|r| r.is_ok()).count();
1751
1752		assert!(successes <= 4, "Got {} successes, max should be 4", successes);
1753		assert!(pool.status().total_bytes <= acct(50, 1) * 4);
1754	}
1755
1756	#[test]
1757	fn test_concurrent_inserts_respect_user_quota() {
1758		use std::{sync::Barrier, thread};
1759
1760		let (_, signer) = test_recipient();
1761		// Per-user cap holds 3 entries of 100 bytes. Pool has plenty of room so the
1762		// *user* cap is what actually constrains the test.
1763		let per_entry = acct(100, 1);
1764		let (pool, _dir) = make_pool_with_user_cap(per_entry * 20, per_entry * 3, 100);
1765		let pool = Arc::new(pool);
1766		let barrier = Arc::new(Barrier::new(10));
1767
1768		let handles: Vec<_> = (0..10u8)
1769			.map(|i| {
1770				let pool = pool.clone();
1771				let signer = signer.clone();
1772				let barrier = barrier.clone();
1773				thread::spawn(move || {
1774					barrier.wait();
1775					pool.insert(
1776						vec![i; 100],
1777						bv(vec![signer]),
1778						SENDER_A,
1779						dummy_auth().0,
1780						dummy_auth().1,
1781						0,
1782					)
1783				})
1784			})
1785			.collect();
1786
1787		let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
1788		let successes = results.iter().filter(|r| r.is_ok()).count();
1789
1790		// Hard per-user cap: at most 3 inserts may succeed regardless of concurrency.
1791		assert!(successes <= 3, "hard per-user cap violated: {} successes", successes);
1792		assert!(user_usage(&pool, &SENDER_A) <= per_entry * 3);
1793	}
1794
1795	#[test]
1796	fn test_concurrent_claim_and_ack() {
1797		use std::{sync::Barrier, thread};
1798
1799		let (pool, _dir) = create_test_pool();
1800		let pool = Arc::new(pool);
1801
1802		let pairs: Vec<_> = (1..=5u8)
1803			.map(|i| {
1804				let pair = ed25519::Pair::from_seed(&[i; 32]);
1805				let signer = MultiSigner::Ed25519(pair.public());
1806				(pair, signer)
1807			})
1808			.collect();
1809
1810		let signers: Vec<_> = pairs.iter().map(|(_, s)| s.clone()).collect();
1811		let data = vec![42u8; 100];
1812		let hash = pool
1813			.insert(data.clone(), bv(signers), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1814			.unwrap();
1815
1816		let barrier = Arc::new(Barrier::new(5));
1817
1818		let handles: Vec<_> = pairs
1819			.into_iter()
1820			.map(|(pair, _)| {
1821				let pool = pool.clone();
1822				let barrier = barrier.clone();
1823				let data = data.clone();
1824				thread::spawn(move || {
1825					barrier.wait();
1826					let claim = sign_ed(&pair, HOP_CLAIM_CONTEXT, &hash);
1827					let ack = sign_ed(&pair, HOP_ACK_CONTEXT, &hash);
1828
1829					let claimed = pool.claim(&hash, &claim).unwrap();
1830					assert_eq!(data, claimed);
1831					pool.ack(&hash, &ack).unwrap();
1832				})
1833			})
1834			.collect();
1835
1836		for h in handles {
1837			h.join().unwrap();
1838		}
1839
1840		assert!(!pool.has(&hash));
1841		assert_eq!(pool.status().total_bytes, 0);
1842	}
1843
1844	#[test]
1845	fn test_concurrent_duplicate_insert_preserves_files() {
1846		use std::{sync::Barrier, thread};
1847
1848		// Two threads insert identical content concurrently. The race-loser must
1849		// not delete the winner's blob/meta files; the winning hash must remain
1850		// readable via claim().
1851		let (kp, signer) = test_recipient();
1852		let (pool, _dir) = make_pool(1024 * 1024, 100);
1853		let pool = Arc::new(pool);
1854		let data = vec![0xABu8; 4096];
1855		let barrier = Arc::new(Barrier::new(2));
1856
1857		let handles: Vec<_> = (0..2)
1858			.map(|_| {
1859				let pool = pool.clone();
1860				let barrier = barrier.clone();
1861				let signer = signer.clone();
1862				let data = data.clone();
1863				thread::spawn(move || {
1864					barrier.wait();
1865					pool.insert(data, bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1866				})
1867			})
1868			.collect();
1869		let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
1870
1871		let oks: Vec<_> = results.iter().filter_map(|r| r.as_ref().ok()).collect();
1872		let dupes = results.iter().filter(|r| matches!(r, Err(HopError::DuplicateEntry))).count();
1873		assert_eq!(oks.len(), 1, "exactly one insert must win the race");
1874		assert_eq!(dupes, 1, "the other must report DuplicateEntry");
1875
1876		let hash = *oks[0];
1877		let sig = sign_ed(&kp, HOP_CLAIM_CONTEXT, &hash);
1878		let claimed = pool.claim(&hash, &sig).expect("claim must succeed");
1879		assert_eq!(claimed, data);
1880	}
1881
1882	#[test]
1883	fn test_concurrent_duplicate_insert_keeps_winner_meta_on_disk() {
1884		use std::{sync::Barrier, thread};
1885
1886		// Same content, different senders. The race-loser's meta must not end
1887		// up on disk; otherwise restart recovery would silently load it as
1888		// canonical for the entry.
1889		let dir = TempDir::new().unwrap();
1890		let pool = Arc::new(
1891			HopDataPool::new(
1892				1024 * 1024,
1893				1024 * 1024,
1894				100,
1895				dir.path().to_path_buf(),
1896				RateLimitConfig::disabled(),
1897			)
1898			.unwrap(),
1899		);
1900
1901		let signer_a = MultiSigner::Ed25519(ed25519::Pair::from_seed(&[11u8; 32]).public());
1902		let signer_b = MultiSigner::Ed25519(ed25519::Pair::from_seed(&[22u8; 32]).public());
1903		let data = vec![0xCDu8; 4096];
1904
1905		let barrier = Arc::new(Barrier::new(2));
1906		let (p1, d1, b1, s1) = (pool.clone(), data.clone(), barrier.clone(), signer_a.clone());
1907		let h1 = thread::spawn(move || {
1908			b1.wait();
1909			p1.insert(d1, bv(vec![s1]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1910		});
1911		let (p2, d2, b2, s2) = (pool.clone(), data.clone(), barrier.clone(), signer_b.clone());
1912		let h2 = thread::spawn(move || {
1913			b2.wait();
1914			p2.insert(d2, bv(vec![s2]), SENDER_B, dummy_auth().0, dummy_auth().1, 0)
1915		});
1916
1917		let r1 = h1.join().unwrap();
1918		let r2 = h2.join().unwrap();
1919
1920		let (winner_hash, winner_sender) = match (&r1, &r2) {
1921			(Ok(h), Err(HopError::DuplicateEntry)) => (*h, SENDER_A),
1922			(Err(HopError::DuplicateEntry), Ok(h)) => (*h, SENDER_B),
1923			other => panic!("expected exactly one winner and one DuplicateEntry, got {other:?}"),
1924		};
1925
1926		// Simulate restart: drop the pool, reopen from the same data dir so
1927		// recovery rebuilds the in-memory index from `.meta` files on disk.
1928		drop(pool);
1929		let pool2 = HopDataPool::new(
1930			1024 * 1024,
1931			1024 * 1024,
1932			100,
1933			dir.path().to_path_buf(),
1934			RateLimitConfig::disabled(),
1935		)
1936		.unwrap();
1937
1938		let recovered_sender = pool2
1939			.index
1940			.lock()
1941			.get(&winner_hash)
1942			.expect("winner's entry must survive restart")
1943			.sender_id;
1944		assert_eq!(
1945			recovered_sender, winner_sender,
1946			"on-disk meta diverged from the winning insert; loser's meta overwrote the winner's",
1947		);
1948	}
1949
1950	#[test]
1951	fn test_saturating_release_concurrent_no_underflow() {
1952		use std::{sync::Barrier, thread};
1953
1954		// Many threads each release a fixed amount that sums to exactly the
1955		// initial counter. With a non-atomic load-then-clamp-then-fetch_sub,
1956		// stale clamps would let the counter wrap to ~u64::MAX.
1957		// `saturating_release` must keep the result clamped at 0.
1958		const THREADS: u64 = 32;
1959		const RELEASE_PER_THREAD: u64 = 7;
1960		let counter = Arc::new(AtomicU64::new(THREADS * RELEASE_PER_THREAD));
1961		let barrier = Arc::new(Barrier::new(THREADS as usize));
1962
1963		let handles: Vec<_> = (0..THREADS)
1964			.map(|_| {
1965				let counter = counter.clone();
1966				let barrier = barrier.clone();
1967				thread::spawn(move || {
1968					barrier.wait();
1969					saturating_release(&counter, RELEASE_PER_THREAD);
1970				})
1971			})
1972			.collect();
1973		for h in handles {
1974			h.join().unwrap();
1975		}
1976
1977		assert_eq!(counter.load(Ordering::Relaxed), 0, "counter underflowed or did not reach zero");
1978
1979		// Releasing more than the remaining balance must clamp to 0, never wrap.
1980		saturating_release(&counter, u64::MAX);
1981		assert_eq!(counter.load(Ordering::Relaxed), 0);
1982	}
1983
1984	#[test]
1985	fn test_get_promotable_within_buffer() {
1986		// retention=3600s; a freshly-inserted entry is in the promotion window only
1987		// if the buffer is at least as large as the time-to-expiry.
1988		let (pool, _dir) = make_pool(1024 * 1024, 3600);
1989		let (_, signer) = test_recipient();
1990
1991		let hash = pool
1992			.insert(vec![1, 2, 3], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
1993			.unwrap();
1994
1995		// Small buffer (180s ≪ 3600s retention): not promotable yet.
1996		let promotable = pool.get_promotable(50, 180, usize::MAX);
1997		assert!(promotable.is_empty());
1998
1999		// Large buffer (6000s > 3600s retention): within the window.
2000		let promotable = pool.get_promotable(0, 6000, usize::MAX);
2001		assert_eq!(promotable.len(), 1);
2002		assert_eq!(promotable[0], hash);
2003	}
2004
2005	#[test]
2006	fn test_get_promotable_excludes_promoted() {
2007		let (pool, _dir) = make_pool(1024 * 1024, 100);
2008		let (_, signer) = test_recipient();
2009
2010		let hash = pool
2011			.insert(vec![1, 2, 3], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
2012			.unwrap();
2013
2014		let promotable = pool.get_promotable(80, 180, usize::MAX);
2015		assert_eq!(promotable.len(), 1);
2016
2017		pool.mark_promoted(&hash);
2018
2019		let promotable = pool.get_promotable(80, 180, usize::MAX);
2020		assert!(promotable.is_empty());
2021	}
2022
2023	#[test]
2024	fn test_mark_promoted_persists_across_restart() {
2025		let dir = TempDir::new().unwrap();
2026		let (_, signer) = test_recipient();
2027
2028		let hash;
2029		{
2030			let pool = HopDataPool::new(
2031				1024 * 1024,
2032				1024 * 1024,
2033				100,
2034				dir.path().to_path_buf(),
2035				RateLimitConfig::disabled(),
2036			)
2037			.unwrap();
2038			hash = pool
2039				.insert(
2040					vec![42u8; 10],
2041					bv(vec![signer]),
2042					SENDER_A,
2043					dummy_auth().0,
2044					dummy_auth().1,
2045					0,
2046				)
2047				.unwrap();
2048			pool.mark_promoted(&hash);
2049		}
2050
2051		{
2052			let pool = HopDataPool::new(
2053				1024 * 1024,
2054				1024 * 1024,
2055				100,
2056				dir.path().to_path_buf(),
2057				RateLimitConfig::disabled(),
2058			)
2059			.unwrap();
2060			let promotable = pool.get_promotable(80, 180, usize::MAX);
2061			assert!(promotable.is_empty(), "promoted entry should not be promotable after restart");
2062			assert!(pool.has(&hash), "entry should still exist");
2063		}
2064	}
2065
2066	#[test]
2067	fn test_cleanup_expired_removes_promoted() {
2068		let (pool, _dir) = make_pool(1024 * 1024, 0);
2069		let (_, signer) = test_recipient();
2070
2071		let hash = pool
2072			.insert(vec![1, 2, 3], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
2073			.unwrap();
2074		pool.mark_promoted(&hash);
2075		assert!(pool.has(&hash));
2076
2077		let freed = pool.cleanup_expired();
2078		assert!(freed > 0);
2079		assert!(!pool.has(&hash));
2080	}
2081
2082	#[test]
2083	fn test_rate_limit_rejects_burst_overflow() {
2084		let dir = TempDir::new().unwrap();
2085		// submit_burst=2 so the 3rd request is rate-limited by submit count.
2086		// Bandwidth is sized comfortably above the 3-byte test payloads so the
2087		// rejection comes from the request bucket, not the bandwidth bucket.
2088		let cfg = RateLimitConfig {
2089			enabled: true,
2090			submit_rate_per_min: 60,
2091			submit_burst: 2,
2092			bandwidth_per_min: 1024 * 1024 * 60,
2093			bandwidth_burst: 1024 * 1024,
2094		};
2095		let pool =
2096			HopDataPool::new(1024 * 1024, 1024 * 1024, 100, dir.path().to_path_buf(), cfg).unwrap();
2097		let (_, signer) = test_recipient();
2098
2099		pool.insert(
2100			vec![1, 2, 3],
2101			bv(vec![signer.clone()]),
2102			SENDER_A,
2103			dummy_auth().0,
2104			dummy_auth().1,
2105			0,
2106		)
2107		.unwrap();
2108		pool.insert(
2109			vec![4, 5, 6],
2110			bv(vec![signer.clone()]),
2111			SENDER_A,
2112			dummy_auth().0,
2113			dummy_auth().1,
2114			0,
2115		)
2116		.unwrap();
2117		assert!(matches!(
2118			pool.insert(
2119				vec![7, 8, 9],
2120				bv(vec![signer]),
2121				SENDER_A,
2122				dummy_auth().0,
2123				dummy_auth().1,
2124				0,
2125			),
2126			Err(HopError::RateLimited { .. })
2127		));
2128	}
2129
2130	#[test]
2131	fn test_meta_version_mismatch_rejected() {
2132		// Persist a HopEntryMeta with version 0 (an older / future schema), then
2133		// boot a fresh pool over the same dir and assert the .meta is wiped and
2134		// not surfaced in the in-memory index.
2135		let dir = TempDir::new().unwrap();
2136		let (_, signer) = test_recipient();
2137		let recipients = bv(vec![signer.clone()]);
2138		let mut meta =
2139			HopEntryMeta::new(100, 0, recipients, SENDER_A, dummy_auth().0, dummy_auth().1, 0);
2140		meta.version = 0;
2141
2142		let fake_hash = "ee".to_string() + &"ff".repeat(15);
2143		let meta_dir = dir.path().join("meta").join("ee");
2144		let blob_dir = dir.path().join("blobs").join("ee");
2145		fs::create_dir_all(&meta_dir).unwrap();
2146		fs::create_dir_all(&blob_dir).unwrap();
2147		let meta_path = meta_dir.join(format!("{}.meta", fake_hash));
2148		let blob_path = blob_dir.join(format!("{}.blob", fake_hash));
2149		fs::write(&meta_path, meta.encode()).unwrap();
2150		fs::write(&blob_path, b"x").unwrap();
2151
2152		let pool = HopDataPool::new(
2153			1024 * 1024,
2154			1024 * 1024,
2155			100,
2156			dir.path().to_path_buf(),
2157			RateLimitConfig::disabled(),
2158		)
2159		.unwrap();
2160		assert!(!meta_path.exists(), "stale-version .meta should be removed");
2161		assert!(!blob_path.exists(), "matching .blob should also be removed");
2162		assert_eq!(pool.status().entry_count, 0);
2163	}
2164
2165	#[test]
2166	fn test_promotion_backoff_skips_until_due_then_gives_up() {
2167		use crate::types::MAX_PROMOTION_ATTEMPTS;
2168
2169		let (pool, _dir) = make_pool(1024 * 1024, /* retention = */ 100);
2170		let (_, signer) = test_recipient();
2171		let hash = pool
2172			.insert(vec![1u8; 100], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
2173			.unwrap();
2174
2175		// Inside the buffer window (>= retention=100s) so the entry is promotable
2176		// in principle.
2177		let buffer = 300_u64;
2178		let current = 60;
2179		assert_eq!(pool.get_promotable(current, buffer, 10), vec![hash]);
2180
2181		// First failure schedules next attempt at current + 1× check_interval_blocks.
2182		let check_interval_blocks: u32 = 10;
2183		pool.record_promotion_attempt(&hash, current, check_interval_blocks);
2184		assert!(
2185			pool.get_promotable(current, buffer, 10).is_empty(),
2186			"entry should be skipped until back-off elapses"
2187		);
2188		assert_eq!(pool.get_promotable(current + 10, buffer, 10), vec![hash]);
2189
2190		// Burn through the remaining attempts; once at MAX, the entry stays out
2191		// of the promotable set forever (regardless of how far we advance time).
2192		// Schedule after first failure: 1×, 2×, 4×, 8×, 16× check_interval.
2193		let mut now = current + 10;
2194		for next_attempt in 2..=MAX_PROMOTION_ATTEMPTS {
2195			pool.record_promotion_attempt(&hash, now, check_interval_blocks);
2196			let shift = (next_attempt - 1).min(5);
2197			let backoff = check_interval_blocks << shift;
2198			now += backoff;
2199		}
2200		assert!(
2201			pool.get_promotable(now + 10_000, buffer, 10).is_empty(),
2202			"entry should give up after MAX_PROMOTION_ATTEMPTS"
2203		);
2204	}
2205}