sc_rpc_spec_v2/chain_head/subscription/
inner.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use futures::channel::oneshot;
20use parking_lot::Mutex;
21use sc_client_api::Backend;
22use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
23use sp_runtime::traits::Block as BlockT;
24use std::{
25	collections::{hash_map::Entry, HashMap, HashSet},
26	sync::{atomic::AtomicBool, Arc},
27	time::{Duration, Instant},
28};
29
30use crate::chain_head::{subscription::SubscriptionManagementError, FollowEvent};
31
32/// The queue size after which the `sc_utils::mpsc::tracing_unbounded` would produce warnings.
33const QUEUE_SIZE_WARNING: usize = 512;
34
35/// The state machine of a block of a single subscription ID.
36///
37/// # Motivation
38///
39/// Each block is registered twice: once from the `BestBlock` event
40/// and once from the `Finalized` event.
41///
42/// The state of a block must be tracked until both events register the
43/// block and the user calls `unpin`.
44///
45/// Otherwise, the following race might happen:
46///  T0. BestBlock event: hash is tracked and pinned in backend.
47///  T1. User calls unpin: hash is untracked and unpinned in backend.
48///  T2. Finalized event: hash is tracked (no previous history) and pinned again.
49///
50/// # State Machine Transition
51///
52/// ```ignore
53///                   (register)
54/// [ REGISTERED ]  ---------------> [ FULLY REGISTERED ]
55///       |                              |
56///       | (unpin)                      | (unpin)
57///       |                              |
58///       V           (register)         V
59/// [ UNPINNED ]  -----------------> [ FULLY UNPINNED ]
60/// ```
61#[derive(Debug, Clone, PartialEq)]
62enum BlockStateMachine {
63	/// The block was registered by one event (either `Finalized` or `BestBlock` event).
64	///
65	/// Unpin was not called.
66	Registered,
67	/// The block was registered by both events (`Finalized` and `BestBlock` events).
68	///
69	/// Unpin was not called.
70	FullyRegistered,
71	/// The block was registered by one event (either `Finalized` or `BestBlock` event),
72	///
73	/// Unpin __was__ called.
74	Unpinned,
75	/// The block was registered by both events (`Finalized` and `BestBlock` events).
76	///
77	/// Unpin __was__ called.
78	FullyUnpinned,
79}
80
81impl BlockStateMachine {
82	fn new() -> Self {
83		BlockStateMachine::Registered
84	}
85
86	fn advance_register(&mut self) {
87		match self {
88			BlockStateMachine::Registered => *self = BlockStateMachine::FullyRegistered,
89			BlockStateMachine::Unpinned => *self = BlockStateMachine::FullyUnpinned,
90			_ => (),
91		}
92	}
93
94	fn advance_unpin(&mut self) {
95		match self {
96			BlockStateMachine::Registered => *self = BlockStateMachine::Unpinned,
97			BlockStateMachine::FullyRegistered => *self = BlockStateMachine::FullyUnpinned,
98			_ => (),
99		}
100	}
101
102	fn was_unpinned(&self) -> bool {
103		match self {
104			BlockStateMachine::Unpinned => true,
105			BlockStateMachine::FullyUnpinned => true,
106			_ => false,
107		}
108	}
109}
110
111/// Limit the number of ongoing operations across methods.
112struct LimitOperations {
113	/// Limit the number of ongoing operations for this subscription.
114	semaphore: Arc<tokio::sync::Semaphore>,
115}
116
117impl LimitOperations {
118	/// Constructs a new [`LimitOperations`].
119	fn new(max_operations: usize) -> Self {
120		LimitOperations { semaphore: Arc::new(tokio::sync::Semaphore::new(max_operations)) }
121	}
122
123	/// Reserves capacity to execute at least one operation and at most the requested items.
124	///
125	/// Dropping [`PermitOperations`] without executing an operation will release
126	/// the reserved capacity.
127	///
128	/// Returns nothing if there's no space available, else returns a permit
129	/// that guarantees that at least one operation can be executed.
130	fn reserve_at_most(&self, to_reserve: usize) -> Option<PermitOperations> {
131		let num_ops = std::cmp::min(self.semaphore.available_permits(), to_reserve);
132
133		if num_ops == 0 {
134			return None
135		}
136
137		let permits = Arc::clone(&self.semaphore)
138			.try_acquire_many_owned(num_ops.try_into().ok()?)
139			.ok()?;
140
141		Some(PermitOperations { num_ops, _permit: permits })
142	}
143}
144
145/// Permits a number of operations to be executed.
146///
147/// [`PermitOperations`] are returned by [`LimitOperations::reserve()`] and are used
148/// to guarantee the RPC server can execute the number of operations.
149///
150/// The number of reserved items are given back to the [`LimitOperations`] on drop.
151struct PermitOperations {
152	/// The number of operations permitted (reserved).
153	num_ops: usize,
154	/// The permit for these operations.
155	_permit: tokio::sync::OwnedSemaphorePermit,
156}
157
158/// The state of one operation.
159///
160/// This is directly exposed to users via `chain_head_unstable_continue` and
161/// `chain_head_unstable_stop_operation`.
162#[derive(Clone)]
163pub struct OperationState {
164	/// The shared operation state that holds information about the
165	/// `waitingForContinue` event and cancellation.
166	shared_state: Arc<SharedOperationState>,
167	/// Send notifications when the user calls `chainHead_continue` method.
168	send_continue: tokio::sync::mpsc::Sender<()>,
169}
170
171impl OperationState {
172	/// Returns true if `chainHead_continue` is called after the
173	/// `waitingForContinue` event was emitted for the associated
174	/// operation ID.
175	pub fn submit_continue(&self) -> bool {
176		// `waitingForContinue` not generated.
177		if !self.shared_state.requested_continue.load(std::sync::atomic::Ordering::Acquire) {
178			return false
179		}
180
181		// Has enough capacity for 1 message.
182		// Can fail if the `stop_operation` propagated the stop first.
183		self.send_continue.try_send(()).is_ok()
184	}
185
186	/// Stops the operation if `waitingForContinue` event was emitted for the associated
187	/// operation ID.
188	///
189	/// Returns nothing in accordance with `chainHead_v1_stopOperation`.
190	pub fn stop_operation(&self) {
191		// `waitingForContinue` not generated.
192		if !self.shared_state.requested_continue.load(std::sync::atomic::Ordering::Acquire) {
193			return
194		}
195
196		self.shared_state
197			.operation_stopped
198			.store(true, std::sync::atomic::Ordering::Release);
199
200		// Send might not have enough capacity if `submit_continue` was sent first.
201		// However, the `operation_stopped` boolean was set.
202		let _ = self.send_continue.try_send(());
203	}
204}
205
206/// The shared operation state between the backend [`RegisteredOperation`] and frontend
207/// [`RegisteredOperation`].
208struct SharedOperationState {
209	/// True if the `chainHead` generated `waitingForContinue` event.
210	requested_continue: AtomicBool,
211	/// True if the operation was cancelled by the user.
212	operation_stopped: AtomicBool,
213}
214
215impl SharedOperationState {
216	/// Constructs a new [`SharedOperationState`].
217	///
218	/// This is efficiently cloned under a single heap allocation.
219	fn new() -> Arc<Self> {
220		Arc::new(SharedOperationState {
221			requested_continue: AtomicBool::new(false),
222			operation_stopped: AtomicBool::new(false),
223		})
224	}
225}
226
227/// The registered operation passed to the `chainHead` methods.
228///
229/// This is used internally by the `chainHead` methods.
230pub struct RegisteredOperation {
231	/// The shared operation state that holds information about the
232	/// `waitingForContinue` event and cancellation.
233	shared_state: Arc<SharedOperationState>,
234	/// Receive notifications when the user calls `chainHead_continue` method.
235	recv_continue: tokio::sync::mpsc::Receiver<()>,
236	/// The operation ID of the request.
237	operation_id: String,
238	/// Track the operations ID of this subscription.
239	operations: Arc<Mutex<HashMap<String, OperationState>>>,
240	/// Permit a number of items to be executed by this operation.
241	permit: PermitOperations,
242}
243
244impl RegisteredOperation {
245	/// Wait until the user calls `chainHead_continue` or the operation
246	/// is cancelled via `chainHead_stopOperation`.
247	pub async fn wait_for_continue(&mut self) {
248		self.shared_state
249			.requested_continue
250			.store(true, std::sync::atomic::Ordering::Release);
251
252		// The sender part of this channel is around for as long as this object exists,
253		// because it is stored in the `OperationState` of the `operations` field.
254		// The sender part is removed from tracking when this object is dropped.
255		let _ = self.recv_continue.recv().await;
256
257		self.shared_state
258			.requested_continue
259			.store(false, std::sync::atomic::Ordering::Release);
260	}
261
262	/// Returns true if the current operation was stopped.
263	pub fn was_stopped(&self) -> bool {
264		self.shared_state.operation_stopped.load(std::sync::atomic::Ordering::Acquire)
265	}
266
267	/// Get the operation ID.
268	pub fn operation_id(&self) -> String {
269		self.operation_id.clone()
270	}
271
272	/// Returns the number of reserved elements for this permit.
273	///
274	/// This can be smaller than the number of items requested via [`LimitOperations::reserve()`].
275	pub fn num_reserved(&self) -> usize {
276		self.permit.num_ops
277	}
278}
279
280impl Drop for RegisteredOperation {
281	fn drop(&mut self) {
282		let mut operations = self.operations.lock();
283		operations.remove(&self.operation_id);
284	}
285}
286
287/// The ongoing operations of a subscription.
288struct Operations {
289	/// The next operation ID to be generated.
290	next_operation_id: usize,
291	/// Limit the number of ongoing operations.
292	limits: LimitOperations,
293	/// Track the operations ID of this subscription.
294	operations: Arc<Mutex<HashMap<String, OperationState>>>,
295}
296
297impl Operations {
298	/// Constructs a new [`Operations`].
299	fn new(max_operations: usize) -> Self {
300		Operations {
301			next_operation_id: 0,
302			limits: LimitOperations::new(max_operations),
303			operations: Default::default(),
304		}
305	}
306
307	/// Register a new operation.
308	pub fn register_operation(&mut self, to_reserve: usize) -> Option<RegisteredOperation> {
309		let permit = self.limits.reserve_at_most(to_reserve)?;
310
311		let operation_id = self.next_operation_id();
312
313		// At most one message can be sent.
314		let (send_continue, recv_continue) = tokio::sync::mpsc::channel(1);
315		let shared_state = SharedOperationState::new();
316
317		let state = OperationState { send_continue, shared_state: shared_state.clone() };
318
319		// Cloned operations for removing the current ID on drop.
320		let operations = self.operations.clone();
321		operations.lock().insert(operation_id.clone(), state);
322
323		Some(RegisteredOperation { shared_state, operation_id, recv_continue, operations, permit })
324	}
325
326	/// Get the associated operation state with the ID.
327	pub fn get_operation(&self, id: &str) -> Option<OperationState> {
328		self.operations.lock().get(id).map(|state| state.clone())
329	}
330
331	/// Generate the next operation ID for this subscription.
332	fn next_operation_id(&mut self) -> String {
333		let op_id = self.next_operation_id;
334		self.next_operation_id += 1;
335		op_id.to_string()
336	}
337}
338
339struct BlockState {
340	/// The state machine of this block.
341	state_machine: BlockStateMachine,
342	/// The timestamp when the block was inserted.
343	timestamp: Instant,
344}
345
346/// The state of a single subscription ID.
347struct SubscriptionState<Block: BlockT> {
348	/// The `with_runtime` parameter flag of the subscription.
349	with_runtime: bool,
350	/// Signals the "Stop" event.
351	tx_stop: Option<oneshot::Sender<()>>,
352	/// The sender of message responses to the `chainHead_follow` events.
353	///
354	/// This object is cloned between methods.
355	response_sender: TracingUnboundedSender<FollowEvent<Block::Hash>>,
356	/// The ongoing operations of a subscription.
357	operations: Operations,
358	/// Track the block hashes available for this subscription.
359	///
360	/// This implementation assumes:
361	/// - most of the time subscriptions keep a few blocks of the chain's head pinned
362	/// - iteration through the blocks happens only when the hard limit is exceeded.
363	///
364	/// Considering the assumption, iterating (in the unlike case) the hashmap O(N) is
365	/// more time efficient and code friendly than paying for:
366	/// - extra space: an extra BTreeMap<Instant, Hash> to older hashes by oldest insertion
367	/// - extra time: O(log(N)) for insert/remove/find each `pin` block time per subscriptions
368	blocks: HashMap<Block::Hash, BlockState>,
369}
370
371impl<Block: BlockT> SubscriptionState<Block> {
372	/// Trigger the stop event for the current subscription.
373	///
374	/// This can happen on internal failure (ie, the pruning deleted the block from memory)
375	/// or if the subscription exceeded the available pinned blocks.
376	fn stop(&mut self) {
377		if let Some(tx_stop) = self.tx_stop.take() {
378			let _ = tx_stop.send(());
379		}
380	}
381
382	/// Keep track of the given block hash for this subscription.
383	///
384	/// This does not handle pinning in the backend.
385	///
386	/// Returns:
387	/// - true if this is the first time that the block is registered
388	/// - false if the block was already registered
389	fn register_block(&mut self, hash: Block::Hash) -> bool {
390		match self.blocks.entry(hash) {
391			Entry::Occupied(mut occupied) => {
392				let block_state = occupied.get_mut();
393
394				block_state.state_machine.advance_register();
395				// Block was registered twice and unpin was called.
396				if block_state.state_machine == BlockStateMachine::FullyUnpinned {
397					occupied.remove();
398				}
399
400				// Second time we register this block.
401				false
402			},
403			Entry::Vacant(vacant) => {
404				vacant.insert(BlockState {
405					state_machine: BlockStateMachine::new(),
406					timestamp: Instant::now(),
407				});
408
409				// First time we register this block.
410				true
411			},
412		}
413	}
414
415	/// A block is unregistered when the user calls `unpin`.
416	///
417	/// Returns:
418	/// - true if the block can be unpinned.
419	/// - false if the subscription does not contain the block or it was unpinned.
420	fn unregister_block(&mut self, hash: Block::Hash) -> bool {
421		match self.blocks.entry(hash) {
422			Entry::Occupied(mut occupied) => {
423				let block_state = occupied.get_mut();
424
425				// Cannot unpin a block twice.
426				if block_state.state_machine.was_unpinned() {
427					return false
428				}
429
430				block_state.state_machine.advance_unpin();
431				// Block was registered twice and unpin was called.
432				if block_state.state_machine == BlockStateMachine::FullyUnpinned {
433					occupied.remove();
434				}
435
436				true
437			},
438			// Block was not tracked.
439			Entry::Vacant(_) => false,
440		}
441	}
442
443	/// A subscription contains a block when the block was
444	/// registered (`pin` was called) and the block was not `unpinned` yet.
445	///
446	/// Returns `true` if the subscription contains the block.
447	fn contains_block(&self, hash: Block::Hash) -> bool {
448		let Some(state) = self.blocks.get(&hash) else {
449			// Block was not tracked.
450			return false
451		};
452
453		// Subscription no longer contains the block if `unpin` was called.
454		!state.state_machine.was_unpinned()
455	}
456
457	/// Get the timestamp of the oldest inserted block.
458	///
459	/// # Note
460	///
461	/// This iterates over all the blocks of the subscription.
462	fn find_oldest_block_timestamp(&self) -> Instant {
463		let mut timestamp = Instant::now();
464		for (_, state) in self.blocks.iter() {
465			timestamp = std::cmp::min(timestamp, state.timestamp);
466		}
467		timestamp
468	}
469
470	/// Register a new operation.
471	///
472	/// The registered operation can execute at least one item and at most the requested items.
473	fn register_operation(&mut self, to_reserve: usize) -> Option<RegisteredOperation> {
474		self.operations.register_operation(to_reserve)
475	}
476
477	/// Get the associated operation state with the ID.
478	pub fn get_operation(&self, id: &str) -> Option<OperationState> {
479		self.operations.get_operation(id)
480	}
481}
482
483/// Keeps a specific block pinned while the handle is alive.
484/// This object ensures that the block is not unpinned while
485/// executing an RPC method call.
486pub struct BlockGuard<Block: BlockT, BE: Backend<Block>> {
487	hash: Block::Hash,
488	with_runtime: bool,
489	response_sender: TracingUnboundedSender<FollowEvent<Block::Hash>>,
490	operation: RegisteredOperation,
491	backend: Arc<BE>,
492}
493
494// Custom implementation of Debug to avoid bounds on `backend: Debug` for `unwrap_err()` needed for
495// testing.
496impl<Block: BlockT, BE: Backend<Block>> std::fmt::Debug for BlockGuard<Block, BE> {
497	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
498		write!(f, "BlockGuard hash {:?} with_runtime {:?}", self.hash, self.with_runtime)
499	}
500}
501
502impl<Block: BlockT, BE: Backend<Block>> BlockGuard<Block, BE> {
503	/// Construct a new [`BlockGuard`] .
504	fn new(
505		hash: Block::Hash,
506		with_runtime: bool,
507		response_sender: TracingUnboundedSender<FollowEvent<Block::Hash>>,
508		operation: RegisteredOperation,
509		backend: Arc<BE>,
510	) -> Result<Self, SubscriptionManagementError> {
511		backend
512			.pin_block(hash)
513			.map_err(|err| SubscriptionManagementError::Custom(err.to_string()))?;
514
515		Ok(Self { hash, with_runtime, response_sender, operation, backend })
516	}
517
518	/// The `with_runtime` flag of the subscription.
519	pub fn has_runtime(&self) -> bool {
520		self.with_runtime
521	}
522
523	/// Send message responses from the `chainHead` methods to `chainHead_follow`.
524	pub fn response_sender(&self) -> TracingUnboundedSender<FollowEvent<Block::Hash>> {
525		self.response_sender.clone()
526	}
527
528	/// Get the details of the registered operation.
529	pub fn operation(&mut self) -> &mut RegisteredOperation {
530		&mut self.operation
531	}
532}
533
534impl<Block: BlockT, BE: Backend<Block>> Drop for BlockGuard<Block, BE> {
535	fn drop(&mut self) {
536		self.backend.unpin_block(self.hash);
537	}
538}
539
540/// The data propagated back to the `chainHead_follow` method after
541/// the subscription is successfully inserted.
542pub struct InsertedSubscriptionData<Block: BlockT> {
543	/// Signal that the subscription must stop.
544	pub rx_stop: oneshot::Receiver<()>,
545	/// Receive message responses from the `chainHead` methods.
546	pub response_receiver: TracingUnboundedReceiver<FollowEvent<Block::Hash>>,
547}
548
549pub struct SubscriptionsInner<Block: BlockT, BE: Backend<Block>> {
550	/// Reference count the block hashes across all subscriptions.
551	///
552	/// The pinned blocks cannot exceed the [`Self::global_limit`] limit.
553	/// When the limit is exceeded subscriptions are stopped via the `Stop` event.
554	global_blocks: HashMap<Block::Hash, usize>,
555	/// The maximum number of pinned blocks across all subscriptions.
556	global_max_pinned_blocks: usize,
557	/// The maximum duration that a block is allowed to be pinned per subscription.
558	local_max_pin_duration: Duration,
559	/// The maximum number of ongoing operations per subscription.
560	max_ongoing_operations: usize,
561	/// Map the subscription ID to internal details of the subscription.
562	subs: HashMap<String, SubscriptionState<Block>>,
563
564	/// Backend pinning / unpinning blocks.
565	///
566	/// The `Arc` is handled one level-above, but substrate exposes the backend as Arc<T>.
567	backend: Arc<BE>,
568}
569
570impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
571	/// Construct a new [`SubscriptionsInner`] from the specified limits.
572	pub fn new(
573		global_max_pinned_blocks: usize,
574		local_max_pin_duration: Duration,
575		max_ongoing_operations: usize,
576		backend: Arc<BE>,
577	) -> Self {
578		SubscriptionsInner {
579			global_blocks: Default::default(),
580			global_max_pinned_blocks,
581			local_max_pin_duration,
582			max_ongoing_operations,
583			subs: Default::default(),
584			backend,
585		}
586	}
587
588	/// Insert a new subscription ID.
589	pub fn insert_subscription(
590		&mut self,
591		sub_id: String,
592		with_runtime: bool,
593	) -> Option<InsertedSubscriptionData<Block>> {
594		if let Entry::Vacant(entry) = self.subs.entry(sub_id) {
595			let (tx_stop, rx_stop) = oneshot::channel();
596			let (response_sender, response_receiver) =
597				tracing_unbounded("chain-head-method-responses", QUEUE_SIZE_WARNING);
598			let state = SubscriptionState::<Block> {
599				with_runtime,
600				tx_stop: Some(tx_stop),
601				response_sender,
602				blocks: Default::default(),
603				operations: Operations::new(self.max_ongoing_operations),
604			};
605			entry.insert(state);
606
607			Some(InsertedSubscriptionData { rx_stop, response_receiver })
608		} else {
609			None
610		}
611	}
612
613	/// Remove the subscription ID with associated pinned blocks.
614	pub fn remove_subscription(&mut self, sub_id: &str) {
615		let Some(mut sub) = self.subs.remove(sub_id) else { return };
616
617		// The `Stop` event can be generated only once.
618		sub.stop();
619
620		for (hash, state) in sub.blocks.iter() {
621			if !state.state_machine.was_unpinned() {
622				self.global_unregister_block(*hash);
623			}
624		}
625	}
626
627	/// All active subscriptions are removed.
628	pub fn stop_all_subscriptions(&mut self) {
629		let to_remove: Vec<_> = self.subs.keys().map(|sub_id| sub_id.clone()).collect();
630
631		for sub_id in to_remove {
632			self.remove_subscription(&sub_id);
633		}
634	}
635
636	/// Ensure that a new block could be pinned.
637	///
638	/// If the global number of blocks has been reached this method
639	/// will remove all subscriptions that have blocks older than the
640	/// specified pin duration.
641	///
642	/// If after removing all subscriptions that exceed the pin duration
643	/// there is no space for pinning a new block, then all subscriptions
644	/// are terminated.
645	///
646	/// Returns true if the given subscription is also terminated.
647	fn ensure_block_space(&mut self, request_sub_id: &str) -> bool {
648		if self.global_blocks.len() < self.global_max_pinned_blocks {
649			return false
650		}
651
652		// Terminate all subscriptions that have blocks older than
653		// the specified pin duration.
654		let now = Instant::now();
655
656		let to_remove: Vec<_> = self
657			.subs
658			.iter_mut()
659			.filter_map(|(sub_id, sub)| {
660				let sub_time = sub.find_oldest_block_timestamp();
661				// Subscriptions older than the specified pin duration should be removed.
662				let should_remove = match now.checked_duration_since(sub_time) {
663					Some(duration) => duration > self.local_max_pin_duration,
664					None => true,
665				};
666				should_remove.then(|| sub_id.clone())
667			})
668			.collect();
669
670		let mut is_terminated = false;
671		for sub_id in to_remove {
672			if sub_id == request_sub_id {
673				is_terminated = true;
674			}
675			self.remove_subscription(&sub_id);
676		}
677
678		// Make sure we have enough space after first pass of terminating subscriptions.
679		if self.global_blocks.len() < self.global_max_pinned_blocks {
680			return is_terminated
681		}
682
683		// Sanity check: cannot uphold `chainHead` guarantees anymore. We have not
684		// found any subscriptions that have older pinned blocks to terminate.
685		let to_remove: Vec<_> = self.subs.keys().map(|sub_id| sub_id.clone()).collect();
686		for sub_id in to_remove {
687			if sub_id == request_sub_id {
688				is_terminated = true;
689			}
690			self.remove_subscription(&sub_id);
691		}
692		return is_terminated
693	}
694
695	pub fn pin_block(
696		&mut self,
697		sub_id: &str,
698		hash: Block::Hash,
699	) -> Result<bool, SubscriptionManagementError> {
700		let Some(sub) = self.subs.get_mut(sub_id) else {
701			return Err(SubscriptionManagementError::SubscriptionAbsent)
702		};
703
704		// Block was already registered for this subscription and therefore
705		// globally tracked.
706		if !sub.register_block(hash) {
707			return Ok(false)
708		}
709
710		// Ensure we have enough space only if the hash is not globally registered.
711		if !self.global_blocks.contains_key(&hash) {
712			// Subscription ID was terminated while ensuring enough space.
713			if self.ensure_block_space(sub_id) {
714				return Err(SubscriptionManagementError::ExceededLimits)
715			}
716		}
717
718		self.global_register_block(hash)?;
719		Ok(true)
720	}
721
722	/// Register the block internally.
723	///
724	/// If the block is present the reference counter is increased.
725	/// If this is a new block, the block is pinned in the backend.
726	fn global_register_block(
727		&mut self,
728		hash: Block::Hash,
729	) -> Result<(), SubscriptionManagementError> {
730		match self.global_blocks.entry(hash) {
731			Entry::Occupied(mut occupied) => {
732				*occupied.get_mut() += 1;
733			},
734			Entry::Vacant(vacant) => {
735				self.backend
736					.pin_block(hash)
737					.map_err(|err| SubscriptionManagementError::Custom(err.to_string()))?;
738
739				vacant.insert(1);
740			},
741		};
742		Ok(())
743	}
744
745	/// Unregister the block internally.
746	///
747	/// If the block is present the reference counter is decreased.
748	/// If this is the last reference of the block, the block
749	/// is unpinned from the backend and removed from internal tracking.
750	fn global_unregister_block(&mut self, hash: Block::Hash) {
751		if let Entry::Occupied(mut occupied) = self.global_blocks.entry(hash) {
752			let counter = occupied.get_mut();
753			if *counter == 1 {
754				// Unpin the block from the backend.
755				self.backend.unpin_block(hash);
756				occupied.remove();
757			} else {
758				*counter -= 1;
759			}
760		}
761	}
762
763	/// Ensure the provided hashes are unique.
764	fn ensure_hash_uniqueness(
765		hashes: impl IntoIterator<Item = Block::Hash> + Clone,
766	) -> Result<(), SubscriptionManagementError> {
767		let mut set = HashSet::new();
768		hashes.into_iter().try_for_each(|hash| {
769			if !set.insert(hash) {
770				Err(SubscriptionManagementError::DuplicateHashes)
771			} else {
772				Ok(())
773			}
774		})
775	}
776
777	pub fn unpin_blocks(
778		&mut self,
779		sub_id: &str,
780		hashes: impl IntoIterator<Item = Block::Hash> + Clone,
781	) -> Result<(), SubscriptionManagementError> {
782		Self::ensure_hash_uniqueness(hashes.clone())?;
783
784		let Some(sub) = self.subs.get_mut(sub_id) else {
785			return Err(SubscriptionManagementError::SubscriptionAbsent)
786		};
787
788		// Ensure that all blocks are part of the subscription before removing individual
789		// blocks.
790		for hash in hashes.clone() {
791			if !sub.contains_block(hash) {
792				return Err(SubscriptionManagementError::BlockHashAbsent)
793			}
794		}
795
796		// Note: this needs to be separate from the global mappings to avoid barrow checker
797		// thinking we borrow `&mut self` twice: once from `self.subs.get_mut` and once from
798		// `self.global_unregister_block`. Although the borrowing is correct, since different
799		// fields of the structure are borrowed, one at a time.
800		for hash in hashes.clone() {
801			sub.unregister_block(hash);
802		}
803
804		// Block have been removed from the subscription. Remove them from the global tracking.
805		for hash in hashes {
806			self.global_unregister_block(hash);
807		}
808
809		Ok(())
810	}
811
812	pub fn lock_block(
813		&mut self,
814		sub_id: &str,
815		hash: Block::Hash,
816		to_reserve: usize,
817	) -> Result<BlockGuard<Block, BE>, SubscriptionManagementError> {
818		let Some(sub) = self.subs.get_mut(sub_id) else {
819			return Err(SubscriptionManagementError::SubscriptionAbsent)
820		};
821
822		if !sub.contains_block(hash) {
823			return Err(SubscriptionManagementError::BlockHashAbsent)
824		}
825
826		let Some(operation) = sub.register_operation(to_reserve) else {
827			// Error when the server cannot execute at least one operation.
828			return Err(SubscriptionManagementError::ExceededLimits)
829		};
830
831		BlockGuard::new(
832			hash,
833			sub.with_runtime,
834			sub.response_sender.clone(),
835			operation,
836			self.backend.clone(),
837		)
838	}
839
840	pub fn get_operation(&mut self, sub_id: &str, id: &str) -> Option<OperationState> {
841		let state = self.subs.get(sub_id)?;
842		state.get_operation(id)
843	}
844}
845
846#[cfg(test)]
847mod tests {
848	use super::*;
849	use jsonrpsee::ConnectionId;
850	use sc_block_builder::BlockBuilderBuilder;
851	use sc_service::client::new_in_mem;
852	use sp_consensus::BlockOrigin;
853	use sp_core::{testing::TaskExecutor, H256};
854	use substrate_test_runtime_client::{
855		prelude::*,
856		runtime::{Block, RuntimeApi},
857		Client, ClientBlockImportExt, GenesisInit,
858	};
859
860	/// Maximum number of ongoing operations per subscription ID.
861	const MAX_OPERATIONS_PER_SUB: usize = 16;
862
863	fn init_backend() -> (
864		Arc<sc_client_api::in_mem::Backend<Block>>,
865		Arc<Client<sc_client_api::in_mem::Backend<Block>>>,
866	) {
867		let backend = Arc::new(sc_client_api::in_mem::Backend::new());
868		let executor = substrate_test_runtime_client::WasmExecutor::default();
869		let client_config = sc_service::ClientConfig::default();
870		let genesis_block_builder = sc_service::GenesisBlockBuilder::new(
871			&substrate_test_runtime_client::GenesisParameters::default().genesis_storage(),
872			!client_config.no_genesis,
873			backend.clone(),
874			executor.clone(),
875		)
876		.unwrap();
877		let client = Arc::new(
878			new_in_mem::<_, Block, _, RuntimeApi>(
879				backend.clone(),
880				executor,
881				genesis_block_builder,
882				None,
883				None,
884				Box::new(TaskExecutor::new()),
885				client_config,
886			)
887			.unwrap(),
888		);
889		(backend, client)
890	}
891
892	fn produce_blocks(
893		client: Arc<Client<sc_client_api::in_mem::Backend<Block>>>,
894		num_blocks: usize,
895	) -> Vec<<Block as BlockT>::Hash> {
896		let mut blocks = Vec::with_capacity(num_blocks);
897		let mut parent_hash = client.chain_info().genesis_hash;
898
899		for i in 0..num_blocks {
900			let block = BlockBuilderBuilder::new(&*client)
901				.on_parent_block(parent_hash)
902				.with_parent_block_number(i as u64)
903				.build()
904				.unwrap()
905				.build()
906				.unwrap()
907				.block;
908			parent_hash = block.header.hash();
909			futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
910			blocks.push(block.header.hash());
911		}
912
913		blocks
914	}
915
916	#[test]
917	fn block_state_machine_register_unpin() {
918		let mut state = BlockStateMachine::new();
919		// Starts in `Registered` state.
920		assert_eq!(state, BlockStateMachine::Registered);
921
922		state.advance_register();
923		assert_eq!(state, BlockStateMachine::FullyRegistered);
924
925		// Can call register multiple times.
926		state.advance_register();
927		assert_eq!(state, BlockStateMachine::FullyRegistered);
928
929		assert!(!state.was_unpinned());
930		state.advance_unpin();
931		assert_eq!(state, BlockStateMachine::FullyUnpinned);
932		assert!(state.was_unpinned());
933
934		// Can call unpin multiple times.
935		state.advance_unpin();
936		assert_eq!(state, BlockStateMachine::FullyUnpinned);
937		assert!(state.was_unpinned());
938
939		// Nothing to advance.
940		state.advance_register();
941		assert_eq!(state, BlockStateMachine::FullyUnpinned);
942	}
943
944	#[test]
945	fn block_state_machine_unpin_register() {
946		let mut state = BlockStateMachine::new();
947		// Starts in `Registered` state.
948		assert_eq!(state, BlockStateMachine::Registered);
949
950		assert!(!state.was_unpinned());
951		state.advance_unpin();
952		assert_eq!(state, BlockStateMachine::Unpinned);
953		assert!(state.was_unpinned());
954
955		// Can call unpin multiple times.
956		state.advance_unpin();
957		assert_eq!(state, BlockStateMachine::Unpinned);
958		assert!(state.was_unpinned());
959
960		state.advance_register();
961		assert_eq!(state, BlockStateMachine::FullyUnpinned);
962		assert!(state.was_unpinned());
963
964		// Nothing to advance.
965		state.advance_register();
966		assert_eq!(state, BlockStateMachine::FullyUnpinned);
967		// Nothing to unpin.
968		state.advance_unpin();
969		assert_eq!(state, BlockStateMachine::FullyUnpinned);
970		assert!(state.was_unpinned());
971	}
972
973	#[test]
974	fn sub_state_register_twice() {
975		let (response_sender, _response_receiver) =
976			tracing_unbounded("test-chain-head-method-responses", QUEUE_SIZE_WARNING);
977		let mut sub_state = SubscriptionState::<Block> {
978			with_runtime: false,
979			tx_stop: None,
980			response_sender,
981			operations: Operations::new(MAX_OPERATIONS_PER_SUB),
982			blocks: Default::default(),
983		};
984
985		let hash = H256::random();
986		assert_eq!(sub_state.register_block(hash), true);
987		let block_state = sub_state.blocks.get(&hash).unwrap();
988		// Did not call `register_block` twice.
989		assert_eq!(block_state.state_machine, BlockStateMachine::Registered);
990
991		assert_eq!(sub_state.register_block(hash), false);
992		let block_state = sub_state.blocks.get(&hash).unwrap();
993		assert_eq!(block_state.state_machine, BlockStateMachine::FullyRegistered);
994
995		// Block is no longer tracked when: `register_block` is called twice and
996		// `unregister_block` is called once.
997		assert_eq!(sub_state.unregister_block(hash), true);
998		let block_state = sub_state.blocks.get(&hash);
999		assert!(block_state.is_none());
1000	}
1001
1002	#[test]
1003	fn sub_state_register_unregister() {
1004		let (response_sender, _response_receiver) =
1005			tracing_unbounded("test-chain-head-method-responses", QUEUE_SIZE_WARNING);
1006		let mut sub_state = SubscriptionState::<Block> {
1007			with_runtime: false,
1008			tx_stop: None,
1009			response_sender,
1010			blocks: Default::default(),
1011			operations: Operations::new(MAX_OPERATIONS_PER_SUB),
1012		};
1013
1014		let hash = H256::random();
1015		// Block was not registered before.
1016		assert_eq!(sub_state.unregister_block(hash), false);
1017
1018		assert_eq!(sub_state.register_block(hash), true);
1019		let block_state = sub_state.blocks.get(&hash).unwrap();
1020		// Did not call `register_block` twice.
1021		assert_eq!(block_state.state_machine, BlockStateMachine::Registered);
1022
1023		// Unregister block before the second `register_block`.
1024		assert_eq!(sub_state.unregister_block(hash), true);
1025		let block_state = sub_state.blocks.get(&hash).unwrap();
1026		assert_eq!(block_state.state_machine, BlockStateMachine::Unpinned);
1027
1028		assert_eq!(sub_state.register_block(hash), false);
1029		let block_state = sub_state.blocks.get(&hash);
1030		assert!(block_state.is_none());
1031
1032		// Block is no longer tracked when: `register_block` is called twice and
1033		// `unregister_block` is called once.
1034		assert_eq!(sub_state.unregister_block(hash), false);
1035		let block_state = sub_state.blocks.get(&hash);
1036		assert!(block_state.is_none());
1037	}
1038
1039	#[test]
1040	fn unpin_duplicate_hashes() {
1041		let (backend, client) = init_backend();
1042
1043		let hashes = produce_blocks(client, 3);
1044		let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
1045
1046		let mut subs =
1047			SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
1048		let id_1 = "abc".to_string();
1049		let id_2 = "abcd".to_string();
1050
1051		// Pin all blocks for the first subscription.
1052		let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
1053		assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
1054		assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
1055		assert_eq!(subs.pin_block(&id_1, hash_3).unwrap(), true);
1056
1057		// Pin only block 2 for the second subscription.
1058		let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
1059		assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
1060
1061		// Check reference count.
1062		assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
1063		assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
1064		assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
1065
1066		// Unpin the same block twice.
1067		let err = subs.unpin_blocks(&id_1, vec![hash_1, hash_1, hash_2, hash_2]).unwrap_err();
1068		assert_eq!(err, SubscriptionManagementError::DuplicateHashes);
1069
1070		// Check reference count must be unaltered.
1071		assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
1072		assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
1073		assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
1074
1075		// Unpin the blocks correctly.
1076		subs.unpin_blocks(&id_1, vec![hash_1, hash_2]).unwrap();
1077		assert_eq!(subs.global_blocks.get(&hash_1), None);
1078		assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 1);
1079		assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
1080	}
1081
1082	#[test]
1083	fn subscription_lock_block() {
1084		let builder = TestClientBuilder::new();
1085		let backend = builder.backend();
1086		let mut subs =
1087			SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
1088
1089		let id = "abc".to_string();
1090		let hash = H256::random();
1091
1092		// Subscription not inserted.
1093		let err = subs.lock_block(&id, hash, 1).unwrap_err();
1094		assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
1095
1096		let _stop = subs.insert_subscription(id.clone(), true).unwrap();
1097		// Cannot insert the same subscription ID twice.
1098		assert!(subs.insert_subscription(id.clone(), true).is_none());
1099
1100		// No block hash.
1101		let err = subs.lock_block(&id, hash, 1).unwrap_err();
1102		assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);
1103
1104		subs.remove_subscription(&id);
1105
1106		// No subscription.
1107		let err = subs.lock_block(&id, hash, 1).unwrap_err();
1108		assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
1109	}
1110
1111	#[test]
1112	fn subscription_check_block() {
1113		let (backend, client) = init_backend();
1114
1115		let hashes = produce_blocks(client, 1);
1116		let hash = hashes[0];
1117
1118		let mut subs =
1119			SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
1120		let id = "abc".to_string();
1121
1122		let _stop = subs.insert_subscription(id.clone(), true).unwrap();
1123
1124		// First time we are pinning the block.
1125		assert_eq!(subs.pin_block(&id, hash).unwrap(), true);
1126
1127		let block = subs.lock_block(&id, hash, 1).unwrap();
1128		// Subscription started with runtime updates
1129		assert_eq!(block.has_runtime(), true);
1130
1131		let invalid_id = "abc-invalid".to_string();
1132		let err = subs.unpin_blocks(&invalid_id, vec![hash]).unwrap_err();
1133		assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
1134
1135		// Unpin the block.
1136		subs.unpin_blocks(&id, vec![hash]).unwrap();
1137		let err = subs.lock_block(&id, hash, 1).unwrap_err();
1138		assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);
1139	}
1140
1141	#[test]
1142	fn subscription_ref_count() {
1143		let (backend, client) = init_backend();
1144
1145		let hashes = produce_blocks(client, 1);
1146		let hash = hashes[0];
1147
1148		let mut subs =
1149			SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
1150		let id = "abc".to_string();
1151
1152		let _stop = subs.insert_subscription(id.clone(), true).unwrap();
1153		assert_eq!(subs.pin_block(&id, hash).unwrap(), true);
1154		// Check the global ref count.
1155		assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1);
1156		// Ensure the block propagated to the subscription.
1157		subs.subs.get(&id).unwrap().blocks.get(&hash).unwrap();
1158
1159		// Insert the block for the same subscription again (simulate NewBlock + Finalized pinning)
1160		assert_eq!(subs.pin_block(&id, hash).unwrap(), false);
1161		// Check the global ref count should not get incremented.
1162		assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1);
1163
1164		// Ensure the hash propagates for the second subscription.
1165		let id_second = "abcd".to_string();
1166		let _stop = subs.insert_subscription(id_second.clone(), true).unwrap();
1167		assert_eq!(subs.pin_block(&id_second, hash).unwrap(), true);
1168		// Check the global ref count.
1169		assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 2);
1170		// Ensure the block propagated to the subscription.
1171		subs.subs.get(&id_second).unwrap().blocks.get(&hash).unwrap();
1172
1173		subs.unpin_blocks(&id, vec![hash]).unwrap();
1174		assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1);
1175		// Cannot unpin a block twice for the same subscription.
1176		let err = subs.unpin_blocks(&id, vec![hash]).unwrap_err();
1177		assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);
1178
1179		subs.unpin_blocks(&id_second, vec![hash]).unwrap();
1180		// Block unregistered from the memory.
1181		assert!(subs.global_blocks.get(&hash).is_none());
1182	}
1183
1184	#[test]
1185	fn subscription_remove_subscription() {
1186		let (backend, client) = init_backend();
1187
1188		let hashes = produce_blocks(client, 3);
1189		let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
1190
1191		let mut subs =
1192			SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
1193		let id_1 = "abc".to_string();
1194		let id_2 = "abcd".to_string();
1195
1196		// Pin all blocks for the first subscription.
1197		let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
1198		assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
1199		assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
1200		assert_eq!(subs.pin_block(&id_1, hash_3).unwrap(), true);
1201
1202		// Pin only block 2 for the second subscription.
1203		let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
1204		assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
1205
1206		// Check reference count.
1207		assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
1208		assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
1209		assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
1210
1211		subs.remove_subscription(&id_1);
1212
1213		assert!(subs.global_blocks.get(&hash_1).is_none());
1214		assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 1);
1215		assert!(subs.global_blocks.get(&hash_3).is_none());
1216
1217		subs.remove_subscription(&id_2);
1218
1219		assert!(subs.global_blocks.get(&hash_2).is_none());
1220		assert_eq!(subs.global_blocks.len(), 0);
1221	}
1222
1223	#[test]
1224	fn subscription_check_limits() {
1225		let (backend, client) = init_backend();
1226
1227		let hashes = produce_blocks(client, 3);
1228		let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
1229
1230		// Maximum number of pinned blocks is 2.
1231		let mut subs =
1232			SubscriptionsInner::new(2, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
1233		let id_1 = "abc".to_string();
1234		let id_2 = "abcd".to_string();
1235
1236		// Both subscriptions can pin the maximum limit.
1237		let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
1238		assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
1239		assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
1240
1241		let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
1242		assert_eq!(subs.pin_block(&id_2, hash_1).unwrap(), true);
1243		assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
1244
1245		// Check reference count.
1246		assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 2);
1247		assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
1248
1249		// Block 3 pinning will exceed the limit and both subscriptions
1250		// are terminated because no subscription with older blocks than 10
1251		// seconds are present.
1252		let err = subs.pin_block(&id_1, hash_3).unwrap_err();
1253		assert_eq!(err, SubscriptionManagementError::ExceededLimits);
1254
1255		// Ensure both subscriptions are removed.
1256		let err = subs.lock_block(&id_1, hash_1, 1).unwrap_err();
1257		assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
1258
1259		let err = subs.lock_block(&id_2, hash_1, 1).unwrap_err();
1260		assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
1261
1262		assert!(subs.global_blocks.get(&hash_1).is_none());
1263		assert!(subs.global_blocks.get(&hash_2).is_none());
1264		assert!(subs.global_blocks.get(&hash_3).is_none());
1265		assert_eq!(subs.global_blocks.len(), 0);
1266	}
1267
1268	#[test]
1269	fn subscription_check_limits_with_duration() {
1270		let (backend, client) = init_backend();
1271
1272		let hashes = produce_blocks(client, 3);
1273		let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
1274
1275		// Maximum number of pinned blocks is 2 and maximum pin duration is 5 second.
1276		let mut subs =
1277			SubscriptionsInner::new(2, Duration::from_secs(5), MAX_OPERATIONS_PER_SUB, backend);
1278		let id_1 = "abc".to_string();
1279		let id_2 = "abcd".to_string();
1280
1281		let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
1282		assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
1283		assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
1284
1285		// Maximum pin duration is 5 second, sleep 5 seconds to ensure we clean up
1286		// the first subscription.
1287		std::thread::sleep(std::time::Duration::from_secs(5));
1288
1289		let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
1290		assert_eq!(subs.pin_block(&id_2, hash_1).unwrap(), true);
1291
1292		// Check reference count.
1293		assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 2);
1294		assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 1);
1295
1296		// Second subscription has only 1 block pinned. Only the first subscription is terminated.
1297		let err = subs.pin_block(&id_1, hash_3).unwrap_err();
1298		assert_eq!(err, SubscriptionManagementError::ExceededLimits);
1299
1300		// Ensure both subscriptions are removed.
1301		let err = subs.lock_block(&id_1, hash_1, 1).unwrap_err();
1302		assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
1303
1304		let _block_guard = subs.lock_block(&id_2, hash_1, 1).unwrap();
1305
1306		assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
1307		assert!(subs.global_blocks.get(&hash_2).is_none());
1308		assert!(subs.global_blocks.get(&hash_3).is_none());
1309		assert_eq!(subs.global_blocks.len(), 1);
1310
1311		// Force second subscription to get terminated.
1312		assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
1313		let err = subs.pin_block(&id_2, hash_3).unwrap_err();
1314		assert_eq!(err, SubscriptionManagementError::ExceededLimits);
1315
1316		assert!(subs.global_blocks.get(&hash_1).is_none());
1317		assert!(subs.global_blocks.get(&hash_2).is_none());
1318		assert!(subs.global_blocks.get(&hash_3).is_none());
1319		assert_eq!(subs.global_blocks.len(), 0);
1320	}
1321
1322	#[test]
1323	fn subscription_check_stop_event() {
1324		let builder = TestClientBuilder::new();
1325		let backend = builder.backend();
1326		let mut subs =
1327			SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
1328
1329		let id = "abc".to_string();
1330
1331		let mut sub_data = subs.insert_subscription(id.clone(), true).unwrap();
1332
1333		// Check the stop signal was not received.
1334		let res = sub_data.rx_stop.try_recv().unwrap();
1335		assert!(res.is_none());
1336
1337		let sub = subs.subs.get_mut(&id).unwrap();
1338		sub.stop();
1339
1340		// Check the signal was received.
1341		let res = sub_data.rx_stop.try_recv().unwrap();
1342		assert!(res.is_some());
1343	}
1344
1345	#[test]
1346	fn ongoing_operations() {
1347		// The object can hold at most 2 operations.
1348		let ops = LimitOperations::new(2);
1349
1350		// One operation is reserved.
1351		let permit_one = ops.reserve_at_most(1).unwrap();
1352		assert_eq!(permit_one.num_ops, 1);
1353
1354		// Request 2 operations, however there is capacity only for one.
1355		let permit_two = ops.reserve_at_most(2).unwrap();
1356		// Number of reserved permits is smaller than provided.
1357		assert_eq!(permit_two.num_ops, 1);
1358
1359		// Try to reserve operations when there's no space.
1360		let permit = ops.reserve_at_most(1);
1361		assert!(permit.is_none());
1362
1363		// Release capacity.
1364		drop(permit_two);
1365
1366		// Can reserve again
1367		let permit_three = ops.reserve_at_most(1).unwrap();
1368		assert_eq!(permit_three.num_ops, 1);
1369	}
1370
1371	#[test]
1372	fn stop_all_subscriptions() {
1373		let (backend, client) = init_backend();
1374
1375		let hashes = produce_blocks(client, 3);
1376		let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
1377
1378		let mut subs =
1379			SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
1380		let id_1 = "abc".to_string();
1381		let id_2 = "abcd".to_string();
1382
1383		// Pin all blocks for the first subscription.
1384		let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
1385		assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
1386		assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
1387		assert_eq!(subs.pin_block(&id_1, hash_3).unwrap(), true);
1388
1389		// Pin only block 2 for the second subscription.
1390		let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
1391		assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
1392
1393		// Check reference count.
1394		assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
1395		assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
1396		assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
1397		assert_eq!(subs.global_blocks.len(), 3);
1398
1399		// Stop all active subscriptions.
1400		subs.stop_all_subscriptions();
1401		assert!(subs.global_blocks.is_empty());
1402	}
1403
1404	#[test]
1405	fn reserved_subscription_cleans_resources() {
1406		let builder = TestClientBuilder::new();
1407		let backend = builder.backend();
1408		let subs = Arc::new(parking_lot::RwLock::new(SubscriptionsInner::new(
1409			10,
1410			Duration::from_secs(10),
1411			MAX_OPERATIONS_PER_SUB,
1412			backend,
1413		)));
1414
1415		// Maximum 2 subscriptions per connection.
1416		let rpc_connections = crate::common::connections::RpcConnections::new(2);
1417
1418		let subscription_management =
1419			crate::chain_head::subscription::SubscriptionManagement::_from_inner(
1420				subs.clone(),
1421				rpc_connections.clone(),
1422			);
1423
1424		let reserved_sub_first =
1425			subscription_management.reserve_subscription(ConnectionId(1)).unwrap();
1426		let mut reserved_sub_second =
1427			subscription_management.reserve_subscription(ConnectionId(1)).unwrap();
1428		// Subscriptions reserved but not yet populated.
1429		assert_eq!(subs.read().subs.len(), 0);
1430
1431		// Cannot reserve anymore.
1432		assert!(subscription_management.reserve_subscription(ConnectionId(1)).is_none());
1433		// Drop the first subscription.
1434		drop(reserved_sub_first);
1435		// Space is freed-up for the rpc connections.
1436		let mut reserved_sub_first =
1437			subscription_management.reserve_subscription(ConnectionId(1)).unwrap();
1438
1439		// Insert subscriptions.
1440		let _sub_data_first =
1441			reserved_sub_first.insert_subscription("sub1".to_string(), true).unwrap();
1442		let _sub_data_second =
1443			reserved_sub_second.insert_subscription("sub2".to_string(), true).unwrap();
1444		// Check we have 2 subscriptions under management.
1445		assert_eq!(subs.read().subs.len(), 2);
1446
1447		// Drop first reserved subscription.
1448		drop(reserved_sub_first);
1449		// Check that the subscription is removed.
1450		assert_eq!(subs.read().subs.len(), 1);
1451		// Space is freed-up for the rpc connections.
1452		let reserved_sub_first =
1453			subscription_management.reserve_subscription(ConnectionId(1)).unwrap();
1454
1455		// Drop all subscriptions.
1456		drop(reserved_sub_first);
1457		drop(reserved_sub_second);
1458		assert_eq!(subs.read().subs.len(), 0);
1459	}
1460}