1use futures::channel::oneshot;
20use parking_lot::Mutex;
21use sc_client_api::Backend;
22use sp_runtime::traits::Block as BlockT;
23use std::{
24 collections::{hash_map::Entry, HashMap, HashSet},
25 sync::Arc,
26 time::{Duration, Instant},
27};
28
29use crate::chain_head::{
30 subscription::SubscriptionManagementError, FollowEventReceiver, FollowEventSender,
31};
32
33type NotifyOnDrop = tokio::sync::mpsc::Receiver<()>;
34type SharedOperations = Arc<Mutex<HashMap<String, (NotifyOnDrop, StopHandle)>>>;
35
36const BUF_CAP_PER_SUBSCRIPTION: usize = 16;
41
42#[derive(Debug, Clone, PartialEq)]
69enum BlockStateMachine {
70 Registered,
74 FullyRegistered,
78 Unpinned,
82 FullyUnpinned,
86}
87
88impl BlockStateMachine {
89 fn new() -> Self {
90 BlockStateMachine::Registered
91 }
92
93 fn advance_register(&mut self) {
94 match self {
95 BlockStateMachine::Registered => *self = BlockStateMachine::FullyRegistered,
96 BlockStateMachine::Unpinned => *self = BlockStateMachine::FullyUnpinned,
97 _ => (),
98 }
99 }
100
101 fn advance_unpin(&mut self) {
102 match self {
103 BlockStateMachine::Registered => *self = BlockStateMachine::Unpinned,
104 BlockStateMachine::FullyRegistered => *self = BlockStateMachine::FullyUnpinned,
105 _ => (),
106 }
107 }
108
109 fn was_unpinned(&self) -> bool {
110 match self {
111 BlockStateMachine::Unpinned => true,
112 BlockStateMachine::FullyUnpinned => true,
113 _ => false,
114 }
115 }
116}
117
118struct LimitOperations {
120 semaphore: Arc<tokio::sync::Semaphore>,
122}
123
124impl LimitOperations {
125 fn new(max_operations: usize) -> Self {
127 LimitOperations { semaphore: Arc::new(tokio::sync::Semaphore::new(max_operations)) }
128 }
129
130 fn reserve_at_most(&self, to_reserve: usize) -> Option<PermitOperations> {
138 let num_ops = std::cmp::min(self.semaphore.available_permits(), to_reserve);
139
140 if num_ops == 0 {
141 return None
142 }
143
144 let permits = Arc::clone(&self.semaphore)
145 .try_acquire_many_owned(num_ops.try_into().ok()?)
146 .ok()?;
147
148 Some(permits)
149 }
150}
151
152type PermitOperations = tokio::sync::OwnedSemaphorePermit;
159
160#[derive(Clone)]
162pub struct StopHandle(tokio::sync::mpsc::Sender<()>);
163
164impl StopHandle {
165 pub async fn stopped(&self) {
166 self.0.closed().await;
167 }
168
169 pub fn is_stopped(&self) -> bool {
170 self.0.is_closed()
171 }
172}
173
174#[derive(Clone)]
177pub struct OperationState {
178 stop: StopHandle,
179 operations: SharedOperations,
180 operation_id: String,
181}
182
183impl OperationState {
184 pub fn stop(&mut self) {
185 if !self.stop.is_stopped() {
186 self.operations.lock().remove(&self.operation_id);
187 }
188 }
189}
190
191pub struct RegisteredOperation {
195 stop_handle: StopHandle,
197 operations: SharedOperations,
199 operation_id: String,
201 _permit: PermitOperations,
203}
204
205impl RegisteredOperation {
206 pub fn stop_handle(&self) -> &StopHandle {
208 &self.stop_handle
209 }
210
211 pub fn operation_id(&self) -> String {
213 self.operation_id.clone()
214 }
215}
216
217impl Drop for RegisteredOperation {
218 fn drop(&mut self) {
219 self.operations.lock().remove(&self.operation_id);
220 }
221}
222
223struct Operations {
225 next_operation_id: usize,
227 limits: LimitOperations,
229 operations: SharedOperations,
231}
232
233impl Operations {
234 fn new(max_operations: usize) -> Self {
236 Operations {
237 next_operation_id: 0,
238 limits: LimitOperations::new(max_operations),
239 operations: Default::default(),
240 }
241 }
242
243 pub fn register_operation(&mut self, to_reserve: usize) -> Option<RegisteredOperation> {
245 let permit = self.limits.reserve_at_most(to_reserve)?;
246 let operation_id = self.next_operation_id();
247
248 let (tx, rx) = tokio::sync::mpsc::channel(1);
249 let stop_handle = StopHandle(tx);
250 let operations = self.operations.clone();
251 operations.lock().insert(operation_id.clone(), (rx, stop_handle.clone()));
252
253 Some(RegisteredOperation { stop_handle, operation_id, operations, _permit: permit })
254 }
255
256 pub fn get_operation(&self, id: &str) -> Option<OperationState> {
258 let stop = self.operations.lock().get(id).map(|(_, stop)| stop.clone())?;
259
260 Some(OperationState {
261 stop,
262 operations: self.operations.clone(),
263 operation_id: id.to_string(),
264 })
265 }
266
267 fn next_operation_id(&mut self) -> String {
269 let op_id = self.next_operation_id;
270 self.next_operation_id += 1;
271 op_id.to_string()
272 }
273}
274
275struct BlockState {
276 state_machine: BlockStateMachine,
278 timestamp: Instant,
280}
281
282struct SubscriptionState<Block: BlockT> {
284 with_runtime: bool,
286 tx_stop: Option<oneshot::Sender<()>>,
288 response_sender: FollowEventSender<Block::Hash>,
292 operations: Operations,
294 blocks: HashMap<Block::Hash, BlockState>,
305}
306
307impl<Block: BlockT> SubscriptionState<Block> {
308 fn stop(&mut self) {
313 if let Some(tx_stop) = self.tx_stop.take() {
314 let _ = tx_stop.send(());
315 }
316 }
317
318 fn register_block(&mut self, hash: Block::Hash) -> bool {
326 match self.blocks.entry(hash) {
327 Entry::Occupied(mut occupied) => {
328 let block_state = occupied.get_mut();
329
330 block_state.state_machine.advance_register();
331 if block_state.state_machine == BlockStateMachine::FullyUnpinned {
333 occupied.remove();
334 }
335
336 false
338 },
339 Entry::Vacant(vacant) => {
340 vacant.insert(BlockState {
341 state_machine: BlockStateMachine::new(),
342 timestamp: Instant::now(),
343 });
344
345 true
347 },
348 }
349 }
350
351 fn unregister_block(&mut self, hash: Block::Hash) -> bool {
357 match self.blocks.entry(hash) {
358 Entry::Occupied(mut occupied) => {
359 let block_state = occupied.get_mut();
360
361 if block_state.state_machine.was_unpinned() {
363 return false
364 }
365
366 block_state.state_machine.advance_unpin();
367 if block_state.state_machine == BlockStateMachine::FullyUnpinned {
369 occupied.remove();
370 }
371
372 true
373 },
374 Entry::Vacant(_) => false,
376 }
377 }
378
379 fn contains_block(&self, hash: Block::Hash) -> bool {
384 let Some(state) = self.blocks.get(&hash) else {
385 return false
387 };
388
389 !state.state_machine.was_unpinned()
391 }
392
393 fn find_oldest_block_timestamp(&self) -> Instant {
399 let mut timestamp = Instant::now();
400 for (_, state) in self.blocks.iter() {
401 timestamp = std::cmp::min(timestamp, state.timestamp);
402 }
403 timestamp
404 }
405
406 fn register_operation(&mut self, to_reserve: usize) -> Option<RegisteredOperation> {
410 self.operations.register_operation(to_reserve)
411 }
412
413 pub fn get_operation(&self, id: &str) -> Option<OperationState> {
415 self.operations.get_operation(id)
416 }
417}
418
419pub struct BlockGuard<Block: BlockT, BE: Backend<Block>> {
423 hash: Block::Hash,
424 with_runtime: bool,
425 response_sender: FollowEventSender<Block::Hash>,
426 operation: RegisteredOperation,
427 backend: Arc<BE>,
428}
429
430impl<Block: BlockT, BE: Backend<Block>> std::fmt::Debug for BlockGuard<Block, BE> {
433 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
434 write!(f, "BlockGuard hash {:?} with_runtime {:?}", self.hash, self.with_runtime)
435 }
436}
437
438impl<Block: BlockT, BE: Backend<Block>> BlockGuard<Block, BE> {
439 fn new(
441 hash: Block::Hash,
442 with_runtime: bool,
443 response_sender: FollowEventSender<Block::Hash>,
444 operation: RegisteredOperation,
445 backend: Arc<BE>,
446 ) -> Result<Self, SubscriptionManagementError> {
447 backend
448 .pin_block(hash)
449 .map_err(|err| SubscriptionManagementError::Custom(err.to_string()))?;
450
451 Ok(Self { hash, with_runtime, response_sender, operation, backend })
452 }
453
454 pub fn has_runtime(&self) -> bool {
456 self.with_runtime
457 }
458
459 pub fn response_sender(&self) -> FollowEventSender<Block::Hash> {
461 self.response_sender.clone()
462 }
463
464 pub fn operation(&mut self) -> &mut RegisteredOperation {
466 &mut self.operation
467 }
468}
469
470impl<Block: BlockT, BE: Backend<Block>> Drop for BlockGuard<Block, BE> {
471 fn drop(&mut self) {
472 self.backend.unpin_block(self.hash);
473 }
474}
475
476pub struct InsertedSubscriptionData<Block: BlockT> {
479 pub rx_stop: oneshot::Receiver<()>,
481 pub response_receiver: FollowEventReceiver<Block::Hash>,
483}
484
485pub struct SubscriptionsInner<Block: BlockT, BE: Backend<Block>> {
486 global_blocks: HashMap<Block::Hash, usize>,
491 global_max_pinned_blocks: usize,
493 local_max_pin_duration: Duration,
495 max_ongoing_operations: usize,
497 subs: HashMap<String, SubscriptionState<Block>>,
499
500 backend: Arc<BE>,
504}
505
506impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
507 pub fn new(
509 global_max_pinned_blocks: usize,
510 local_max_pin_duration: Duration,
511 max_ongoing_operations: usize,
512 backend: Arc<BE>,
513 ) -> Self {
514 SubscriptionsInner {
515 global_blocks: Default::default(),
516 global_max_pinned_blocks,
517 local_max_pin_duration,
518 max_ongoing_operations,
519 subs: Default::default(),
520 backend,
521 }
522 }
523
524 pub fn insert_subscription(
526 &mut self,
527 sub_id: String,
528 with_runtime: bool,
529 ) -> Option<InsertedSubscriptionData<Block>> {
530 if let Entry::Vacant(entry) = self.subs.entry(sub_id) {
531 let (tx_stop, rx_stop) = oneshot::channel();
532 let (response_sender, response_receiver) =
533 futures::channel::mpsc::channel(BUF_CAP_PER_SUBSCRIPTION);
534 let state = SubscriptionState::<Block> {
535 with_runtime,
536 tx_stop: Some(tx_stop),
537 response_sender,
538 blocks: Default::default(),
539 operations: Operations::new(self.max_ongoing_operations),
540 };
541 entry.insert(state);
542
543 Some(InsertedSubscriptionData { rx_stop, response_receiver })
544 } else {
545 None
546 }
547 }
548
549 pub fn remove_subscription(&mut self, sub_id: &str) {
551 let Some(mut sub) = self.subs.remove(sub_id) else { return };
552
553 sub.stop();
555
556 for (hash, state) in sub.blocks.iter() {
557 if !state.state_machine.was_unpinned() {
558 self.global_unregister_block(*hash);
559 }
560 }
561 }
562
563 pub fn stop_all_subscriptions(&mut self) {
565 let to_remove: Vec<_> = self.subs.keys().map(|sub_id| sub_id.clone()).collect();
566
567 for sub_id in to_remove {
568 self.remove_subscription(&sub_id);
569 }
570 }
571
572 fn ensure_block_space(&mut self, request_sub_id: &str) -> bool {
584 if self.global_blocks.len() < self.global_max_pinned_blocks {
585 return false
586 }
587
588 let now = Instant::now();
591
592 let to_remove: Vec<_> = self
593 .subs
594 .iter_mut()
595 .filter_map(|(sub_id, sub)| {
596 let sub_time = sub.find_oldest_block_timestamp();
597 let should_remove = match now.checked_duration_since(sub_time) {
599 Some(duration) => duration > self.local_max_pin_duration,
600 None => true,
601 };
602 should_remove.then(|| sub_id.clone())
603 })
604 .collect();
605
606 let mut is_terminated = false;
607 for sub_id in to_remove {
608 if sub_id == request_sub_id {
609 is_terminated = true;
610 }
611 self.remove_subscription(&sub_id);
612 }
613
614 if self.global_blocks.len() < self.global_max_pinned_blocks {
616 return is_terminated
617 }
618
619 let to_remove: Vec<_> = self.subs.keys().map(|sub_id| sub_id.clone()).collect();
622 for sub_id in to_remove {
623 if sub_id == request_sub_id {
624 is_terminated = true;
625 }
626 self.remove_subscription(&sub_id);
627 }
628 return is_terminated
629 }
630
631 pub fn pin_block(
632 &mut self,
633 sub_id: &str,
634 hash: Block::Hash,
635 ) -> Result<bool, SubscriptionManagementError> {
636 let Some(sub) = self.subs.get_mut(sub_id) else {
637 return Err(SubscriptionManagementError::SubscriptionAbsent)
638 };
639
640 if !sub.register_block(hash) {
643 return Ok(false)
644 }
645
646 if !self.global_blocks.contains_key(&hash) {
648 if self.ensure_block_space(sub_id) {
650 return Err(SubscriptionManagementError::ExceededLimits)
651 }
652 }
653
654 self.global_register_block(hash)?;
655 Ok(true)
656 }
657
658 fn global_register_block(
663 &mut self,
664 hash: Block::Hash,
665 ) -> Result<(), SubscriptionManagementError> {
666 match self.global_blocks.entry(hash) {
667 Entry::Occupied(mut occupied) => {
668 *occupied.get_mut() += 1;
669 },
670 Entry::Vacant(vacant) => {
671 self.backend
672 .pin_block(hash)
673 .map_err(|err| SubscriptionManagementError::Custom(err.to_string()))?;
674
675 vacant.insert(1);
676 },
677 };
678 Ok(())
679 }
680
681 fn global_unregister_block(&mut self, hash: Block::Hash) {
687 if let Entry::Occupied(mut occupied) = self.global_blocks.entry(hash) {
688 let counter = occupied.get_mut();
689 if *counter == 1 {
690 self.backend.unpin_block(hash);
692 occupied.remove();
693 } else {
694 *counter -= 1;
695 }
696 }
697 }
698
699 fn ensure_hash_uniqueness(
701 hashes: impl IntoIterator<Item = Block::Hash> + Clone,
702 ) -> Result<(), SubscriptionManagementError> {
703 let mut set = HashSet::new();
704 hashes.into_iter().try_for_each(|hash| {
705 if !set.insert(hash) {
706 Err(SubscriptionManagementError::DuplicateHashes)
707 } else {
708 Ok(())
709 }
710 })
711 }
712
713 pub fn unpin_blocks(
714 &mut self,
715 sub_id: &str,
716 hashes: impl IntoIterator<Item = Block::Hash> + Clone,
717 ) -> Result<(), SubscriptionManagementError> {
718 Self::ensure_hash_uniqueness(hashes.clone())?;
719
720 let Some(sub) = self.subs.get_mut(sub_id) else {
721 return Err(SubscriptionManagementError::SubscriptionAbsent)
722 };
723
724 for hash in hashes.clone() {
727 if !sub.contains_block(hash) {
728 return Err(SubscriptionManagementError::BlockHashAbsent)
729 }
730 }
731
732 for hash in hashes.clone() {
737 sub.unregister_block(hash);
738 }
739
740 for hash in hashes {
742 self.global_unregister_block(hash);
743 }
744
745 Ok(())
746 }
747
748 pub fn lock_block(
749 &mut self,
750 sub_id: &str,
751 hash: Block::Hash,
752 to_reserve: usize,
753 ) -> Result<BlockGuard<Block, BE>, SubscriptionManagementError> {
754 let Some(sub) = self.subs.get_mut(sub_id) else {
755 return Err(SubscriptionManagementError::SubscriptionAbsent)
756 };
757
758 if !sub.contains_block(hash) {
759 return Err(SubscriptionManagementError::BlockHashAbsent)
760 }
761
762 let Some(operation) = sub.register_operation(to_reserve) else {
763 return Err(SubscriptionManagementError::ExceededLimits)
765 };
766
767 BlockGuard::new(
768 hash,
769 sub.with_runtime,
770 sub.response_sender.clone(),
771 operation,
772 self.backend.clone(),
773 )
774 }
775
776 pub fn get_operation(&mut self, sub_id: &str, id: &str) -> Option<OperationState> {
777 let state = self.subs.get(sub_id)?;
778 state.get_operation(id)
779 }
780}
781
782#[cfg(test)]
783mod tests {
784 use super::*;
785 use jsonrpsee::ConnectionId;
786 use sc_block_builder::BlockBuilderBuilder;
787 use sc_service::client::new_with_backend;
788 use sp_consensus::BlockOrigin;
789 use sp_core::{testing::TaskExecutor, H256};
790 use substrate_test_runtime_client::{
791 prelude::*,
792 runtime::{Block, RuntimeApi},
793 Client, ClientBlockImportExt, GenesisInit,
794 };
795
796 const MAX_OPERATIONS_PER_SUB: usize = 16;
798
799 fn init_backend() -> (
800 Arc<sc_client_api::in_mem::Backend<Block>>,
801 Arc<Client<sc_client_api::in_mem::Backend<Block>>>,
802 ) {
803 let backend = Arc::new(sc_client_api::in_mem::Backend::new());
804 let executor = substrate_test_runtime_client::WasmExecutor::default();
805 let client_config = sc_service::ClientConfig::default();
806 let genesis_block_builder = sc_service::GenesisBlockBuilder::new(
807 &substrate_test_runtime_client::GenesisParameters::default().genesis_storage(),
808 !client_config.no_genesis,
809 backend.clone(),
810 executor.clone(),
811 )
812 .unwrap();
813 let client = Arc::new(
814 new_with_backend::<_, _, Block, _, RuntimeApi>(
815 backend.clone(),
816 executor,
817 genesis_block_builder,
818 Box::new(TaskExecutor::new()),
819 None,
820 None,
821 client_config,
822 )
823 .unwrap(),
824 );
825 (backend, client)
826 }
827
828 fn produce_blocks(
829 client: Arc<Client<sc_client_api::in_mem::Backend<Block>>>,
830 num_blocks: usize,
831 ) -> Vec<<Block as BlockT>::Hash> {
832 let mut blocks = Vec::with_capacity(num_blocks);
833 let mut parent_hash = client.chain_info().genesis_hash;
834
835 for i in 0..num_blocks {
836 let block = BlockBuilderBuilder::new(&*client)
837 .on_parent_block(parent_hash)
838 .with_parent_block_number(i as u64)
839 .build()
840 .unwrap()
841 .build()
842 .unwrap()
843 .block;
844 parent_hash = block.header.hash();
845 futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
846 blocks.push(block.header.hash());
847 }
848
849 blocks
850 }
851
852 #[test]
853 fn block_state_machine_register_unpin() {
854 let mut state = BlockStateMachine::new();
855 assert_eq!(state, BlockStateMachine::Registered);
857
858 state.advance_register();
859 assert_eq!(state, BlockStateMachine::FullyRegistered);
860
861 state.advance_register();
863 assert_eq!(state, BlockStateMachine::FullyRegistered);
864
865 assert!(!state.was_unpinned());
866 state.advance_unpin();
867 assert_eq!(state, BlockStateMachine::FullyUnpinned);
868 assert!(state.was_unpinned());
869
870 state.advance_unpin();
872 assert_eq!(state, BlockStateMachine::FullyUnpinned);
873 assert!(state.was_unpinned());
874
875 state.advance_register();
877 assert_eq!(state, BlockStateMachine::FullyUnpinned);
878 }
879
880 #[test]
881 fn block_state_machine_unpin_register() {
882 let mut state = BlockStateMachine::new();
883 assert_eq!(state, BlockStateMachine::Registered);
885
886 assert!(!state.was_unpinned());
887 state.advance_unpin();
888 assert_eq!(state, BlockStateMachine::Unpinned);
889 assert!(state.was_unpinned());
890
891 state.advance_unpin();
893 assert_eq!(state, BlockStateMachine::Unpinned);
894 assert!(state.was_unpinned());
895
896 state.advance_register();
897 assert_eq!(state, BlockStateMachine::FullyUnpinned);
898 assert!(state.was_unpinned());
899
900 state.advance_register();
902 assert_eq!(state, BlockStateMachine::FullyUnpinned);
903 state.advance_unpin();
905 assert_eq!(state, BlockStateMachine::FullyUnpinned);
906 assert!(state.was_unpinned());
907 }
908
909 #[test]
910 fn sub_state_register_twice() {
911 let (response_sender, _response_receiver) = futures::channel::mpsc::channel(1);
912 let mut sub_state = SubscriptionState::<Block> {
913 with_runtime: false,
914 tx_stop: None,
915 response_sender,
916 operations: Operations::new(MAX_OPERATIONS_PER_SUB),
917 blocks: Default::default(),
918 };
919
920 let hash = H256::random();
921 assert_eq!(sub_state.register_block(hash), true);
922 let block_state = sub_state.blocks.get(&hash).unwrap();
923 assert_eq!(block_state.state_machine, BlockStateMachine::Registered);
925
926 assert_eq!(sub_state.register_block(hash), false);
927 let block_state = sub_state.blocks.get(&hash).unwrap();
928 assert_eq!(block_state.state_machine, BlockStateMachine::FullyRegistered);
929
930 assert_eq!(sub_state.unregister_block(hash), true);
933 let block_state = sub_state.blocks.get(&hash);
934 assert!(block_state.is_none());
935 }
936
937 #[test]
938 fn sub_state_register_unregister() {
939 let (response_sender, _response_receiver) = futures::channel::mpsc::channel(1);
940 let mut sub_state = SubscriptionState::<Block> {
941 with_runtime: false,
942 tx_stop: None,
943 response_sender,
944 blocks: Default::default(),
945 operations: Operations::new(MAX_OPERATIONS_PER_SUB),
946 };
947
948 let hash = H256::random();
949 assert_eq!(sub_state.unregister_block(hash), false);
951
952 assert_eq!(sub_state.register_block(hash), true);
953 let block_state = sub_state.blocks.get(&hash).unwrap();
954 assert_eq!(block_state.state_machine, BlockStateMachine::Registered);
956
957 assert_eq!(sub_state.unregister_block(hash), true);
959 let block_state = sub_state.blocks.get(&hash).unwrap();
960 assert_eq!(block_state.state_machine, BlockStateMachine::Unpinned);
961
962 assert_eq!(sub_state.register_block(hash), false);
963 let block_state = sub_state.blocks.get(&hash);
964 assert!(block_state.is_none());
965
966 assert_eq!(sub_state.unregister_block(hash), false);
969 let block_state = sub_state.blocks.get(&hash);
970 assert!(block_state.is_none());
971 }
972
973 #[test]
974 fn unpin_duplicate_hashes() {
975 let (backend, client) = init_backend();
976
977 let hashes = produce_blocks(client, 3);
978 let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
979
980 let mut subs =
981 SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
982 let id_1 = "abc".to_string();
983 let id_2 = "abcd".to_string();
984
985 let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
987 assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
988 assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
989 assert_eq!(subs.pin_block(&id_1, hash_3).unwrap(), true);
990
991 let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
993 assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
994
995 assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
997 assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
998 assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
999
1000 let err = subs.unpin_blocks(&id_1, vec![hash_1, hash_1, hash_2, hash_2]).unwrap_err();
1002 assert_eq!(err, SubscriptionManagementError::DuplicateHashes);
1003
1004 assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
1006 assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
1007 assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
1008
1009 subs.unpin_blocks(&id_1, vec![hash_1, hash_2]).unwrap();
1011 assert_eq!(subs.global_blocks.get(&hash_1), None);
1012 assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 1);
1013 assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
1014 }
1015
1016 #[test]
1017 fn subscription_lock_block() {
1018 let builder = TestClientBuilder::new();
1019 let backend = builder.backend();
1020 let mut subs =
1021 SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
1022
1023 let id = "abc".to_string();
1024 let hash = H256::random();
1025
1026 let err = subs.lock_block(&id, hash, 1).unwrap_err();
1028 assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
1029
1030 let _stop = subs.insert_subscription(id.clone(), true).unwrap();
1031 assert!(subs.insert_subscription(id.clone(), true).is_none());
1033
1034 let err = subs.lock_block(&id, hash, 1).unwrap_err();
1036 assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);
1037
1038 subs.remove_subscription(&id);
1039
1040 let err = subs.lock_block(&id, hash, 1).unwrap_err();
1042 assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
1043 }
1044
1045 #[test]
1046 fn subscription_check_block() {
1047 let (backend, client) = init_backend();
1048
1049 let hashes = produce_blocks(client, 1);
1050 let hash = hashes[0];
1051
1052 let mut subs =
1053 SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
1054 let id = "abc".to_string();
1055
1056 let _stop = subs.insert_subscription(id.clone(), true).unwrap();
1057
1058 assert_eq!(subs.pin_block(&id, hash).unwrap(), true);
1060
1061 let block = subs.lock_block(&id, hash, 1).unwrap();
1062 assert_eq!(block.has_runtime(), true);
1064
1065 let invalid_id = "abc-invalid".to_string();
1066 let err = subs.unpin_blocks(&invalid_id, vec![hash]).unwrap_err();
1067 assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
1068
1069 subs.unpin_blocks(&id, vec![hash]).unwrap();
1071 let err = subs.lock_block(&id, hash, 1).unwrap_err();
1072 assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);
1073 }
1074
1075 #[test]
1076 fn subscription_ref_count() {
1077 let (backend, client) = init_backend();
1078
1079 let hashes = produce_blocks(client, 1);
1080 let hash = hashes[0];
1081
1082 let mut subs =
1083 SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
1084 let id = "abc".to_string();
1085
1086 let _stop = subs.insert_subscription(id.clone(), true).unwrap();
1087 assert_eq!(subs.pin_block(&id, hash).unwrap(), true);
1088 assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1);
1090 subs.subs.get(&id).unwrap().blocks.get(&hash).unwrap();
1092
1093 assert_eq!(subs.pin_block(&id, hash).unwrap(), false);
1095 assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1);
1097
1098 let id_second = "abcd".to_string();
1100 let _stop = subs.insert_subscription(id_second.clone(), true).unwrap();
1101 assert_eq!(subs.pin_block(&id_second, hash).unwrap(), true);
1102 assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 2);
1104 subs.subs.get(&id_second).unwrap().blocks.get(&hash).unwrap();
1106
1107 subs.unpin_blocks(&id, vec![hash]).unwrap();
1108 assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1);
1109 let err = subs.unpin_blocks(&id, vec![hash]).unwrap_err();
1111 assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);
1112
1113 subs.unpin_blocks(&id_second, vec![hash]).unwrap();
1114 assert!(subs.global_blocks.get(&hash).is_none());
1116 }
1117
1118 #[test]
1119 fn subscription_remove_subscription() {
1120 let (backend, client) = init_backend();
1121
1122 let hashes = produce_blocks(client, 3);
1123 let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
1124
1125 let mut subs =
1126 SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
1127 let id_1 = "abc".to_string();
1128 let id_2 = "abcd".to_string();
1129
1130 let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
1132 assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
1133 assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
1134 assert_eq!(subs.pin_block(&id_1, hash_3).unwrap(), true);
1135
1136 let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
1138 assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
1139
1140 assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
1142 assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
1143 assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
1144
1145 subs.remove_subscription(&id_1);
1146
1147 assert!(subs.global_blocks.get(&hash_1).is_none());
1148 assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 1);
1149 assert!(subs.global_blocks.get(&hash_3).is_none());
1150
1151 subs.remove_subscription(&id_2);
1152
1153 assert!(subs.global_blocks.get(&hash_2).is_none());
1154 assert_eq!(subs.global_blocks.len(), 0);
1155 }
1156
1157 #[test]
1158 fn subscription_check_limits() {
1159 let (backend, client) = init_backend();
1160
1161 let hashes = produce_blocks(client, 3);
1162 let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
1163
1164 let mut subs =
1166 SubscriptionsInner::new(2, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
1167 let id_1 = "abc".to_string();
1168 let id_2 = "abcd".to_string();
1169
1170 let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
1172 assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
1173 assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
1174
1175 let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
1176 assert_eq!(subs.pin_block(&id_2, hash_1).unwrap(), true);
1177 assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
1178
1179 assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 2);
1181 assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
1182
1183 let err = subs.pin_block(&id_1, hash_3).unwrap_err();
1187 assert_eq!(err, SubscriptionManagementError::ExceededLimits);
1188
1189 let err = subs.lock_block(&id_1, hash_1, 1).unwrap_err();
1191 assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
1192
1193 let err = subs.lock_block(&id_2, hash_1, 1).unwrap_err();
1194 assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
1195
1196 assert!(subs.global_blocks.get(&hash_1).is_none());
1197 assert!(subs.global_blocks.get(&hash_2).is_none());
1198 assert!(subs.global_blocks.get(&hash_3).is_none());
1199 assert_eq!(subs.global_blocks.len(), 0);
1200 }
1201
1202 #[test]
1203 fn subscription_check_limits_with_duration() {
1204 let (backend, client) = init_backend();
1205
1206 let hashes = produce_blocks(client, 3);
1207 let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
1208
1209 let mut subs =
1211 SubscriptionsInner::new(2, Duration::from_secs(5), MAX_OPERATIONS_PER_SUB, backend);
1212 let id_1 = "abc".to_string();
1213 let id_2 = "abcd".to_string();
1214
1215 let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
1216 assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
1217 assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
1218
1219 std::thread::sleep(std::time::Duration::from_secs(5));
1222
1223 let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
1224 assert_eq!(subs.pin_block(&id_2, hash_1).unwrap(), true);
1225
1226 assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 2);
1228 assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 1);
1229
1230 let err = subs.pin_block(&id_1, hash_3).unwrap_err();
1232 assert_eq!(err, SubscriptionManagementError::ExceededLimits);
1233
1234 let err = subs.lock_block(&id_1, hash_1, 1).unwrap_err();
1236 assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
1237
1238 let _block_guard = subs.lock_block(&id_2, hash_1, 1).unwrap();
1239
1240 assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
1241 assert!(subs.global_blocks.get(&hash_2).is_none());
1242 assert!(subs.global_blocks.get(&hash_3).is_none());
1243 assert_eq!(subs.global_blocks.len(), 1);
1244
1245 assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
1247 let err = subs.pin_block(&id_2, hash_3).unwrap_err();
1248 assert_eq!(err, SubscriptionManagementError::ExceededLimits);
1249
1250 assert!(subs.global_blocks.get(&hash_1).is_none());
1251 assert!(subs.global_blocks.get(&hash_2).is_none());
1252 assert!(subs.global_blocks.get(&hash_3).is_none());
1253 assert_eq!(subs.global_blocks.len(), 0);
1254 }
1255
1256 #[test]
1257 fn subscription_check_stop_event() {
1258 let builder = TestClientBuilder::new();
1259 let backend = builder.backend();
1260 let mut subs =
1261 SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
1262
1263 let id = "abc".to_string();
1264
1265 let mut sub_data = subs.insert_subscription(id.clone(), true).unwrap();
1266
1267 let res = sub_data.rx_stop.try_recv().unwrap();
1269 assert!(res.is_none());
1270
1271 let sub = subs.subs.get_mut(&id).unwrap();
1272 sub.stop();
1273
1274 let res = sub_data.rx_stop.try_recv().unwrap();
1276 assert!(res.is_some());
1277 }
1278
1279 #[test]
1280 fn ongoing_operations() {
1281 let ops = LimitOperations::new(2);
1283
1284 let permit_one = ops.reserve_at_most(1).unwrap();
1286 assert_eq!(permit_one.num_permits(), 1);
1287
1288 let permit_two = ops.reserve_at_most(2).unwrap();
1290 assert_eq!(permit_two.num_permits(), 1);
1292
1293 let permit = ops.reserve_at_most(1);
1295 assert!(permit.is_none());
1296
1297 drop(permit_two);
1299
1300 let permit_three = ops.reserve_at_most(1).unwrap();
1302 assert_eq!(permit_three.num_permits(), 1);
1303 }
1304
1305 #[test]
1306 fn stop_all_subscriptions() {
1307 let (backend, client) = init_backend();
1308
1309 let hashes = produce_blocks(client, 3);
1310 let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
1311
1312 let mut subs =
1313 SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
1314 let id_1 = "abc".to_string();
1315 let id_2 = "abcd".to_string();
1316
1317 let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
1319 assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
1320 assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
1321 assert_eq!(subs.pin_block(&id_1, hash_3).unwrap(), true);
1322
1323 let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
1325 assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
1326
1327 assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
1329 assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
1330 assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
1331 assert_eq!(subs.global_blocks.len(), 3);
1332
1333 subs.stop_all_subscriptions();
1335 assert!(subs.global_blocks.is_empty());
1336 }
1337
1338 #[test]
1339 fn reserved_subscription_cleans_resources() {
1340 let builder = TestClientBuilder::new();
1341 let backend = builder.backend();
1342 let subs = Arc::new(parking_lot::RwLock::new(SubscriptionsInner::new(
1343 10,
1344 Duration::from_secs(10),
1345 MAX_OPERATIONS_PER_SUB,
1346 backend,
1347 )));
1348
1349 let rpc_connections = crate::common::connections::RpcConnections::new(2);
1351
1352 let subscription_management =
1353 crate::chain_head::subscription::SubscriptionManagement::_from_inner(
1354 subs.clone(),
1355 rpc_connections.clone(),
1356 );
1357
1358 let reserved_sub_first =
1359 subscription_management.reserve_subscription(ConnectionId(1)).unwrap();
1360 let mut reserved_sub_second =
1361 subscription_management.reserve_subscription(ConnectionId(1)).unwrap();
1362 assert_eq!(subs.read().subs.len(), 0);
1364
1365 assert!(subscription_management.reserve_subscription(ConnectionId(1)).is_none());
1367 drop(reserved_sub_first);
1369 let mut reserved_sub_first =
1371 subscription_management.reserve_subscription(ConnectionId(1)).unwrap();
1372
1373 let _sub_data_first =
1375 reserved_sub_first.insert_subscription("sub1".to_string(), true).unwrap();
1376 let _sub_data_second =
1377 reserved_sub_second.insert_subscription("sub2".to_string(), true).unwrap();
1378 assert_eq!(subs.read().subs.len(), 2);
1380
1381 drop(reserved_sub_first);
1383 assert_eq!(subs.read().subs.len(), 1);
1385 let reserved_sub_first =
1387 subscription_management.reserve_subscription(ConnectionId(1)).unwrap();
1388
1389 drop(reserved_sub_first);
1391 drop(reserved_sub_second);
1392 assert_eq!(subs.read().subs.len(), 0);
1393 }
1394}