use futures::{channel::mpsc::Sender, prelude::*, stream::FuturesUnordered};
use jsonrpsee::core::client::{
Client as JsonRpseeClient, ClientBuilder, ClientT, ReceivedMessage, TransportReceiverT,
TransportSenderT,
};
use smoldot_light::{ChainId, Client as SmoldotClient, JsonRpcResponses};
use std::{num::NonZeroU32, sync::Arc};
use tokio::sync::mpsc::{channel as tokio_channel, Receiver, Sender as TokioSender};
use cumulus_primitives_core::relay_chain::{
Block as RelayBlock, BlockNumber as RelayNumber, Hash as RelayHash, Header as RelayHeader,
};
use cumulus_relay_chain_interface::{RelayChainError, RelayChainResult};
use sp_runtime::generic::SignedBlock;
use sc_rpc_api::chain::ChainApiClient;
use sc_service::SpawnTaskHandle;
use crate::{rpc_client::RpcDispatcherMessage, tokio_platform::TokioPlatform};
const LOG_TARGET: &str = "rpc-light-client-worker";
const MAX_PENDING_REQUESTS: u32 = 128;
const MAX_SUBSCRIPTIONS: u32 = 64;
#[derive(thiserror::Error, Debug)]
enum LightClientError {
#[error("Error occurred while executing smoldot request: {0}")]
SmoldotError(String),
#[error("Nothing returned from json_rpc_responses")]
EmptyResult,
}
struct SimpleStringSender {
inner: SmoldotClient<TokioPlatform, ()>,
chain_id: ChainId,
}
#[async_trait::async_trait]
impl TransportSenderT for SimpleStringSender {
type Error = LightClientError;
async fn send(&mut self, msg: String) -> Result<(), Self::Error> {
self.inner
.json_rpc_request(msg, self.chain_id)
.map_err(|e| LightClientError::SmoldotError(e.to_string()))
}
}
struct SimpleStringReceiver {
inner: JsonRpcResponses,
}
#[async_trait::async_trait]
impl TransportReceiverT for SimpleStringReceiver {
type Error = LightClientError;
async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
self.inner
.next()
.await
.map(|message| jsonrpsee::core::client::ReceivedMessage::Text(message))
.ok_or(LightClientError::EmptyResult)
}
}
pub async fn build_smoldot_client(
spawner: SpawnTaskHandle,
chain_spec: &str,
) -> RelayChainResult<(SmoldotClient<TokioPlatform, ()>, ChainId, JsonRpcResponses)> {
let platform = TokioPlatform::new(spawner);
let mut client = SmoldotClient::new(platform);
let smoldot_light::AddChainSuccess { chain_id, json_rpc_responses } = client
.add_chain(smoldot_light::AddChainConfig {
specification: chain_spec,
json_rpc: smoldot_light::AddChainConfigJsonRpc::Enabled {
max_pending_requests: NonZeroU32::new(MAX_PENDING_REQUESTS)
.expect("Constant larger than 0; qed"),
max_subscriptions: MAX_SUBSCRIPTIONS,
},
potential_relay_chains: core::iter::empty(),
database_content: "",
user_data: (),
})
.map_err(|e| RelayChainError::GenericError(e.to_string()))?;
Ok((client, chain_id, json_rpc_responses.expect("JSON RPC is enabled; qed")))
}
pub struct LightClientRpcWorker {
client_receiver: Receiver<RpcDispatcherMessage>,
imported_header_listeners: Vec<Sender<RelayHeader>>,
finalized_header_listeners: Vec<Sender<RelayHeader>>,
best_header_listeners: Vec<Sender<RelayHeader>>,
smoldot_client: Arc<JsonRpseeClient>,
}
fn handle_notification(
maybe_header: Option<Result<RelayHeader, serde_json::Error>>,
senders: &mut Vec<Sender<RelayHeader>>,
) -> Result<(), ()> {
match maybe_header {
Some(Ok(header)) => {
crate::rpc_client::distribute_header(header, senders);
Ok(())
},
None => {
tracing::error!(target: LOG_TARGET, "Subscription closed.");
Err(())
},
Some(Err(error)) => {
tracing::error!(target: LOG_TARGET, ?error, "Error in RPC subscription.");
Err(())
},
}
}
impl LightClientRpcWorker {
pub fn new(
smoldot_client: smoldot_light::Client<TokioPlatform, ()>,
json_rpc_responses: JsonRpcResponses,
chain_id: ChainId,
) -> (LightClientRpcWorker, TokioSender<RpcDispatcherMessage>) {
let (tx, rx) = tokio_channel(100);
let smoldot_adapter_sender = SimpleStringSender { inner: smoldot_client, chain_id };
let smoldot_adapter_receiver = SimpleStringReceiver { inner: json_rpc_responses };
let smoldot_jsonrpsee_client = Arc::new(
ClientBuilder::default()
.build_with_tokio(smoldot_adapter_sender, smoldot_adapter_receiver),
);
let worker = LightClientRpcWorker {
client_receiver: rx,
imported_header_listeners: Default::default(),
finalized_header_listeners: Default::default(),
best_header_listeners: Default::default(),
smoldot_client: smoldot_jsonrpsee_client,
};
(worker, tx)
}
pub async fn run(mut self) {
let mut pending_requests = FuturesUnordered::new();
let Ok(mut new_head_subscription) = <JsonRpseeClient as ChainApiClient<
RelayNumber,
RelayHash,
RelayHeader,
SignedBlock<RelayBlock>,
>>::subscribe_new_heads(&self.smoldot_client)
.await
else {
tracing::error!(
target: LOG_TARGET,
"Unable to initialize new heads subscription"
);
return
};
let Ok(mut finalized_head_subscription) =
<JsonRpseeClient as ChainApiClient<
RelayNumber,
RelayHash,
RelayHeader,
SignedBlock<RelayBlock>,
>>::subscribe_finalized_heads(&self.smoldot_client)
.await
else {
tracing::error!(
target: LOG_TARGET,
"Unable to initialize finalized heads subscription"
);
return
};
let Ok(mut all_head_subscription) = <JsonRpseeClient as ChainApiClient<
RelayNumber,
RelayHash,
RelayHeader,
SignedBlock<RelayBlock>,
>>::subscribe_all_heads(&self.smoldot_client)
.await
else {
tracing::error!(
target: LOG_TARGET,
"Unable to initialize all heads subscription"
);
return
};
loop {
tokio::select! {
evt = self.client_receiver.recv() => match evt {
Some(RpcDispatcherMessage::RegisterBestHeadListener(tx)) => {
self.best_header_listeners.push(tx);
},
Some(RpcDispatcherMessage::RegisterImportListener(tx)) => {
self.imported_header_listeners.push(tx)
},
Some(RpcDispatcherMessage::RegisterFinalizationListener(tx)) => {
self.finalized_header_listeners.push(tx)
},
Some(RpcDispatcherMessage::Request(method, params, response_sender)) => {
let closure_client = self.smoldot_client.clone();
tracing::debug!(
target: LOG_TARGET,
len = pending_requests.len(),
method,
"Request"
);
pending_requests.push(async move {
let response = closure_client.request(&method, params).await;
tracing::debug!(
target: LOG_TARGET,
method,
?response,
"Response"
);
if let Err(err) = response_sender.send(response) {
tracing::debug!(
target: LOG_TARGET,
?err,
"Recipient no longer interested in request result"
);
};
});
},
None => {
tracing::error!(target: LOG_TARGET, "RPC client receiver closed. Stopping RPC Worker.");
return;
}
},
_ = pending_requests.next(), if !pending_requests.is_empty() => {},
import_event = all_head_subscription.next() => {
if handle_notification(import_event, &mut self.imported_header_listeners).is_err() {
return
}
},
best_header_event = new_head_subscription.next() => {
if handle_notification(best_header_event, &mut self.best_header_listeners).is_err() {
return
}
}
finalized_event = finalized_head_subscription.next() => {
if handle_notification(finalized_event, &mut self.finalized_header_listeners).is_err() {
return
}
}
}
}
}
}