use super::{
chain_head_storage::ChainHeadStorage,
event::{MethodResponseStarted, OperationBodyDone, OperationCallDone},
};
use crate::{
chain_head::{
api::ChainHeadApiServer,
chain_head_follow::ChainHeadFollower,
error::Error as ChainHeadRpcError,
event::{FollowEvent, MethodResponse, OperationError, OperationId, OperationStorageItems},
subscription::{StopHandle, SubscriptionManagement, SubscriptionManagementError},
FollowEventSendError, FollowEventSender,
},
common::{events::StorageQuery, storage::QueryResult},
hex_string, SubscriptionTaskExecutor,
};
use codec::Encode;
use futures::{channel::oneshot, future::FutureExt, SinkExt};
use jsonrpsee::{
core::async_trait, server::ResponsePayload, types::SubscriptionId, ConnectionId, Extensions,
MethodResponseFuture, PendingSubscriptionSink,
};
use log::debug;
use sc_client_api::{
Backend, BlockBackend, BlockchainEvents, CallExecutor, ChildInfo, ExecutorProvider, StorageKey,
StorageProvider,
};
use sc_rpc::utils::Subscription;
use sp_api::CallApiAt;
use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
use sp_core::{traits::CallContext, Bytes};
use sp_rpc::list::ListOrValue;
use sp_runtime::traits::Block as BlockT;
use std::{marker::PhantomData, sync::Arc, time::Duration};
use tokio::sync::mpsc;
pub(crate) const LOG_TARGET: &str = "rpc-spec-v2";
const STORAGE_QUERY_BUF: usize = 16;
pub struct ChainHeadConfig {
pub global_max_pinned_blocks: usize,
pub subscription_max_pinned_duration: Duration,
pub subscription_max_ongoing_operations: usize,
pub max_lagging_distance: usize,
pub max_follow_subscriptions_per_connection: usize,
pub subscription_buffer_cap: usize,
}
pub(crate) const MAX_PINNED_BLOCKS: usize = 512;
const MAX_PINNED_DURATION: Duration = Duration::from_secs(60);
const MAX_ONGOING_OPERATIONS: usize = 16;
const MAX_LAGGING_DISTANCE: usize = 128;
const MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION: usize = 4;
impl Default for ChainHeadConfig {
fn default() -> Self {
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: MAX_PINNED_DURATION,
subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS,
max_lagging_distance: MAX_LAGGING_DISTANCE,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
subscription_buffer_cap: MAX_PINNED_BLOCKS,
}
}
}
pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
client: Arc<Client>,
backend: Arc<BE>,
executor: SubscriptionTaskExecutor,
subscriptions: SubscriptionManagement<Block, BE>,
max_lagging_distance: usize,
_phantom: PhantomData<Block>,
subscription_buffer_cap: usize,
}
impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
pub fn new(
client: Arc<Client>,
backend: Arc<BE>,
executor: SubscriptionTaskExecutor,
config: ChainHeadConfig,
) -> Self {
Self {
client,
backend: backend.clone(),
executor,
subscriptions: SubscriptionManagement::new(
config.global_max_pinned_blocks,
config.subscription_max_pinned_duration,
config.subscription_max_ongoing_operations,
config.max_follow_subscriptions_per_connection,
backend,
),
max_lagging_distance: config.max_lagging_distance,
subscription_buffer_cap: config.subscription_buffer_cap,
_phantom: PhantomData,
}
}
}
pub fn read_subscription_id_as_string(sink: &Subscription) -> String {
match sink.subscription_id() {
SubscriptionId::Num(n) => n.to_string(),
SubscriptionId::Str(s) => s.into_owned().into(),
}
}
fn parse_hex_param(param: String) -> Result<Vec<u8>, ChainHeadRpcError> {
if param.is_empty() {
return Ok(Default::default())
}
match array_bytes::hex2bytes(¶m) {
Ok(bytes) => Ok(bytes),
Err(_) => Err(ChainHeadRpcError::InvalidParam(param)),
}
}
#[async_trait]
impl<BE, Block, Client> ChainHeadApiServer<Block::Hash> for ChainHead<BE, Block, Client>
where
Block: BlockT + 'static,
Block::Header: Unpin,
BE: Backend<Block> + 'static,
Client: BlockBackend<Block>
+ ExecutorProvider<Block>
+ HeaderBackend<Block>
+ HeaderMetadata<Block, Error = BlockChainError>
+ BlockchainEvents<Block>
+ CallApiAt<Block>
+ StorageProvider<Block, BE>
+ 'static,
{
fn chain_head_unstable_follow(&self, pending: PendingSubscriptionSink, with_runtime: bool) {
let subscriptions = self.subscriptions.clone();
let backend = self.backend.clone();
let client = self.client.clone();
let max_lagging_distance = self.max_lagging_distance;
let subscription_buffer_cap = self.subscription_buffer_cap;
let fut = async move {
let connection_id = pending.connection_id();
let Some(mut reserved_subscription) = subscriptions.reserve_subscription(connection_id)
else {
pending.reject(ChainHeadRpcError::ReachedLimits).await;
return
};
let Ok(sink) = pending.accept().await.map(Subscription::from) else { return };
let sub_id = read_subscription_id_as_string(&sink);
let Some(sub_data) =
reserved_subscription.insert_subscription(sub_id.clone(), with_runtime)
else {
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted", sub_id);
let _ = sink.send(&FollowEvent::<String>::Stop).await;
return
};
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription accepted", sub_id);
let mut chain_head_follow = ChainHeadFollower::new(
client,
backend,
subscriptions,
with_runtime,
sub_id.clone(),
max_lagging_distance,
subscription_buffer_cap,
);
let result = chain_head_follow.generate_events(sink, sub_data).await;
if let Err(SubscriptionManagementError::BlockDistanceTooLarge) = result {
debug!(target: LOG_TARGET, "[follow][id={:?}] All subscriptions are stopped", sub_id);
reserved_subscription.stop_all_subscriptions();
}
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription removed", sub_id);
};
self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
}
async fn chain_head_unstable_body(
&self,
ext: &Extensions,
follow_subscription: String,
hash: Block::Hash,
) -> ResponsePayload<'static, MethodResponse> {
let conn_id = ext
.get::<ConnectionId>()
.copied()
.expect("ConnectionId is always set by jsonrpsee; qed");
if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
return ResponsePayload::success(MethodResponse::LimitReached);
}
let client = self.client.clone();
let subscriptions = self.subscriptions.clone();
let executor = self.executor.clone();
let result = spawn_blocking(&self.executor, async move {
let mut block_guard = match subscriptions.lock_block(&follow_subscription, hash, 1) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) =>
return ResponsePayload::success(MethodResponse::LimitReached),
Err(SubscriptionManagementError::BlockHashAbsent) => {
return ResponsePayload::error(ChainHeadRpcError::InvalidBlock);
},
Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock),
};
let operation_id = block_guard.operation().operation_id();
let event = match client.block(hash) {
Ok(Some(signed_block)) => {
let extrinsics = signed_block
.block
.extrinsics()
.iter()
.map(|extrinsic| hex_string(&extrinsic.encode()))
.collect();
FollowEvent::<Block::Hash>::OperationBodyDone(OperationBodyDone {
operation_id: operation_id.clone(),
value: extrinsics,
})
},
Ok(None) => {
debug!(
target: LOG_TARGET,
"[body][id={:?}] Stopping subscription because hash={:?} was pruned",
&follow_subscription,
hash
);
subscriptions.remove_subscription(&follow_subscription);
return ResponsePayload::error(ChainHeadRpcError::InvalidBlock)
},
Err(error) => FollowEvent::<Block::Hash>::OperationError(OperationError {
operation_id: operation_id.clone(),
error: error.to_string(),
}),
};
let (rp, rp_fut) = method_started_response(operation_id, None);
let fut = async move {
if rp_fut.await.is_err() {
return;
}
let _ = block_guard.response_sender().send(event).await;
};
executor.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
rp
});
result
.await
.unwrap_or_else(|_| ResponsePayload::success(MethodResponse::LimitReached))
}
async fn chain_head_unstable_header(
&self,
ext: &Extensions,
follow_subscription: String,
hash: Block::Hash,
) -> Result<Option<String>, ChainHeadRpcError> {
let conn_id = ext
.get::<ConnectionId>()
.copied()
.expect("ConnectionId is always set by jsonrpsee; qed");
if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
return Ok(None);
}
let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) => return Ok(None),
Err(SubscriptionManagementError::BlockHashAbsent) => {
return Err(ChainHeadRpcError::InvalidBlock.into())
},
Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
};
let client = self.client.clone();
let result = spawn_blocking(&self.executor, async move {
let _block_guard = block_guard;
client
.header(hash)
.map(|opt_header| opt_header.map(|h| hex_string(&h.encode())))
.map_err(|err| ChainHeadRpcError::InternalError(err.to_string()))
});
result.await.unwrap_or_else(|_| Ok(None))
}
async fn chain_head_unstable_storage(
&self,
ext: &Extensions,
follow_subscription: String,
hash: Block::Hash,
items: Vec<StorageQuery<String>>,
child_trie: Option<String>,
) -> ResponsePayload<'static, MethodResponse> {
let conn_id = ext
.get::<ConnectionId>()
.copied()
.expect("ConnectionId is always set by jsonrpsee; qed");
if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
return ResponsePayload::success(MethodResponse::LimitReached);
}
let items = match items
.into_iter()
.map(|query| {
let key = StorageKey(parse_hex_param(query.key)?);
Ok(StorageQuery { key, query_type: query.query_type })
})
.collect::<Result<Vec<_>, ChainHeadRpcError>>()
{
Ok(items) => items,
Err(err) => {
return ResponsePayload::error(err);
},
};
let child_trie = match child_trie.map(|child_trie| parse_hex_param(child_trie)).transpose()
{
Ok(c) => c.map(ChildInfo::new_default_from_vec),
Err(e) => return ResponsePayload::error(e),
};
let mut block_guard =
match self.subscriptions.lock_block(&follow_subscription, hash, items.len()) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) => {
return ResponsePayload::success(MethodResponse::LimitReached);
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
return ResponsePayload::error(ChainHeadRpcError::InvalidBlock)
},
Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock),
};
let mut storage_client = ChainHeadStorage::<Client, Block, BE>::new(self.client.clone());
let (rp, rp_fut) = method_started_response(block_guard.operation().operation_id(), Some(0));
let fut = async move {
if rp_fut.await.is_err() {
return;
}
let (tx, rx) = tokio::sync::mpsc::channel(STORAGE_QUERY_BUF);
let operation_id = block_guard.operation().operation_id();
let stop_handle = block_guard.operation().stop_handle().clone();
let response_sender = block_guard.response_sender();
let _ = futures::future::join(
storage_client.generate_events(hash, items, child_trie, tx),
process_storage_items(rx, response_sender, operation_id, &stop_handle),
)
.await;
};
self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
rp
}
async fn chain_head_unstable_call(
&self,
ext: &Extensions,
follow_subscription: String,
hash: Block::Hash,
function: String,
call_parameters: String,
) -> ResponsePayload<'static, MethodResponse> {
let call_parameters = match parse_hex_param(call_parameters) {
Ok(hex) => Bytes::from(hex),
Err(err) => return ResponsePayload::error(err),
};
let conn_id = ext
.get::<ConnectionId>()
.copied()
.expect("ConnectionId is always set by jsonrpsee; qed");
if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
return ResponsePayload::success(MethodResponse::LimitReached);
}
let mut block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) => {
return ResponsePayload::success(MethodResponse::LimitReached)
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
return ResponsePayload::error(ChainHeadRpcError::InvalidBlock)
},
Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock),
};
if !block_guard.has_runtime() {
return ResponsePayload::error(ChainHeadRpcError::InvalidRuntimeCall(
"The runtime updates flag must be set".to_string(),
));
}
let operation_id = block_guard.operation().operation_id();
let client = self.client.clone();
let (rp, rp_fut) = method_started_response(operation_id.clone(), None);
let fut = async move {
if rp_fut.await.is_err() {
return
}
let event = client
.executor()
.call(hash, &function, &call_parameters, CallContext::Offchain)
.map(|result| {
FollowEvent::<Block::Hash>::OperationCallDone(OperationCallDone {
operation_id: operation_id.clone(),
output: hex_string(&result),
})
})
.unwrap_or_else(|error| {
FollowEvent::<Block::Hash>::OperationError(OperationError {
operation_id: operation_id.clone(),
error: error.to_string(),
})
});
let _ = block_guard.response_sender().send(event).await;
};
self.executor
.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
rp
}
async fn chain_head_unstable_unpin(
&self,
ext: &Extensions,
follow_subscription: String,
hash_or_hashes: ListOrValue<Block::Hash>,
) -> Result<(), ChainHeadRpcError> {
let conn_id = ext
.get::<ConnectionId>()
.copied()
.expect("ConnectionId is always set by jsonrpsee; qed");
if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
return Ok(());
}
let result = match hash_or_hashes {
ListOrValue::Value(hash) =>
self.subscriptions.unpin_blocks(&follow_subscription, [hash]),
ListOrValue::List(hashes) =>
self.subscriptions.unpin_blocks(&follow_subscription, hashes),
};
match result {
Ok(()) => Ok(()),
Err(SubscriptionManagementError::SubscriptionAbsent) => {
Ok(())
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
Err(ChainHeadRpcError::InvalidBlock)
},
Err(SubscriptionManagementError::DuplicateHashes) =>
Err(ChainHeadRpcError::InvalidDuplicateHashes),
Err(_) => Err(ChainHeadRpcError::InvalidBlock),
}
}
async fn chain_head_unstable_continue(
&self,
ext: &Extensions,
follow_subscription: String,
operation_id: String,
) -> Result<(), ChainHeadRpcError> {
let conn_id = ext
.get::<ConnectionId>()
.copied()
.expect("ConnectionId is always set by jsonrpsee; qed");
if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
return Ok(())
}
if self.subscriptions.get_operation(&follow_subscription, &operation_id).is_some() {
Err(ChainHeadRpcError::InvalidContinue.into())
} else {
Ok(())
}
}
async fn chain_head_unstable_stop_operation(
&self,
ext: &Extensions,
follow_subscription: String,
operation_id: String,
) -> Result<(), ChainHeadRpcError> {
let conn_id = ext
.get::<ConnectionId>()
.copied()
.expect("ConnectionId is always set by jsonrpsee; qed");
if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
return Ok(())
}
let Some(mut operation) =
self.subscriptions.get_operation(&follow_subscription, &operation_id)
else {
return Ok(())
};
operation.stop();
Ok(())
}
}
fn method_started_response(
operation_id: String,
discarded_items: Option<usize>,
) -> (ResponsePayload<'static, MethodResponse>, MethodResponseFuture) {
let rp = MethodResponse::Started(MethodResponseStarted { operation_id, discarded_items });
ResponsePayload::success(rp).notify_on_completion()
}
fn spawn_blocking<R>(
executor: &SubscriptionTaskExecutor,
fut: impl std::future::Future<Output = R> + Send + 'static,
) -> oneshot::Receiver<R>
where
R: Send + 'static,
{
let (tx, rx) = oneshot::channel();
let blocking_fut = async move {
let result = fut.await;
let _ = tx.send(result);
};
executor.spawn_blocking("substrate-rpc-subscription", Some("rpc"), blocking_fut.boxed());
rx
}
async fn process_storage_items<Hash>(
mut storage_query_stream: mpsc::Receiver<QueryResult>,
mut sender: FollowEventSender<Hash>,
operation_id: String,
stop_handle: &StopHandle,
) -> Result<(), FollowEventSendError> {
loop {
tokio::select! {
_ = stop_handle.stopped() => {
break;
},
maybe_storage = storage_query_stream.recv() => {
let Some(storage) = maybe_storage else {
break;
};
let item = match storage {
QueryResult::Err(error) => {
return sender
.send(FollowEvent::OperationError(OperationError { operation_id, error }))
.await
}
QueryResult::Ok(Some(v)) => v,
QueryResult::Ok(None) => continue,
};
sender
.send(FollowEvent::OperationStorageItems(OperationStorageItems {
operation_id: operation_id.clone(),
items: vec![item],
})).await?;
},
}
}
sender
.send(FollowEvent::OperationStorageDone(OperationId { operation_id }))
.await?;
Ok(())
}