1use super::{
22 chain_head_storage::ChainHeadStorage,
23 event::{MethodResponseStarted, OperationBodyDone, OperationCallDone},
24};
25use crate::{
26 chain_head::{
27 api::ChainHeadApiServer,
28 chain_head_follow::ChainHeadFollower,
29 error::Error as ChainHeadRpcError,
30 event::{FollowEvent, MethodResponse, OperationError},
31 subscription::{SubscriptionManagement, SubscriptionManagementError},
32 },
33 common::events::StorageQuery,
34 hex_string, SubscriptionTaskExecutor,
35};
36use codec::Encode;
37use futures::{channel::oneshot, future::FutureExt};
38use jsonrpsee::{
39 core::async_trait, server::ResponsePayload, types::SubscriptionId, ConnectionId, Extensions,
40 MethodResponseFuture, PendingSubscriptionSink,
41};
42use log::debug;
43use sc_client_api::{
44 Backend, BlockBackend, BlockchainEvents, CallExecutor, ChildInfo, ExecutorProvider, StorageKey,
45 StorageProvider,
46};
47use sc_rpc::utils::Subscription;
48use sp_api::CallApiAt;
49use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
50use sp_core::{traits::CallContext, Bytes};
51use sp_rpc::list::ListOrValue;
52use sp_runtime::traits::Block as BlockT;
53use std::{marker::PhantomData, sync::Arc, time::Duration};
54
55pub(crate) const LOG_TARGET: &str = "rpc-spec-v2";
56
57pub struct ChainHeadConfig {
59 pub global_max_pinned_blocks: usize,
61 pub subscription_max_pinned_duration: Duration,
63 pub subscription_max_ongoing_operations: usize,
65 pub max_lagging_distance: usize,
68 pub operation_max_storage_items: usize,
71 pub max_follow_subscriptions_per_connection: usize,
73}
74
75pub(crate) const MAX_PINNED_BLOCKS: usize = 512;
79
80const MAX_PINNED_DURATION: Duration = Duration::from_secs(60);
85
86const MAX_ONGOING_OPERATIONS: usize = 16;
89
90const MAX_STORAGE_ITER_ITEMS: usize = 5;
93
94const MAX_LAGGING_DISTANCE: usize = 128;
97
98const MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION: usize = 4;
100
101impl Default for ChainHeadConfig {
102 fn default() -> Self {
103 ChainHeadConfig {
104 global_max_pinned_blocks: MAX_PINNED_BLOCKS,
105 subscription_max_pinned_duration: MAX_PINNED_DURATION,
106 subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS,
107 max_lagging_distance: MAX_LAGGING_DISTANCE,
108 operation_max_storage_items: MAX_STORAGE_ITER_ITEMS,
109 max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
110 }
111 }
112}
113
114pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
116 client: Arc<Client>,
118 backend: Arc<BE>,
120 executor: SubscriptionTaskExecutor,
122 subscriptions: SubscriptionManagement<Block, BE>,
124 operation_max_storage_items: usize,
127 max_lagging_distance: usize,
130 _phantom: PhantomData<Block>,
132}
133
134impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
135 pub fn new(
137 client: Arc<Client>,
138 backend: Arc<BE>,
139 executor: SubscriptionTaskExecutor,
140 config: ChainHeadConfig,
141 ) -> Self {
142 Self {
143 client,
144 backend: backend.clone(),
145 executor,
146 subscriptions: SubscriptionManagement::new(
147 config.global_max_pinned_blocks,
148 config.subscription_max_pinned_duration,
149 config.subscription_max_ongoing_operations,
150 config.max_follow_subscriptions_per_connection,
151 backend,
152 ),
153 operation_max_storage_items: config.operation_max_storage_items,
154 max_lagging_distance: config.max_lagging_distance,
155 _phantom: PhantomData,
156 }
157 }
158}
159
160pub fn read_subscription_id_as_string(sink: &Subscription) -> String {
162 match sink.subscription_id() {
163 SubscriptionId::Num(n) => n.to_string(),
164 SubscriptionId::Str(s) => s.into_owned().into(),
165 }
166}
167
168fn parse_hex_param(param: String) -> Result<Vec<u8>, ChainHeadRpcError> {
172 if param.is_empty() {
174 return Ok(Default::default())
175 }
176
177 match array_bytes::hex2bytes(¶m) {
178 Ok(bytes) => Ok(bytes),
179 Err(_) => Err(ChainHeadRpcError::InvalidParam(param)),
180 }
181}
182
183#[async_trait]
184impl<BE, Block, Client> ChainHeadApiServer<Block::Hash> for ChainHead<BE, Block, Client>
185where
186 Block: BlockT + 'static,
187 Block::Header: Unpin,
188 BE: Backend<Block> + 'static,
189 Client: BlockBackend<Block>
190 + ExecutorProvider<Block>
191 + HeaderBackend<Block>
192 + HeaderMetadata<Block, Error = BlockChainError>
193 + BlockchainEvents<Block>
194 + CallApiAt<Block>
195 + StorageProvider<Block, BE>
196 + 'static,
197{
198 fn chain_head_unstable_follow(&self, pending: PendingSubscriptionSink, with_runtime: bool) {
199 let subscriptions = self.subscriptions.clone();
200 let backend = self.backend.clone();
201 let client = self.client.clone();
202 let max_lagging_distance = self.max_lagging_distance;
203
204 let fut = async move {
205 let connection_id = pending.connection_id();
207 let Some(mut reserved_subscription) = subscriptions.reserve_subscription(connection_id)
211 else {
212 pending.reject(ChainHeadRpcError::ReachedLimits).await;
213 return
214 };
215
216 let Ok(sink) = pending.accept().await.map(Subscription::from) else { return };
217
218 let sub_id = read_subscription_id_as_string(&sink);
219 let Some(sub_data) =
221 reserved_subscription.insert_subscription(sub_id.clone(), with_runtime)
222 else {
223 debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted", sub_id);
226 let _ = sink.send(&FollowEvent::<String>::Stop).await;
227 return
228 };
229 debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription accepted", sub_id);
230
231 let mut chain_head_follow = ChainHeadFollower::new(
232 client,
233 backend,
234 subscriptions,
235 with_runtime,
236 sub_id.clone(),
237 max_lagging_distance,
238 );
239 let result = chain_head_follow.generate_events(sink, sub_data).await;
240 if let Err(SubscriptionManagementError::BlockDistanceTooLarge) = result {
241 debug!(target: LOG_TARGET, "[follow][id={:?}] All subscriptions are stopped", sub_id);
242 reserved_subscription.stop_all_subscriptions();
243 }
244
245 debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription removed", sub_id);
246 };
247
248 self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
249 }
250
251 async fn chain_head_unstable_body(
252 &self,
253 ext: &Extensions,
254 follow_subscription: String,
255 hash: Block::Hash,
256 ) -> ResponsePayload<'static, MethodResponse> {
257 let conn_id = ext
258 .get::<ConnectionId>()
259 .copied()
260 .expect("ConnectionId is always set by jsonrpsee; qed");
261
262 if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
263 return ResponsePayload::success(MethodResponse::LimitReached);
266 }
267
268 let client = self.client.clone();
269 let subscriptions = self.subscriptions.clone();
270 let executor = self.executor.clone();
271
272 let result = spawn_blocking(&self.executor, async move {
273 let mut block_guard = match subscriptions.lock_block(&follow_subscription, hash, 1) {
274 Ok(block) => block,
275 Err(SubscriptionManagementError::SubscriptionAbsent) |
276 Err(SubscriptionManagementError::ExceededLimits) =>
277 return ResponsePayload::success(MethodResponse::LimitReached),
278 Err(SubscriptionManagementError::BlockHashAbsent) => {
279 return ResponsePayload::error(ChainHeadRpcError::InvalidBlock);
281 },
282 Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock),
283 };
284
285 let operation_id = block_guard.operation().operation_id();
286
287 let event = match client.block(hash) {
288 Ok(Some(signed_block)) => {
289 let extrinsics = signed_block
290 .block
291 .extrinsics()
292 .iter()
293 .map(|extrinsic| hex_string(&extrinsic.encode()))
294 .collect();
295 FollowEvent::<Block::Hash>::OperationBodyDone(OperationBodyDone {
296 operation_id: operation_id.clone(),
297 value: extrinsics,
298 })
299 },
300 Ok(None) => {
301 debug!(
303 target: LOG_TARGET,
304 "[body][id={:?}] Stopping subscription because hash={:?} was pruned",
305 &follow_subscription,
306 hash
307 );
308 subscriptions.remove_subscription(&follow_subscription);
309 return ResponsePayload::error(ChainHeadRpcError::InvalidBlock)
310 },
311 Err(error) => FollowEvent::<Block::Hash>::OperationError(OperationError {
312 operation_id: operation_id.clone(),
313 error: error.to_string(),
314 }),
315 };
316
317 let (rp, rp_fut) = method_started_response(operation_id, None);
318 let fut = async move {
319 if rp_fut.await.is_err() {
322 return;
323 }
324
325 let _ = block_guard.response_sender().unbounded_send(event);
326 };
327 executor.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
328
329 rp
330 });
331
332 result
333 .await
334 .unwrap_or_else(|_| ResponsePayload::success(MethodResponse::LimitReached))
335 }
336
337 async fn chain_head_unstable_header(
338 &self,
339 ext: &Extensions,
340 follow_subscription: String,
341 hash: Block::Hash,
342 ) -> Result<Option<String>, ChainHeadRpcError> {
343 let conn_id = ext
344 .get::<ConnectionId>()
345 .copied()
346 .expect("ConnectionId is always set by jsonrpsee; qed");
347
348 if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
349 return Ok(None);
350 }
351
352 let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
353 Ok(block) => block,
354 Err(SubscriptionManagementError::SubscriptionAbsent) |
355 Err(SubscriptionManagementError::ExceededLimits) => return Ok(None),
356 Err(SubscriptionManagementError::BlockHashAbsent) => {
357 return Err(ChainHeadRpcError::InvalidBlock.into())
359 },
360 Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
361 };
362
363 let client = self.client.clone();
364 let result = spawn_blocking(&self.executor, async move {
365 let _block_guard = block_guard;
366
367 client
368 .header(hash)
369 .map(|opt_header| opt_header.map(|h| hex_string(&h.encode())))
370 .map_err(|err| ChainHeadRpcError::InternalError(err.to_string()))
371 });
372 result.await.unwrap_or_else(|_| Ok(None))
373 }
374
375 async fn chain_head_unstable_storage(
376 &self,
377 ext: &Extensions,
378 follow_subscription: String,
379 hash: Block::Hash,
380 items: Vec<StorageQuery<String>>,
381 child_trie: Option<String>,
382 ) -> ResponsePayload<'static, MethodResponse> {
383 let conn_id = ext
384 .get::<ConnectionId>()
385 .copied()
386 .expect("ConnectionId is always set by jsonrpsee; qed");
387
388 if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
389 return ResponsePayload::success(MethodResponse::LimitReached);
392 }
393
394 let items = match items
396 .into_iter()
397 .map(|query| {
398 let key = StorageKey(parse_hex_param(query.key)?);
399 Ok(StorageQuery { key, query_type: query.query_type })
400 })
401 .collect::<Result<Vec<_>, ChainHeadRpcError>>()
402 {
403 Ok(items) => items,
404 Err(err) => {
405 return ResponsePayload::error(err);
406 },
407 };
408
409 let child_trie = match child_trie.map(|child_trie| parse_hex_param(child_trie)).transpose()
410 {
411 Ok(c) => c.map(ChildInfo::new_default_from_vec),
412 Err(e) => return ResponsePayload::error(e),
413 };
414
415 let mut block_guard =
416 match self.subscriptions.lock_block(&follow_subscription, hash, items.len()) {
417 Ok(block) => block,
418 Err(SubscriptionManagementError::SubscriptionAbsent) |
419 Err(SubscriptionManagementError::ExceededLimits) => {
420 return ResponsePayload::success(MethodResponse::LimitReached);
421 },
422 Err(SubscriptionManagementError::BlockHashAbsent) => {
423 return ResponsePayload::error(ChainHeadRpcError::InvalidBlock)
425 },
426 Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock),
427 };
428
429 let mut storage_client = ChainHeadStorage::<Client, Block, BE>::new(
430 self.client.clone(),
431 self.operation_max_storage_items,
432 );
433 let operation = block_guard.operation();
434 let operation_id = operation.operation_id();
435
436 let num_operations = operation.num_reserved();
438 let discarded = items.len().saturating_sub(num_operations);
439 let mut items = items;
440 items.truncate(num_operations);
441
442 let (rp, rp_fut) = method_started_response(operation_id, Some(discarded));
443 let fut = async move {
444 if rp_fut.await.is_err() {
447 return;
448 }
449
450 storage_client.generate_events(block_guard, hash, items, child_trie).await;
451 };
452 self.executor
453 .spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
454
455 rp
456 }
457
458 async fn chain_head_unstable_call(
459 &self,
460 ext: &Extensions,
461 follow_subscription: String,
462 hash: Block::Hash,
463 function: String,
464 call_parameters: String,
465 ) -> ResponsePayload<'static, MethodResponse> {
466 let call_parameters = match parse_hex_param(call_parameters) {
467 Ok(hex) => Bytes::from(hex),
468 Err(err) => return ResponsePayload::error(err),
469 };
470
471 let conn_id = ext
472 .get::<ConnectionId>()
473 .copied()
474 .expect("ConnectionId is always set by jsonrpsee; qed");
475
476 if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
477 return ResponsePayload::success(MethodResponse::LimitReached);
480 }
481
482 let mut block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
483 Ok(block) => block,
484 Err(SubscriptionManagementError::SubscriptionAbsent) |
485 Err(SubscriptionManagementError::ExceededLimits) => {
486 return ResponsePayload::success(MethodResponse::LimitReached)
488 },
489 Err(SubscriptionManagementError::BlockHashAbsent) => {
490 return ResponsePayload::error(ChainHeadRpcError::InvalidBlock)
492 },
493 Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock),
494 };
495
496 if !block_guard.has_runtime() {
498 return ResponsePayload::error(ChainHeadRpcError::InvalidRuntimeCall(
499 "The runtime updates flag must be set".to_string(),
500 ));
501 }
502
503 let operation_id = block_guard.operation().operation_id();
504 let client = self.client.clone();
505
506 let (rp, rp_fut) = method_started_response(operation_id.clone(), None);
507 let fut = async move {
508 if rp_fut.await.is_err() {
511 return
512 }
513
514 let event = client
515 .executor()
516 .call(hash, &function, &call_parameters, CallContext::Offchain)
517 .map(|result| {
518 FollowEvent::<Block::Hash>::OperationCallDone(OperationCallDone {
519 operation_id: operation_id.clone(),
520 output: hex_string(&result),
521 })
522 })
523 .unwrap_or_else(|error| {
524 FollowEvent::<Block::Hash>::OperationError(OperationError {
525 operation_id: operation_id.clone(),
526 error: error.to_string(),
527 })
528 });
529
530 let _ = block_guard.response_sender().unbounded_send(event);
531 };
532 self.executor
533 .spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
534
535 rp
536 }
537
538 async fn chain_head_unstable_unpin(
539 &self,
540 ext: &Extensions,
541 follow_subscription: String,
542 hash_or_hashes: ListOrValue<Block::Hash>,
543 ) -> Result<(), ChainHeadRpcError> {
544 let conn_id = ext
545 .get::<ConnectionId>()
546 .copied()
547 .expect("ConnectionId is always set by jsonrpsee; qed");
548
549 if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
550 return Ok(());
551 }
552
553 let result = match hash_or_hashes {
554 ListOrValue::Value(hash) =>
555 self.subscriptions.unpin_blocks(&follow_subscription, [hash]),
556 ListOrValue::List(hashes) =>
557 self.subscriptions.unpin_blocks(&follow_subscription, hashes),
558 };
559
560 match result {
561 Ok(()) => Ok(()),
562 Err(SubscriptionManagementError::SubscriptionAbsent) => {
563 Ok(())
565 },
566 Err(SubscriptionManagementError::BlockHashAbsent) => {
567 Err(ChainHeadRpcError::InvalidBlock)
569 },
570 Err(SubscriptionManagementError::DuplicateHashes) =>
571 Err(ChainHeadRpcError::InvalidDuplicateHashes),
572 Err(_) => Err(ChainHeadRpcError::InvalidBlock),
573 }
574 }
575
576 async fn chain_head_unstable_continue(
577 &self,
578 ext: &Extensions,
579 follow_subscription: String,
580 operation_id: String,
581 ) -> Result<(), ChainHeadRpcError> {
582 let conn_id = ext
583 .get::<ConnectionId>()
584 .copied()
585 .expect("ConnectionId is always set by jsonrpsee; qed");
586
587 if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
588 return Ok(())
589 }
590
591 let Some(operation) = self.subscriptions.get_operation(&follow_subscription, &operation_id)
592 else {
593 return Ok(())
594 };
595
596 if !operation.submit_continue() {
597 Err(ChainHeadRpcError::InvalidContinue.into())
599 } else {
600 Ok(())
601 }
602 }
603
604 async fn chain_head_unstable_stop_operation(
605 &self,
606 ext: &Extensions,
607 follow_subscription: String,
608 operation_id: String,
609 ) -> Result<(), ChainHeadRpcError> {
610 let conn_id = ext
611 .get::<ConnectionId>()
612 .copied()
613 .expect("ConnectionId is always set by jsonrpsee; qed");
614
615 if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
616 return Ok(())
617 }
618
619 let Some(operation) = self.subscriptions.get_operation(&follow_subscription, &operation_id)
620 else {
621 return Ok(())
622 };
623
624 operation.stop_operation();
625
626 Ok(())
627 }
628}
629
630fn method_started_response(
631 operation_id: String,
632 discarded_items: Option<usize>,
633) -> (ResponsePayload<'static, MethodResponse>, MethodResponseFuture) {
634 let rp = MethodResponse::Started(MethodResponseStarted { operation_id, discarded_items });
635 ResponsePayload::success(rp).notify_on_completion()
636}
637
638fn spawn_blocking<R>(
642 executor: &SubscriptionTaskExecutor,
643 fut: impl std::future::Future<Output = R> + Send + 'static,
644) -> oneshot::Receiver<R>
645where
646 R: Send + 'static,
647{
648 let (tx, rx) = oneshot::channel();
649
650 let blocking_fut = async move {
651 let result = fut.await;
652 let _ = tx.send(result);
654 };
655
656 executor.spawn_blocking("substrate-rpc-subscription", Some("rpc"), blocking_fut.boxed());
657
658 rx
659}