jsonrpsee_core/client/async_client/
helpers.rs1use crate::client::async_client::manager::{RequestManager, RequestStatus};
28use crate::client::async_client::{Notification, LOG_TARGET};
29use crate::client::{subscription_channel, Error, RequestMessage, TransportSenderT, TrySubscriptionSendError};
30use crate::params::ArrayParams;
31use crate::traits::ToRpcParams;
32
33use futures_timer::Delay;
34use futures_util::future::{self, Either};
35use tokio::sync::oneshot;
36
37use jsonrpsee_types::response::SubscriptionError;
38use jsonrpsee_types::{
39 ErrorObject, Id, InvalidRequestId, RequestSer, Response, ResponseSuccess, SubscriptionId, SubscriptionResponse,
40};
41use serde_json::Value as JsonValue;
42use std::ops::Range;
43
44#[derive(Debug, Clone)]
45pub(crate) struct InnerBatchResponse {
46 pub(crate) id: u64,
47 pub(crate) result: Result<JsonValue, ErrorObject<'static>>,
48}
49
50pub(crate) fn process_batch_response(
54 manager: &mut RequestManager,
55 rps: Vec<InnerBatchResponse>,
56 range: Range<u64>,
57) -> Result<(), InvalidRequestId> {
58 let mut responses = Vec::with_capacity(rps.len());
59
60 let start_idx = range.start;
61
62 let batch_state = match manager.complete_pending_batch(range.clone()) {
63 Some(state) => state,
64 None => {
65 tracing::debug!(target: LOG_TARGET, "Received unknown batch response");
66 return Err(InvalidRequestId::NotPendingRequest(format!("{:?}", range)));
67 }
68 };
69
70 for _ in range {
71 let err_obj = ErrorObject::borrowed(0, "", None);
72 responses.push(Err(err_obj));
73 }
74
75 for rp in rps {
76 let maybe_elem =
77 rp.id.checked_sub(start_idx).and_then(|p| p.try_into().ok()).and_then(|p: usize| responses.get_mut(p));
78
79 if let Some(elem) = maybe_elem {
80 *elem = rp.result;
81 } else {
82 return Err(InvalidRequestId::NotPendingRequest(rp.id.to_string()));
83 }
84 }
85
86 let _ = batch_state.send_back.send(Ok(responses));
87 Ok(())
88}
89
90pub(crate) fn process_subscription_response(
95 manager: &mut RequestManager,
96 response: SubscriptionResponse<JsonValue>,
97) -> Option<SubscriptionId<'static>> {
98 let sub_id = response.params.subscription.into_owned();
99 let request_id = match manager.get_request_id_by_subscription_id(&sub_id) {
100 Some(request_id) => request_id,
101 None => {
102 tracing::debug!(target: LOG_TARGET, "Subscription {:?} is not active", sub_id);
103 return None;
104 }
105 };
106
107 match manager.as_subscription_mut(&request_id) {
108 Some(send_back_sink) => match send_back_sink.send(response.params.result) {
109 Ok(_) => None,
110 Err(TrySubscriptionSendError::Closed) => Some(sub_id),
111 Err(TrySubscriptionSendError::TooSlow(m)) => {
112 tracing::debug!(target: LOG_TARGET, "Subscription {{method={}, sub_id={:?}}} couldn't keep up with server; failed to send {m}", response.method, sub_id);
113 Some(sub_id)
114 }
115 },
116 None => {
117 tracing::debug!(target: LOG_TARGET, "Subscription {:?} is not active", sub_id);
118 None
119 }
120 }
121}
122
123pub(crate) fn process_subscription_close_response(
130 manager: &mut RequestManager,
131 response: SubscriptionError<JsonValue>,
132) {
133 let sub_id = response.params.subscription.into_owned();
134 match manager.get_request_id_by_subscription_id(&sub_id) {
135 Some(request_id) => {
136 manager.remove_subscription(request_id, sub_id).expect("Both request ID and sub ID in RequestManager; qed");
137 }
138 None => {
139 tracing::debug!(target: LOG_TARGET, "The server tried to close an non-pending subscription: {:?}", sub_id);
140 }
141 }
142}
143
144pub(crate) fn process_notification(manager: &mut RequestManager, notif: Notification) {
151 match manager.as_notification_handler_mut(notif.method.to_string()) {
152 Some(send_back_sink) => match send_back_sink.send(notif.params.unwrap_or_default()) {
154 Ok(()) => (),
155 Err(TrySubscriptionSendError::Closed) => {
156 let _ = manager.remove_notification_handler(¬if.method);
157 }
158 Err(TrySubscriptionSendError::TooSlow(m)) => {
159 tracing::debug!(target: LOG_TARGET, "Notification `{}` couldn't keep up with server; failed to send {m}", notif.method);
160 let _ = manager.remove_notification_handler(¬if.method);
161 }
162 },
163 None => {
164 tracing::debug!(target: LOG_TARGET, "Notification: {:?} not a registered method", notif.method);
165 }
166 }
167}
168
169pub(crate) fn process_single_response(
175 manager: &mut RequestManager,
176 response: Response<JsonValue>,
177 max_capacity_per_subscription: usize,
178) -> Result<Option<RequestMessage>, InvalidRequestId> {
179 let response_id = response.id.clone().into_owned();
180 let result = ResponseSuccess::try_from(response).map(|s| s.result).map_err(Error::Call);
181
182 match manager.request_status(&response_id) {
183 RequestStatus::PendingMethodCall => {
184 let send_back_oneshot = match manager.complete_pending_call(response_id.clone()) {
185 Some(Some(send)) => send,
186 Some(None) => return Ok(None),
187 None => return Err(InvalidRequestId::NotPendingRequest(response_id.to_string())),
188 };
189
190 let _ = send_back_oneshot.send(result);
191 Ok(None)
192 }
193 RequestStatus::PendingSubscription => {
194 let (unsub_id, send_back_oneshot, unsubscribe_method) = manager
195 .complete_pending_subscription(response_id.clone())
196 .ok_or(InvalidRequestId::NotPendingRequest(response_id.to_string()))?;
197
198 let sub_id = result.map(|r| SubscriptionId::try_from(r).ok());
199
200 let sub_id = match sub_id {
201 Ok(Some(sub_id)) => sub_id,
202 Ok(None) => {
203 let _ = send_back_oneshot.send(Err(Error::InvalidSubscriptionId));
204 return Ok(None);
205 }
206 Err(e) => {
207 let _ = send_back_oneshot.send(Err(e));
208 return Ok(None);
209 }
210 };
211
212 let (subscribe_tx, subscribe_rx) = subscription_channel(max_capacity_per_subscription);
213 if manager
214 .insert_subscription(response_id.clone(), unsub_id, sub_id.clone(), subscribe_tx, unsubscribe_method)
215 .is_ok()
216 {
217 match send_back_oneshot.send(Ok((subscribe_rx, sub_id.clone()))) {
218 Ok(_) => Ok(None),
219 Err(_) => Ok(build_unsubscribe_message(manager, response_id, sub_id)),
220 }
221 } else {
222 let _ = send_back_oneshot.send(Err(Error::InvalidSubscriptionId));
223 Ok(None)
224 }
225 }
226
227 RequestStatus::Subscription | RequestStatus::Invalid => {
228 Err(InvalidRequestId::NotPendingRequest(response_id.to_string()))
229 }
230 }
231}
232
233pub(crate) async fn stop_subscription<S: TransportSenderT>(
238 sender: &mut S,
239 unsub: RequestMessage,
240) -> Result<(), S::Error> {
241 sender.send(unsub.raw).await?;
242 Ok(())
243}
244
245pub(crate) fn build_unsubscribe_message(
247 manager: &mut RequestManager,
248 sub_req_id: Id<'static>,
249 sub_id: SubscriptionId<'static>,
250) -> Option<RequestMessage> {
251 let (unsub_req_id, _, unsub, sub_id) = manager.unsubscribe(sub_req_id, sub_id)?;
252
253 let mut params = ArrayParams::new();
254 params.insert(sub_id).ok()?;
255 let params = params.to_rpc_params().ok()?;
256
257 let raw = serde_json::to_string(&RequestSer::owned(unsub_req_id.clone(), unsub, params)).ok()?;
258 Some(RequestMessage { raw, id: unsub_req_id, send_back: None })
259}
260
261pub(crate) async fn call_with_timeout<T>(
263 timeout: std::time::Duration,
264 rx: oneshot::Receiver<Result<T, Error>>,
265) -> Result<Result<T, Error>, oneshot::error::RecvError> {
266 match future::select(rx, Delay::new(timeout)).await {
267 Either::Left((res, _)) => res,
268 Either::Right((_, _)) => Ok(Err(Error::RequestTimeout)),
269 }
270}