1use futures::channel::oneshot;
20use parking_lot::Mutex;
21use sc_client_api::Backend;
22use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
23use sp_runtime::traits::Block as BlockT;
24use std::{
25 collections::{hash_map::Entry, HashMap, HashSet},
26 sync::{atomic::AtomicBool, Arc},
27 time::{Duration, Instant},
28};
29
30use crate::chain_head::{subscription::SubscriptionManagementError, FollowEvent};
31
32const QUEUE_SIZE_WARNING: usize = 512;
34
35#[derive(Debug, Clone, PartialEq)]
62enum BlockStateMachine {
63 Registered,
67 FullyRegistered,
71 Unpinned,
75 FullyUnpinned,
79}
80
81impl BlockStateMachine {
82 fn new() -> Self {
83 BlockStateMachine::Registered
84 }
85
86 fn advance_register(&mut self) {
87 match self {
88 BlockStateMachine::Registered => *self = BlockStateMachine::FullyRegistered,
89 BlockStateMachine::Unpinned => *self = BlockStateMachine::FullyUnpinned,
90 _ => (),
91 }
92 }
93
94 fn advance_unpin(&mut self) {
95 match self {
96 BlockStateMachine::Registered => *self = BlockStateMachine::Unpinned,
97 BlockStateMachine::FullyRegistered => *self = BlockStateMachine::FullyUnpinned,
98 _ => (),
99 }
100 }
101
102 fn was_unpinned(&self) -> bool {
103 match self {
104 BlockStateMachine::Unpinned => true,
105 BlockStateMachine::FullyUnpinned => true,
106 _ => false,
107 }
108 }
109}
110
111struct LimitOperations {
113 semaphore: Arc<tokio::sync::Semaphore>,
115}
116
117impl LimitOperations {
118 fn new(max_operations: usize) -> Self {
120 LimitOperations { semaphore: Arc::new(tokio::sync::Semaphore::new(max_operations)) }
121 }
122
123 fn reserve_at_most(&self, to_reserve: usize) -> Option<PermitOperations> {
131 let num_ops = std::cmp::min(self.semaphore.available_permits(), to_reserve);
132
133 if num_ops == 0 {
134 return None
135 }
136
137 let permits = Arc::clone(&self.semaphore)
138 .try_acquire_many_owned(num_ops.try_into().ok()?)
139 .ok()?;
140
141 Some(PermitOperations { num_ops, _permit: permits })
142 }
143}
144
145struct PermitOperations {
152 num_ops: usize,
154 _permit: tokio::sync::OwnedSemaphorePermit,
156}
157
158#[derive(Clone)]
163pub struct OperationState {
164 shared_state: Arc<SharedOperationState>,
167 send_continue: tokio::sync::mpsc::Sender<()>,
169}
170
171impl OperationState {
172 pub fn submit_continue(&self) -> bool {
176 if !self.shared_state.requested_continue.load(std::sync::atomic::Ordering::Acquire) {
178 return false
179 }
180
181 self.send_continue.try_send(()).is_ok()
184 }
185
186 pub fn stop_operation(&self) {
191 if !self.shared_state.requested_continue.load(std::sync::atomic::Ordering::Acquire) {
193 return
194 }
195
196 self.shared_state
197 .operation_stopped
198 .store(true, std::sync::atomic::Ordering::Release);
199
200 let _ = self.send_continue.try_send(());
203 }
204}
205
206struct SharedOperationState {
209 requested_continue: AtomicBool,
211 operation_stopped: AtomicBool,
213}
214
215impl SharedOperationState {
216 fn new() -> Arc<Self> {
220 Arc::new(SharedOperationState {
221 requested_continue: AtomicBool::new(false),
222 operation_stopped: AtomicBool::new(false),
223 })
224 }
225}
226
227pub struct RegisteredOperation {
231 shared_state: Arc<SharedOperationState>,
234 recv_continue: tokio::sync::mpsc::Receiver<()>,
236 operation_id: String,
238 operations: Arc<Mutex<HashMap<String, OperationState>>>,
240 permit: PermitOperations,
242}
243
244impl RegisteredOperation {
245 pub async fn wait_for_continue(&mut self) {
248 self.shared_state
249 .requested_continue
250 .store(true, std::sync::atomic::Ordering::Release);
251
252 let _ = self.recv_continue.recv().await;
256
257 self.shared_state
258 .requested_continue
259 .store(false, std::sync::atomic::Ordering::Release);
260 }
261
262 pub fn was_stopped(&self) -> bool {
264 self.shared_state.operation_stopped.load(std::sync::atomic::Ordering::Acquire)
265 }
266
267 pub fn operation_id(&self) -> String {
269 self.operation_id.clone()
270 }
271
272 pub fn num_reserved(&self) -> usize {
276 self.permit.num_ops
277 }
278}
279
280impl Drop for RegisteredOperation {
281 fn drop(&mut self) {
282 let mut operations = self.operations.lock();
283 operations.remove(&self.operation_id);
284 }
285}
286
287struct Operations {
289 next_operation_id: usize,
291 limits: LimitOperations,
293 operations: Arc<Mutex<HashMap<String, OperationState>>>,
295}
296
297impl Operations {
298 fn new(max_operations: usize) -> Self {
300 Operations {
301 next_operation_id: 0,
302 limits: LimitOperations::new(max_operations),
303 operations: Default::default(),
304 }
305 }
306
307 pub fn register_operation(&mut self, to_reserve: usize) -> Option<RegisteredOperation> {
309 let permit = self.limits.reserve_at_most(to_reserve)?;
310
311 let operation_id = self.next_operation_id();
312
313 let (send_continue, recv_continue) = tokio::sync::mpsc::channel(1);
315 let shared_state = SharedOperationState::new();
316
317 let state = OperationState { send_continue, shared_state: shared_state.clone() };
318
319 let operations = self.operations.clone();
321 operations.lock().insert(operation_id.clone(), state);
322
323 Some(RegisteredOperation { shared_state, operation_id, recv_continue, operations, permit })
324 }
325
326 pub fn get_operation(&self, id: &str) -> Option<OperationState> {
328 self.operations.lock().get(id).map(|state| state.clone())
329 }
330
331 fn next_operation_id(&mut self) -> String {
333 let op_id = self.next_operation_id;
334 self.next_operation_id += 1;
335 op_id.to_string()
336 }
337}
338
339struct BlockState {
340 state_machine: BlockStateMachine,
342 timestamp: Instant,
344}
345
346struct SubscriptionState<Block: BlockT> {
348 with_runtime: bool,
350 tx_stop: Option<oneshot::Sender<()>>,
352 response_sender: TracingUnboundedSender<FollowEvent<Block::Hash>>,
356 operations: Operations,
358 blocks: HashMap<Block::Hash, BlockState>,
369}
370
371impl<Block: BlockT> SubscriptionState<Block> {
372 fn stop(&mut self) {
377 if let Some(tx_stop) = self.tx_stop.take() {
378 let _ = tx_stop.send(());
379 }
380 }
381
382 fn register_block(&mut self, hash: Block::Hash) -> bool {
390 match self.blocks.entry(hash) {
391 Entry::Occupied(mut occupied) => {
392 let block_state = occupied.get_mut();
393
394 block_state.state_machine.advance_register();
395 if block_state.state_machine == BlockStateMachine::FullyUnpinned {
397 occupied.remove();
398 }
399
400 false
402 },
403 Entry::Vacant(vacant) => {
404 vacant.insert(BlockState {
405 state_machine: BlockStateMachine::new(),
406 timestamp: Instant::now(),
407 });
408
409 true
411 },
412 }
413 }
414
415 fn unregister_block(&mut self, hash: Block::Hash) -> bool {
421 match self.blocks.entry(hash) {
422 Entry::Occupied(mut occupied) => {
423 let block_state = occupied.get_mut();
424
425 if block_state.state_machine.was_unpinned() {
427 return false
428 }
429
430 block_state.state_machine.advance_unpin();
431 if block_state.state_machine == BlockStateMachine::FullyUnpinned {
433 occupied.remove();
434 }
435
436 true
437 },
438 Entry::Vacant(_) => false,
440 }
441 }
442
443 fn contains_block(&self, hash: Block::Hash) -> bool {
448 let Some(state) = self.blocks.get(&hash) else {
449 return false
451 };
452
453 !state.state_machine.was_unpinned()
455 }
456
457 fn find_oldest_block_timestamp(&self) -> Instant {
463 let mut timestamp = Instant::now();
464 for (_, state) in self.blocks.iter() {
465 timestamp = std::cmp::min(timestamp, state.timestamp);
466 }
467 timestamp
468 }
469
470 fn register_operation(&mut self, to_reserve: usize) -> Option<RegisteredOperation> {
474 self.operations.register_operation(to_reserve)
475 }
476
477 pub fn get_operation(&self, id: &str) -> Option<OperationState> {
479 self.operations.get_operation(id)
480 }
481}
482
483pub struct BlockGuard<Block: BlockT, BE: Backend<Block>> {
487 hash: Block::Hash,
488 with_runtime: bool,
489 response_sender: TracingUnboundedSender<FollowEvent<Block::Hash>>,
490 operation: RegisteredOperation,
491 backend: Arc<BE>,
492}
493
494impl<Block: BlockT, BE: Backend<Block>> std::fmt::Debug for BlockGuard<Block, BE> {
497 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
498 write!(f, "BlockGuard hash {:?} with_runtime {:?}", self.hash, self.with_runtime)
499 }
500}
501
502impl<Block: BlockT, BE: Backend<Block>> BlockGuard<Block, BE> {
503 fn new(
505 hash: Block::Hash,
506 with_runtime: bool,
507 response_sender: TracingUnboundedSender<FollowEvent<Block::Hash>>,
508 operation: RegisteredOperation,
509 backend: Arc<BE>,
510 ) -> Result<Self, SubscriptionManagementError> {
511 backend
512 .pin_block(hash)
513 .map_err(|err| SubscriptionManagementError::Custom(err.to_string()))?;
514
515 Ok(Self { hash, with_runtime, response_sender, operation, backend })
516 }
517
518 pub fn has_runtime(&self) -> bool {
520 self.with_runtime
521 }
522
523 pub fn response_sender(&self) -> TracingUnboundedSender<FollowEvent<Block::Hash>> {
525 self.response_sender.clone()
526 }
527
528 pub fn operation(&mut self) -> &mut RegisteredOperation {
530 &mut self.operation
531 }
532}
533
534impl<Block: BlockT, BE: Backend<Block>> Drop for BlockGuard<Block, BE> {
535 fn drop(&mut self) {
536 self.backend.unpin_block(self.hash);
537 }
538}
539
540pub struct InsertedSubscriptionData<Block: BlockT> {
543 pub rx_stop: oneshot::Receiver<()>,
545 pub response_receiver: TracingUnboundedReceiver<FollowEvent<Block::Hash>>,
547}
548
549pub struct SubscriptionsInner<Block: BlockT, BE: Backend<Block>> {
550 global_blocks: HashMap<Block::Hash, usize>,
555 global_max_pinned_blocks: usize,
557 local_max_pin_duration: Duration,
559 max_ongoing_operations: usize,
561 subs: HashMap<String, SubscriptionState<Block>>,
563
564 backend: Arc<BE>,
568}
569
570impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
571 pub fn new(
573 global_max_pinned_blocks: usize,
574 local_max_pin_duration: Duration,
575 max_ongoing_operations: usize,
576 backend: Arc<BE>,
577 ) -> Self {
578 SubscriptionsInner {
579 global_blocks: Default::default(),
580 global_max_pinned_blocks,
581 local_max_pin_duration,
582 max_ongoing_operations,
583 subs: Default::default(),
584 backend,
585 }
586 }
587
588 pub fn insert_subscription(
590 &mut self,
591 sub_id: String,
592 with_runtime: bool,
593 ) -> Option<InsertedSubscriptionData<Block>> {
594 if let Entry::Vacant(entry) = self.subs.entry(sub_id) {
595 let (tx_stop, rx_stop) = oneshot::channel();
596 let (response_sender, response_receiver) =
597 tracing_unbounded("chain-head-method-responses", QUEUE_SIZE_WARNING);
598 let state = SubscriptionState::<Block> {
599 with_runtime,
600 tx_stop: Some(tx_stop),
601 response_sender,
602 blocks: Default::default(),
603 operations: Operations::new(self.max_ongoing_operations),
604 };
605 entry.insert(state);
606
607 Some(InsertedSubscriptionData { rx_stop, response_receiver })
608 } else {
609 None
610 }
611 }
612
613 pub fn remove_subscription(&mut self, sub_id: &str) {
615 let Some(mut sub) = self.subs.remove(sub_id) else { return };
616
617 sub.stop();
619
620 for (hash, state) in sub.blocks.iter() {
621 if !state.state_machine.was_unpinned() {
622 self.global_unregister_block(*hash);
623 }
624 }
625 }
626
627 pub fn stop_all_subscriptions(&mut self) {
629 let to_remove: Vec<_> = self.subs.keys().map(|sub_id| sub_id.clone()).collect();
630
631 for sub_id in to_remove {
632 self.remove_subscription(&sub_id);
633 }
634 }
635
636 fn ensure_block_space(&mut self, request_sub_id: &str) -> bool {
648 if self.global_blocks.len() < self.global_max_pinned_blocks {
649 return false
650 }
651
652 let now = Instant::now();
655
656 let to_remove: Vec<_> = self
657 .subs
658 .iter_mut()
659 .filter_map(|(sub_id, sub)| {
660 let sub_time = sub.find_oldest_block_timestamp();
661 let should_remove = match now.checked_duration_since(sub_time) {
663 Some(duration) => duration > self.local_max_pin_duration,
664 None => true,
665 };
666 should_remove.then(|| sub_id.clone())
667 })
668 .collect();
669
670 let mut is_terminated = false;
671 for sub_id in to_remove {
672 if sub_id == request_sub_id {
673 is_terminated = true;
674 }
675 self.remove_subscription(&sub_id);
676 }
677
678 if self.global_blocks.len() < self.global_max_pinned_blocks {
680 return is_terminated
681 }
682
683 let to_remove: Vec<_> = self.subs.keys().map(|sub_id| sub_id.clone()).collect();
686 for sub_id in to_remove {
687 if sub_id == request_sub_id {
688 is_terminated = true;
689 }
690 self.remove_subscription(&sub_id);
691 }
692 return is_terminated
693 }
694
695 pub fn pin_block(
696 &mut self,
697 sub_id: &str,
698 hash: Block::Hash,
699 ) -> Result<bool, SubscriptionManagementError> {
700 let Some(sub) = self.subs.get_mut(sub_id) else {
701 return Err(SubscriptionManagementError::SubscriptionAbsent)
702 };
703
704 if !sub.register_block(hash) {
707 return Ok(false)
708 }
709
710 if !self.global_blocks.contains_key(&hash) {
712 if self.ensure_block_space(sub_id) {
714 return Err(SubscriptionManagementError::ExceededLimits)
715 }
716 }
717
718 self.global_register_block(hash)?;
719 Ok(true)
720 }
721
722 fn global_register_block(
727 &mut self,
728 hash: Block::Hash,
729 ) -> Result<(), SubscriptionManagementError> {
730 match self.global_blocks.entry(hash) {
731 Entry::Occupied(mut occupied) => {
732 *occupied.get_mut() += 1;
733 },
734 Entry::Vacant(vacant) => {
735 self.backend
736 .pin_block(hash)
737 .map_err(|err| SubscriptionManagementError::Custom(err.to_string()))?;
738
739 vacant.insert(1);
740 },
741 };
742 Ok(())
743 }
744
745 fn global_unregister_block(&mut self, hash: Block::Hash) {
751 if let Entry::Occupied(mut occupied) = self.global_blocks.entry(hash) {
752 let counter = occupied.get_mut();
753 if *counter == 1 {
754 self.backend.unpin_block(hash);
756 occupied.remove();
757 } else {
758 *counter -= 1;
759 }
760 }
761 }
762
763 fn ensure_hash_uniqueness(
765 hashes: impl IntoIterator<Item = Block::Hash> + Clone,
766 ) -> Result<(), SubscriptionManagementError> {
767 let mut set = HashSet::new();
768 hashes.into_iter().try_for_each(|hash| {
769 if !set.insert(hash) {
770 Err(SubscriptionManagementError::DuplicateHashes)
771 } else {
772 Ok(())
773 }
774 })
775 }
776
777 pub fn unpin_blocks(
778 &mut self,
779 sub_id: &str,
780 hashes: impl IntoIterator<Item = Block::Hash> + Clone,
781 ) -> Result<(), SubscriptionManagementError> {
782 Self::ensure_hash_uniqueness(hashes.clone())?;
783
784 let Some(sub) = self.subs.get_mut(sub_id) else {
785 return Err(SubscriptionManagementError::SubscriptionAbsent)
786 };
787
788 for hash in hashes.clone() {
791 if !sub.contains_block(hash) {
792 return Err(SubscriptionManagementError::BlockHashAbsent)
793 }
794 }
795
796 for hash in hashes.clone() {
801 sub.unregister_block(hash);
802 }
803
804 for hash in hashes {
806 self.global_unregister_block(hash);
807 }
808
809 Ok(())
810 }
811
812 pub fn lock_block(
813 &mut self,
814 sub_id: &str,
815 hash: Block::Hash,
816 to_reserve: usize,
817 ) -> Result<BlockGuard<Block, BE>, SubscriptionManagementError> {
818 let Some(sub) = self.subs.get_mut(sub_id) else {
819 return Err(SubscriptionManagementError::SubscriptionAbsent)
820 };
821
822 if !sub.contains_block(hash) {
823 return Err(SubscriptionManagementError::BlockHashAbsent)
824 }
825
826 let Some(operation) = sub.register_operation(to_reserve) else {
827 return Err(SubscriptionManagementError::ExceededLimits)
829 };
830
831 BlockGuard::new(
832 hash,
833 sub.with_runtime,
834 sub.response_sender.clone(),
835 operation,
836 self.backend.clone(),
837 )
838 }
839
840 pub fn get_operation(&mut self, sub_id: &str, id: &str) -> Option<OperationState> {
841 let state = self.subs.get(sub_id)?;
842 state.get_operation(id)
843 }
844}
845
846#[cfg(test)]
847mod tests {
848 use super::*;
849 use jsonrpsee::ConnectionId;
850 use sc_block_builder::BlockBuilderBuilder;
851 use sc_service::client::new_in_mem;
852 use sp_consensus::BlockOrigin;
853 use sp_core::{testing::TaskExecutor, H256};
854 use substrate_test_runtime_client::{
855 prelude::*,
856 runtime::{Block, RuntimeApi},
857 Client, ClientBlockImportExt, GenesisInit,
858 };
859
860 const MAX_OPERATIONS_PER_SUB: usize = 16;
862
863 fn init_backend() -> (
864 Arc<sc_client_api::in_mem::Backend<Block>>,
865 Arc<Client<sc_client_api::in_mem::Backend<Block>>>,
866 ) {
867 let backend = Arc::new(sc_client_api::in_mem::Backend::new());
868 let executor = substrate_test_runtime_client::WasmExecutor::default();
869 let client_config = sc_service::ClientConfig::default();
870 let genesis_block_builder = sc_service::GenesisBlockBuilder::new(
871 &substrate_test_runtime_client::GenesisParameters::default().genesis_storage(),
872 !client_config.no_genesis,
873 backend.clone(),
874 executor.clone(),
875 )
876 .unwrap();
877 let client = Arc::new(
878 new_in_mem::<_, Block, _, RuntimeApi>(
879 backend.clone(),
880 executor,
881 genesis_block_builder,
882 None,
883 None,
884 Box::new(TaskExecutor::new()),
885 client_config,
886 )
887 .unwrap(),
888 );
889 (backend, client)
890 }
891
892 fn produce_blocks(
893 client: Arc<Client<sc_client_api::in_mem::Backend<Block>>>,
894 num_blocks: usize,
895 ) -> Vec<<Block as BlockT>::Hash> {
896 let mut blocks = Vec::with_capacity(num_blocks);
897 let mut parent_hash = client.chain_info().genesis_hash;
898
899 for i in 0..num_blocks {
900 let block = BlockBuilderBuilder::new(&*client)
901 .on_parent_block(parent_hash)
902 .with_parent_block_number(i as u64)
903 .build()
904 .unwrap()
905 .build()
906 .unwrap()
907 .block;
908 parent_hash = block.header.hash();
909 futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
910 blocks.push(block.header.hash());
911 }
912
913 blocks
914 }
915
916 #[test]
917 fn block_state_machine_register_unpin() {
918 let mut state = BlockStateMachine::new();
919 assert_eq!(state, BlockStateMachine::Registered);
921
922 state.advance_register();
923 assert_eq!(state, BlockStateMachine::FullyRegistered);
924
925 state.advance_register();
927 assert_eq!(state, BlockStateMachine::FullyRegistered);
928
929 assert!(!state.was_unpinned());
930 state.advance_unpin();
931 assert_eq!(state, BlockStateMachine::FullyUnpinned);
932 assert!(state.was_unpinned());
933
934 state.advance_unpin();
936 assert_eq!(state, BlockStateMachine::FullyUnpinned);
937 assert!(state.was_unpinned());
938
939 state.advance_register();
941 assert_eq!(state, BlockStateMachine::FullyUnpinned);
942 }
943
944 #[test]
945 fn block_state_machine_unpin_register() {
946 let mut state = BlockStateMachine::new();
947 assert_eq!(state, BlockStateMachine::Registered);
949
950 assert!(!state.was_unpinned());
951 state.advance_unpin();
952 assert_eq!(state, BlockStateMachine::Unpinned);
953 assert!(state.was_unpinned());
954
955 state.advance_unpin();
957 assert_eq!(state, BlockStateMachine::Unpinned);
958 assert!(state.was_unpinned());
959
960 state.advance_register();
961 assert_eq!(state, BlockStateMachine::FullyUnpinned);
962 assert!(state.was_unpinned());
963
964 state.advance_register();
966 assert_eq!(state, BlockStateMachine::FullyUnpinned);
967 state.advance_unpin();
969 assert_eq!(state, BlockStateMachine::FullyUnpinned);
970 assert!(state.was_unpinned());
971 }
972
973 #[test]
974 fn sub_state_register_twice() {
975 let (response_sender, _response_receiver) =
976 tracing_unbounded("test-chain-head-method-responses", QUEUE_SIZE_WARNING);
977 let mut sub_state = SubscriptionState::<Block> {
978 with_runtime: false,
979 tx_stop: None,
980 response_sender,
981 operations: Operations::new(MAX_OPERATIONS_PER_SUB),
982 blocks: Default::default(),
983 };
984
985 let hash = H256::random();
986 assert_eq!(sub_state.register_block(hash), true);
987 let block_state = sub_state.blocks.get(&hash).unwrap();
988 assert_eq!(block_state.state_machine, BlockStateMachine::Registered);
990
991 assert_eq!(sub_state.register_block(hash), false);
992 let block_state = sub_state.blocks.get(&hash).unwrap();
993 assert_eq!(block_state.state_machine, BlockStateMachine::FullyRegistered);
994
995 assert_eq!(sub_state.unregister_block(hash), true);
998 let block_state = sub_state.blocks.get(&hash);
999 assert!(block_state.is_none());
1000 }
1001
1002 #[test]
1003 fn sub_state_register_unregister() {
1004 let (response_sender, _response_receiver) =
1005 tracing_unbounded("test-chain-head-method-responses", QUEUE_SIZE_WARNING);
1006 let mut sub_state = SubscriptionState::<Block> {
1007 with_runtime: false,
1008 tx_stop: None,
1009 response_sender,
1010 blocks: Default::default(),
1011 operations: Operations::new(MAX_OPERATIONS_PER_SUB),
1012 };
1013
1014 let hash = H256::random();
1015 assert_eq!(sub_state.unregister_block(hash), false);
1017
1018 assert_eq!(sub_state.register_block(hash), true);
1019 let block_state = sub_state.blocks.get(&hash).unwrap();
1020 assert_eq!(block_state.state_machine, BlockStateMachine::Registered);
1022
1023 assert_eq!(sub_state.unregister_block(hash), true);
1025 let block_state = sub_state.blocks.get(&hash).unwrap();
1026 assert_eq!(block_state.state_machine, BlockStateMachine::Unpinned);
1027
1028 assert_eq!(sub_state.register_block(hash), false);
1029 let block_state = sub_state.blocks.get(&hash);
1030 assert!(block_state.is_none());
1031
1032 assert_eq!(sub_state.unregister_block(hash), false);
1035 let block_state = sub_state.blocks.get(&hash);
1036 assert!(block_state.is_none());
1037 }
1038
1039 #[test]
1040 fn unpin_duplicate_hashes() {
1041 let (backend, client) = init_backend();
1042
1043 let hashes = produce_blocks(client, 3);
1044 let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
1045
1046 let mut subs =
1047 SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
1048 let id_1 = "abc".to_string();
1049 let id_2 = "abcd".to_string();
1050
1051 let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
1053 assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
1054 assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
1055 assert_eq!(subs.pin_block(&id_1, hash_3).unwrap(), true);
1056
1057 let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
1059 assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
1060
1061 assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
1063 assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
1064 assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
1065
1066 let err = subs.unpin_blocks(&id_1, vec![hash_1, hash_1, hash_2, hash_2]).unwrap_err();
1068 assert_eq!(err, SubscriptionManagementError::DuplicateHashes);
1069
1070 assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
1072 assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
1073 assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
1074
1075 subs.unpin_blocks(&id_1, vec![hash_1, hash_2]).unwrap();
1077 assert_eq!(subs.global_blocks.get(&hash_1), None);
1078 assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 1);
1079 assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
1080 }
1081
1082 #[test]
1083 fn subscription_lock_block() {
1084 let builder = TestClientBuilder::new();
1085 let backend = builder.backend();
1086 let mut subs =
1087 SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
1088
1089 let id = "abc".to_string();
1090 let hash = H256::random();
1091
1092 let err = subs.lock_block(&id, hash, 1).unwrap_err();
1094 assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
1095
1096 let _stop = subs.insert_subscription(id.clone(), true).unwrap();
1097 assert!(subs.insert_subscription(id.clone(), true).is_none());
1099
1100 let err = subs.lock_block(&id, hash, 1).unwrap_err();
1102 assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);
1103
1104 subs.remove_subscription(&id);
1105
1106 let err = subs.lock_block(&id, hash, 1).unwrap_err();
1108 assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
1109 }
1110
1111 #[test]
1112 fn subscription_check_block() {
1113 let (backend, client) = init_backend();
1114
1115 let hashes = produce_blocks(client, 1);
1116 let hash = hashes[0];
1117
1118 let mut subs =
1119 SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
1120 let id = "abc".to_string();
1121
1122 let _stop = subs.insert_subscription(id.clone(), true).unwrap();
1123
1124 assert_eq!(subs.pin_block(&id, hash).unwrap(), true);
1126
1127 let block = subs.lock_block(&id, hash, 1).unwrap();
1128 assert_eq!(block.has_runtime(), true);
1130
1131 let invalid_id = "abc-invalid".to_string();
1132 let err = subs.unpin_blocks(&invalid_id, vec![hash]).unwrap_err();
1133 assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
1134
1135 subs.unpin_blocks(&id, vec![hash]).unwrap();
1137 let err = subs.lock_block(&id, hash, 1).unwrap_err();
1138 assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);
1139 }
1140
1141 #[test]
1142 fn subscription_ref_count() {
1143 let (backend, client) = init_backend();
1144
1145 let hashes = produce_blocks(client, 1);
1146 let hash = hashes[0];
1147
1148 let mut subs =
1149 SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
1150 let id = "abc".to_string();
1151
1152 let _stop = subs.insert_subscription(id.clone(), true).unwrap();
1153 assert_eq!(subs.pin_block(&id, hash).unwrap(), true);
1154 assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1);
1156 subs.subs.get(&id).unwrap().blocks.get(&hash).unwrap();
1158
1159 assert_eq!(subs.pin_block(&id, hash).unwrap(), false);
1161 assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1);
1163
1164 let id_second = "abcd".to_string();
1166 let _stop = subs.insert_subscription(id_second.clone(), true).unwrap();
1167 assert_eq!(subs.pin_block(&id_second, hash).unwrap(), true);
1168 assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 2);
1170 subs.subs.get(&id_second).unwrap().blocks.get(&hash).unwrap();
1172
1173 subs.unpin_blocks(&id, vec![hash]).unwrap();
1174 assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1);
1175 let err = subs.unpin_blocks(&id, vec![hash]).unwrap_err();
1177 assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);
1178
1179 subs.unpin_blocks(&id_second, vec![hash]).unwrap();
1180 assert!(subs.global_blocks.get(&hash).is_none());
1182 }
1183
1184 #[test]
1185 fn subscription_remove_subscription() {
1186 let (backend, client) = init_backend();
1187
1188 let hashes = produce_blocks(client, 3);
1189 let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
1190
1191 let mut subs =
1192 SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
1193 let id_1 = "abc".to_string();
1194 let id_2 = "abcd".to_string();
1195
1196 let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
1198 assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
1199 assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
1200 assert_eq!(subs.pin_block(&id_1, hash_3).unwrap(), true);
1201
1202 let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
1204 assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
1205
1206 assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
1208 assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
1209 assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
1210
1211 subs.remove_subscription(&id_1);
1212
1213 assert!(subs.global_blocks.get(&hash_1).is_none());
1214 assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 1);
1215 assert!(subs.global_blocks.get(&hash_3).is_none());
1216
1217 subs.remove_subscription(&id_2);
1218
1219 assert!(subs.global_blocks.get(&hash_2).is_none());
1220 assert_eq!(subs.global_blocks.len(), 0);
1221 }
1222
1223 #[test]
1224 fn subscription_check_limits() {
1225 let (backend, client) = init_backend();
1226
1227 let hashes = produce_blocks(client, 3);
1228 let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
1229
1230 let mut subs =
1232 SubscriptionsInner::new(2, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
1233 let id_1 = "abc".to_string();
1234 let id_2 = "abcd".to_string();
1235
1236 let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
1238 assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
1239 assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
1240
1241 let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
1242 assert_eq!(subs.pin_block(&id_2, hash_1).unwrap(), true);
1243 assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
1244
1245 assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 2);
1247 assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
1248
1249 let err = subs.pin_block(&id_1, hash_3).unwrap_err();
1253 assert_eq!(err, SubscriptionManagementError::ExceededLimits);
1254
1255 let err = subs.lock_block(&id_1, hash_1, 1).unwrap_err();
1257 assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
1258
1259 let err = subs.lock_block(&id_2, hash_1, 1).unwrap_err();
1260 assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
1261
1262 assert!(subs.global_blocks.get(&hash_1).is_none());
1263 assert!(subs.global_blocks.get(&hash_2).is_none());
1264 assert!(subs.global_blocks.get(&hash_3).is_none());
1265 assert_eq!(subs.global_blocks.len(), 0);
1266 }
1267
1268 #[test]
1269 fn subscription_check_limits_with_duration() {
1270 let (backend, client) = init_backend();
1271
1272 let hashes = produce_blocks(client, 3);
1273 let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
1274
1275 let mut subs =
1277 SubscriptionsInner::new(2, Duration::from_secs(5), MAX_OPERATIONS_PER_SUB, backend);
1278 let id_1 = "abc".to_string();
1279 let id_2 = "abcd".to_string();
1280
1281 let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
1282 assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
1283 assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
1284
1285 std::thread::sleep(std::time::Duration::from_secs(5));
1288
1289 let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
1290 assert_eq!(subs.pin_block(&id_2, hash_1).unwrap(), true);
1291
1292 assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 2);
1294 assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 1);
1295
1296 let err = subs.pin_block(&id_1, hash_3).unwrap_err();
1298 assert_eq!(err, SubscriptionManagementError::ExceededLimits);
1299
1300 let err = subs.lock_block(&id_1, hash_1, 1).unwrap_err();
1302 assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
1303
1304 let _block_guard = subs.lock_block(&id_2, hash_1, 1).unwrap();
1305
1306 assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
1307 assert!(subs.global_blocks.get(&hash_2).is_none());
1308 assert!(subs.global_blocks.get(&hash_3).is_none());
1309 assert_eq!(subs.global_blocks.len(), 1);
1310
1311 assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
1313 let err = subs.pin_block(&id_2, hash_3).unwrap_err();
1314 assert_eq!(err, SubscriptionManagementError::ExceededLimits);
1315
1316 assert!(subs.global_blocks.get(&hash_1).is_none());
1317 assert!(subs.global_blocks.get(&hash_2).is_none());
1318 assert!(subs.global_blocks.get(&hash_3).is_none());
1319 assert_eq!(subs.global_blocks.len(), 0);
1320 }
1321
1322 #[test]
1323 fn subscription_check_stop_event() {
1324 let builder = TestClientBuilder::new();
1325 let backend = builder.backend();
1326 let mut subs =
1327 SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
1328
1329 let id = "abc".to_string();
1330
1331 let mut sub_data = subs.insert_subscription(id.clone(), true).unwrap();
1332
1333 let res = sub_data.rx_stop.try_recv().unwrap();
1335 assert!(res.is_none());
1336
1337 let sub = subs.subs.get_mut(&id).unwrap();
1338 sub.stop();
1339
1340 let res = sub_data.rx_stop.try_recv().unwrap();
1342 assert!(res.is_some());
1343 }
1344
1345 #[test]
1346 fn ongoing_operations() {
1347 let ops = LimitOperations::new(2);
1349
1350 let permit_one = ops.reserve_at_most(1).unwrap();
1352 assert_eq!(permit_one.num_ops, 1);
1353
1354 let permit_two = ops.reserve_at_most(2).unwrap();
1356 assert_eq!(permit_two.num_ops, 1);
1358
1359 let permit = ops.reserve_at_most(1);
1361 assert!(permit.is_none());
1362
1363 drop(permit_two);
1365
1366 let permit_three = ops.reserve_at_most(1).unwrap();
1368 assert_eq!(permit_three.num_ops, 1);
1369 }
1370
1371 #[test]
1372 fn stop_all_subscriptions() {
1373 let (backend, client) = init_backend();
1374
1375 let hashes = produce_blocks(client, 3);
1376 let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
1377
1378 let mut subs =
1379 SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
1380 let id_1 = "abc".to_string();
1381 let id_2 = "abcd".to_string();
1382
1383 let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
1385 assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
1386 assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
1387 assert_eq!(subs.pin_block(&id_1, hash_3).unwrap(), true);
1388
1389 let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
1391 assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
1392
1393 assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
1395 assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
1396 assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
1397 assert_eq!(subs.global_blocks.len(), 3);
1398
1399 subs.stop_all_subscriptions();
1401 assert!(subs.global_blocks.is_empty());
1402 }
1403
1404 #[test]
1405 fn reserved_subscription_cleans_resources() {
1406 let builder = TestClientBuilder::new();
1407 let backend = builder.backend();
1408 let subs = Arc::new(parking_lot::RwLock::new(SubscriptionsInner::new(
1409 10,
1410 Duration::from_secs(10),
1411 MAX_OPERATIONS_PER_SUB,
1412 backend,
1413 )));
1414
1415 let rpc_connections = crate::common::connections::RpcConnections::new(2);
1417
1418 let subscription_management =
1419 crate::chain_head::subscription::SubscriptionManagement::_from_inner(
1420 subs.clone(),
1421 rpc_connections.clone(),
1422 );
1423
1424 let reserved_sub_first =
1425 subscription_management.reserve_subscription(ConnectionId(1)).unwrap();
1426 let mut reserved_sub_second =
1427 subscription_management.reserve_subscription(ConnectionId(1)).unwrap();
1428 assert_eq!(subs.read().subs.len(), 0);
1430
1431 assert!(subscription_management.reserve_subscription(ConnectionId(1)).is_none());
1433 drop(reserved_sub_first);
1435 let mut reserved_sub_first =
1437 subscription_management.reserve_subscription(ConnectionId(1)).unwrap();
1438
1439 let _sub_data_first =
1441 reserved_sub_first.insert_subscription("sub1".to_string(), true).unwrap();
1442 let _sub_data_second =
1443 reserved_sub_second.insert_subscription("sub2".to_string(), true).unwrap();
1444 assert_eq!(subs.read().subs.len(), 2);
1446
1447 drop(reserved_sub_first);
1449 assert_eq!(subs.read().subs.len(), 1);
1451 let reserved_sub_first =
1453 subscription_management.reserve_subscription(ConnectionId(1)).unwrap();
1454
1455 drop(reserved_sub_first);
1457 drop(reserved_sub_second);
1458 assert_eq!(subs.read().subs.len(), 0);
1459 }
1460}