1use crate::{pool::HopDataPool, runtime_api};
30use sp_api::{ApiExt, CallApiAt, ProvideRuntimeApi};
31use sp_blockchain::HeaderBackend;
32use sp_runtime::{
33 traits::Block as BlockT, AccountId32, MultiSignature, MultiSigner, SaturatedConversion,
34};
35use std::{marker::PhantomData, sync::Arc, time::Duration};
36
37pub trait HopPromoter: Send + Sync + 'static {
43 fn promote(
50 &self,
51 data: Vec<u8>,
52 signer: MultiSigner,
53 signature: MultiSignature,
54 submit_timestamp: u64,
55 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
56
57 fn is_promoted_on_chain(
62 &self,
63 hash: &[u8; 32],
64 ) -> Result<bool, Box<dyn std::error::Error + Send + Sync>>;
65}
66
67pub struct RuntimeApiPromoter<Block: BlockT, C, P> {
71 client: Arc<C>,
72 tx_pool: Arc<P>,
73 _phantom: PhantomData<Block>,
74}
75
76impl<Block, C, P> RuntimeApiPromoter<Block, C, P>
77where
78 Block: BlockT,
79 C: HeaderBackend<Block> + CallApiAt<Block> + Send + Sync + 'static,
80 P: sc_transaction_pool_api::LocalTransactionPool<Block = Block> + 'static,
81{
82 pub fn new(client: Arc<C>, tx_pool: Arc<P>) -> Self {
84 Self { client, tx_pool, _phantom: PhantomData }
85 }
86}
87
88impl<Block, C, P> HopPromoter for RuntimeApiPromoter<Block, C, P>
89where
90 Block: BlockT,
91 C: HeaderBackend<Block> + CallApiAt<Block> + Send + Sync + 'static,
92 P: sc_transaction_pool_api::LocalTransactionPool<Block = Block> + 'static,
93{
94 fn promote(
95 &self,
96 data: Vec<u8>,
97 signer: MultiSigner,
98 signature: MultiSignature,
99 submit_timestamp: u64,
100 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
101 let best_hash = self.client.info().best_hash;
102 let ext = runtime_api::create_promotion_extrinsic::<Block, _>(
103 &*self.client,
104 best_hash,
105 data,
106 signer,
107 signature,
108 submit_timestamp,
109 )?;
110 self.tx_pool
111 .submit_local(best_hash, ext)
112 .map_err(|e| format!("submit_local failed: {:?}", e))?;
113 Ok(())
114 }
115
116 fn is_promoted_on_chain(
117 &self,
118 hash: &[u8; 32],
119 ) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
120 let best_hash = self.client.info().best_hash;
121 Ok(runtime_api::is_promoted_on_chain::<Block, _>(&*self.client, best_hash, *hash)?)
122 }
123}
124
125pub fn try_build_promoter<Block, C, P>(
131 client: &Arc<C>,
132 tx_pool: &Arc<P>,
133) -> Option<Arc<dyn HopPromoter>>
134where
135 Block: BlockT,
136 C: HeaderBackend<Block> + ProvideRuntimeApi<Block> + CallApiAt<Block> + Send + Sync + 'static,
137 P: sc_transaction_pool_api::LocalTransactionPool<Block = Block> + 'static,
138{
139 let best_hash = client.info().best_hash;
140 match client
141 .runtime_api()
142 .has_api_with::<dyn sp_hop::HopRuntimeApi<Block, AccountId32>, _>(best_hash, |v| v >= 1)
143 {
144 Ok(true) => {
145 tracing::info!(target: "hop", "HopRuntimeApi detected — promotion enabled");
146 Some(Arc::new(RuntimeApiPromoter::new(client.clone(), tx_pool.clone())))
147 },
148 Ok(false) => {
149 tracing::warn!(
150 target: "hop",
151 "HOP enabled but runtime does not support HopRuntimeApi — running cleanup only"
152 );
153 None
154 },
155 Err(e) => {
156 tracing::warn!(
157 target: "hop",
158 error = %e,
159 "Failed to check HopRuntimeApi support — running cleanup only"
160 );
161 None
162 },
163 }
164}
165
166pub fn build_maintenance_task<Block, C, P>(
172 client: &Arc<C>,
173 tx_pool: &Arc<P>,
174 pool: Arc<HopDataPool>,
175 buffer_secs: u64,
176 check_interval_secs: u64,
177) -> HopMaintenanceTask
178where
179 Block: BlockT,
180 C: HeaderBackend<Block> + ProvideRuntimeApi<Block> + CallApiAt<Block> + Send + Sync + 'static,
181 P: sc_transaction_pool_api::LocalTransactionPool<Block = Block> + 'static,
182{
183 let promoter = try_build_promoter::<Block, _, _>(client, tx_pool);
184 let best_block_client = client.clone();
185 let best_block: Arc<dyn Fn() -> u32 + Send + Sync> =
186 Arc::new(move || best_block_client.info().best_number.saturated_into::<u32>());
187 HopMaintenanceTask::new(pool, promoter, best_block, buffer_secs, check_interval_secs)
188}
189
190pub struct HopMaintenanceTask {
193 hop_pool: Arc<HopDataPool>,
194 promoter: Option<Arc<dyn HopPromoter>>,
195 buffer_secs: u64,
196 check_interval_secs: u64,
197 check_interval_blocks: u32,
198 best_block: Arc<dyn Fn() -> u32 + Send + Sync>,
199}
200
201impl HopMaintenanceTask {
202 pub fn new(
209 hop_pool: Arc<HopDataPool>,
210 promoter: Option<Arc<dyn HopPromoter>>,
211 best_block: Arc<dyn Fn() -> u32 + Send + Sync>,
212 buffer_secs: u64,
213 check_interval_secs: u64,
214 ) -> Self {
215 let check_interval_blocks =
216 (check_interval_secs.max(1) / crate::types::HOP_BLOCK_TIME_SECS.max(1)).max(1) as u32;
217 Self {
218 hop_pool,
219 promoter,
220 buffer_secs,
221 check_interval_secs,
222 check_interval_blocks,
223 best_block,
224 }
225 }
226
227 pub async fn run(self) {
229 loop {
230 futures_timer::Delay::new(Duration::from_secs(self.check_interval_secs)).await;
231 self.tick();
232 }
233 }
234
235 pub fn tick(&self) {
237 let current_block = (self.best_block)();
238
239 if let Some(ref promoter) = self.promoter {
241 const PROMOTION_BATCH_SIZE: usize = 10;
242 let hashes =
243 self.hop_pool
244 .get_promotable(current_block, self.buffer_secs, PROMOTION_BATCH_SIZE);
245 for hash in hashes {
246 match promoter.is_promoted_on_chain(hash.as_fixed_bytes()) {
250 Ok(true) => {
251 self.hop_pool.mark_promoted(&hash);
252 tracing::info!(
253 target: "hop",
254 hash = ?hex::encode(hash),
255 "HOP entry already on-chain — flagged locally"
256 );
257 continue;
258 },
259 Ok(false) => {},
260 Err(e) => {
261 tracing::warn!(
265 target: "hop",
266 hash = ?hex::encode(hash),
267 error = %e,
268 "is_promoted_on_chain failed; assuming not on-chain"
269 );
270 },
271 }
272
273 let (data, signer, signature, submit_timestamp) =
274 match self.hop_pool.get_with_auth(&hash) {
275 Some(t) => t,
276 None => continue,
277 };
278 let size = data.len();
279 let result = promoter.promote(data, signer, signature, submit_timestamp);
280 self.hop_pool.record_promotion_attempt(
286 &hash,
287 current_block,
288 self.check_interval_blocks,
289 );
290 match result {
291 Ok(()) => tracing::info!(
292 target: "hop",
293 hash = ?hex::encode(hash),
294 size,
295 "Submitted HOP promotion extrinsic; awaiting on-chain confirmation"
296 ),
297 Err(e) => tracing::warn!(
298 target: "hop",
299 hash = ?hex::encode(hash),
300 error = %e,
301 "Failed to submit HOP promotion extrinsic; will back off"
302 ),
303 }
304 }
305 }
306
307 let freed = self.hop_pool.cleanup_expired();
309 if freed > 0 {
310 tracing::info!(
311 target: "hop",
312 freed_bytes = freed,
313 "Cleaned up expired HOP entries"
314 );
315 }
316 }
317}
318
319#[cfg(test)]
320mod tests {
321 use super::*;
322 use crate::{
323 pool::HopDataPool,
324 rate_limit::RateLimitConfig,
325 types::{Recipient, RecipientVec, SenderId},
326 };
327 use sp_core::{crypto::Pair, ed25519};
328 use sp_runtime::{MultiSignature, MultiSigner};
329 use std::sync::Mutex;
330 use tempfile::TempDir;
331
332 const SENDER_A: SenderId = [1u8; 32];
333
334 fn test_recipient() -> (ed25519::Pair, MultiSigner) {
335 let pair = ed25519::Pair::from_seed(&[1u8; 32]);
336 let signer = MultiSigner::Ed25519(pair.public());
337 (pair, signer)
338 }
339
340 fn dummy_auth() -> (MultiSigner, MultiSignature) {
341 let pair = ed25519::Pair::from_seed(&[7u8; 32]);
342 let signer = MultiSigner::Ed25519(pair.public());
343 let sig = MultiSignature::Ed25519(pair.sign(&[]));
344 (signer, sig)
345 }
346
347 fn bv(v: Vec<MultiSigner>) -> RecipientVec {
348 let recipients: Vec<Recipient> =
349 v.into_iter().map(|signer| Recipient { signer, claimed: false }).collect();
350 RecipientVec::try_from(recipients).expect("test recipient list exceeds MAX_RECIPIENTS")
351 }
352
353 fn test_pool(max_size: u64, retention_secs: u64, dir: &TempDir) -> Arc<HopDataPool> {
354 Arc::new(
355 HopDataPool::new(
356 max_size,
357 max_size,
358 retention_secs,
359 dir.path().to_path_buf(),
360 RateLimitConfig::disabled(),
361 )
362 .unwrap(),
363 )
364 }
365
366 struct MockPromoter {
367 calls: Mutex<Vec<Vec<u8>>>,
368 should_fail: bool,
369 on_chain: Mutex<std::collections::HashSet<[u8; 32]>>,
371 }
372
373 impl MockPromoter {
374 fn new(should_fail: bool) -> Self {
375 Self {
376 calls: Mutex::new(Vec::new()),
377 should_fail,
378 on_chain: Mutex::new(std::collections::HashSet::new()),
379 }
380 }
381
382 fn call_count(&self) -> usize {
383 self.calls.lock().unwrap().len()
384 }
385
386 fn calls(&self) -> Vec<Vec<u8>> {
387 self.calls.lock().unwrap().clone()
388 }
389
390 fn set_on_chain(&self, hash: [u8; 32]) {
392 self.on_chain.lock().unwrap().insert(hash);
393 }
394 }
395
396 impl HopPromoter for MockPromoter {
397 fn promote(
398 &self,
399 data: Vec<u8>,
400 _signer: MultiSigner,
401 _signature: MultiSignature,
402 _submit_timestamp: u64,
403 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
404 self.calls.lock().unwrap().push(data);
405 if self.should_fail {
406 Err("mock failure".into())
407 } else {
408 Ok(())
409 }
410 }
411
412 fn is_promoted_on_chain(
413 &self,
414 hash: &[u8; 32],
415 ) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
416 Ok(self.on_chain.lock().unwrap().contains(hash))
417 }
418 }
419
420 #[test]
421 fn tick_promotes_near_expiry_entries() {
422 let dir = TempDir::new().unwrap();
423 let pool = test_pool(1024 * 1024, 100, &dir);
424 let (_, signer) = test_recipient();
425
426 let hash = pool
427 .insert(vec![42u8; 10], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
428 .unwrap();
429
430 let promoter = Arc::new(MockPromoter::new(false));
431 let task = HopMaintenanceTask::new(
432 pool.clone(),
433 Some(promoter.clone()),
434 Arc::new(|| 80), 180, 60,
437 );
438
439 task.tick();
440
441 assert_eq!(promoter.call_count(), 1);
442 assert_eq!(promoter.calls()[0], vec![42u8; 10]);
443
444 assert!(pool.has(&hash));
447 let promotable = pool.get_promotable(80, 180, usize::MAX);
448 assert!(promotable.is_empty(), "back-off should suppress immediate re-promotion");
449 }
450
451 #[test]
452 fn tick_skips_promotion_when_no_promoter() {
453 let dir = TempDir::new().unwrap();
454 let pool = test_pool(1024 * 1024, 100, &dir);
455 let (_, signer) = test_recipient();
456
457 pool.insert(vec![42u8; 10], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
458 .unwrap();
459
460 let task = HopMaintenanceTask::new(
461 pool.clone(),
462 None, Arc::new(|| 80),
464 180,
465 60,
466 );
467
468 task.tick();
469
470 let promotable = pool.get_promotable(80, 180, usize::MAX);
472 assert_eq!(promotable.len(), 1);
473 }
474
475 #[test]
476 fn tick_does_not_mark_promoted_on_failure() {
477 let dir = TempDir::new().unwrap();
478 let pool = test_pool(1024 * 1024, 100, &dir);
479 let (_, signer) = test_recipient();
480
481 pool.insert(vec![42u8; 10], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
482 .unwrap();
483
484 let promoter = Arc::new(MockPromoter::new(true)); let task =
486 HopMaintenanceTask::new(pool.clone(), Some(promoter.clone()), Arc::new(|| 80), 180, 60);
487
488 task.tick();
489
490 assert_eq!(promoter.call_count(), 1);
492
493 assert!(pool.get_promotable(80, 180, usize::MAX).is_empty());
498 assert_eq!(pool.get_promotable(95, 180, usize::MAX).len(), 1);
499 }
500
501 #[test]
502 fn tick_cleans_up_expired_entries() {
503 let dir = TempDir::new().unwrap();
504 let pool = test_pool(1024 * 1024, 0, &dir);
506 let (_, signer) = test_recipient();
507
508 pool.insert(vec![42u8; 50], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
509 .unwrap();
510 assert_eq!(pool.status().entry_count, 1);
511
512 let task = HopMaintenanceTask::new(pool.clone(), None, Arc::new(|| 0), 5, 60);
513
514 task.tick();
515
516 assert_eq!(pool.status().entry_count, 0);
517 assert_eq!(pool.status().total_bytes, 0);
518 }
519
520 #[test]
521 fn tick_promotes_then_cleans_up_independently() {
522 let dir = TempDir::new().unwrap();
523 let pool = test_pool(1024 * 1024, 100, &dir);
524 let (_, signer) = test_recipient();
525
526 let hash = pool
527 .insert(
528 vec![1u8; 10],
529 bv(vec![signer.clone()]),
530 SENDER_A,
531 dummy_auth().0,
532 dummy_auth().1,
533 0,
534 )
535 .unwrap();
536
537 let promoter = Arc::new(MockPromoter::new(false));
538
539 let block = Arc::new(Mutex::new(80u32));
542 let block_clone = block.clone();
543 let task = HopMaintenanceTask::new(
544 pool.clone(),
545 Some(promoter.clone()),
546 Arc::new(move || *block_clone.lock().unwrap()),
547 180,
548 60,
549 );
550
551 task.tick();
552 assert_eq!(promoter.call_count(), 1);
553 assert_eq!(pool.status().entry_count, 1);
554
555 promoter.set_on_chain(*hash.as_fixed_bytes());
558
559 *block.lock().unwrap() = 100;
562 task.tick();
563 assert_eq!(promoter.call_count(), 1, "promoter must not be called again once on-chain");
564 }
565
566 #[test]
567 fn tick_skips_promotion_when_already_on_chain() {
568 let dir = TempDir::new().unwrap();
569 let pool = test_pool(1024 * 1024, 100, &dir);
570 let (_, signer) = test_recipient();
571
572 let hash = pool
573 .insert(vec![42u8; 10], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
574 .unwrap();
575
576 let promoter = Arc::new(MockPromoter::new(false));
577 promoter.set_on_chain(*hash.as_fixed_bytes());
579
580 let task =
581 HopMaintenanceTask::new(pool.clone(), Some(promoter.clone()), Arc::new(|| 80), 180, 60);
582
583 task.tick();
584
585 assert_eq!(promoter.call_count(), 0, "promote must not be called when already on-chain");
586 assert!(pool.get_promotable(80, 180, usize::MAX).is_empty());
587 }
588
589 #[test]
590 fn tick_retries_unconfirmed_with_backoff() {
591 let dir = TempDir::new().unwrap();
592 let pool = test_pool(1024 * 1024, 100, &dir);
593 let (_, signer) = test_recipient();
594
595 pool.insert(vec![42u8; 10], bv(vec![signer]), SENDER_A, dummy_auth().0, dummy_auth().1, 0)
596 .unwrap();
597
598 let promoter = Arc::new(MockPromoter::new(false));
599 let block = Arc::new(Mutex::new(80u32));
600 let block_clone = block.clone();
601 let task = HopMaintenanceTask::new(
604 pool.clone(),
605 Some(promoter.clone()),
606 Arc::new(move || *block_clone.lock().unwrap()),
607 180,
608 60,
609 );
610
611 task.tick();
613 assert_eq!(promoter.call_count(), 1);
614
615 *block.lock().unwrap() = 85;
617 task.tick();
618 assert_eq!(promoter.call_count(), 1);
619
620 *block.lock().unwrap() = 90;
623 task.tick();
624 assert_eq!(promoter.call_count(), 2);
625
626 *block.lock().unwrap() = 109;
628 task.tick();
629 assert_eq!(promoter.call_count(), 2);
630 }
631}