jsonrpsee_core/client/async_client/
helpers.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any
4// person obtaining a copy of this software and associated
5// documentation files (the "Software"), to deal in the
6// Software without restriction, including without
7// limitation the rights to use, copy, modify, merge,
8// publish, distribute, sublicense, and/or sell copies of
9// the Software, and to permit persons to whom the Software
10// is furnished to do so, subject to the following
11// conditions:
12//
13// The above copyright notice and this permission notice
14// shall be included in all copies or substantial portions
15// of the Software.
16//
17// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
18// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
19// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
20// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
21// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
22// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
24// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25// DEALINGS IN THE SOFTWARE.
26
27use crate::client::async_client::manager::{RequestManager, RequestStatus};
28use crate::client::async_client::{Notification, LOG_TARGET};
29use crate::client::{subscription_channel, Error, RequestMessage, TransportSenderT, TrySubscriptionSendError};
30use crate::params::ArrayParams;
31use crate::traits::ToRpcParams;
32
33use futures_timer::Delay;
34use futures_util::future::{self, Either};
35use tokio::sync::oneshot;
36
37use jsonrpsee_types::response::SubscriptionError;
38use jsonrpsee_types::{
39	ErrorObject, Id, InvalidRequestId, RequestSer, Response, ResponseSuccess, SubscriptionId, SubscriptionResponse,
40};
41use serde_json::Value as JsonValue;
42use std::ops::Range;
43
44#[derive(Debug, Clone)]
45pub(crate) struct InnerBatchResponse {
46	pub(crate) id: u64,
47	pub(crate) result: Result<JsonValue, ErrorObject<'static>>,
48}
49
50/// Attempts to process a batch response.
51///
52/// On success the result is sent to the frontend.
53pub(crate) fn process_batch_response(
54	manager: &mut RequestManager,
55	rps: Vec<InnerBatchResponse>,
56	range: Range<u64>,
57) -> Result<(), InvalidRequestId> {
58	let mut responses = Vec::with_capacity(rps.len());
59
60	let start_idx = range.start;
61
62	let batch_state = match manager.complete_pending_batch(range.clone()) {
63		Some(state) => state,
64		None => {
65			tracing::debug!(target: LOG_TARGET, "Received unknown batch response");
66			return Err(InvalidRequestId::NotPendingRequest(format!("{:?}", range)));
67		}
68	};
69
70	for _ in range {
71		let err_obj = ErrorObject::borrowed(0, "", None);
72		responses.push(Err(err_obj));
73	}
74
75	for rp in rps {
76		let maybe_elem =
77			rp.id.checked_sub(start_idx).and_then(|p| p.try_into().ok()).and_then(|p: usize| responses.get_mut(p));
78
79		if let Some(elem) = maybe_elem {
80			*elem = rp.result;
81		} else {
82			return Err(InvalidRequestId::NotPendingRequest(rp.id.to_string()));
83		}
84	}
85
86	let _ = batch_state.send_back.send(Ok(responses));
87	Ok(())
88}
89
90/// Attempts to process a subscription response.
91///
92/// Returns `Some(sub_id)` if the subscription should be closed otherwise
93/// `None` is returned.
94pub(crate) fn process_subscription_response(
95	manager: &mut RequestManager,
96	response: SubscriptionResponse<JsonValue>,
97) -> Option<SubscriptionId<'static>> {
98	let sub_id = response.params.subscription.into_owned();
99	let request_id = match manager.get_request_id_by_subscription_id(&sub_id) {
100		Some(request_id) => request_id,
101		None => {
102			tracing::debug!(target: LOG_TARGET, "Subscription {:?} is not active", sub_id);
103			return None;
104		}
105	};
106
107	match manager.as_subscription_mut(&request_id) {
108		Some(send_back_sink) => match send_back_sink.send(response.params.result) {
109			Ok(_) => None,
110			Err(TrySubscriptionSendError::Closed) => Some(sub_id),
111			Err(TrySubscriptionSendError::TooSlow(m)) => {
112				tracing::debug!(target: LOG_TARGET, "Subscription {{method={}, sub_id={:?}}} couldn't keep up with server; failed to send {m}", response.method, sub_id);
113				Some(sub_id)
114			}
115		},
116		None => {
117			tracing::debug!(target: LOG_TARGET, "Subscription {:?} is not active", sub_id);
118			None
119		}
120	}
121}
122
123/// Attempts to close a subscription when a [`SubscriptionError`] is received.
124///
125/// If the notification is not found it's just logged as a warning and the connection
126/// will continue.
127///
128/// It's possible that the user closed down the subscription before the actual close response is received
129pub(crate) fn process_subscription_close_response(
130	manager: &mut RequestManager,
131	response: SubscriptionError<JsonValue>,
132) {
133	let sub_id = response.params.subscription.into_owned();
134	match manager.get_request_id_by_subscription_id(&sub_id) {
135		Some(request_id) => {
136			manager.remove_subscription(request_id, sub_id).expect("Both request ID and sub ID in RequestManager; qed");
137		}
138		None => {
139			tracing::debug!(target: LOG_TARGET, "The server tried to close an non-pending subscription: {:?}", sub_id);
140		}
141	}
142}
143
144/// Attempts to process an incoming notification
145///
146/// If the notification is not found it's just logged as a warning and the connection
147/// will continue.
148///
149/// It's possible that user close down the subscription before this notification is received.
150pub(crate) fn process_notification(manager: &mut RequestManager, notif: Notification) {
151	match manager.as_notification_handler_mut(notif.method.to_string()) {
152		// If the notification doesn't have params, we just send an empty JSON object to indicate that to the user.
153		Some(send_back_sink) => match send_back_sink.send(notif.params.unwrap_or_default()) {
154			Ok(()) => (),
155			Err(TrySubscriptionSendError::Closed) => {
156				let _ = manager.remove_notification_handler(&notif.method);
157			}
158			Err(TrySubscriptionSendError::TooSlow(m)) => {
159				tracing::debug!(target: LOG_TARGET, "Notification `{}` couldn't keep up with server; failed to send {m}", notif.method);
160				let _ = manager.remove_notification_handler(&notif.method);
161			}
162		},
163		None => {
164			tracing::debug!(target: LOG_TARGET, "Notification: {:?} not a registered method", notif.method);
165		}
166	}
167}
168
169/// Process a response from the server.
170///
171/// Returns `Ok(None)` if the response was successfully sent.
172/// Returns `Ok(Some(_))` if the response got an error but could be handled.
173/// Returns `Err(_)` if the response couldn't be handled.
174pub(crate) fn process_single_response(
175	manager: &mut RequestManager,
176	response: Response<JsonValue>,
177	max_capacity_per_subscription: usize,
178) -> Result<Option<RequestMessage>, InvalidRequestId> {
179	let response_id = response.id.clone().into_owned();
180	let result = ResponseSuccess::try_from(response).map(|s| s.result).map_err(Error::Call);
181
182	match manager.request_status(&response_id) {
183		RequestStatus::PendingMethodCall => {
184			let send_back_oneshot = match manager.complete_pending_call(response_id.clone()) {
185				Some(Some(send)) => send,
186				Some(None) => return Ok(None),
187				None => return Err(InvalidRequestId::NotPendingRequest(response_id.to_string())),
188			};
189
190			let _ = send_back_oneshot.send(result);
191			Ok(None)
192		}
193		RequestStatus::PendingSubscription => {
194			let (unsub_id, send_back_oneshot, unsubscribe_method) = manager
195				.complete_pending_subscription(response_id.clone())
196				.ok_or(InvalidRequestId::NotPendingRequest(response_id.to_string()))?;
197
198			let sub_id = result.map(|r| SubscriptionId::try_from(r).ok());
199
200			let sub_id = match sub_id {
201				Ok(Some(sub_id)) => sub_id,
202				Ok(None) => {
203					let _ = send_back_oneshot.send(Err(Error::InvalidSubscriptionId));
204					return Ok(None);
205				}
206				Err(e) => {
207					let _ = send_back_oneshot.send(Err(e));
208					return Ok(None);
209				}
210			};
211
212			let (subscribe_tx, subscribe_rx) = subscription_channel(max_capacity_per_subscription);
213			if manager
214				.insert_subscription(response_id.clone(), unsub_id, sub_id.clone(), subscribe_tx, unsubscribe_method)
215				.is_ok()
216			{
217				match send_back_oneshot.send(Ok((subscribe_rx, sub_id.clone()))) {
218					Ok(_) => Ok(None),
219					Err(_) => Ok(build_unsubscribe_message(manager, response_id, sub_id)),
220				}
221			} else {
222				let _ = send_back_oneshot.send(Err(Error::InvalidSubscriptionId));
223				Ok(None)
224			}
225		}
226
227		RequestStatus::Subscription | RequestStatus::Invalid => {
228			Err(InvalidRequestId::NotPendingRequest(response_id.to_string()))
229		}
230	}
231}
232
233/// Sends an unsubscribe to request to server to indicate
234/// that the client is not interested in the subscription anymore.
235//
236// NOTE: we don't count this a concurrent request as it's part of a subscription.
237pub(crate) async fn stop_subscription<S: TransportSenderT>(
238	sender: &mut S,
239	unsub: RequestMessage,
240) -> Result<(), S::Error> {
241	sender.send(unsub.raw).await?;
242	Ok(())
243}
244
245/// Builds an unsubscription message.
246pub(crate) fn build_unsubscribe_message(
247	manager: &mut RequestManager,
248	sub_req_id: Id<'static>,
249	sub_id: SubscriptionId<'static>,
250) -> Option<RequestMessage> {
251	let (unsub_req_id, _, unsub, sub_id) = manager.unsubscribe(sub_req_id, sub_id)?;
252
253	let mut params = ArrayParams::new();
254	params.insert(sub_id).ok()?;
255	let params = params.to_rpc_params().ok()?;
256
257	let raw = serde_json::to_string(&RequestSer::owned(unsub_req_id.clone(), unsub, params)).ok()?;
258	Some(RequestMessage { raw, id: unsub_req_id, send_back: None })
259}
260
261/// Wait for a stream to complete within the given timeout.
262pub(crate) async fn call_with_timeout<T>(
263	timeout: std::time::Duration,
264	rx: oneshot::Receiver<Result<T, Error>>,
265) -> Result<Result<T, Error>, oneshot::error::RecvError> {
266	match future::select(rx, Delay::new(timeout)).await {
267		Either::Left((res, _)) => res,
268		Either::Right((_, _)) => Ok(Err(Error::RequestTimeout)),
269	}
270}