referrerpolicy=no-referrer-when-downgrade

sc_rpc_spec_v2/chain_head/subscription/
inner.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use futures::channel::oneshot;
20use parking_lot::Mutex;
21use sc_client_api::Backend;
22use sp_runtime::traits::Block as BlockT;
23use std::{
24	collections::{hash_map::Entry, HashMap, HashSet},
25	sync::Arc,
26	time::{Duration, Instant},
27};
28
29use crate::chain_head::{
30	subscription::SubscriptionManagementError, FollowEventReceiver, FollowEventSender,
31};
32
33type NotifyOnDrop = tokio::sync::mpsc::Receiver<()>;
34type SharedOperations = Arc<Mutex<HashMap<String, (NotifyOnDrop, StopHandle)>>>;
35
36/// The buffer capacity for each subscription
37///
38/// Beware of that the JSON-RPC server has a global
39/// buffer per connection and this a extra buffer.
40const BUF_CAP_PER_SUBSCRIPTION: usize = 16;
41
42/// The state machine of a block of a single subscription ID.
43///
44/// # Motivation
45///
46/// Each block is registered twice: once from the `BestBlock` event
47/// and once from the `Finalized` event.
48///
49/// The state of a block must be tracked until both events register the
50/// block and the user calls `unpin`.
51///
52/// Otherwise, the following race might happen:
53///  T0. BestBlock event: hash is tracked and pinned in backend.
54///  T1. User calls unpin: hash is untracked and unpinned in backend.
55///  T2. Finalized event: hash is tracked (no previous history) and pinned again.
56///
57/// # State Machine Transition
58///
59/// ```ignore
60///                   (register)
61/// [ REGISTERED ]  ---------------> [ FULLY REGISTERED ]
62///       |                              |
63///       | (unpin)                      | (unpin)
64///       |                              |
65///       V           (register)         V
66/// [ UNPINNED ]  -----------------> [ FULLY UNPINNED ]
67/// ```
68#[derive(Debug, Clone, PartialEq)]
69enum BlockStateMachine {
70	/// The block was registered by one event (either `Finalized` or `BestBlock` event).
71	///
72	/// Unpin was not called.
73	Registered,
74	/// The block was registered by both events (`Finalized` and `BestBlock` events).
75	///
76	/// Unpin was not called.
77	FullyRegistered,
78	/// The block was registered by one event (either `Finalized` or `BestBlock` event),
79	///
80	/// Unpin __was__ called.
81	Unpinned,
82	/// The block was registered by both events (`Finalized` and `BestBlock` events).
83	///
84	/// Unpin __was__ called.
85	FullyUnpinned,
86}
87
88impl BlockStateMachine {
89	fn new() -> Self {
90		BlockStateMachine::Registered
91	}
92
93	fn advance_register(&mut self) {
94		match self {
95			BlockStateMachine::Registered => *self = BlockStateMachine::FullyRegistered,
96			BlockStateMachine::Unpinned => *self = BlockStateMachine::FullyUnpinned,
97			_ => (),
98		}
99	}
100
101	fn advance_unpin(&mut self) {
102		match self {
103			BlockStateMachine::Registered => *self = BlockStateMachine::Unpinned,
104			BlockStateMachine::FullyRegistered => *self = BlockStateMachine::FullyUnpinned,
105			_ => (),
106		}
107	}
108
109	fn was_unpinned(&self) -> bool {
110		match self {
111			BlockStateMachine::Unpinned => true,
112			BlockStateMachine::FullyUnpinned => true,
113			_ => false,
114		}
115	}
116}
117
118/// Limit the number of ongoing operations across methods.
119struct LimitOperations {
120	/// Limit the number of ongoing operations for this subscription.
121	semaphore: Arc<tokio::sync::Semaphore>,
122}
123
124impl LimitOperations {
125	/// Constructs a new [`LimitOperations`].
126	fn new(max_operations: usize) -> Self {
127		LimitOperations { semaphore: Arc::new(tokio::sync::Semaphore::new(max_operations)) }
128	}
129
130	/// Reserves capacity to execute at least one operation and at most the requested items.
131	///
132	/// Dropping [`PermitOperations`] without executing an operation will release
133	/// the reserved capacity.
134	///
135	/// Returns nothing if there's no space available, else returns a permit
136	/// that guarantees that at least one operation can be executed.
137	fn reserve_at_most(&self, to_reserve: usize) -> Option<PermitOperations> {
138		let num_ops = std::cmp::min(self.semaphore.available_permits(), to_reserve);
139
140		if num_ops == 0 {
141			return None
142		}
143
144		let permits = Arc::clone(&self.semaphore)
145			.try_acquire_many_owned(num_ops.try_into().ok()?)
146			.ok()?;
147
148		Some(permits)
149	}
150}
151
152/// Permits a number of operations to be executed.
153///
154/// [`PermitOperations`] are returned by [`LimitOperations::reserve()`] and are used
155/// to guarantee the RPC server can execute the number of operations.
156///
157/// The number of reserved items are given back to the [`LimitOperations`] on drop.
158type PermitOperations = tokio::sync::OwnedSemaphorePermit;
159
160/// Stop handle for the operation.
161#[derive(Clone)]
162pub struct StopHandle(tokio::sync::mpsc::Sender<()>);
163
164impl StopHandle {
165	pub async fn stopped(&self) {
166		self.0.closed().await;
167	}
168
169	pub fn is_stopped(&self) -> bool {
170		self.0.is_closed()
171	}
172}
173
174/// The shared operation state between the backend [`RegisteredOperation`] and frontend
175/// [`RegisteredOperation`].
176#[derive(Clone)]
177pub struct OperationState {
178	stop: StopHandle,
179	operations: SharedOperations,
180	operation_id: String,
181}
182
183impl OperationState {
184	pub fn stop(&mut self) {
185		if !self.stop.is_stopped() {
186			self.operations.lock().remove(&self.operation_id);
187		}
188	}
189}
190
191/// The registered operation passed to the `chainHead` methods.
192///
193/// This is used internally by the `chainHead` methods.
194pub struct RegisteredOperation {
195	/// Stop handle for the operation.
196	stop_handle: StopHandle,
197	/// Track the operations ID of this subscription.
198	operations: SharedOperations,
199	/// The operation ID of the request.
200	operation_id: String,
201	/// Permit a number of items to be executed by this operation.
202	_permit: PermitOperations,
203}
204
205impl RegisteredOperation {
206	/// Stop handle for the operation.
207	pub fn stop_handle(&self) -> &StopHandle {
208		&self.stop_handle
209	}
210
211	/// Get the operation ID.
212	pub fn operation_id(&self) -> String {
213		self.operation_id.clone()
214	}
215}
216
217impl Drop for RegisteredOperation {
218	fn drop(&mut self) {
219		self.operations.lock().remove(&self.operation_id);
220	}
221}
222
223/// The ongoing operations of a subscription.
224struct Operations {
225	/// The next operation ID to be generated.
226	next_operation_id: usize,
227	/// Limit the number of ongoing operations.
228	limits: LimitOperations,
229	/// Track the operations ID of this subscription.
230	operations: SharedOperations,
231}
232
233impl Operations {
234	/// Constructs a new [`Operations`].
235	fn new(max_operations: usize) -> Self {
236		Operations {
237			next_operation_id: 0,
238			limits: LimitOperations::new(max_operations),
239			operations: Default::default(),
240		}
241	}
242
243	/// Register a new operation.
244	pub fn register_operation(&mut self, to_reserve: usize) -> Option<RegisteredOperation> {
245		let permit = self.limits.reserve_at_most(to_reserve)?;
246		let operation_id = self.next_operation_id();
247
248		let (tx, rx) = tokio::sync::mpsc::channel(1);
249		let stop_handle = StopHandle(tx);
250		let operations = self.operations.clone();
251		operations.lock().insert(operation_id.clone(), (rx, stop_handle.clone()));
252
253		Some(RegisteredOperation { stop_handle, operation_id, operations, _permit: permit })
254	}
255
256	/// Get the associated operation state with the ID.
257	pub fn get_operation(&self, id: &str) -> Option<OperationState> {
258		let stop = self.operations.lock().get(id).map(|(_, stop)| stop.clone())?;
259
260		Some(OperationState {
261			stop,
262			operations: self.operations.clone(),
263			operation_id: id.to_string(),
264		})
265	}
266
267	/// Generate the next operation ID for this subscription.
268	fn next_operation_id(&mut self) -> String {
269		let op_id = self.next_operation_id;
270		self.next_operation_id += 1;
271		op_id.to_string()
272	}
273}
274
275struct BlockState {
276	/// The state machine of this block.
277	state_machine: BlockStateMachine,
278	/// The timestamp when the block was inserted.
279	timestamp: Instant,
280}
281
282/// The state of a single subscription ID.
283struct SubscriptionState<Block: BlockT> {
284	/// The `with_runtime` parameter flag of the subscription.
285	with_runtime: bool,
286	/// Signals the "Stop" event.
287	tx_stop: Option<oneshot::Sender<()>>,
288	/// The sender of message responses to the `chainHead_follow` events.
289	///
290	/// This object is cloned between methods.
291	response_sender: FollowEventSender<Block::Hash>,
292	/// The ongoing operations of a subscription.
293	operations: Operations,
294	/// Track the block hashes available for this subscription.
295	///
296	/// This implementation assumes:
297	/// - most of the time subscriptions keep a few blocks of the chain's head pinned
298	/// - iteration through the blocks happens only when the hard limit is exceeded.
299	///
300	/// Considering the assumption, iterating (in the unlike case) the hashmap O(N) is
301	/// more time efficient and code friendly than paying for:
302	/// - extra space: an extra BTreeMap<Instant, Hash> to older hashes by oldest insertion
303	/// - extra time: O(log(N)) for insert/remove/find each `pin` block time per subscriptions
304	blocks: HashMap<Block::Hash, BlockState>,
305}
306
307impl<Block: BlockT> SubscriptionState<Block> {
308	/// Trigger the stop event for the current subscription.
309	///
310	/// This can happen on internal failure (ie, the pruning deleted the block from memory)
311	/// or if the subscription exceeded the available pinned blocks.
312	fn stop(&mut self) {
313		if let Some(tx_stop) = self.tx_stop.take() {
314			let _ = tx_stop.send(());
315		}
316	}
317
318	/// Keep track of the given block hash for this subscription.
319	///
320	/// This does not handle pinning in the backend.
321	///
322	/// Returns:
323	/// - true if this is the first time that the block is registered
324	/// - false if the block was already registered
325	fn register_block(&mut self, hash: Block::Hash) -> bool {
326		match self.blocks.entry(hash) {
327			Entry::Occupied(mut occupied) => {
328				let block_state = occupied.get_mut();
329
330				block_state.state_machine.advance_register();
331				// Block was registered twice and unpin was called.
332				if block_state.state_machine == BlockStateMachine::FullyUnpinned {
333					occupied.remove();
334				}
335
336				// Second time we register this block.
337				false
338			},
339			Entry::Vacant(vacant) => {
340				vacant.insert(BlockState {
341					state_machine: BlockStateMachine::new(),
342					timestamp: Instant::now(),
343				});
344
345				// First time we register this block.
346				true
347			},
348		}
349	}
350
351	/// A block is unregistered when the user calls `unpin`.
352	///
353	/// Returns:
354	/// - true if the block can be unpinned.
355	/// - false if the subscription does not contain the block or it was unpinned.
356	fn unregister_block(&mut self, hash: Block::Hash) -> bool {
357		match self.blocks.entry(hash) {
358			Entry::Occupied(mut occupied) => {
359				let block_state = occupied.get_mut();
360
361				// Cannot unpin a block twice.
362				if block_state.state_machine.was_unpinned() {
363					return false
364				}
365
366				block_state.state_machine.advance_unpin();
367				// Block was registered twice and unpin was called.
368				if block_state.state_machine == BlockStateMachine::FullyUnpinned {
369					occupied.remove();
370				}
371
372				true
373			},
374			// Block was not tracked.
375			Entry::Vacant(_) => false,
376		}
377	}
378
379	/// A subscription contains a block when the block was
380	/// registered (`pin` was called) and the block was not `unpinned` yet.
381	///
382	/// Returns `true` if the subscription contains the block.
383	fn contains_block(&self, hash: Block::Hash) -> bool {
384		let Some(state) = self.blocks.get(&hash) else {
385			// Block was not tracked.
386			return false
387		};
388
389		// Subscription no longer contains the block if `unpin` was called.
390		!state.state_machine.was_unpinned()
391	}
392
393	/// Get the timestamp of the oldest inserted block.
394	///
395	/// # Note
396	///
397	/// This iterates over all the blocks of the subscription.
398	fn find_oldest_block_timestamp(&self) -> Instant {
399		let mut timestamp = Instant::now();
400		for (_, state) in self.blocks.iter() {
401			timestamp = std::cmp::min(timestamp, state.timestamp);
402		}
403		timestamp
404	}
405
406	/// Register a new operation.
407	///
408	/// The registered operation can execute at least one item and at most the requested items.
409	fn register_operation(&mut self, to_reserve: usize) -> Option<RegisteredOperation> {
410		self.operations.register_operation(to_reserve)
411	}
412
413	/// Get the associated operation state with the ID.
414	pub fn get_operation(&self, id: &str) -> Option<OperationState> {
415		self.operations.get_operation(id)
416	}
417}
418
419/// Keeps a specific block pinned while the handle is alive.
420/// This object ensures that the block is not unpinned while
421/// executing an RPC method call.
422pub struct BlockGuard<Block: BlockT, BE: Backend<Block>> {
423	hash: Block::Hash,
424	with_runtime: bool,
425	response_sender: FollowEventSender<Block::Hash>,
426	operation: RegisteredOperation,
427	backend: Arc<BE>,
428}
429
430// Custom implementation of Debug to avoid bounds on `backend: Debug` for `unwrap_err()` needed for
431// testing.
432impl<Block: BlockT, BE: Backend<Block>> std::fmt::Debug for BlockGuard<Block, BE> {
433	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
434		write!(f, "BlockGuard hash {:?} with_runtime {:?}", self.hash, self.with_runtime)
435	}
436}
437
438impl<Block: BlockT, BE: Backend<Block>> BlockGuard<Block, BE> {
439	/// Construct a new [`BlockGuard`] .
440	fn new(
441		hash: Block::Hash,
442		with_runtime: bool,
443		response_sender: FollowEventSender<Block::Hash>,
444		operation: RegisteredOperation,
445		backend: Arc<BE>,
446	) -> Result<Self, SubscriptionManagementError> {
447		backend
448			.pin_block(hash)
449			.map_err(|err| SubscriptionManagementError::Custom(err.to_string()))?;
450
451		Ok(Self { hash, with_runtime, response_sender, operation, backend })
452	}
453
454	/// The `with_runtime` flag of the subscription.
455	pub fn has_runtime(&self) -> bool {
456		self.with_runtime
457	}
458
459	/// Send message responses from the `chainHead` methods to `chainHead_follow`.
460	pub fn response_sender(&self) -> FollowEventSender<Block::Hash> {
461		self.response_sender.clone()
462	}
463
464	/// Get the details of the registered operation.
465	pub fn operation(&mut self) -> &mut RegisteredOperation {
466		&mut self.operation
467	}
468}
469
470impl<Block: BlockT, BE: Backend<Block>> Drop for BlockGuard<Block, BE> {
471	fn drop(&mut self) {
472		self.backend.unpin_block(self.hash);
473	}
474}
475
476/// The data propagated back to the `chainHead_follow` method after
477/// the subscription is successfully inserted.
478pub struct InsertedSubscriptionData<Block: BlockT> {
479	/// Signal that the subscription must stop.
480	pub rx_stop: oneshot::Receiver<()>,
481	/// Receive message responses from the `chainHead` methods.
482	pub response_receiver: FollowEventReceiver<Block::Hash>,
483}
484
485pub struct SubscriptionsInner<Block: BlockT, BE: Backend<Block>> {
486	/// Reference count the block hashes across all subscriptions.
487	///
488	/// The pinned blocks cannot exceed the [`Self::global_limit`] limit.
489	/// When the limit is exceeded subscriptions are stopped via the `Stop` event.
490	global_blocks: HashMap<Block::Hash, usize>,
491	/// The maximum number of pinned blocks across all subscriptions.
492	global_max_pinned_blocks: usize,
493	/// The maximum duration that a block is allowed to be pinned per subscription.
494	local_max_pin_duration: Duration,
495	/// The maximum number of ongoing operations per subscription.
496	max_ongoing_operations: usize,
497	/// Map the subscription ID to internal details of the subscription.
498	subs: HashMap<String, SubscriptionState<Block>>,
499
500	/// Backend pinning / unpinning blocks.
501	///
502	/// The `Arc` is handled one level-above, but substrate exposes the backend as Arc<T>.
503	backend: Arc<BE>,
504}
505
506impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
507	/// Construct a new [`SubscriptionsInner`] from the specified limits.
508	pub fn new(
509		global_max_pinned_blocks: usize,
510		local_max_pin_duration: Duration,
511		max_ongoing_operations: usize,
512		backend: Arc<BE>,
513	) -> Self {
514		SubscriptionsInner {
515			global_blocks: Default::default(),
516			global_max_pinned_blocks,
517			local_max_pin_duration,
518			max_ongoing_operations,
519			subs: Default::default(),
520			backend,
521		}
522	}
523
524	/// Insert a new subscription ID.
525	pub fn insert_subscription(
526		&mut self,
527		sub_id: String,
528		with_runtime: bool,
529	) -> Option<InsertedSubscriptionData<Block>> {
530		if let Entry::Vacant(entry) = self.subs.entry(sub_id) {
531			let (tx_stop, rx_stop) = oneshot::channel();
532			let (response_sender, response_receiver) =
533				futures::channel::mpsc::channel(BUF_CAP_PER_SUBSCRIPTION);
534			let state = SubscriptionState::<Block> {
535				with_runtime,
536				tx_stop: Some(tx_stop),
537				response_sender,
538				blocks: Default::default(),
539				operations: Operations::new(self.max_ongoing_operations),
540			};
541			entry.insert(state);
542
543			Some(InsertedSubscriptionData { rx_stop, response_receiver })
544		} else {
545			None
546		}
547	}
548
549	/// Remove the subscription ID with associated pinned blocks.
550	pub fn remove_subscription(&mut self, sub_id: &str) {
551		let Some(mut sub) = self.subs.remove(sub_id) else { return };
552
553		// The `Stop` event can be generated only once.
554		sub.stop();
555
556		for (hash, state) in sub.blocks.iter() {
557			if !state.state_machine.was_unpinned() {
558				self.global_unregister_block(*hash);
559			}
560		}
561	}
562
563	/// All active subscriptions are removed.
564	pub fn stop_all_subscriptions(&mut self) {
565		let to_remove: Vec<_> = self.subs.keys().map(|sub_id| sub_id.clone()).collect();
566
567		for sub_id in to_remove {
568			self.remove_subscription(&sub_id);
569		}
570	}
571
572	/// Ensure that a new block could be pinned.
573	///
574	/// If the global number of blocks has been reached this method
575	/// will remove all subscriptions that have blocks older than the
576	/// specified pin duration.
577	///
578	/// If after removing all subscriptions that exceed the pin duration
579	/// there is no space for pinning a new block, then all subscriptions
580	/// are terminated.
581	///
582	/// Returns true if the given subscription is also terminated.
583	fn ensure_block_space(&mut self, request_sub_id: &str) -> bool {
584		if self.global_blocks.len() < self.global_max_pinned_blocks {
585			return false
586		}
587
588		// Terminate all subscriptions that have blocks older than
589		// the specified pin duration.
590		let now = Instant::now();
591
592		let to_remove: Vec<_> = self
593			.subs
594			.iter_mut()
595			.filter_map(|(sub_id, sub)| {
596				let sub_time = sub.find_oldest_block_timestamp();
597				// Subscriptions older than the specified pin duration should be removed.
598				let should_remove = match now.checked_duration_since(sub_time) {
599					Some(duration) => duration > self.local_max_pin_duration,
600					None => true,
601				};
602				should_remove.then(|| sub_id.clone())
603			})
604			.collect();
605
606		let mut is_terminated = false;
607		for sub_id in to_remove {
608			if sub_id == request_sub_id {
609				is_terminated = true;
610			}
611			self.remove_subscription(&sub_id);
612		}
613
614		// Make sure we have enough space after first pass of terminating subscriptions.
615		if self.global_blocks.len() < self.global_max_pinned_blocks {
616			return is_terminated
617		}
618
619		// Sanity check: cannot uphold `chainHead` guarantees anymore. We have not
620		// found any subscriptions that have older pinned blocks to terminate.
621		let to_remove: Vec<_> = self.subs.keys().map(|sub_id| sub_id.clone()).collect();
622		for sub_id in to_remove {
623			if sub_id == request_sub_id {
624				is_terminated = true;
625			}
626			self.remove_subscription(&sub_id);
627		}
628		return is_terminated
629	}
630
631	pub fn pin_block(
632		&mut self,
633		sub_id: &str,
634		hash: Block::Hash,
635	) -> Result<bool, SubscriptionManagementError> {
636		let Some(sub) = self.subs.get_mut(sub_id) else {
637			return Err(SubscriptionManagementError::SubscriptionAbsent)
638		};
639
640		// Block was already registered for this subscription and therefore
641		// globally tracked.
642		if !sub.register_block(hash) {
643			return Ok(false)
644		}
645
646		// Ensure we have enough space only if the hash is not globally registered.
647		if !self.global_blocks.contains_key(&hash) {
648			// Subscription ID was terminated while ensuring enough space.
649			if self.ensure_block_space(sub_id) {
650				return Err(SubscriptionManagementError::ExceededLimits)
651			}
652		}
653
654		self.global_register_block(hash)?;
655		Ok(true)
656	}
657
658	/// Register the block internally.
659	///
660	/// If the block is present the reference counter is increased.
661	/// If this is a new block, the block is pinned in the backend.
662	fn global_register_block(
663		&mut self,
664		hash: Block::Hash,
665	) -> Result<(), SubscriptionManagementError> {
666		match self.global_blocks.entry(hash) {
667			Entry::Occupied(mut occupied) => {
668				*occupied.get_mut() += 1;
669			},
670			Entry::Vacant(vacant) => {
671				self.backend
672					.pin_block(hash)
673					.map_err(|err| SubscriptionManagementError::Custom(err.to_string()))?;
674
675				vacant.insert(1);
676			},
677		};
678		Ok(())
679	}
680
681	/// Unregister the block internally.
682	///
683	/// If the block is present the reference counter is decreased.
684	/// If this is the last reference of the block, the block
685	/// is unpinned from the backend and removed from internal tracking.
686	fn global_unregister_block(&mut self, hash: Block::Hash) {
687		if let Entry::Occupied(mut occupied) = self.global_blocks.entry(hash) {
688			let counter = occupied.get_mut();
689			if *counter == 1 {
690				// Unpin the block from the backend.
691				self.backend.unpin_block(hash);
692				occupied.remove();
693			} else {
694				*counter -= 1;
695			}
696		}
697	}
698
699	/// Ensure the provided hashes are unique.
700	fn ensure_hash_uniqueness(
701		hashes: impl IntoIterator<Item = Block::Hash> + Clone,
702	) -> Result<(), SubscriptionManagementError> {
703		let mut set = HashSet::new();
704		hashes.into_iter().try_for_each(|hash| {
705			if !set.insert(hash) {
706				Err(SubscriptionManagementError::DuplicateHashes)
707			} else {
708				Ok(())
709			}
710		})
711	}
712
713	pub fn unpin_blocks(
714		&mut self,
715		sub_id: &str,
716		hashes: impl IntoIterator<Item = Block::Hash> + Clone,
717	) -> Result<(), SubscriptionManagementError> {
718		Self::ensure_hash_uniqueness(hashes.clone())?;
719
720		let Some(sub) = self.subs.get_mut(sub_id) else {
721			return Err(SubscriptionManagementError::SubscriptionAbsent)
722		};
723
724		// Ensure that all blocks are part of the subscription before removing individual
725		// blocks.
726		for hash in hashes.clone() {
727			if !sub.contains_block(hash) {
728				return Err(SubscriptionManagementError::BlockHashAbsent)
729			}
730		}
731
732		// Note: this needs to be separate from the global mappings to avoid barrow checker
733		// thinking we borrow `&mut self` twice: once from `self.subs.get_mut` and once from
734		// `self.global_unregister_block`. Although the borrowing is correct, since different
735		// fields of the structure are borrowed, one at a time.
736		for hash in hashes.clone() {
737			sub.unregister_block(hash);
738		}
739
740		// Block have been removed from the subscription. Remove them from the global tracking.
741		for hash in hashes {
742			self.global_unregister_block(hash);
743		}
744
745		Ok(())
746	}
747
748	pub fn lock_block(
749		&mut self,
750		sub_id: &str,
751		hash: Block::Hash,
752		to_reserve: usize,
753	) -> Result<BlockGuard<Block, BE>, SubscriptionManagementError> {
754		let Some(sub) = self.subs.get_mut(sub_id) else {
755			return Err(SubscriptionManagementError::SubscriptionAbsent)
756		};
757
758		if !sub.contains_block(hash) {
759			return Err(SubscriptionManagementError::BlockHashAbsent)
760		}
761
762		let Some(operation) = sub.register_operation(to_reserve) else {
763			// Error when the server cannot execute at least one operation.
764			return Err(SubscriptionManagementError::ExceededLimits)
765		};
766
767		BlockGuard::new(
768			hash,
769			sub.with_runtime,
770			sub.response_sender.clone(),
771			operation,
772			self.backend.clone(),
773		)
774	}
775
776	pub fn get_operation(&mut self, sub_id: &str, id: &str) -> Option<OperationState> {
777		let state = self.subs.get(sub_id)?;
778		state.get_operation(id)
779	}
780}
781
782#[cfg(test)]
783mod tests {
784	use super::*;
785	use jsonrpsee::ConnectionId;
786	use sc_block_builder::BlockBuilderBuilder;
787	use sc_service::client::new_with_backend;
788	use sp_consensus::BlockOrigin;
789	use sp_core::{testing::TaskExecutor, H256};
790	use substrate_test_runtime_client::{
791		prelude::*,
792		runtime::{Block, RuntimeApi},
793		Client, ClientBlockImportExt, GenesisInit,
794	};
795
796	/// Maximum number of ongoing operations per subscription ID.
797	const MAX_OPERATIONS_PER_SUB: usize = 16;
798
799	fn init_backend() -> (
800		Arc<sc_client_api::in_mem::Backend<Block>>,
801		Arc<Client<sc_client_api::in_mem::Backend<Block>>>,
802	) {
803		let backend = Arc::new(sc_client_api::in_mem::Backend::new());
804		let executor = substrate_test_runtime_client::WasmExecutor::default();
805		let client_config = sc_service::ClientConfig::default();
806		let genesis_block_builder = sc_service::GenesisBlockBuilder::new(
807			&substrate_test_runtime_client::GenesisParameters::default().genesis_storage(),
808			!client_config.no_genesis,
809			backend.clone(),
810			executor.clone(),
811		)
812		.unwrap();
813		let client = Arc::new(
814			new_with_backend::<_, _, Block, _, RuntimeApi>(
815				backend.clone(),
816				executor,
817				genesis_block_builder,
818				Box::new(TaskExecutor::new()),
819				None,
820				None,
821				client_config,
822			)
823			.unwrap(),
824		);
825		(backend, client)
826	}
827
828	fn produce_blocks(
829		client: Arc<Client<sc_client_api::in_mem::Backend<Block>>>,
830		num_blocks: usize,
831	) -> Vec<<Block as BlockT>::Hash> {
832		let mut blocks = Vec::with_capacity(num_blocks);
833		let mut parent_hash = client.chain_info().genesis_hash;
834
835		for i in 0..num_blocks {
836			let block = BlockBuilderBuilder::new(&*client)
837				.on_parent_block(parent_hash)
838				.with_parent_block_number(i as u64)
839				.build()
840				.unwrap()
841				.build()
842				.unwrap()
843				.block;
844			parent_hash = block.header.hash();
845			futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
846			blocks.push(block.header.hash());
847		}
848
849		blocks
850	}
851
852	#[test]
853	fn block_state_machine_register_unpin() {
854		let mut state = BlockStateMachine::new();
855		// Starts in `Registered` state.
856		assert_eq!(state, BlockStateMachine::Registered);
857
858		state.advance_register();
859		assert_eq!(state, BlockStateMachine::FullyRegistered);
860
861		// Can call register multiple times.
862		state.advance_register();
863		assert_eq!(state, BlockStateMachine::FullyRegistered);
864
865		assert!(!state.was_unpinned());
866		state.advance_unpin();
867		assert_eq!(state, BlockStateMachine::FullyUnpinned);
868		assert!(state.was_unpinned());
869
870		// Can call unpin multiple times.
871		state.advance_unpin();
872		assert_eq!(state, BlockStateMachine::FullyUnpinned);
873		assert!(state.was_unpinned());
874
875		// Nothing to advance.
876		state.advance_register();
877		assert_eq!(state, BlockStateMachine::FullyUnpinned);
878	}
879
880	#[test]
881	fn block_state_machine_unpin_register() {
882		let mut state = BlockStateMachine::new();
883		// Starts in `Registered` state.
884		assert_eq!(state, BlockStateMachine::Registered);
885
886		assert!(!state.was_unpinned());
887		state.advance_unpin();
888		assert_eq!(state, BlockStateMachine::Unpinned);
889		assert!(state.was_unpinned());
890
891		// Can call unpin multiple times.
892		state.advance_unpin();
893		assert_eq!(state, BlockStateMachine::Unpinned);
894		assert!(state.was_unpinned());
895
896		state.advance_register();
897		assert_eq!(state, BlockStateMachine::FullyUnpinned);
898		assert!(state.was_unpinned());
899
900		// Nothing to advance.
901		state.advance_register();
902		assert_eq!(state, BlockStateMachine::FullyUnpinned);
903		// Nothing to unpin.
904		state.advance_unpin();
905		assert_eq!(state, BlockStateMachine::FullyUnpinned);
906		assert!(state.was_unpinned());
907	}
908
909	#[test]
910	fn sub_state_register_twice() {
911		let (response_sender, _response_receiver) = futures::channel::mpsc::channel(1);
912		let mut sub_state = SubscriptionState::<Block> {
913			with_runtime: false,
914			tx_stop: None,
915			response_sender,
916			operations: Operations::new(MAX_OPERATIONS_PER_SUB),
917			blocks: Default::default(),
918		};
919
920		let hash = H256::random();
921		assert_eq!(sub_state.register_block(hash), true);
922		let block_state = sub_state.blocks.get(&hash).unwrap();
923		// Did not call `register_block` twice.
924		assert_eq!(block_state.state_machine, BlockStateMachine::Registered);
925
926		assert_eq!(sub_state.register_block(hash), false);
927		let block_state = sub_state.blocks.get(&hash).unwrap();
928		assert_eq!(block_state.state_machine, BlockStateMachine::FullyRegistered);
929
930		// Block is no longer tracked when: `register_block` is called twice and
931		// `unregister_block` is called once.
932		assert_eq!(sub_state.unregister_block(hash), true);
933		let block_state = sub_state.blocks.get(&hash);
934		assert!(block_state.is_none());
935	}
936
937	#[test]
938	fn sub_state_register_unregister() {
939		let (response_sender, _response_receiver) = futures::channel::mpsc::channel(1);
940		let mut sub_state = SubscriptionState::<Block> {
941			with_runtime: false,
942			tx_stop: None,
943			response_sender,
944			blocks: Default::default(),
945			operations: Operations::new(MAX_OPERATIONS_PER_SUB),
946		};
947
948		let hash = H256::random();
949		// Block was not registered before.
950		assert_eq!(sub_state.unregister_block(hash), false);
951
952		assert_eq!(sub_state.register_block(hash), true);
953		let block_state = sub_state.blocks.get(&hash).unwrap();
954		// Did not call `register_block` twice.
955		assert_eq!(block_state.state_machine, BlockStateMachine::Registered);
956
957		// Unregister block before the second `register_block`.
958		assert_eq!(sub_state.unregister_block(hash), true);
959		let block_state = sub_state.blocks.get(&hash).unwrap();
960		assert_eq!(block_state.state_machine, BlockStateMachine::Unpinned);
961
962		assert_eq!(sub_state.register_block(hash), false);
963		let block_state = sub_state.blocks.get(&hash);
964		assert!(block_state.is_none());
965
966		// Block is no longer tracked when: `register_block` is called twice and
967		// `unregister_block` is called once.
968		assert_eq!(sub_state.unregister_block(hash), false);
969		let block_state = sub_state.blocks.get(&hash);
970		assert!(block_state.is_none());
971	}
972
973	#[test]
974	fn unpin_duplicate_hashes() {
975		let (backend, client) = init_backend();
976
977		let hashes = produce_blocks(client, 3);
978		let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
979
980		let mut subs =
981			SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
982		let id_1 = "abc".to_string();
983		let id_2 = "abcd".to_string();
984
985		// Pin all blocks for the first subscription.
986		let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
987		assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
988		assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
989		assert_eq!(subs.pin_block(&id_1, hash_3).unwrap(), true);
990
991		// Pin only block 2 for the second subscription.
992		let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
993		assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
994
995		// Check reference count.
996		assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
997		assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
998		assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
999
1000		// Unpin the same block twice.
1001		let err = subs.unpin_blocks(&id_1, vec![hash_1, hash_1, hash_2, hash_2]).unwrap_err();
1002		assert_eq!(err, SubscriptionManagementError::DuplicateHashes);
1003
1004		// Check reference count must be unaltered.
1005		assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
1006		assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
1007		assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
1008
1009		// Unpin the blocks correctly.
1010		subs.unpin_blocks(&id_1, vec![hash_1, hash_2]).unwrap();
1011		assert_eq!(subs.global_blocks.get(&hash_1), None);
1012		assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 1);
1013		assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
1014	}
1015
1016	#[test]
1017	fn subscription_lock_block() {
1018		let builder = TestClientBuilder::new();
1019		let backend = builder.backend();
1020		let mut subs =
1021			SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
1022
1023		let id = "abc".to_string();
1024		let hash = H256::random();
1025
1026		// Subscription not inserted.
1027		let err = subs.lock_block(&id, hash, 1).unwrap_err();
1028		assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
1029
1030		let _stop = subs.insert_subscription(id.clone(), true).unwrap();
1031		// Cannot insert the same subscription ID twice.
1032		assert!(subs.insert_subscription(id.clone(), true).is_none());
1033
1034		// No block hash.
1035		let err = subs.lock_block(&id, hash, 1).unwrap_err();
1036		assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);
1037
1038		subs.remove_subscription(&id);
1039
1040		// No subscription.
1041		let err = subs.lock_block(&id, hash, 1).unwrap_err();
1042		assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
1043	}
1044
1045	#[test]
1046	fn subscription_check_block() {
1047		let (backend, client) = init_backend();
1048
1049		let hashes = produce_blocks(client, 1);
1050		let hash = hashes[0];
1051
1052		let mut subs =
1053			SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
1054		let id = "abc".to_string();
1055
1056		let _stop = subs.insert_subscription(id.clone(), true).unwrap();
1057
1058		// First time we are pinning the block.
1059		assert_eq!(subs.pin_block(&id, hash).unwrap(), true);
1060
1061		let block = subs.lock_block(&id, hash, 1).unwrap();
1062		// Subscription started with runtime updates
1063		assert_eq!(block.has_runtime(), true);
1064
1065		let invalid_id = "abc-invalid".to_string();
1066		let err = subs.unpin_blocks(&invalid_id, vec![hash]).unwrap_err();
1067		assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
1068
1069		// Unpin the block.
1070		subs.unpin_blocks(&id, vec![hash]).unwrap();
1071		let err = subs.lock_block(&id, hash, 1).unwrap_err();
1072		assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);
1073	}
1074
1075	#[test]
1076	fn subscription_ref_count() {
1077		let (backend, client) = init_backend();
1078
1079		let hashes = produce_blocks(client, 1);
1080		let hash = hashes[0];
1081
1082		let mut subs =
1083			SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
1084		let id = "abc".to_string();
1085
1086		let _stop = subs.insert_subscription(id.clone(), true).unwrap();
1087		assert_eq!(subs.pin_block(&id, hash).unwrap(), true);
1088		// Check the global ref count.
1089		assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1);
1090		// Ensure the block propagated to the subscription.
1091		subs.subs.get(&id).unwrap().blocks.get(&hash).unwrap();
1092
1093		// Insert the block for the same subscription again (simulate NewBlock + Finalized pinning)
1094		assert_eq!(subs.pin_block(&id, hash).unwrap(), false);
1095		// Check the global ref count should not get incremented.
1096		assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1);
1097
1098		// Ensure the hash propagates for the second subscription.
1099		let id_second = "abcd".to_string();
1100		let _stop = subs.insert_subscription(id_second.clone(), true).unwrap();
1101		assert_eq!(subs.pin_block(&id_second, hash).unwrap(), true);
1102		// Check the global ref count.
1103		assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 2);
1104		// Ensure the block propagated to the subscription.
1105		subs.subs.get(&id_second).unwrap().blocks.get(&hash).unwrap();
1106
1107		subs.unpin_blocks(&id, vec![hash]).unwrap();
1108		assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1);
1109		// Cannot unpin a block twice for the same subscription.
1110		let err = subs.unpin_blocks(&id, vec![hash]).unwrap_err();
1111		assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);
1112
1113		subs.unpin_blocks(&id_second, vec![hash]).unwrap();
1114		// Block unregistered from the memory.
1115		assert!(subs.global_blocks.get(&hash).is_none());
1116	}
1117
1118	#[test]
1119	fn subscription_remove_subscription() {
1120		let (backend, client) = init_backend();
1121
1122		let hashes = produce_blocks(client, 3);
1123		let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
1124
1125		let mut subs =
1126			SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
1127		let id_1 = "abc".to_string();
1128		let id_2 = "abcd".to_string();
1129
1130		// Pin all blocks for the first subscription.
1131		let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
1132		assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
1133		assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
1134		assert_eq!(subs.pin_block(&id_1, hash_3).unwrap(), true);
1135
1136		// Pin only block 2 for the second subscription.
1137		let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
1138		assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
1139
1140		// Check reference count.
1141		assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
1142		assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
1143		assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
1144
1145		subs.remove_subscription(&id_1);
1146
1147		assert!(subs.global_blocks.get(&hash_1).is_none());
1148		assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 1);
1149		assert!(subs.global_blocks.get(&hash_3).is_none());
1150
1151		subs.remove_subscription(&id_2);
1152
1153		assert!(subs.global_blocks.get(&hash_2).is_none());
1154		assert_eq!(subs.global_blocks.len(), 0);
1155	}
1156
1157	#[test]
1158	fn subscription_check_limits() {
1159		let (backend, client) = init_backend();
1160
1161		let hashes = produce_blocks(client, 3);
1162		let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
1163
1164		// Maximum number of pinned blocks is 2.
1165		let mut subs =
1166			SubscriptionsInner::new(2, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
1167		let id_1 = "abc".to_string();
1168		let id_2 = "abcd".to_string();
1169
1170		// Both subscriptions can pin the maximum limit.
1171		let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
1172		assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
1173		assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
1174
1175		let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
1176		assert_eq!(subs.pin_block(&id_2, hash_1).unwrap(), true);
1177		assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
1178
1179		// Check reference count.
1180		assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 2);
1181		assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
1182
1183		// Block 3 pinning will exceed the limit and both subscriptions
1184		// are terminated because no subscription with older blocks than 10
1185		// seconds are present.
1186		let err = subs.pin_block(&id_1, hash_3).unwrap_err();
1187		assert_eq!(err, SubscriptionManagementError::ExceededLimits);
1188
1189		// Ensure both subscriptions are removed.
1190		let err = subs.lock_block(&id_1, hash_1, 1).unwrap_err();
1191		assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
1192
1193		let err = subs.lock_block(&id_2, hash_1, 1).unwrap_err();
1194		assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
1195
1196		assert!(subs.global_blocks.get(&hash_1).is_none());
1197		assert!(subs.global_blocks.get(&hash_2).is_none());
1198		assert!(subs.global_blocks.get(&hash_3).is_none());
1199		assert_eq!(subs.global_blocks.len(), 0);
1200	}
1201
1202	#[test]
1203	fn subscription_check_limits_with_duration() {
1204		let (backend, client) = init_backend();
1205
1206		let hashes = produce_blocks(client, 3);
1207		let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
1208
1209		// Maximum number of pinned blocks is 2 and maximum pin duration is 5 second.
1210		let mut subs =
1211			SubscriptionsInner::new(2, Duration::from_secs(5), MAX_OPERATIONS_PER_SUB, backend);
1212		let id_1 = "abc".to_string();
1213		let id_2 = "abcd".to_string();
1214
1215		let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
1216		assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
1217		assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
1218
1219		// Maximum pin duration is 5 second, sleep 5 seconds to ensure we clean up
1220		// the first subscription.
1221		std::thread::sleep(std::time::Duration::from_secs(5));
1222
1223		let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
1224		assert_eq!(subs.pin_block(&id_2, hash_1).unwrap(), true);
1225
1226		// Check reference count.
1227		assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 2);
1228		assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 1);
1229
1230		// Second subscription has only 1 block pinned. Only the first subscription is terminated.
1231		let err = subs.pin_block(&id_1, hash_3).unwrap_err();
1232		assert_eq!(err, SubscriptionManagementError::ExceededLimits);
1233
1234		// Ensure both subscriptions are removed.
1235		let err = subs.lock_block(&id_1, hash_1, 1).unwrap_err();
1236		assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
1237
1238		let _block_guard = subs.lock_block(&id_2, hash_1, 1).unwrap();
1239
1240		assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
1241		assert!(subs.global_blocks.get(&hash_2).is_none());
1242		assert!(subs.global_blocks.get(&hash_3).is_none());
1243		assert_eq!(subs.global_blocks.len(), 1);
1244
1245		// Force second subscription to get terminated.
1246		assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
1247		let err = subs.pin_block(&id_2, hash_3).unwrap_err();
1248		assert_eq!(err, SubscriptionManagementError::ExceededLimits);
1249
1250		assert!(subs.global_blocks.get(&hash_1).is_none());
1251		assert!(subs.global_blocks.get(&hash_2).is_none());
1252		assert!(subs.global_blocks.get(&hash_3).is_none());
1253		assert_eq!(subs.global_blocks.len(), 0);
1254	}
1255
1256	#[test]
1257	fn subscription_check_stop_event() {
1258		let builder = TestClientBuilder::new();
1259		let backend = builder.backend();
1260		let mut subs =
1261			SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
1262
1263		let id = "abc".to_string();
1264
1265		let mut sub_data = subs.insert_subscription(id.clone(), true).unwrap();
1266
1267		// Check the stop signal was not received.
1268		let res = sub_data.rx_stop.try_recv().unwrap();
1269		assert!(res.is_none());
1270
1271		let sub = subs.subs.get_mut(&id).unwrap();
1272		sub.stop();
1273
1274		// Check the signal was received.
1275		let res = sub_data.rx_stop.try_recv().unwrap();
1276		assert!(res.is_some());
1277	}
1278
1279	#[test]
1280	fn ongoing_operations() {
1281		// The object can hold at most 2 operations.
1282		let ops = LimitOperations::new(2);
1283
1284		// One operation is reserved.
1285		let permit_one = ops.reserve_at_most(1).unwrap();
1286		assert_eq!(permit_one.num_permits(), 1);
1287
1288		// Request 2 operations, however there is capacity only for one.
1289		let permit_two = ops.reserve_at_most(2).unwrap();
1290		// Number of reserved permits is smaller than provided.
1291		assert_eq!(permit_two.num_permits(), 1);
1292
1293		// Try to reserve operations when there's no space.
1294		let permit = ops.reserve_at_most(1);
1295		assert!(permit.is_none());
1296
1297		// Release capacity.
1298		drop(permit_two);
1299
1300		// Can reserve again
1301		let permit_three = ops.reserve_at_most(1).unwrap();
1302		assert_eq!(permit_three.num_permits(), 1);
1303	}
1304
1305	#[test]
1306	fn stop_all_subscriptions() {
1307		let (backend, client) = init_backend();
1308
1309		let hashes = produce_blocks(client, 3);
1310		let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
1311
1312		let mut subs =
1313			SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
1314		let id_1 = "abc".to_string();
1315		let id_2 = "abcd".to_string();
1316
1317		// Pin all blocks for the first subscription.
1318		let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
1319		assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
1320		assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
1321		assert_eq!(subs.pin_block(&id_1, hash_3).unwrap(), true);
1322
1323		// Pin only block 2 for the second subscription.
1324		let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
1325		assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
1326
1327		// Check reference count.
1328		assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
1329		assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
1330		assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
1331		assert_eq!(subs.global_blocks.len(), 3);
1332
1333		// Stop all active subscriptions.
1334		subs.stop_all_subscriptions();
1335		assert!(subs.global_blocks.is_empty());
1336	}
1337
1338	#[test]
1339	fn reserved_subscription_cleans_resources() {
1340		let builder = TestClientBuilder::new();
1341		let backend = builder.backend();
1342		let subs = Arc::new(parking_lot::RwLock::new(SubscriptionsInner::new(
1343			10,
1344			Duration::from_secs(10),
1345			MAX_OPERATIONS_PER_SUB,
1346			backend,
1347		)));
1348
1349		// Maximum 2 subscriptions per connection.
1350		let rpc_connections = crate::common::connections::RpcConnections::new(2);
1351
1352		let subscription_management =
1353			crate::chain_head::subscription::SubscriptionManagement::_from_inner(
1354				subs.clone(),
1355				rpc_connections.clone(),
1356			);
1357
1358		let reserved_sub_first =
1359			subscription_management.reserve_subscription(ConnectionId(1)).unwrap();
1360		let mut reserved_sub_second =
1361			subscription_management.reserve_subscription(ConnectionId(1)).unwrap();
1362		// Subscriptions reserved but not yet populated.
1363		assert_eq!(subs.read().subs.len(), 0);
1364
1365		// Cannot reserve anymore.
1366		assert!(subscription_management.reserve_subscription(ConnectionId(1)).is_none());
1367		// Drop the first subscription.
1368		drop(reserved_sub_first);
1369		// Space is freed-up for the rpc connections.
1370		let mut reserved_sub_first =
1371			subscription_management.reserve_subscription(ConnectionId(1)).unwrap();
1372
1373		// Insert subscriptions.
1374		let _sub_data_first =
1375			reserved_sub_first.insert_subscription("sub1".to_string(), true).unwrap();
1376		let _sub_data_second =
1377			reserved_sub_second.insert_subscription("sub2".to_string(), true).unwrap();
1378		// Check we have 2 subscriptions under management.
1379		assert_eq!(subs.read().subs.len(), 2);
1380
1381		// Drop first reserved subscription.
1382		drop(reserved_sub_first);
1383		// Check that the subscription is removed.
1384		assert_eq!(subs.read().subs.len(), 1);
1385		// Space is freed-up for the rpc connections.
1386		let reserved_sub_first =
1387			subscription_management.reserve_subscription(ConnectionId(1)).unwrap();
1388
1389		// Drop all subscriptions.
1390		drop(reserved_sub_first);
1391		drop(reserved_sub_second);
1392		assert_eq!(subs.read().subs.len(), 0);
1393	}
1394}