jsonrpsee_core/client/async_client/
manager.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any
4// person obtaining a copy of this software and associated
5// documentation files (the "Software"), to deal in the
6// Software without restriction, including without
7// limitation the rights to use, copy, modify, merge,
8// publish, distribute, sublicense, and/or sell copies of
9// the Software, and to permit persons to whom the Software
10// is furnished to do so, subject to the following
11// conditions:
12//
13// The above copyright notice and this permission notice
14// shall be included in all copies or substantial portions
15// of the Software.
16//
17// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
18// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
19// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
20// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
21// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
22// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
24// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25// DEALINGS IN THE SOFTWARE.
26
27//! Handles and monitors JSONRPC v2 method calls and subscriptions
28//!
29//! Definitions:
30//!
31//!    - RequestId: request ID in the JSONRPC-v2 specification
32//!    > **Note**: The spec allow number, string or null but this crate only supports numbers.
33//!    - SubscriptionId: unique ID generated by server
34
35use std::{
36	collections::{hash_map::Entry, HashMap},
37	ops::Range,
38};
39
40use crate::{
41	client::{BatchEntry, Error, SubscriptionReceiver, SubscriptionSender},
42	error::RegisterMethodError,
43};
44use jsonrpsee_types::{Id, SubscriptionId};
45use rustc_hash::FxHashMap;
46use serde_json::value::Value as JsonValue;
47use tokio::sync::oneshot;
48
49#[derive(Debug)]
50enum Kind {
51	PendingMethodCall(PendingCallOneshot),
52	PendingSubscription((RequestId, PendingSubscriptionOneshot, UnsubscribeMethod)),
53	Subscription((RequestId, SubscriptionSink, UnsubscribeMethod)),
54}
55
56#[derive(Debug, Clone)]
57/// Indicates the status of a given request/response.
58pub(crate) enum RequestStatus {
59	/// The method call is waiting for a response,
60	PendingMethodCall,
61	/// The subscription is waiting for a response to become an active subscription.
62	PendingSubscription,
63	/// An active subscription.
64	Subscription,
65	/// Invalid request ID.
66	Invalid,
67}
68
69type PendingCallOneshot = Option<oneshot::Sender<Result<JsonValue, Error>>>;
70type PendingBatchOneshot = oneshot::Sender<Result<Vec<BatchEntry<'static, JsonValue>>, Error>>;
71type PendingSubscriptionOneshot = oneshot::Sender<Result<(SubscriptionReceiver, SubscriptionId<'static>), Error>>;
72type SubscriptionSink = SubscriptionSender;
73type UnsubscribeMethod = String;
74type RequestId = Id<'static>;
75
76#[derive(Debug)]
77/// Batch state.
78pub(crate) struct BatchState {
79	/// Oneshot send back.
80	pub(crate) send_back: PendingBatchOneshot,
81}
82
83#[derive(Debug, Default)]
84/// Manages and monitors JSONRPC v2 method calls and subscriptions.
85pub(crate) struct RequestManager {
86	/// List of requests that are waiting for a response from the server.
87	// NOTE: FnvHashMap is used here because RequestId is not under the caller's control and is known to be a short
88	// key.
89	requests: FxHashMap<RequestId, Kind>,
90	/// Reverse lookup, to find a request ID in constant time by `subscription ID` instead of looking through all
91	/// requests.
92	subscriptions: HashMap<SubscriptionId<'static>, RequestId>,
93	/// Pending batch requests.
94	batches: FxHashMap<Range<u64>, BatchState>,
95	/// Registered Methods for incoming notifications.
96	notification_handlers: HashMap<String, SubscriptionSink>,
97}
98
99impl RequestManager {
100	/// Create a new `RequestManager`.
101	#[allow(unused)]
102	pub(crate) fn new() -> Self {
103		Self::default()
104	}
105
106	/// Tries to insert a new pending request.
107	///
108	/// Returns `Ok` if the pending request was successfully inserted otherwise `Err`.
109	pub(crate) fn insert_pending_call(
110		&mut self,
111		id: RequestId,
112		send_back: PendingCallOneshot,
113	) -> Result<(), PendingCallOneshot> {
114		if let Entry::Vacant(v) = self.requests.entry(id) {
115			v.insert(Kind::PendingMethodCall(send_back));
116			Ok(())
117		} else {
118			Err(send_back)
119		}
120	}
121
122	/// Tries to insert a new batch request.
123	///
124	/// Returns `Ok` if the pending request was successfully inserted otherwise `Err`.
125	pub(crate) fn insert_pending_batch(
126		&mut self,
127		batch: Range<u64>,
128		send_back: PendingBatchOneshot,
129	) -> Result<(), PendingBatchOneshot> {
130		if let Entry::Vacant(v) = self.batches.entry(batch) {
131			v.insert(BatchState { send_back });
132			Ok(())
133		} else {
134			Err(send_back)
135		}
136	}
137
138	/// Tries to insert a new pending subscription and reserves a slot for a "potential" unsubscription request.
139	///
140	/// Returns `Ok` if the pending request was successfully inserted otherwise `Err`.
141	pub(crate) fn insert_pending_subscription(
142		&mut self,
143		sub_req_id: RequestId,
144		unsub_req_id: RequestId,
145		send_back: PendingSubscriptionOneshot,
146		unsubscribe_method: UnsubscribeMethod,
147	) -> Result<(), PendingSubscriptionOneshot> {
148		// The request IDs are not in the manager and the `sub_id` and `unsub_id` are not equal.
149		if !self.requests.contains_key(&sub_req_id)
150			&& !self.requests.contains_key(&unsub_req_id)
151			&& sub_req_id != unsub_req_id
152		{
153			self.requests
154				.insert(sub_req_id, Kind::PendingSubscription((unsub_req_id.clone(), send_back, unsubscribe_method)));
155			self.requests.insert(unsub_req_id, Kind::PendingMethodCall(None));
156			Ok(())
157		} else {
158			Err(send_back)
159		}
160	}
161
162	/// Tries to insert a new subscription.
163	///
164	/// Returns `Ok` if the pending request was successfully inserted otherwise `Err`.
165	pub(crate) fn insert_subscription(
166		&mut self,
167		sub_req_id: RequestId,
168		unsub_req_id: RequestId,
169		subscription_id: SubscriptionId<'static>,
170		send_back: SubscriptionSink,
171		unsubscribe_method: UnsubscribeMethod,
172	) -> Result<(), SubscriptionSink> {
173		if let (Entry::Vacant(request), Entry::Vacant(subscription)) =
174			(self.requests.entry(sub_req_id.clone()), self.subscriptions.entry(subscription_id))
175		{
176			request.insert(Kind::Subscription((unsub_req_id, send_back, unsubscribe_method)));
177			subscription.insert(sub_req_id);
178			Ok(())
179		} else {
180			Err(send_back)
181		}
182	}
183
184	/// Inserts a handler for incoming notifications.
185	pub(crate) fn insert_notification_handler(
186		&mut self,
187		method: &str,
188		send_back: SubscriptionSink,
189	) -> Result<(), RegisterMethodError> {
190		if let Entry::Vacant(handle) = self.notification_handlers.entry(method.to_owned()) {
191			handle.insert(send_back);
192			Ok(())
193		} else {
194			Err(RegisterMethodError::AlreadyRegistered(method.to_owned()))
195		}
196	}
197
198	/// Removes a notification handler.
199	pub(crate) fn remove_notification_handler(&mut self, method: &str) -> Option<SubscriptionSink> {
200		self.notification_handlers.remove(method)
201	}
202
203	/// Tries to complete a pending subscription.
204	///
205	/// Returns `Some` if the subscription was completed otherwise `None`.
206	pub(crate) fn complete_pending_subscription(
207		&mut self,
208		request_id: RequestId,
209	) -> Option<(RequestId, PendingSubscriptionOneshot, UnsubscribeMethod)> {
210		match self.requests.entry(request_id) {
211			Entry::Occupied(request) if matches!(request.get(), Kind::PendingSubscription(_)) => {
212				let (_req_id, kind) = request.remove_entry();
213				if let Kind::PendingSubscription(send_back) = kind {
214					Some(send_back)
215				} else {
216					unreachable!("Pending subscription is Pending subscription checked above; qed");
217				}
218			}
219			_ => None,
220		}
221	}
222
223	/// Tries to complete a pending batch request.
224	///
225	/// Returns `Some` if the subscription was completed otherwise `None`.
226	pub(crate) fn complete_pending_batch(&mut self, batch: Range<u64>) -> Option<BatchState> {
227		match self.batches.entry(batch) {
228			Entry::Occupied(request) => {
229				let (_digest, state) = request.remove_entry();
230				Some(state)
231			}
232			_ => None,
233		}
234	}
235
236	/// Tries to complete a pending call.
237	///
238	/// Returns `Some` if the call was completed otherwise `None`.
239	pub(crate) fn complete_pending_call(&mut self, request_id: RequestId) -> Option<PendingCallOneshot> {
240		match self.requests.entry(request_id) {
241			Entry::Occupied(request) if matches!(request.get(), Kind::PendingMethodCall(_)) => {
242				let (_req_id, kind) = request.remove_entry();
243				if let Kind::PendingMethodCall(send_back) = kind {
244					Some(send_back)
245				} else {
246					unreachable!("Pending call is Pending call checked above; qed");
247				}
248			}
249			_ => None,
250		}
251	}
252
253	/// Removes the subscription without waiting for the unsubscribe call.
254	///
255	/// Returns `Some` if the subscription was removed.
256	pub(crate) fn remove_subscription(
257		&mut self,
258		request_id: RequestId,
259		subscription_id: SubscriptionId<'static>,
260	) -> Option<(RequestId, SubscriptionSink, UnsubscribeMethod, SubscriptionId)> {
261		match (self.requests.entry(request_id), self.subscriptions.entry(subscription_id)) {
262			(Entry::Occupied(request), Entry::Occupied(subscription))
263				if matches!(request.get(), Kind::Subscription(_)) =>
264			{
265				// Mark the request ID as pending unsubscription.
266				let (_req_id, kind) = request.remove_entry();
267				let (sub_id, _req_id) = subscription.remove_entry();
268				if let Kind::Subscription((unsub_req_id, send_back, unsub)) = kind {
269					Some((unsub_req_id, send_back, unsub, sub_id))
270				} else {
271					unreachable!("Subscription is Subscription checked above; qed");
272				}
273			}
274			_ => None,
275		}
276	}
277
278	/// Initiates an unsubscribe which is not completed until the unsubscribe call
279	/// has been acknowledged.
280	///
281	/// Returns `Some` if the subscription was unsubscribed.
282	pub(crate) fn unsubscribe(
283		&mut self,
284		request_id: RequestId,
285		subscription_id: SubscriptionId<'static>,
286	) -> Option<(RequestId, SubscriptionSink, UnsubscribeMethod, SubscriptionId)> {
287		match (self.requests.entry(request_id), self.subscriptions.entry(subscription_id)) {
288			(Entry::Occupied(mut request), Entry::Occupied(subscription))
289				if matches!(request.get(), Kind::Subscription(_)) =>
290			{
291				// Mark the request ID as "pending unsubscription" which will be resolved once the
292				// unsubscribe call has been acknowledged.
293				let kind = std::mem::replace(request.get_mut(), Kind::PendingMethodCall(None));
294				let (sub_id, _req_id) = subscription.remove_entry();
295				if let Kind::Subscription((unsub_req_id, send_back, unsub)) = kind {
296					Some((unsub_req_id, send_back, unsub, sub_id))
297				} else {
298					unreachable!("Subscription is Subscription checked above; qed");
299				}
300			}
301			_ => None,
302		}
303	}
304
305	/// Returns the status of a request ID
306	pub(crate) fn request_status(&mut self, id: &RequestId) -> RequestStatus {
307		self.requests.get(id).map_or(RequestStatus::Invalid, |kind| match kind {
308			Kind::PendingMethodCall(_) => RequestStatus::PendingMethodCall,
309			Kind::PendingSubscription(_) => RequestStatus::PendingSubscription,
310			Kind::Subscription(_) => RequestStatus::Subscription,
311		})
312	}
313
314	/// Get a mutable reference to underlying `Sink` in order to send messages to the subscription.
315	///
316	/// Returns `Some` if the `request_id` was registered as a subscription otherwise `None`.
317	pub(crate) fn as_subscription_mut(&mut self, request_id: &RequestId) -> Option<&mut SubscriptionSink> {
318		if let Some(Kind::Subscription((_, sink, _))) = self.requests.get_mut(request_id) {
319			Some(sink)
320		} else {
321			None
322		}
323	}
324
325	/// Get a mutable reference to underlying `Sink` in order to send incoming notifications to the subscription.
326	///
327	/// Returns `Some` if the `method` was registered as a NotificationHandler otherwise `None`.
328	pub(crate) fn as_notification_handler_mut(&mut self, method: String) -> Option<&mut SubscriptionSink> {
329		self.notification_handlers.get_mut(&method)
330	}
331
332	/// Reverse lookup to get the request ID for a subscription ID.
333	///
334	/// Returns `Some` if the subscription ID was registered as a subscription otherwise `None`.
335	pub(crate) fn get_request_id_by_subscription_id(&self, sub_id: &SubscriptionId) -> Option<RequestId> {
336		self.subscriptions.get(sub_id).map(|id| id.clone().into_owned())
337	}
338}
339
340#[cfg(test)]
341mod tests {
342	use crate::client::subscription_channel;
343
344	use super::{Error, RequestManager};
345	use jsonrpsee_types::{Id, SubscriptionId};
346	use serde_json::Value as JsonValue;
347	use tokio::sync::oneshot;
348
349	#[test]
350	fn insert_remove_pending_request_works() {
351		let (request_tx, _) = oneshot::channel::<Result<JsonValue, Error>>();
352
353		let mut manager = RequestManager::new();
354		assert!(manager.insert_pending_call(Id::Number(0), Some(request_tx)).is_ok());
355		assert!(manager.complete_pending_call(Id::Number(0)).is_some());
356	}
357
358	#[test]
359	fn insert_remove_subscription_works() {
360		let (pending_sub_tx, _) = oneshot::channel();
361		let (sub_tx, _) = subscription_channel(1);
362		let mut manager = RequestManager::new();
363		assert!(manager
364			.insert_pending_subscription(Id::Number(1), Id::Number(2), pending_sub_tx, "unsubscribe_method".into())
365			.is_ok());
366		let (unsub_req_id, _send_back_oneshot, unsubscribe_method) =
367			manager.complete_pending_subscription(Id::Number(1)).unwrap();
368		assert_eq!(unsub_req_id, Id::Number(2));
369		assert!(manager
370			.insert_subscription(
371				Id::Number(1),
372				Id::Number(2),
373				SubscriptionId::Str("uniq_id_from_server".into()),
374				sub_tx,
375				unsubscribe_method
376			)
377			.is_ok());
378
379		assert!(manager.as_subscription_mut(&Id::Number(1)).is_some());
380		assert!(manager
381			.remove_subscription(Id::Number(1), SubscriptionId::Str("uniq_id_from_server".into()))
382			.is_some());
383	}
384
385	#[test]
386	fn insert_subscription_with_same_sub_and_unsub_id_should_err() {
387		let (tx1, _) = oneshot::channel();
388		let (tx2, _) = oneshot::channel();
389		let (tx3, _) = oneshot::channel();
390		let (tx4, _) = oneshot::channel();
391		let mut manager = RequestManager::new();
392		assert!(manager
393			.insert_pending_subscription(Id::Str("1".into()), Id::Str("1".into()), tx1, "unsubscribe_method".into())
394			.is_err());
395		assert!(manager
396			.insert_pending_subscription(Id::Str("0".into()), Id::Str("1".into()), tx2, "unsubscribe_method".into())
397			.is_ok());
398		assert!(
399			manager
400				.insert_pending_subscription(
401					Id::Str("99".into()),
402					Id::Str("0".into()),
403					tx3,
404					"unsubscribe_method".into()
405				)
406				.is_err(),
407			"unsub request ID already occupied"
408		);
409		assert!(
410			manager
411				.insert_pending_subscription(
412					Id::Str("99".into()),
413					Id::Str("1".into()),
414					tx4,
415					"unsubscribe_method".into()
416				)
417				.is_err(),
418			"sub request ID already occupied"
419		);
420	}
421
422	#[test]
423	fn pending_method_call_faulty() {
424		let (request_tx1, _) = oneshot::channel();
425		let (request_tx2, _) = oneshot::channel();
426		let (pending_sub_tx, _) = oneshot::channel();
427		let (sub_tx, _) = subscription_channel(1);
428
429		let mut manager = RequestManager::new();
430		assert!(manager.insert_pending_call(Id::Number(0), Some(request_tx1)).is_ok());
431		assert!(manager.insert_pending_call(Id::Number(0), Some(request_tx2)).is_err());
432		assert!(manager
433			.insert_pending_subscription(Id::Number(0), Id::Number(1), pending_sub_tx, "beef".to_string())
434			.is_err());
435		assert!(manager
436			.insert_subscription(
437				Id::Number(0),
438				Id::Number(99),
439				SubscriptionId::Num(137),
440				sub_tx,
441				"bibimbap".to_string()
442			)
443			.is_err());
444
445		assert!(manager.remove_subscription(Id::Number(0), SubscriptionId::Num(137)).is_none());
446		assert!(manager.complete_pending_subscription(Id::Number(0)).is_none());
447		assert!(manager.complete_pending_call(Id::Number(0)).is_some());
448	}
449
450	#[test]
451	fn pending_subscription_faulty() {
452		let (request_tx, _) = oneshot::channel();
453		let (pending_sub_tx1, _) = oneshot::channel();
454		let (pending_sub_tx2, _) = oneshot::channel();
455		let (sub_tx, _) = subscription_channel(1);
456
457		let mut manager = RequestManager::new();
458		assert!(manager
459			.insert_pending_subscription(Id::Number(99), Id::Number(100), pending_sub_tx1, "beef".to_string())
460			.is_ok());
461		assert!(manager.insert_pending_call(Id::Number(99), Some(request_tx)).is_err());
462		assert!(manager
463			.insert_pending_subscription(Id::Number(99), Id::Number(1337), pending_sub_tx2, "vegan".to_string())
464			.is_err());
465
466		assert!(manager
467			.insert_subscription(
468				Id::Number(99),
469				Id::Number(100),
470				SubscriptionId::Num(0),
471				sub_tx,
472				"bibimbap".to_string()
473			)
474			.is_err());
475
476		assert!(manager.remove_subscription(Id::Number(99), SubscriptionId::Num(0)).is_none());
477		assert!(manager.complete_pending_call(Id::Number(99)).is_none());
478		assert!(manager.complete_pending_subscription(Id::Number(99)).is_some());
479	}
480
481	#[test]
482	fn active_subscriptions_faulty() {
483		let (request_tx, _) = oneshot::channel();
484		let (pending_sub_tx, _) = oneshot::channel();
485		let (sub_tx1, _) = subscription_channel(1);
486		let (sub_tx2, _) = subscription_channel(1);
487
488		let mut manager = RequestManager::new();
489
490		assert!(manager
491			.insert_subscription(Id::Number(3), Id::Number(4), SubscriptionId::Num(0), sub_tx1, "bibimbap".to_string())
492			.is_ok());
493		assert!(manager
494			.insert_subscription(Id::Number(3), Id::Number(4), SubscriptionId::Num(1), sub_tx2, "bibimbap".to_string())
495			.is_err());
496		assert!(manager
497			.insert_pending_subscription(Id::Number(3), Id::Number(4), pending_sub_tx, "beef".to_string())
498			.is_err());
499		assert!(manager.insert_pending_call(Id::Number(3), Some(request_tx)).is_err());
500
501		assert!(manager.remove_subscription(Id::Number(3), SubscriptionId::Num(7)).is_none());
502		assert!(manager.complete_pending_call(Id::Number(3)).is_none());
503		assert!(manager.complete_pending_subscription(Id::Number(3)).is_none());
504		assert!(manager.remove_subscription(Id::Number(3), SubscriptionId::Num(1)).is_none());
505		assert!(manager.remove_subscription(Id::Number(3), SubscriptionId::Num(0)).is_some());
506
507		assert!(manager.requests.is_empty());
508		assert!(manager.subscriptions.is_empty());
509	}
510}