referrerpolicy=no-referrer-when-downgrade

sc_hop/
promotion.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
3
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with this program. If not, see <https://www.gnu.org/licenses/>.
16
17//! HOP maintenance: periodic promotion of near-expiry pool entries to permanent
18//! on-chain storage and cleanup of expired entries.
19//!
20//! ## Architecture
21//!
22//! - [`HopPromoter`] — trait for promoting data on-chain (trait-object friendly).
23//! - [`RuntimeApiPromoter`] — concrete implementation that calls the HOP runtime API via dynamic
24//!   dispatch (see [`crate::runtime_api`]) plus [`sc_transaction_pool_api::LocalTransactionPool`].
25//! - [`try_build_promoter`] — detects runtime API support at startup, returns `Some(promoter)` or
26//!   logs a warning and returns `None`.
27//! - [`HopMaintenanceTask`] — background task combining promotion + cleanup.
28
29use crate::{pool::HopDataPool, runtime_api};
30use sp_api::{ApiExt, CallApiAt, ProvideRuntimeApi};
31use sp_blockchain::HeaderBackend;
32use sp_runtime::{
33	traits::Block as BlockT, AccountId32, MultiSignature, MultiSigner, SaturatedConversion,
34};
35use std::{marker::PhantomData, sync::Arc, time::Duration};
36
37/// Trait for promoting HOP data to permanent on-chain storage.
38///
39/// Implemented as a trait object so that `HopMaintenanceTask` is not generic
40/// over runtime-specific types. The concrete implementation
41/// ([`RuntimeApiPromoter`]) uses the `HopRuntimeApi` runtime API.
42pub trait HopPromoter: Send + Sync + 'static {
43	/// Promote a blob of HOP data to permanent on-chain storage.
44	///
45	/// `signer`, `signature`, and `submit_timestamp` are the user's `hop_submit`-time
46	/// `MultiSigner`, signature, and wall-clock timestamp (ms since unix epoch),
47	/// carried into the unsigned promotion extrinsic so the runtime pallet can
48	/// verify consent on-chain and bound the signature's validity window.
49	fn promote(
50		&self,
51		data: Vec<u8>,
52		signer: MultiSigner,
53		signature: MultiSignature,
54		submit_timestamp: u64,
55	) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
56
57	/// Whether `hash` is already stored on-chain.
58	///
59	/// Used by the maintenance task to confirm that a previously submitted
60	/// promotion extrinsic was actually included in a block.
61	fn is_promoted_on_chain(
62		&self,
63		hash: &[u8; 32],
64	) -> Result<bool, Box<dyn std::error::Error + Send + Sync>>;
65}
66
67/// Concrete [`HopPromoter`] that calls the HOP runtime API dynamically (see
68/// [`crate::runtime_api`]) to build a promotion extrinsic and submits it to
69/// the local transaction pool.
70pub struct RuntimeApiPromoter<Block: BlockT, C, P> {
71	client: Arc<C>,
72	tx_pool: Arc<P>,
73	_phantom: PhantomData<Block>,
74}
75
76impl<Block, C, P> RuntimeApiPromoter<Block, C, P>
77where
78	Block: BlockT,
79	C: HeaderBackend<Block> + CallApiAt<Block> + Send + Sync + 'static,
80	P: sc_transaction_pool_api::LocalTransactionPool<Block = Block> + 'static,
81{
82	/// Create a new promoter.
83	pub fn new(client: Arc<C>, tx_pool: Arc<P>) -> Self {
84		Self { client, tx_pool, _phantom: PhantomData }
85	}
86}
87
88impl<Block, C, P> HopPromoter for RuntimeApiPromoter<Block, C, P>
89where
90	Block: BlockT,
91	C: HeaderBackend<Block> + CallApiAt<Block> + Send + Sync + 'static,
92	P: sc_transaction_pool_api::LocalTransactionPool<Block = Block> + 'static,
93{
94	fn promote(
95		&self,
96		data: Vec<u8>,
97		signer: MultiSigner,
98		signature: MultiSignature,
99		submit_timestamp: u64,
100	) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
101		let best_hash = self.client.info().best_hash;
102		let ext = runtime_api::create_promotion_extrinsic::<Block, _>(
103			&*self.client,
104			best_hash,
105			data,
106			signer,
107			signature,
108			submit_timestamp,
109		)?;
110		self.tx_pool
111			.submit_local(best_hash, ext)
112			.map_err(|e| format!("submit_local failed: {:?}", e))?;
113		Ok(())
114	}
115
116	fn is_promoted_on_chain(
117		&self,
118		hash: &[u8; 32],
119	) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
120		let best_hash = self.client.info().best_hash;
121		Ok(runtime_api::is_promoted_on_chain::<Block, _>(&*self.client, best_hash, *hash)?)
122	}
123}
124
125/// Try to build a [`HopPromoter`] by detecting the `HopRuntimeApi` runtime
126/// API at the current best block.
127///
128/// Returns `Some(promoter)` if the runtime supports the API, or `None` with a
129/// warning log if it doesn't.
130pub fn try_build_promoter<Block, C, P>(
131	client: &Arc<C>,
132	tx_pool: &Arc<P>,
133) -> Option<Arc<dyn HopPromoter>>
134where
135	Block: BlockT,
136	C: HeaderBackend<Block> + ProvideRuntimeApi<Block> + CallApiAt<Block> + Send + Sync + 'static,
137	P: sc_transaction_pool_api::LocalTransactionPool<Block = Block> + 'static,
138{
139	let best_hash = client.info().best_hash;
140	match client
141		.runtime_api()
142		.has_api_with::<dyn sp_hop::HopRuntimeApi<Block, AccountId32>, _>(best_hash, |v| v >= 1)
143	{
144		Ok(true) => {
145			tracing::info!(target: "hop", "HopRuntimeApi detected — promotion enabled");
146			Some(Arc::new(RuntimeApiPromoter::new(client.clone(), tx_pool.clone())))
147		},
148		Ok(false) => {
149			tracing::warn!(
150				target: "hop",
151				"HOP enabled but runtime does not support HopRuntimeApi — running cleanup only"
152			);
153			None
154		},
155		Err(e) => {
156			tracing::warn!(
157				target: "hop",
158				error = %e,
159				"Failed to check HopRuntimeApi support — running cleanup only"
160			);
161			None
162		},
163	}
164}
165
166/// Build a [`HopMaintenanceTask`] wired to the node's client and transaction pool.
167///
168/// Detects `HopRuntimeApi` support at startup (see [`try_build_promoter`]) and captures
169/// a best-block closure over `client` so callers only need to spawn the returned
170/// task on their task manager.
171pub fn build_maintenance_task<Block, C, P>(
172	client: &Arc<C>,
173	tx_pool: &Arc<P>,
174	pool: Arc<HopDataPool>,
175	buffer_secs: u64,
176	check_interval_secs: u64,
177) -> HopMaintenanceTask
178where
179	Block: BlockT,
180	C: HeaderBackend<Block> + ProvideRuntimeApi<Block> + CallApiAt<Block> + Send + Sync + 'static,
181	P: sc_transaction_pool_api::LocalTransactionPool<Block = Block> + 'static,
182{
183	let promoter = try_build_promoter::<Block, _, _>(client, tx_pool);
184	let best_block_client = client.clone();
185	let best_block: Arc<dyn Fn() -> u32 + Send + Sync> =
186		Arc::new(move || best_block_client.info().best_number.saturated_into::<u32>());
187	HopMaintenanceTask::new(pool, promoter, best_block, buffer_secs, check_interval_secs)
188}
189
190/// Background task that periodically promotes near-expiry HOP pool entries to
191/// permanent on-chain storage and cleans up expired entries.
192pub struct HopMaintenanceTask {
193	hop_pool: Arc<HopDataPool>,
194	promoter: Option<Arc<dyn HopPromoter>>,
195	buffer_secs: u64,
196	check_interval_secs: u64,
197	check_interval_blocks: u32,
198	best_block: Arc<dyn Fn() -> u32 + Send + Sync>,
199}
200
201impl HopMaintenanceTask {
202	/// Create a new maintenance task.
203	///
204	/// - `promoter`: `Some` to enable on-chain promotion, `None` for cleanup-only.
205	/// - `best_block`: closure returning the current best block number.
206	/// - `buffer_secs`: how many seconds before expiry to start promoting.
207	/// - `check_interval_secs`: how often to run the maintenance cycle.
208	pub fn new(
209		hop_pool: Arc<HopDataPool>,
210		promoter: Option<Arc<dyn HopPromoter>>,
211		best_block: Arc<dyn Fn() -> u32 + Send + Sync>,
212		buffer_secs: u64,
213		check_interval_secs: u64,
214	) -> Self {
215		let check_interval_blocks =
216			(check_interval_secs.max(1) / crate::types::HOP_BLOCK_TIME_SECS.max(1)).max(1) as u32;
217		Self {
218			hop_pool,
219			promoter,
220			buffer_secs,
221			check_interval_secs,
222			check_interval_blocks,
223			best_block,
224		}
225	}
226
227	/// Run the maintenance loop.
228	pub async fn run(self) {
229		loop {
230			futures_timer::Delay::new(Duration::from_secs(self.check_interval_secs)).await;
231			self.tick();
232		}
233	}
234
235	/// Execute a single maintenance cycle: promote near-expiry entries and clean up expired ones.
236	pub fn tick(&self) {
237		let current_block = (self.best_block)();
238
239		// Promote near-expiry entries one at a time to bound peak memory.
240		if let Some(ref promoter) = self.promoter {
241			const PROMOTION_BATCH_SIZE: usize = 10;
242			let hashes =
243				self.hop_pool
244					.get_promotable(current_block, self.buffer_secs, PROMOTION_BATCH_SIZE);
245			for hash in hashes {
246				// First, ask the runtime whether this hash is already on-chain.
247				// If so, the previous attempt (or a third party) already
248				// landed it — flag locally and stop touching the chain.
249				match promoter.is_promoted_on_chain(hash.as_fixed_bytes()) {
250					Ok(true) => {
251						self.hop_pool.mark_promoted(&hash);
252						tracing::info!(
253							target: "hop",
254							hash = ?hex::encode(hash),
255							"HOP entry already on-chain — flagged locally"
256						);
257						continue;
258					},
259					Ok(false) => {},
260					Err(e) => {
261						// Treat runtime-API failures as "unknown", which means
262						// proceed with submission. Worst case we resubmit a
263						// duplicate; the on-chain check will catch it next cycle.
264						tracing::warn!(
265							target: "hop",
266							hash = ?hex::encode(hash),
267							error = %e,
268							"is_promoted_on_chain failed; assuming not on-chain"
269						);
270					},
271				}
272
273				let (data, signer, signature, submit_timestamp) =
274					match self.hop_pool.get_with_auth(&hash) {
275						Some(t) => t,
276						None => continue,
277					};
278				let size = data.len();
279				let result = promoter.promote(data, signer, signature, submit_timestamp);
280				// Backoff on every attempt — both Ok (submitted to local pool, may
281				// or may not get included) and Err (pool rejected). Without this,
282				// every cycle would resubmit the same extrinsic for the entry's
283				// lifetime, wasting fees and authorization budget if the runtime
284				// pallet does not deduplicate.
285				self.hop_pool.record_promotion_attempt(
286					&hash,
287					current_block,
288					self.check_interval_blocks,
289				);
290				match result {
291					Ok(()) => tracing::info!(
292						target: "hop",
293						hash = ?hex::encode(hash),
294						size,
295						"Submitted HOP promotion extrinsic; awaiting on-chain confirmation"
296					),
297					Err(e) => tracing::warn!(
298						target: "hop",
299						hash = ?hex::encode(hash),
300						error = %e,
301						"Failed to submit HOP promotion extrinsic; will back off"
302					),
303				}
304			}
305		}
306
307		// Always clean up expired entries.
308		let freed = self.hop_pool.cleanup_expired();
309		if freed > 0 {
310			tracing::info!(
311				target: "hop",
312				freed_bytes = freed,
313				"Cleaned up expired HOP entries"
314			);
315		}
316	}
317}
318
319#[cfg(test)]
320mod tests {
321	use super::*;
322	use crate::{
323		pool::HopDataPool,
324		rate_limit::RateLimitConfig,
325		types::{Recipient, RecipientVec, SenderId},
326	};
327	use sp_core::{crypto::Pair, ed25519};
328	use sp_runtime::{MultiSignature, MultiSigner};
329	use std::sync::Mutex;
330	use tempfile::TempDir;
331
332	const SENDER_A: SenderId = [1u8; 32];
333
334	fn test_recipient() -> (ed25519::Pair, MultiSigner) {
335		let pair = ed25519::Pair::from_seed(&[1u8; 32]);
336		let signer = MultiSigner::Ed25519(pair.public());
337		(pair, signer)
338	}
339
340	fn dummy_auth() -> (MultiSigner, MultiSignature) {
341		let pair = ed25519::Pair::from_seed(&[7u8; 32]);
342		let signer = MultiSigner::Ed25519(pair.public());
343		let sig = MultiSignature::Ed25519(pair.sign(&[]));
344		(signer, sig)
345	}
346
347	fn bv(v: Vec<MultiSigner>) -> RecipientVec {
348		let recipients: Vec<Recipient> =
349			v.into_iter().map(|signer| Recipient { signer, claimed: false }).collect();
350		RecipientVec::try_from(recipients).expect("test recipient list exceeds MAX_RECIPIENTS")
351	}
352
353	fn test_pool(max_size: u64, retention_secs: u64, dir: &TempDir) -> Arc<HopDataPool> {
354		Arc::new(
355			HopDataPool::new(
356				max_size,
357				max_size,
358				retention_secs,
359				dir.path().to_path_buf(),
360				RateLimitConfig::disabled(),
361			)
362			.unwrap(),
363		)
364	}
365
366	struct MockPromoter {
367		calls: Mutex<Vec<Vec<u8>>>,
368		should_fail: bool,
369		/// Hashes that the mock claims are already stored on-chain.
370		on_chain: Mutex<std::collections::HashSet<[u8; 32]>>,
371	}
372
373	impl MockPromoter {
374		fn new(should_fail: bool) -> Self {
375			Self {
376				calls: Mutex::new(Vec::new()),
377				should_fail,
378				on_chain: Mutex::new(std::collections::HashSet::new()),
379			}
380		}
381
382		fn call_count(&self) -> usize {
383			self.calls.lock().unwrap().len()
384		}
385
386		fn calls(&self) -> Vec<Vec<u8>> {
387			self.calls.lock().unwrap().clone()
388		}
389
390		/// Mark a hash as on-chain (subsequent `is_promoted_on_chain` returns `true`).
391		fn set_on_chain(&self, hash: [u8; 32]) {
392			self.on_chain.lock().unwrap().insert(hash);
393		}
394	}
395
396	impl HopPromoter for MockPromoter {
397		fn promote(
398			&self,
399			data: Vec<u8>,
400			_signer: MultiSigner,
401			_signature: MultiSignature,
402			_submit_timestamp: u64,
403		) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
404			self.calls.lock().unwrap().push(data);
405			if self.should_fail {
406				Err("mock failure".into())
407			} else {
408				Ok(())
409			}
410		}
411
412		fn is_promoted_on_chain(
413			&self,
414			hash: &[u8; 32],
415		) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
416			Ok(self.on_chain.lock().unwrap().contains(hash))
417		}
418	}
419
420	#[test]
421	fn tick_promotes_near_expiry_entries() {
422		let dir = TempDir::new().unwrap();
423		let pool = test_pool(1024 * 1024, 100, &dir);
424		let (_, signer) = test_recipient();
425
426		let hash = pool
427			.insert(vec![42u8; 10], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
428			.unwrap();
429
430		let promoter = Arc::new(MockPromoter::new(false));
431		let task = HopMaintenanceTask::new(
432			pool.clone(),
433			Some(promoter.clone()),
434			Arc::new(|| 80), // current block = 80
435			180,             // buffer_secs = 180 > retention=100 → in window
436			60,
437		);
438
439		task.tick();
440
441		assert_eq!(promoter.call_count(), 1);
442		assert_eq!(promoter.calls()[0], vec![42u8; 10]);
443
444		// Entry stays in the pool (not yet confirmed on-chain) and is excluded
445		// from the next promotable batch by the post-attempt backoff window.
446		assert!(pool.has(&hash));
447		let promotable = pool.get_promotable(80, 180, usize::MAX);
448		assert!(promotable.is_empty(), "back-off should suppress immediate re-promotion");
449	}
450
451	#[test]
452	fn tick_skips_promotion_when_no_promoter() {
453		let dir = TempDir::new().unwrap();
454		let pool = test_pool(1024 * 1024, 100, &dir);
455		let (_, signer) = test_recipient();
456
457		pool.insert(vec![42u8; 10], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
458			.unwrap();
459
460		let task = HopMaintenanceTask::new(
461			pool.clone(),
462			None, // no promoter
463			Arc::new(|| 80),
464			180,
465			60,
466		);
467
468		task.tick();
469
470		// Entry should still be promotable (no promoter to process it).
471		let promotable = pool.get_promotable(80, 180, usize::MAX);
472		assert_eq!(promotable.len(), 1);
473	}
474
475	#[test]
476	fn tick_does_not_mark_promoted_on_failure() {
477		let dir = TempDir::new().unwrap();
478		let pool = test_pool(1024 * 1024, 100, &dir);
479		let (_, signer) = test_recipient();
480
481		pool.insert(vec![42u8; 10], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
482			.unwrap();
483
484		let promoter = Arc::new(MockPromoter::new(true)); // will fail
485		let task =
486			HopMaintenanceTask::new(pool.clone(), Some(promoter.clone()), Arc::new(|| 80), 180, 60);
487
488		task.tick();
489
490		// Promoter was called but failed.
491		assert_eq!(promoter.call_count(), 1);
492
493		// Failure schedules a back-off rather than re-marking immediately. The
494		// entry isn't promotable at the failure block, but becomes promotable
495		// again once the back-off (1× check_interval = 10 blocks at 6 s/block)
496		// elapses — and crucially, never gets `mark_promoted`.
497		assert!(pool.get_promotable(80, 180, usize::MAX).is_empty());
498		assert_eq!(pool.get_promotable(95, 180, usize::MAX).len(), 1);
499	}
500
501	#[test]
502	fn tick_cleans_up_expired_entries() {
503		let dir = TempDir::new().unwrap();
504		// retention=0 secs so entries expire immediately on the next cleanup pass.
505		let pool = test_pool(1024 * 1024, 0, &dir);
506		let (_, signer) = test_recipient();
507
508		pool.insert(vec![42u8; 50], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
509			.unwrap();
510		assert_eq!(pool.status().entry_count, 1);
511
512		let task = HopMaintenanceTask::new(pool.clone(), None, Arc::new(|| 0), 5, 60);
513
514		task.tick();
515
516		assert_eq!(pool.status().entry_count, 0);
517		assert_eq!(pool.status().total_bytes, 0);
518	}
519
520	#[test]
521	fn tick_promotes_then_cleans_up_independently() {
522		let dir = TempDir::new().unwrap();
523		let pool = test_pool(1024 * 1024, 100, &dir);
524		let (_, signer) = test_recipient();
525
526		let hash = pool
527			.insert(
528				vec![1u8; 10],
529				bv(vec![signer.clone()]),
530				SENDER_A,
531				dummy_auth().0,
532				dummy_auth().1,
533				0,
534			)
535			.unwrap();
536
537		let promoter = Arc::new(MockPromoter::new(false));
538
539		// First tick at block 80: promote is attempted. Entry is NOT marked promoted
540		// yet because we have not confirmed inclusion on-chain.
541		let block = Arc::new(Mutex::new(80u32));
542		let block_clone = block.clone();
543		let task = HopMaintenanceTask::new(
544			pool.clone(),
545			Some(promoter.clone()),
546			Arc::new(move || *block_clone.lock().unwrap()),
547			180,
548			60,
549		);
550
551		task.tick();
552		assert_eq!(promoter.call_count(), 1);
553		assert_eq!(pool.status().entry_count, 1);
554
555		// Simulate the runtime confirming inclusion: from now on, the on-chain check
556		// returns true.
557		promoter.set_on_chain(*hash.as_fixed_bytes());
558
559		// Second tick: the on-chain check short-circuits to mark_promoted.
560		// Promoter must not be invoked a second time.
561		*block.lock().unwrap() = 100;
562		task.tick();
563		assert_eq!(promoter.call_count(), 1, "promoter must not be called again once on-chain");
564	}
565
566	#[test]
567	fn tick_skips_promotion_when_already_on_chain() {
568		let dir = TempDir::new().unwrap();
569		let pool = test_pool(1024 * 1024, 100, &dir);
570		let (_, signer) = test_recipient();
571
572		let hash = pool
573			.insert(vec![42u8; 10], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
574			.unwrap();
575
576		let promoter = Arc::new(MockPromoter::new(false));
577		// The entry is "already on-chain" before any tick runs.
578		promoter.set_on_chain(*hash.as_fixed_bytes());
579
580		let task =
581			HopMaintenanceTask::new(pool.clone(), Some(promoter.clone()), Arc::new(|| 80), 180, 60);
582
583		task.tick();
584
585		assert_eq!(promoter.call_count(), 0, "promote must not be called when already on-chain");
586		assert!(pool.get_promotable(80, 180, usize::MAX).is_empty());
587	}
588
589	#[test]
590	fn tick_retries_unconfirmed_with_backoff() {
591		let dir = TempDir::new().unwrap();
592		let pool = test_pool(1024 * 1024, 100, &dir);
593		let (_, signer) = test_recipient();
594
595		pool.insert(vec![42u8; 10], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
596			.unwrap();
597
598		let promoter = Arc::new(MockPromoter::new(false));
599		let block = Arc::new(Mutex::new(80u32));
600		let block_clone = block.clone();
601		// check_interval_secs=60 → check_interval_blocks = 60/HOP_BLOCK_TIME_SECS.
602		// With the default HOP_BLOCK_TIME_SECS=6 → check_interval_blocks = 10.
603		let task = HopMaintenanceTask::new(
604			pool.clone(),
605			Some(promoter.clone()),
606			Arc::new(move || *block_clone.lock().unwrap()),
607			180,
608			60,
609		);
610
611		// Tick 1 at block 80: promote (submit_local Ok). Backoff: next attempt at 80 + 10 = 90.
612		task.tick();
613		assert_eq!(promoter.call_count(), 1);
614
615		// Still inside the backoff window: nothing happens.
616		*block.lock().unwrap() = 85;
617		task.tick();
618		assert_eq!(promoter.call_count(), 1);
619
620		// Past the first backoff: tick fires again. Backoff after attempt 2 is 2× = 20 blocks,
621		// so next attempt at 90 + 20 = 110.
622		*block.lock().unwrap() = 90;
623		task.tick();
624		assert_eq!(promoter.call_count(), 2);
625
626		// Inside the new backoff window.
627		*block.lock().unwrap() = 109;
628		task.tick();
629		assert_eq!(promoter.call_count(), 2);
630	}
631}