use super::{
chain_head_storage::ChainHeadStorage,
event::{MethodResponseStarted, OperationBodyDone, OperationCallDone},
};
use crate::{
chain_head::{
api::ChainHeadApiServer,
chain_head_follow::ChainHeadFollower,
error::Error as ChainHeadRpcError,
event::{FollowEvent, MethodResponse, OperationError},
subscription::{SubscriptionManagement, SubscriptionManagementError},
},
common::events::StorageQuery,
hex_string, SubscriptionTaskExecutor,
};
use codec::Encode;
use futures::{channel::oneshot, future::FutureExt};
use jsonrpsee::{
core::async_trait, server::ResponsePayload, types::SubscriptionId, ConnectionId, Extensions,
MethodResponseFuture, PendingSubscriptionSink,
};
use log::debug;
use sc_client_api::{
Backend, BlockBackend, BlockchainEvents, CallExecutor, ChildInfo, ExecutorProvider, StorageKey,
StorageProvider,
};
use sc_rpc::utils::Subscription;
use sp_api::CallApiAt;
use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
use sp_core::{traits::CallContext, Bytes};
use sp_rpc::list::ListOrValue;
use sp_runtime::traits::Block as BlockT;
use std::{marker::PhantomData, sync::Arc, time::Duration};
pub(crate) const LOG_TARGET: &str = "rpc-spec-v2";
pub struct ChainHeadConfig {
pub global_max_pinned_blocks: usize,
pub subscription_max_pinned_duration: Duration,
pub subscription_max_ongoing_operations: usize,
pub max_lagging_distance: usize,
pub operation_max_storage_items: usize,
pub max_follow_subscriptions_per_connection: usize,
}
pub(crate) const MAX_PINNED_BLOCKS: usize = 512;
const MAX_PINNED_DURATION: Duration = Duration::from_secs(60);
const MAX_ONGOING_OPERATIONS: usize = 16;
const MAX_STORAGE_ITER_ITEMS: usize = 5;
const MAX_LAGGING_DISTANCE: usize = 128;
const MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION: usize = 4;
impl Default for ChainHeadConfig {
fn default() -> Self {
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: MAX_PINNED_DURATION,
subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS,
max_lagging_distance: MAX_LAGGING_DISTANCE,
operation_max_storage_items: MAX_STORAGE_ITER_ITEMS,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
}
}
}
pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
client: Arc<Client>,
backend: Arc<BE>,
executor: SubscriptionTaskExecutor,
subscriptions: SubscriptionManagement<Block, BE>,
operation_max_storage_items: usize,
max_lagging_distance: usize,
_phantom: PhantomData<Block>,
}
impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
pub fn new(
client: Arc<Client>,
backend: Arc<BE>,
executor: SubscriptionTaskExecutor,
config: ChainHeadConfig,
) -> Self {
Self {
client,
backend: backend.clone(),
executor,
subscriptions: SubscriptionManagement::new(
config.global_max_pinned_blocks,
config.subscription_max_pinned_duration,
config.subscription_max_ongoing_operations,
config.max_follow_subscriptions_per_connection,
backend,
),
operation_max_storage_items: config.operation_max_storage_items,
max_lagging_distance: config.max_lagging_distance,
_phantom: PhantomData,
}
}
}
pub fn read_subscription_id_as_string(sink: &Subscription) -> String {
match sink.subscription_id() {
SubscriptionId::Num(n) => n.to_string(),
SubscriptionId::Str(s) => s.into_owned().into(),
}
}
fn parse_hex_param(param: String) -> Result<Vec<u8>, ChainHeadRpcError> {
if param.is_empty() {
return Ok(Default::default())
}
match array_bytes::hex2bytes(¶m) {
Ok(bytes) => Ok(bytes),
Err(_) => Err(ChainHeadRpcError::InvalidParam(param)),
}
}
#[async_trait]
impl<BE, Block, Client> ChainHeadApiServer<Block::Hash> for ChainHead<BE, Block, Client>
where
Block: BlockT + 'static,
Block::Header: Unpin,
BE: Backend<Block> + 'static,
Client: BlockBackend<Block>
+ ExecutorProvider<Block>
+ HeaderBackend<Block>
+ HeaderMetadata<Block, Error = BlockChainError>
+ BlockchainEvents<Block>
+ CallApiAt<Block>
+ StorageProvider<Block, BE>
+ 'static,
{
fn chain_head_unstable_follow(&self, pending: PendingSubscriptionSink, with_runtime: bool) {
let subscriptions = self.subscriptions.clone();
let backend = self.backend.clone();
let client = self.client.clone();
let max_lagging_distance = self.max_lagging_distance;
let fut = async move {
let connection_id = pending.connection_id();
let Some(mut reserved_subscription) = subscriptions.reserve_subscription(connection_id)
else {
pending.reject(ChainHeadRpcError::ReachedLimits).await;
return
};
let Ok(sink) = pending.accept().await.map(Subscription::from) else { return };
let sub_id = read_subscription_id_as_string(&sink);
let Some(sub_data) =
reserved_subscription.insert_subscription(sub_id.clone(), with_runtime)
else {
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted", sub_id);
let _ = sink.send(&FollowEvent::<String>::Stop).await;
return
};
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription accepted", sub_id);
let mut chain_head_follow = ChainHeadFollower::new(
client,
backend,
subscriptions,
with_runtime,
sub_id.clone(),
max_lagging_distance,
);
let result = chain_head_follow.generate_events(sink, sub_data).await;
if let Err(SubscriptionManagementError::BlockDistanceTooLarge) = result {
debug!(target: LOG_TARGET, "[follow][id={:?}] All subscriptions are stopped", sub_id);
reserved_subscription.stop_all_subscriptions();
}
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription removed", sub_id);
};
self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
}
async fn chain_head_unstable_body(
&self,
ext: &Extensions,
follow_subscription: String,
hash: Block::Hash,
) -> ResponsePayload<'static, MethodResponse> {
let conn_id = ext
.get::<ConnectionId>()
.copied()
.expect("ConnectionId is always set by jsonrpsee; qed");
if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
return ResponsePayload::success(MethodResponse::LimitReached);
}
let client = self.client.clone();
let subscriptions = self.subscriptions.clone();
let executor = self.executor.clone();
let result = spawn_blocking(&self.executor, async move {
let mut block_guard = match subscriptions.lock_block(&follow_subscription, hash, 1) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) =>
return ResponsePayload::success(MethodResponse::LimitReached),
Err(SubscriptionManagementError::BlockHashAbsent) => {
return ResponsePayload::error(ChainHeadRpcError::InvalidBlock);
},
Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock),
};
let operation_id = block_guard.operation().operation_id();
let event = match client.block(hash) {
Ok(Some(signed_block)) => {
let extrinsics = signed_block
.block
.extrinsics()
.iter()
.map(|extrinsic| hex_string(&extrinsic.encode()))
.collect();
FollowEvent::<Block::Hash>::OperationBodyDone(OperationBodyDone {
operation_id: operation_id.clone(),
value: extrinsics,
})
},
Ok(None) => {
debug!(
target: LOG_TARGET,
"[body][id={:?}] Stopping subscription because hash={:?} was pruned",
&follow_subscription,
hash
);
subscriptions.remove_subscription(&follow_subscription);
return ResponsePayload::error(ChainHeadRpcError::InvalidBlock)
},
Err(error) => FollowEvent::<Block::Hash>::OperationError(OperationError {
operation_id: operation_id.clone(),
error: error.to_string(),
}),
};
let (rp, rp_fut) = method_started_response(operation_id, None);
let fut = async move {
if rp_fut.await.is_err() {
return;
}
let _ = block_guard.response_sender().unbounded_send(event);
};
executor.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
rp
});
result
.await
.unwrap_or_else(|_| ResponsePayload::success(MethodResponse::LimitReached))
}
async fn chain_head_unstable_header(
&self,
ext: &Extensions,
follow_subscription: String,
hash: Block::Hash,
) -> Result<Option<String>, ChainHeadRpcError> {
let conn_id = ext
.get::<ConnectionId>()
.copied()
.expect("ConnectionId is always set by jsonrpsee; qed");
if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
return Ok(None);
}
let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) => return Ok(None),
Err(SubscriptionManagementError::BlockHashAbsent) => {
return Err(ChainHeadRpcError::InvalidBlock.into())
},
Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
};
let client = self.client.clone();
let result = spawn_blocking(&self.executor, async move {
let _block_guard = block_guard;
client
.header(hash)
.map(|opt_header| opt_header.map(|h| hex_string(&h.encode())))
.map_err(|err| ChainHeadRpcError::InternalError(err.to_string()))
});
result.await.unwrap_or_else(|_| Ok(None))
}
async fn chain_head_unstable_storage(
&self,
ext: &Extensions,
follow_subscription: String,
hash: Block::Hash,
items: Vec<StorageQuery<String>>,
child_trie: Option<String>,
) -> ResponsePayload<'static, MethodResponse> {
let conn_id = ext
.get::<ConnectionId>()
.copied()
.expect("ConnectionId is always set by jsonrpsee; qed");
if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
return ResponsePayload::success(MethodResponse::LimitReached);
}
let items = match items
.into_iter()
.map(|query| {
let key = StorageKey(parse_hex_param(query.key)?);
Ok(StorageQuery { key, query_type: query.query_type })
})
.collect::<Result<Vec<_>, ChainHeadRpcError>>()
{
Ok(items) => items,
Err(err) => {
return ResponsePayload::error(err);
},
};
let child_trie = match child_trie.map(|child_trie| parse_hex_param(child_trie)).transpose()
{
Ok(c) => c.map(ChildInfo::new_default_from_vec),
Err(e) => return ResponsePayload::error(e),
};
let mut block_guard =
match self.subscriptions.lock_block(&follow_subscription, hash, items.len()) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) => {
return ResponsePayload::success(MethodResponse::LimitReached);
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
return ResponsePayload::error(ChainHeadRpcError::InvalidBlock)
},
Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock),
};
let mut storage_client = ChainHeadStorage::<Client, Block, BE>::new(
self.client.clone(),
self.operation_max_storage_items,
);
let operation = block_guard.operation();
let operation_id = operation.operation_id();
let num_operations = operation.num_reserved();
let discarded = items.len().saturating_sub(num_operations);
let mut items = items;
items.truncate(num_operations);
let (rp, rp_fut) = method_started_response(operation_id, Some(discarded));
let fut = async move {
if rp_fut.await.is_err() {
return;
}
storage_client.generate_events(block_guard, hash, items, child_trie).await;
};
self.executor
.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
rp
}
async fn chain_head_unstable_call(
&self,
ext: &Extensions,
follow_subscription: String,
hash: Block::Hash,
function: String,
call_parameters: String,
) -> ResponsePayload<'static, MethodResponse> {
let call_parameters = match parse_hex_param(call_parameters) {
Ok(hex) => Bytes::from(hex),
Err(err) => return ResponsePayload::error(err),
};
let conn_id = ext
.get::<ConnectionId>()
.copied()
.expect("ConnectionId is always set by jsonrpsee; qed");
if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
return ResponsePayload::success(MethodResponse::LimitReached);
}
let mut block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) => {
return ResponsePayload::success(MethodResponse::LimitReached)
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
return ResponsePayload::error(ChainHeadRpcError::InvalidBlock)
},
Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock),
};
if !block_guard.has_runtime() {
return ResponsePayload::error(ChainHeadRpcError::InvalidRuntimeCall(
"The runtime updates flag must be set".to_string(),
));
}
let operation_id = block_guard.operation().operation_id();
let client = self.client.clone();
let (rp, rp_fut) = method_started_response(operation_id.clone(), None);
let fut = async move {
if rp_fut.await.is_err() {
return
}
let event = client
.executor()
.call(hash, &function, &call_parameters, CallContext::Offchain)
.map(|result| {
FollowEvent::<Block::Hash>::OperationCallDone(OperationCallDone {
operation_id: operation_id.clone(),
output: hex_string(&result),
})
})
.unwrap_or_else(|error| {
FollowEvent::<Block::Hash>::OperationError(OperationError {
operation_id: operation_id.clone(),
error: error.to_string(),
})
});
let _ = block_guard.response_sender().unbounded_send(event);
};
self.executor
.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
rp
}
async fn chain_head_unstable_unpin(
&self,
ext: &Extensions,
follow_subscription: String,
hash_or_hashes: ListOrValue<Block::Hash>,
) -> Result<(), ChainHeadRpcError> {
let conn_id = ext
.get::<ConnectionId>()
.copied()
.expect("ConnectionId is always set by jsonrpsee; qed");
if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
return Ok(());
}
let result = match hash_or_hashes {
ListOrValue::Value(hash) =>
self.subscriptions.unpin_blocks(&follow_subscription, [hash]),
ListOrValue::List(hashes) =>
self.subscriptions.unpin_blocks(&follow_subscription, hashes),
};
match result {
Ok(()) => Ok(()),
Err(SubscriptionManagementError::SubscriptionAbsent) => {
Ok(())
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
Err(ChainHeadRpcError::InvalidBlock)
},
Err(SubscriptionManagementError::DuplicateHashes) =>
Err(ChainHeadRpcError::InvalidDuplicateHashes),
Err(_) => Err(ChainHeadRpcError::InvalidBlock),
}
}
async fn chain_head_unstable_continue(
&self,
ext: &Extensions,
follow_subscription: String,
operation_id: String,
) -> Result<(), ChainHeadRpcError> {
let conn_id = ext
.get::<ConnectionId>()
.copied()
.expect("ConnectionId is always set by jsonrpsee; qed");
if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
return Ok(())
}
let Some(operation) = self.subscriptions.get_operation(&follow_subscription, &operation_id)
else {
return Ok(())
};
if !operation.submit_continue() {
Err(ChainHeadRpcError::InvalidContinue.into())
} else {
Ok(())
}
}
async fn chain_head_unstable_stop_operation(
&self,
ext: &Extensions,
follow_subscription: String,
operation_id: String,
) -> Result<(), ChainHeadRpcError> {
let conn_id = ext
.get::<ConnectionId>()
.copied()
.expect("ConnectionId is always set by jsonrpsee; qed");
if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
return Ok(())
}
let Some(operation) = self.subscriptions.get_operation(&follow_subscription, &operation_id)
else {
return Ok(())
};
operation.stop_operation();
Ok(())
}
}
fn method_started_response(
operation_id: String,
discarded_items: Option<usize>,
) -> (ResponsePayload<'static, MethodResponse>, MethodResponseFuture) {
let rp = MethodResponse::Started(MethodResponseStarted { operation_id, discarded_items });
ResponsePayload::success(rp).notify_on_completion()
}
fn spawn_blocking<R>(
executor: &SubscriptionTaskExecutor,
fut: impl std::future::Future<Output = R> + Send + 'static,
) -> oneshot::Receiver<R>
where
R: Send + 'static,
{
let (tx, rx) = oneshot::channel();
let blocking_fut = async move {
let result = fut.await;
let _ = tx.send(result);
};
executor.spawn_blocking("substrate-rpc-subscription", Some("rpc"), blocking_fut.boxed());
rx
}