1use std::{
36 collections::{hash_map::Entry, HashMap},
37 ops::Range,
38};
39
40use crate::{
41 client::{BatchEntry, Error, SubscriptionReceiver, SubscriptionSender},
42 error::RegisterMethodError,
43};
44use jsonrpsee_types::{Id, SubscriptionId};
45use rustc_hash::FxHashMap;
46use serde_json::value::Value as JsonValue;
47use tokio::sync::oneshot;
48
49#[derive(Debug)]
50enum Kind {
51 PendingMethodCall(PendingCallOneshot),
52 PendingSubscription((RequestId, PendingSubscriptionOneshot, UnsubscribeMethod)),
53 Subscription((RequestId, SubscriptionSink, UnsubscribeMethod)),
54}
55
56#[derive(Debug, Clone)]
57pub(crate) enum RequestStatus {
59 PendingMethodCall,
61 PendingSubscription,
63 Subscription,
65 Invalid,
67}
68
69type PendingCallOneshot = Option<oneshot::Sender<Result<JsonValue, Error>>>;
70type PendingBatchOneshot = oneshot::Sender<Result<Vec<BatchEntry<'static, JsonValue>>, Error>>;
71type PendingSubscriptionOneshot = oneshot::Sender<Result<(SubscriptionReceiver, SubscriptionId<'static>), Error>>;
72type SubscriptionSink = SubscriptionSender;
73type UnsubscribeMethod = String;
74type RequestId = Id<'static>;
75
76#[derive(Debug)]
77pub(crate) struct BatchState {
79 pub(crate) send_back: PendingBatchOneshot,
81}
82
83#[derive(Debug, Default)]
84pub(crate) struct RequestManager {
86 requests: FxHashMap<RequestId, Kind>,
90 subscriptions: HashMap<SubscriptionId<'static>, RequestId>,
93 batches: FxHashMap<Range<u64>, BatchState>,
95 notification_handlers: HashMap<String, SubscriptionSink>,
97}
98
99impl RequestManager {
100 #[allow(unused)]
102 pub(crate) fn new() -> Self {
103 Self::default()
104 }
105
106 pub(crate) fn insert_pending_call(
110 &mut self,
111 id: RequestId,
112 send_back: PendingCallOneshot,
113 ) -> Result<(), PendingCallOneshot> {
114 if let Entry::Vacant(v) = self.requests.entry(id) {
115 v.insert(Kind::PendingMethodCall(send_back));
116 Ok(())
117 } else {
118 Err(send_back)
119 }
120 }
121
122 pub(crate) fn insert_pending_batch(
126 &mut self,
127 batch: Range<u64>,
128 send_back: PendingBatchOneshot,
129 ) -> Result<(), PendingBatchOneshot> {
130 if let Entry::Vacant(v) = self.batches.entry(batch) {
131 v.insert(BatchState { send_back });
132 Ok(())
133 } else {
134 Err(send_back)
135 }
136 }
137
138 pub(crate) fn insert_pending_subscription(
142 &mut self,
143 sub_req_id: RequestId,
144 unsub_req_id: RequestId,
145 send_back: PendingSubscriptionOneshot,
146 unsubscribe_method: UnsubscribeMethod,
147 ) -> Result<(), PendingSubscriptionOneshot> {
148 if !self.requests.contains_key(&sub_req_id)
150 && !self.requests.contains_key(&unsub_req_id)
151 && sub_req_id != unsub_req_id
152 {
153 self.requests
154 .insert(sub_req_id, Kind::PendingSubscription((unsub_req_id.clone(), send_back, unsubscribe_method)));
155 self.requests.insert(unsub_req_id, Kind::PendingMethodCall(None));
156 Ok(())
157 } else {
158 Err(send_back)
159 }
160 }
161
162 pub(crate) fn insert_subscription(
166 &mut self,
167 sub_req_id: RequestId,
168 unsub_req_id: RequestId,
169 subscription_id: SubscriptionId<'static>,
170 send_back: SubscriptionSink,
171 unsubscribe_method: UnsubscribeMethod,
172 ) -> Result<(), SubscriptionSink> {
173 if let (Entry::Vacant(request), Entry::Vacant(subscription)) =
174 (self.requests.entry(sub_req_id.clone()), self.subscriptions.entry(subscription_id))
175 {
176 request.insert(Kind::Subscription((unsub_req_id, send_back, unsubscribe_method)));
177 subscription.insert(sub_req_id);
178 Ok(())
179 } else {
180 Err(send_back)
181 }
182 }
183
184 pub(crate) fn insert_notification_handler(
186 &mut self,
187 method: &str,
188 send_back: SubscriptionSink,
189 ) -> Result<(), RegisterMethodError> {
190 if let Entry::Vacant(handle) = self.notification_handlers.entry(method.to_owned()) {
191 handle.insert(send_back);
192 Ok(())
193 } else {
194 Err(RegisterMethodError::AlreadyRegistered(method.to_owned()))
195 }
196 }
197
198 pub(crate) fn remove_notification_handler(&mut self, method: &str) -> Option<SubscriptionSink> {
200 self.notification_handlers.remove(method)
201 }
202
203 pub(crate) fn complete_pending_subscription(
207 &mut self,
208 request_id: RequestId,
209 ) -> Option<(RequestId, PendingSubscriptionOneshot, UnsubscribeMethod)> {
210 match self.requests.entry(request_id) {
211 Entry::Occupied(request) if matches!(request.get(), Kind::PendingSubscription(_)) => {
212 let (_req_id, kind) = request.remove_entry();
213 if let Kind::PendingSubscription(send_back) = kind {
214 Some(send_back)
215 } else {
216 unreachable!("Pending subscription is Pending subscription checked above; qed");
217 }
218 }
219 _ => None,
220 }
221 }
222
223 pub(crate) fn complete_pending_batch(&mut self, batch: Range<u64>) -> Option<BatchState> {
227 match self.batches.entry(batch) {
228 Entry::Occupied(request) => {
229 let (_digest, state) = request.remove_entry();
230 Some(state)
231 }
232 _ => None,
233 }
234 }
235
236 pub(crate) fn complete_pending_call(&mut self, request_id: RequestId) -> Option<PendingCallOneshot> {
240 match self.requests.entry(request_id) {
241 Entry::Occupied(request) if matches!(request.get(), Kind::PendingMethodCall(_)) => {
242 let (_req_id, kind) = request.remove_entry();
243 if let Kind::PendingMethodCall(send_back) = kind {
244 Some(send_back)
245 } else {
246 unreachable!("Pending call is Pending call checked above; qed");
247 }
248 }
249 _ => None,
250 }
251 }
252
253 pub(crate) fn remove_subscription(
257 &mut self,
258 request_id: RequestId,
259 subscription_id: SubscriptionId<'static>,
260 ) -> Option<(RequestId, SubscriptionSink, UnsubscribeMethod, SubscriptionId)> {
261 match (self.requests.entry(request_id), self.subscriptions.entry(subscription_id)) {
262 (Entry::Occupied(request), Entry::Occupied(subscription))
263 if matches!(request.get(), Kind::Subscription(_)) =>
264 {
265 let (_req_id, kind) = request.remove_entry();
267 let (sub_id, _req_id) = subscription.remove_entry();
268 if let Kind::Subscription((unsub_req_id, send_back, unsub)) = kind {
269 Some((unsub_req_id, send_back, unsub, sub_id))
270 } else {
271 unreachable!("Subscription is Subscription checked above; qed");
272 }
273 }
274 _ => None,
275 }
276 }
277
278 pub(crate) fn unsubscribe(
283 &mut self,
284 request_id: RequestId,
285 subscription_id: SubscriptionId<'static>,
286 ) -> Option<(RequestId, SubscriptionSink, UnsubscribeMethod, SubscriptionId)> {
287 match (self.requests.entry(request_id), self.subscriptions.entry(subscription_id)) {
288 (Entry::Occupied(mut request), Entry::Occupied(subscription))
289 if matches!(request.get(), Kind::Subscription(_)) =>
290 {
291 let kind = std::mem::replace(request.get_mut(), Kind::PendingMethodCall(None));
294 let (sub_id, _req_id) = subscription.remove_entry();
295 if let Kind::Subscription((unsub_req_id, send_back, unsub)) = kind {
296 Some((unsub_req_id, send_back, unsub, sub_id))
297 } else {
298 unreachable!("Subscription is Subscription checked above; qed");
299 }
300 }
301 _ => None,
302 }
303 }
304
305 pub(crate) fn request_status(&mut self, id: &RequestId) -> RequestStatus {
307 self.requests.get(id).map_or(RequestStatus::Invalid, |kind| match kind {
308 Kind::PendingMethodCall(_) => RequestStatus::PendingMethodCall,
309 Kind::PendingSubscription(_) => RequestStatus::PendingSubscription,
310 Kind::Subscription(_) => RequestStatus::Subscription,
311 })
312 }
313
314 pub(crate) fn as_subscription_mut(&mut self, request_id: &RequestId) -> Option<&mut SubscriptionSink> {
318 if let Some(Kind::Subscription((_, sink, _))) = self.requests.get_mut(request_id) {
319 Some(sink)
320 } else {
321 None
322 }
323 }
324
325 pub(crate) fn as_notification_handler_mut(&mut self, method: String) -> Option<&mut SubscriptionSink> {
329 self.notification_handlers.get_mut(&method)
330 }
331
332 pub(crate) fn get_request_id_by_subscription_id(&self, sub_id: &SubscriptionId) -> Option<RequestId> {
336 self.subscriptions.get(sub_id).map(|id| id.clone().into_owned())
337 }
338}
339
340#[cfg(test)]
341mod tests {
342 use crate::client::subscription_channel;
343
344 use super::{Error, RequestManager};
345 use jsonrpsee_types::{Id, SubscriptionId};
346 use serde_json::Value as JsonValue;
347 use tokio::sync::oneshot;
348
349 #[test]
350 fn insert_remove_pending_request_works() {
351 let (request_tx, _) = oneshot::channel::<Result<JsonValue, Error>>();
352
353 let mut manager = RequestManager::new();
354 assert!(manager.insert_pending_call(Id::Number(0), Some(request_tx)).is_ok());
355 assert!(manager.complete_pending_call(Id::Number(0)).is_some());
356 }
357
358 #[test]
359 fn insert_remove_subscription_works() {
360 let (pending_sub_tx, _) = oneshot::channel();
361 let (sub_tx, _) = subscription_channel(1);
362 let mut manager = RequestManager::new();
363 assert!(manager
364 .insert_pending_subscription(Id::Number(1), Id::Number(2), pending_sub_tx, "unsubscribe_method".into())
365 .is_ok());
366 let (unsub_req_id, _send_back_oneshot, unsubscribe_method) =
367 manager.complete_pending_subscription(Id::Number(1)).unwrap();
368 assert_eq!(unsub_req_id, Id::Number(2));
369 assert!(manager
370 .insert_subscription(
371 Id::Number(1),
372 Id::Number(2),
373 SubscriptionId::Str("uniq_id_from_server".into()),
374 sub_tx,
375 unsubscribe_method
376 )
377 .is_ok());
378
379 assert!(manager.as_subscription_mut(&Id::Number(1)).is_some());
380 assert!(manager
381 .remove_subscription(Id::Number(1), SubscriptionId::Str("uniq_id_from_server".into()))
382 .is_some());
383 }
384
385 #[test]
386 fn insert_subscription_with_same_sub_and_unsub_id_should_err() {
387 let (tx1, _) = oneshot::channel();
388 let (tx2, _) = oneshot::channel();
389 let (tx3, _) = oneshot::channel();
390 let (tx4, _) = oneshot::channel();
391 let mut manager = RequestManager::new();
392 assert!(manager
393 .insert_pending_subscription(Id::Str("1".into()), Id::Str("1".into()), tx1, "unsubscribe_method".into())
394 .is_err());
395 assert!(manager
396 .insert_pending_subscription(Id::Str("0".into()), Id::Str("1".into()), tx2, "unsubscribe_method".into())
397 .is_ok());
398 assert!(
399 manager
400 .insert_pending_subscription(
401 Id::Str("99".into()),
402 Id::Str("0".into()),
403 tx3,
404 "unsubscribe_method".into()
405 )
406 .is_err(),
407 "unsub request ID already occupied"
408 );
409 assert!(
410 manager
411 .insert_pending_subscription(
412 Id::Str("99".into()),
413 Id::Str("1".into()),
414 tx4,
415 "unsubscribe_method".into()
416 )
417 .is_err(),
418 "sub request ID already occupied"
419 );
420 }
421
422 #[test]
423 fn pending_method_call_faulty() {
424 let (request_tx1, _) = oneshot::channel();
425 let (request_tx2, _) = oneshot::channel();
426 let (pending_sub_tx, _) = oneshot::channel();
427 let (sub_tx, _) = subscription_channel(1);
428
429 let mut manager = RequestManager::new();
430 assert!(manager.insert_pending_call(Id::Number(0), Some(request_tx1)).is_ok());
431 assert!(manager.insert_pending_call(Id::Number(0), Some(request_tx2)).is_err());
432 assert!(manager
433 .insert_pending_subscription(Id::Number(0), Id::Number(1), pending_sub_tx, "beef".to_string())
434 .is_err());
435 assert!(manager
436 .insert_subscription(
437 Id::Number(0),
438 Id::Number(99),
439 SubscriptionId::Num(137),
440 sub_tx,
441 "bibimbap".to_string()
442 )
443 .is_err());
444
445 assert!(manager.remove_subscription(Id::Number(0), SubscriptionId::Num(137)).is_none());
446 assert!(manager.complete_pending_subscription(Id::Number(0)).is_none());
447 assert!(manager.complete_pending_call(Id::Number(0)).is_some());
448 }
449
450 #[test]
451 fn pending_subscription_faulty() {
452 let (request_tx, _) = oneshot::channel();
453 let (pending_sub_tx1, _) = oneshot::channel();
454 let (pending_sub_tx2, _) = oneshot::channel();
455 let (sub_tx, _) = subscription_channel(1);
456
457 let mut manager = RequestManager::new();
458 assert!(manager
459 .insert_pending_subscription(Id::Number(99), Id::Number(100), pending_sub_tx1, "beef".to_string())
460 .is_ok());
461 assert!(manager.insert_pending_call(Id::Number(99), Some(request_tx)).is_err());
462 assert!(manager
463 .insert_pending_subscription(Id::Number(99), Id::Number(1337), pending_sub_tx2, "vegan".to_string())
464 .is_err());
465
466 assert!(manager
467 .insert_subscription(
468 Id::Number(99),
469 Id::Number(100),
470 SubscriptionId::Num(0),
471 sub_tx,
472 "bibimbap".to_string()
473 )
474 .is_err());
475
476 assert!(manager.remove_subscription(Id::Number(99), SubscriptionId::Num(0)).is_none());
477 assert!(manager.complete_pending_call(Id::Number(99)).is_none());
478 assert!(manager.complete_pending_subscription(Id::Number(99)).is_some());
479 }
480
481 #[test]
482 fn active_subscriptions_faulty() {
483 let (request_tx, _) = oneshot::channel();
484 let (pending_sub_tx, _) = oneshot::channel();
485 let (sub_tx1, _) = subscription_channel(1);
486 let (sub_tx2, _) = subscription_channel(1);
487
488 let mut manager = RequestManager::new();
489
490 assert!(manager
491 .insert_subscription(Id::Number(3), Id::Number(4), SubscriptionId::Num(0), sub_tx1, "bibimbap".to_string())
492 .is_ok());
493 assert!(manager
494 .insert_subscription(Id::Number(3), Id::Number(4), SubscriptionId::Num(1), sub_tx2, "bibimbap".to_string())
495 .is_err());
496 assert!(manager
497 .insert_pending_subscription(Id::Number(3), Id::Number(4), pending_sub_tx, "beef".to_string())
498 .is_err());
499 assert!(manager.insert_pending_call(Id::Number(3), Some(request_tx)).is_err());
500
501 assert!(manager.remove_subscription(Id::Number(3), SubscriptionId::Num(7)).is_none());
502 assert!(manager.complete_pending_call(Id::Number(3)).is_none());
503 assert!(manager.complete_pending_subscription(Id::Number(3)).is_none());
504 assert!(manager.remove_subscription(Id::Number(3), SubscriptionId::Num(1)).is_none());
505 assert!(manager.remove_subscription(Id::Number(3), SubscriptionId::Num(0)).is_some());
506
507 assert!(manager.requests.is_empty());
508 assert!(manager.subscriptions.is_empty());
509 }
510}