use super::ResponseFuture;
use std::sync::Arc;
use crate::middleware::rpc::RpcServiceT;
use crate::ConnectionId;
use futures_util::future::BoxFuture;
use jsonrpsee_core::server::{
BoundedSubscriptions, MethodCallback, MethodResponse, MethodSink, Methods, SubscriptionState,
};
use jsonrpsee_core::traits::IdProvider;
use jsonrpsee_types::error::{reject_too_many_subscriptions, ErrorCode};
use jsonrpsee_types::{ErrorObject, Request};
#[derive(Clone, Debug)]
pub struct RpcService {
conn_id: ConnectionId,
methods: Methods,
max_response_body_size: usize,
cfg: RpcServiceCfg,
}
#[derive(Clone, Debug)]
pub(crate) enum RpcServiceCfg {
OnlyCalls,
CallsAndSubscriptions {
bounded_subscriptions: BoundedSubscriptions,
sink: MethodSink,
id_provider: Arc<dyn IdProvider>,
_pending_calls: tokio::sync::mpsc::Sender<()>,
},
}
impl RpcService {
pub(crate) fn new(
methods: Methods,
max_response_body_size: usize,
conn_id: ConnectionId,
cfg: RpcServiceCfg,
) -> Self {
Self { methods, max_response_body_size, conn_id, cfg }
}
}
impl<'a> RpcServiceT<'a> for RpcService {
type Future = ResponseFuture<BoxFuture<'a, MethodResponse>>;
fn call(&self, req: Request<'a>) -> Self::Future {
let conn_id = self.conn_id;
let max_response_body_size = self.max_response_body_size;
let Request { id, method, params, extensions, .. } = req;
let params = jsonrpsee_types::Params::new(params.as_ref().map(|p| serde_json::value::RawValue::get(p)));
match self.methods.method_with_name(&method) {
None => {
let rp =
MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound)).with_extensions(extensions);
ResponseFuture::ready(rp)
}
Some((_name, method)) => match method {
MethodCallback::Async(callback) => {
let params = params.into_owned();
let id = id.into_owned();
let fut = (callback)(id, params, conn_id, max_response_body_size, extensions);
ResponseFuture::future(fut)
}
MethodCallback::Sync(callback) => {
let rp = (callback)(id, params, max_response_body_size, extensions);
ResponseFuture::ready(rp)
}
MethodCallback::Subscription(callback) => {
let RpcServiceCfg::CallsAndSubscriptions {
bounded_subscriptions,
sink,
id_provider,
_pending_calls,
} = self.cfg.clone()
else {
tracing::warn!("Subscriptions not supported");
let rp = MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError))
.with_extensions(extensions);
return ResponseFuture::ready(rp);
};
if let Some(p) = bounded_subscriptions.acquire() {
let conn_state =
SubscriptionState { conn_id, id_provider: &*id_provider.clone(), subscription_permit: p };
let fut = callback(id.clone(), params, sink, conn_state, extensions);
ResponseFuture::future(fut)
} else {
let max = bounded_subscriptions.max();
let rp =
MethodResponse::error(id, reject_too_many_subscriptions(max)).with_extensions(extensions);
ResponseFuture::ready(rp)
}
}
MethodCallback::Unsubscription(callback) => {
let RpcServiceCfg::CallsAndSubscriptions { .. } = self.cfg else {
tracing::warn!("Subscriptions not supported");
let rp = MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError))
.with_extensions(extensions);
return ResponseFuture::ready(rp);
};
let rp = callback(id, params, conn_id, max_response_body_size, extensions);
ResponseFuture::ready(rp)
}
},
}
}
}