sc_rpc_spec_v2/chain_head/subscription/
mod.rs1use jsonrpsee::ConnectionId;
20use parking_lot::RwLock;
21use sc_client_api::Backend;
22use sp_runtime::traits::Block as BlockT;
23use std::{sync::Arc, time::Duration};
24
25mod error;
26mod inner;
27
28use crate::{
29 chain_head::chain_head::LOG_TARGET,
30 common::connections::{RegisteredConnection, ReservedConnection, RpcConnections},
31};
32
33use self::inner::SubscriptionsInner;
34
35pub use self::inner::OperationState;
36pub use error::SubscriptionManagementError;
37pub use inner::{BlockGuard, InsertedSubscriptionData, StopHandle};
38
39pub struct SubscriptionManagement<Block: BlockT, BE: Backend<Block>> {
41 inner: Arc<RwLock<SubscriptionsInner<Block, BE>>>,
44
45 rpc_connections: RpcConnections,
50}
51
52impl<Block: BlockT, BE: Backend<Block>> Clone for SubscriptionManagement<Block, BE> {
53 fn clone(&self) -> Self {
54 SubscriptionManagement {
55 inner: self.inner.clone(),
56 rpc_connections: self.rpc_connections.clone(),
57 }
58 }
59}
60
61impl<Block: BlockT, BE: Backend<Block>> SubscriptionManagement<Block, BE> {
62 pub fn new(
64 global_max_pinned_blocks: usize,
65 local_max_pin_duration: Duration,
66 max_ongoing_operations: usize,
67 max_follow_subscriptions_per_connection: usize,
68 backend: Arc<BE>,
69 ) -> Self {
70 SubscriptionManagement {
71 inner: Arc::new(RwLock::new(SubscriptionsInner::new(
72 global_max_pinned_blocks,
73 local_max_pin_duration,
74 max_ongoing_operations,
75 backend,
76 ))),
77 rpc_connections: RpcConnections::new(max_follow_subscriptions_per_connection),
78 }
79 }
80
81 #[cfg(test)]
87 pub(crate) fn _from_inner(
88 inner: Arc<RwLock<SubscriptionsInner<Block, BE>>>,
89 rpc_connections: RpcConnections,
90 ) -> Self {
91 SubscriptionManagement { inner, rpc_connections }
92 }
93
94 pub fn reserve_subscription(
98 &self,
99 connection_id: ConnectionId,
100 ) -> Option<ReservedSubscription<Block, BE>> {
101 let reserved_token = self.rpc_connections.reserve_space(connection_id)?;
102
103 Some(ReservedSubscription {
104 state: ConnectionState::Reserved(reserved_token),
105 inner: self.inner.clone(),
106 })
107 }
108
109 pub fn contains_subscription(
111 &self,
112 connection_id: ConnectionId,
113 subscription_id: &str,
114 ) -> bool {
115 self.rpc_connections.contains_identifier(connection_id, subscription_id)
116 }
117
118 pub fn remove_subscription(&self, sub_id: &str) {
120 let mut inner = self.inner.write();
121 inner.remove_subscription(sub_id)
122 }
123
124 pub fn pin_block(
135 &self,
136 sub_id: &str,
137 hash: Block::Hash,
138 ) -> Result<bool, SubscriptionManagementError> {
139 let mut inner = self.inner.write();
140 inner.pin_block(sub_id, hash)
141 }
142
143 pub fn unpin_blocks(
154 &self,
155 sub_id: &str,
156 hashes: impl IntoIterator<Item = Block::Hash> + Clone,
157 ) -> Result<(), SubscriptionManagementError> {
158 let mut inner = self.inner.write();
159 inner.unpin_blocks(sub_id, hashes)
160 }
161
162 pub fn lock_block(
170 &self,
171 sub_id: &str,
172 hash: Block::Hash,
173 to_reserve: usize,
174 ) -> Result<BlockGuard<Block, BE>, SubscriptionManagementError> {
175 let mut inner = self.inner.write();
176 inner.lock_block(sub_id, hash, to_reserve)
177 }
178
179 pub fn get_operation(&self, sub_id: &str, operation_id: &str) -> Option<OperationState> {
181 let mut inner = self.inner.write();
182 inner.get_operation(sub_id, operation_id)
183 }
184}
185
186enum ConnectionState {
191 Reserved(ReservedConnection),
192 Registered { _unregister_on_drop: RegisteredConnection, sub_id: String },
193 Empty,
194}
195
196pub struct ReservedSubscription<Block: BlockT, BE: Backend<Block>> {
199 state: ConnectionState,
200 inner: Arc<RwLock<SubscriptionsInner<Block, BE>>>,
201}
202
203impl<Block: BlockT, BE: Backend<Block>> ReservedSubscription<Block, BE> {
204 pub fn insert_subscription(
214 &mut self,
215 sub_id: String,
216 runtime_updates: bool,
217 ) -> Option<InsertedSubscriptionData<Block>> {
218 match std::mem::replace(&mut self.state, ConnectionState::Empty) {
219 ConnectionState::Reserved(reserved) => {
220 let registered_token = reserved.register(sub_id.clone())?;
221 self.state = ConnectionState::Registered {
222 _unregister_on_drop: registered_token,
223 sub_id: sub_id.clone(),
224 };
225
226 let mut inner = self.inner.write();
227 inner.insert_subscription(sub_id, runtime_updates)
228 },
229 ConnectionState::Registered { .. } | ConnectionState::Empty => {
231 log::error!(target: LOG_TARGET, "Called insert_subscription on a connection that is not reserved");
232 None
233 },
234 }
235 }
236
237 pub fn stop_all_subscriptions(&self) {
242 let mut inner = self.inner.write();
243 inner.stop_all_subscriptions()
244 }
245}
246
247impl<Block: BlockT, BE: Backend<Block>> Drop for ReservedSubscription<Block, BE> {
248 fn drop(&mut self) {
249 if let ConnectionState::Registered { sub_id, .. } = &self.state {
250 self.inner.write().remove_subscription(sub_id);
251 }
252 }
253}