referrerpolicy=no-referrer-when-downgrade

sc_rpc_spec_v2/chain_head/subscription/
mod.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use jsonrpsee::ConnectionId;
20use parking_lot::RwLock;
21use sc_client_api::Backend;
22use sp_runtime::traits::Block as BlockT;
23use std::{sync::Arc, time::Duration};
24
25mod error;
26mod inner;
27
28use crate::{
29	chain_head::chain_head::LOG_TARGET,
30	common::connections::{RegisteredConnection, ReservedConnection, RpcConnections},
31};
32
33use self::inner::SubscriptionsInner;
34
35pub use self::inner::OperationState;
36pub use error::SubscriptionManagementError;
37pub use inner::{BlockGuard, InsertedSubscriptionData, StopHandle};
38
39/// Manage block pinning / unpinning for subscription IDs.
40pub struct SubscriptionManagement<Block: BlockT, BE: Backend<Block>> {
41	/// Manage subscription by mapping the subscription ID
42	/// to a set of block hashes.
43	inner: Arc<RwLock<SubscriptionsInner<Block, BE>>>,
44
45	/// Ensures that chainHead methods can be called from a single connection context.
46	///
47	/// For example, `chainHead_storage` cannot be called with a subscription ID that
48	/// was obtained from a different connection.
49	rpc_connections: RpcConnections,
50}
51
52impl<Block: BlockT, BE: Backend<Block>> Clone for SubscriptionManagement<Block, BE> {
53	fn clone(&self) -> Self {
54		SubscriptionManagement {
55			inner: self.inner.clone(),
56			rpc_connections: self.rpc_connections.clone(),
57		}
58	}
59}
60
61impl<Block: BlockT, BE: Backend<Block>> SubscriptionManagement<Block, BE> {
62	/// Construct a new [`SubscriptionManagement`].
63	pub fn new(
64		global_max_pinned_blocks: usize,
65		local_max_pin_duration: Duration,
66		max_ongoing_operations: usize,
67		max_follow_subscriptions_per_connection: usize,
68		backend: Arc<BE>,
69	) -> Self {
70		SubscriptionManagement {
71			inner: Arc::new(RwLock::new(SubscriptionsInner::new(
72				global_max_pinned_blocks,
73				local_max_pin_duration,
74				max_ongoing_operations,
75				backend,
76			))),
77			rpc_connections: RpcConnections::new(max_follow_subscriptions_per_connection),
78		}
79	}
80
81	/// Create a new instance from the inner state.
82	///
83	/// # Note
84	///
85	/// Used for testing.
86	#[cfg(test)]
87	pub(crate) fn _from_inner(
88		inner: Arc<RwLock<SubscriptionsInner<Block, BE>>>,
89		rpc_connections: RpcConnections,
90	) -> Self {
91		SubscriptionManagement { inner, rpc_connections }
92	}
93
94	/// Reserve space for a subscriptions.
95	///
96	/// Fails if the connection ID is has reached the maximum number of active subscriptions.
97	pub fn reserve_subscription(
98		&self,
99		connection_id: ConnectionId,
100	) -> Option<ReservedSubscription<Block, BE>> {
101		let reserved_token = self.rpc_connections.reserve_space(connection_id)?;
102
103		Some(ReservedSubscription {
104			state: ConnectionState::Reserved(reserved_token),
105			inner: self.inner.clone(),
106		})
107	}
108
109	/// Check if the given connection contains the given subscription.
110	pub fn contains_subscription(
111		&self,
112		connection_id: ConnectionId,
113		subscription_id: &str,
114	) -> bool {
115		self.rpc_connections.contains_identifier(connection_id, subscription_id)
116	}
117
118	/// Remove the subscription ID with associated pinned blocks.
119	pub fn remove_subscription(&self, sub_id: &str) {
120		let mut inner = self.inner.write();
121		inner.remove_subscription(sub_id)
122	}
123
124	/// The block is pinned in the backend only once when the block's hash is first encountered.
125	///
126	/// Each subscription is expected to call this method twice:
127	/// - once from the `NewBlock` import
128	/// - once from the `Finalized` import
129	///
130	/// Returns
131	/// - Ok(true) if the subscription did not previously contain this block
132	/// - Ok(false) if the subscription already contained this this
133	/// - Error if the backend failed to pin the block or the subscription ID is invalid
134	pub fn pin_block(
135		&self,
136		sub_id: &str,
137		hash: Block::Hash,
138	) -> Result<bool, SubscriptionManagementError> {
139		let mut inner = self.inner.write();
140		inner.pin_block(sub_id, hash)
141	}
142
143	/// Unpin the blocks from the subscription.
144	///
145	/// Blocks are reference counted and when the last subscription unpins a given block, the block
146	/// is also unpinned from the backend.
147	///
148	/// This method is called only once per subscription.
149	///
150	/// Returns an error if the subscription ID is invalid, or any of the blocks are not pinned
151	/// for the subscriptions. When an error is returned, it is guaranteed that no blocks have
152	/// been unpinned.
153	pub fn unpin_blocks(
154		&self,
155		sub_id: &str,
156		hashes: impl IntoIterator<Item = Block::Hash> + Clone,
157	) -> Result<(), SubscriptionManagementError> {
158		let mut inner = self.inner.write();
159		inner.unpin_blocks(sub_id, hashes)
160	}
161
162	/// Ensure the block remains pinned until the return object is dropped.
163	///
164	/// Returns a [`BlockGuard`] that pins and unpins the block hash in RAII manner
165	/// and reserves capacity for ogoing operations.
166	///
167	/// Returns an error if the block hash is not pinned for the subscription,
168	/// the subscription ID is invalid or the limit of ongoing operations was exceeded.
169	pub fn lock_block(
170		&self,
171		sub_id: &str,
172		hash: Block::Hash,
173		to_reserve: usize,
174	) -> Result<BlockGuard<Block, BE>, SubscriptionManagementError> {
175		let mut inner = self.inner.write();
176		inner.lock_block(sub_id, hash, to_reserve)
177	}
178
179	/// Get the operation state.
180	pub fn get_operation(&self, sub_id: &str, operation_id: &str) -> Option<OperationState> {
181		let mut inner = self.inner.write();
182		inner.get_operation(sub_id, operation_id)
183	}
184}
185
186/// The state of the connection.
187///
188/// The state starts in a [`ConnectionState::Reserved`] state and then transitions to
189/// [`ConnectionState::Registered`] when the subscription is inserted.
190enum ConnectionState {
191	Reserved(ReservedConnection),
192	Registered { _unregister_on_drop: RegisteredConnection, sub_id: String },
193	Empty,
194}
195
196/// RAII wrapper that removes the subscription from internal mappings and
197/// gives back the reserved space for the connection.
198pub struct ReservedSubscription<Block: BlockT, BE: Backend<Block>> {
199	state: ConnectionState,
200	inner: Arc<RwLock<SubscriptionsInner<Block, BE>>>,
201}
202
203impl<Block: BlockT, BE: Backend<Block>> ReservedSubscription<Block, BE> {
204	/// Insert a new subscription ID.
205	///
206	/// If the subscription was not previously inserted, returns the receiver that is
207	/// triggered upon the "Stop" event. Otherwise, if the subscription ID was already
208	/// inserted returns none.
209	///
210	/// # Note
211	///
212	/// This method should be called only once.
213	pub fn insert_subscription(
214		&mut self,
215		sub_id: String,
216		runtime_updates: bool,
217	) -> Option<InsertedSubscriptionData<Block>> {
218		match std::mem::replace(&mut self.state, ConnectionState::Empty) {
219			ConnectionState::Reserved(reserved) => {
220				let registered_token = reserved.register(sub_id.clone())?;
221				self.state = ConnectionState::Registered {
222					_unregister_on_drop: registered_token,
223					sub_id: sub_id.clone(),
224				};
225
226				let mut inner = self.inner.write();
227				inner.insert_subscription(sub_id, runtime_updates)
228			},
229			// Cannot insert multiple subscriptions into one single reserved space.
230			ConnectionState::Registered { .. } | ConnectionState::Empty => {
231				log::error!(target: LOG_TARGET, "Called insert_subscription on a connection that is not reserved");
232				None
233			},
234		}
235	}
236
237	/// Stop all active subscriptions.
238	///
239	/// For all active subscriptions, the internal data is discarded, blocks are unpinned and the
240	/// `Stop` event will be generated.
241	pub fn stop_all_subscriptions(&self) {
242		let mut inner = self.inner.write();
243		inner.stop_all_subscriptions()
244	}
245}
246
247impl<Block: BlockT, BE: Backend<Block>> Drop for ReservedSubscription<Block, BE> {
248	fn drop(&mut self) {
249		if let ConnectionState::Registered { sub_id, .. } = &self.state {
250			self.inner.write().remove_subscription(sub_id);
251		}
252	}
253}