referrerpolicy=no-referrer-when-downgrade

sc_rpc_spec_v2/common/
connections.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use jsonrpsee::ConnectionId;
20use parking_lot::Mutex;
21use std::{
22	collections::{HashMap, HashSet},
23	sync::Arc,
24};
25
26/// Connection state which keeps track whether a connection exist and
27/// the number of concurrent operations.
28#[derive(Default, Clone)]
29pub struct RpcConnections {
30	/// The number of identifiers that can be registered for each connection.
31	///
32	/// # Example
33	///
34	/// This is used to limit how many `chainHead_follow` subscriptions are active at one time.
35	capacity: usize,
36	/// Map the connecton ID to a set of identifiers.
37	data: Arc<Mutex<HashMap<ConnectionId, ConnectionData>>>,
38}
39
40#[derive(Default)]
41struct ConnectionData {
42	/// The total number of identifiers for the given connection.
43	///
44	/// An identifier for a connection might be:
45	/// - the subscription ID for chainHead_follow
46	/// - the operation ID for the transactionBroadcast API
47	/// - or simply how many times the transaction API has been called.
48	///
49	/// # Note
50	///
51	/// Because a pending subscription sink does not expose the future subscription ID,
52	/// we cannot register a subscription ID before the pending subscription is accepted.
53	/// This variable ensures that we have enough capacity to register an identifier, after
54	/// the subscription is accepted. Otherwise, a jsonrpc error object should be returned.
55	num_identifiers: usize,
56	/// Active registered identifiers for the given connection.
57	///
58	/// # Note
59	///
60	/// For chainHead, this represents the subscription ID.
61	/// For transactionBroadcast, this represents the operation ID.
62	/// For transaction, this is empty and the number of active calls is tracked by
63	/// [`Self::num_identifiers`].
64	identifiers: HashSet<String>,
65}
66
67impl RpcConnections {
68	/// Constructs a new instance of [`RpcConnections`].
69	pub fn new(capacity: usize) -> Self {
70		RpcConnections { capacity, data: Default::default() }
71	}
72
73	/// Reserve space for a new connection identifier.
74	///
75	/// If the number of active identifiers for the given connection exceeds the capacity,
76	/// returns None.
77	pub fn reserve_space(&self, connection_id: ConnectionId) -> Option<ReservedConnection> {
78		let mut data = self.data.lock();
79
80		let entry = data.entry(connection_id).or_insert_with(ConnectionData::default);
81		if entry.num_identifiers >= self.capacity {
82			return None;
83		}
84		entry.num_identifiers = entry.num_identifiers.saturating_add(1);
85
86		Some(ReservedConnection { connection_id, rpc_connections: Some(self.clone()) })
87	}
88
89	/// Gives back the reserved space before the connection identifier is registered.
90	///
91	/// # Note
92	///
93	/// This may happen if the pending subscription cannot be accepted (unlikely).
94	fn unreserve_space(&self, connection_id: ConnectionId) {
95		let mut data = self.data.lock();
96
97		let entry = data.entry(connection_id).or_insert_with(ConnectionData::default);
98		entry.num_identifiers = entry.num_identifiers.saturating_sub(1);
99
100		if entry.num_identifiers == 0 {
101			data.remove(&connection_id);
102		}
103	}
104
105	/// Register an identifier for the given connection.
106	///
107	/// Users must call [`Self::reserve_space`] before calling this method to ensure enough
108	/// space is available.
109	///
110	/// Returns true if the identifier was inserted successfully, false if the identifier was
111	/// already inserted or reached capacity.
112	fn register_identifier(&self, connection_id: ConnectionId, identifier: String) -> bool {
113		let mut data = self.data.lock();
114
115		let entry = data.entry(connection_id).or_insert_with(ConnectionData::default);
116		// Should be already checked `Self::reserve_space`.
117		if entry.identifiers.len() >= self.capacity {
118			return false;
119		}
120
121		entry.identifiers.insert(identifier)
122	}
123
124	/// Unregister an identifier for the given connection.
125	fn unregister_identifier(&self, connection_id: ConnectionId, identifier: &str) {
126		let mut data = self.data.lock();
127		if let Some(connection_data) = data.get_mut(&connection_id) {
128			connection_data.identifiers.remove(identifier);
129			connection_data.num_identifiers = connection_data.num_identifiers.saturating_sub(1);
130
131			if connection_data.num_identifiers == 0 {
132				data.remove(&connection_id);
133			}
134		}
135	}
136
137	/// Check if the given connection contains the given identifier.
138	pub fn contains_identifier(&self, connection_id: ConnectionId, identifier: &str) -> bool {
139		let data = self.data.lock();
140		data.get(&connection_id)
141			.map(|connection_data| connection_data.identifiers.contains(identifier))
142			.unwrap_or(false)
143	}
144}
145
146/// RAII wrapper that ensures the reserved space is given back if the object is
147/// dropped before the identifier is registered.
148pub struct ReservedConnection {
149	connection_id: ConnectionId,
150	rpc_connections: Option<RpcConnections>,
151}
152
153impl ReservedConnection {
154	/// Register the identifier for the given connection.
155	pub fn register(mut self, identifier: String) -> Option<RegisteredConnection> {
156		let rpc_connections = self.rpc_connections.take()?;
157
158		if rpc_connections.register_identifier(self.connection_id, identifier.clone()) {
159			Some(RegisteredConnection {
160				connection_id: self.connection_id,
161				identifier,
162				rpc_connections,
163			})
164		} else {
165			None
166		}
167	}
168}
169
170impl Drop for ReservedConnection {
171	fn drop(&mut self) {
172		if let Some(rpc_connections) = self.rpc_connections.take() {
173			rpc_connections.unreserve_space(self.connection_id);
174		}
175	}
176}
177
178/// RAII wrapper that ensures the identifier is unregistered if the object is dropped.
179pub struct RegisteredConnection {
180	connection_id: ConnectionId,
181	identifier: String,
182	rpc_connections: RpcConnections,
183}
184
185impl Drop for RegisteredConnection {
186	fn drop(&mut self) {
187		self.rpc_connections.unregister_identifier(self.connection_id, &self.identifier);
188	}
189}
190
191#[cfg(test)]
192mod tests {
193	use super::*;
194
195	#[test]
196	fn reserve_space() {
197		let rpc_connections = RpcConnections::new(2);
198		let conn_id = ConnectionId(1);
199		let reserved = rpc_connections.reserve_space(conn_id);
200
201		assert!(reserved.is_some());
202		assert_eq!(1, rpc_connections.data.lock().get(&conn_id).unwrap().num_identifiers);
203		assert_eq!(rpc_connections.data.lock().len(), 1);
204
205		let reserved = reserved.unwrap();
206		let registered = reserved.register("identifier1".to_string()).unwrap();
207		assert!(rpc_connections.contains_identifier(conn_id, "identifier1"));
208		assert_eq!(1, rpc_connections.data.lock().get(&conn_id).unwrap().num_identifiers);
209		drop(registered);
210
211		// Data is dropped.
212		assert!(rpc_connections.data.lock().get(&conn_id).is_none());
213		assert!(rpc_connections.data.lock().is_empty());
214		// Checks can still happen.
215		assert!(!rpc_connections.contains_identifier(conn_id, "identifier1"));
216	}
217
218	#[test]
219	fn reserve_space_capacity_reached() {
220		let rpc_connections = RpcConnections::new(2);
221		let conn_id = ConnectionId(1);
222
223		// Reserve identifier for connection 1.
224		let reserved = rpc_connections.reserve_space(conn_id);
225		assert!(reserved.is_some());
226		assert_eq!(1, rpc_connections.data.lock().get(&conn_id).unwrap().num_identifiers);
227
228		// Add identifier for connection 1.
229		let reserved = reserved.unwrap();
230		let registered = reserved.register("identifier1".to_string()).unwrap();
231		assert!(rpc_connections.contains_identifier(conn_id, "identifier1"));
232		assert_eq!(1, rpc_connections.data.lock().get(&conn_id).unwrap().num_identifiers);
233
234		// Reserve identifier for connection 1 again.
235		let reserved = rpc_connections.reserve_space(conn_id);
236		assert!(reserved.is_some());
237		assert_eq!(2, rpc_connections.data.lock().get(&conn_id).unwrap().num_identifiers);
238
239		// Add identifier for connection 1 again.
240		let reserved = reserved.unwrap();
241		let registered_second = reserved.register("identifier2".to_string()).unwrap();
242		assert!(rpc_connections.contains_identifier(conn_id, "identifier2"));
243		assert_eq!(2, rpc_connections.data.lock().get(&conn_id).unwrap().num_identifiers);
244
245		// Cannot reserve more identifiers.
246		let reserved = rpc_connections.reserve_space(conn_id);
247		assert!(reserved.is_none());
248
249		// Drop the first identifier.
250		drop(registered);
251		assert_eq!(1, rpc_connections.data.lock().get(&conn_id).unwrap().num_identifiers);
252		assert!(rpc_connections.contains_identifier(conn_id, "identifier2"));
253		assert!(!rpc_connections.contains_identifier(conn_id, "identifier1"));
254
255		// Can reserve again after clearing the space.
256		let reserved = rpc_connections.reserve_space(conn_id);
257		assert!(reserved.is_some());
258		assert_eq!(2, rpc_connections.data.lock().get(&conn_id).unwrap().num_identifiers);
259
260		// Ensure data is cleared.
261		drop(reserved);
262		drop(registered_second);
263		assert!(rpc_connections.data.lock().get(&conn_id).is_none());
264	}
265}