1use super::{
22 chain_head_storage::ChainHeadStorage,
23 event::{MethodResponseStarted, OperationBodyDone, OperationCallDone},
24};
25use crate::{
26 chain_head::{
27 api::ChainHeadApiServer,
28 chain_head_follow::ChainHeadFollower,
29 error::Error as ChainHeadRpcError,
30 event::{FollowEvent, MethodResponse, OperationError, OperationId, OperationStorageItems},
31 subscription::{StopHandle, SubscriptionManagement, SubscriptionManagementError},
32 FollowEventSendError, FollowEventSender,
33 },
34 common::{events::StorageQuery, storage::QueryResult},
35 hex_string, SubscriptionTaskExecutor,
36};
37use codec::Encode;
38use futures::{channel::oneshot, future::FutureExt, SinkExt};
39use jsonrpsee::{
40 core::async_trait, server::ResponsePayload, types::SubscriptionId, ConnectionId, Extensions,
41 MethodResponseFuture, PendingSubscriptionSink,
42};
43use log::debug;
44use sc_client_api::{
45 Backend, BlockBackend, BlockchainEvents, CallExecutor, ChildInfo, ExecutorProvider, StorageKey,
46 StorageProvider,
47};
48use sc_rpc::utils::Subscription;
49use sp_api::CallApiAt;
50use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
51use sp_core::{traits::CallContext, Bytes};
52use sp_rpc::list::ListOrValue;
53use sp_runtime::traits::Block as BlockT;
54use std::{marker::PhantomData, sync::Arc, time::Duration};
55use tokio::sync::mpsc;
56
57pub(crate) const LOG_TARGET: &str = "rpc-spec-v2";
58
59const STORAGE_QUERY_BUF: usize = 16;
64
65pub struct ChainHeadConfig {
67 pub global_max_pinned_blocks: usize,
69 pub subscription_max_pinned_duration: Duration,
71 pub subscription_max_ongoing_operations: usize,
73 pub max_lagging_distance: usize,
76 pub max_follow_subscriptions_per_connection: usize,
78 pub subscription_buffer_cap: usize,
80}
81
82pub(crate) const MAX_PINNED_BLOCKS: usize = 512;
86
87const MAX_PINNED_DURATION: Duration = Duration::from_secs(60);
92
93const MAX_ONGOING_OPERATIONS: usize = 16;
96
97const MAX_LAGGING_DISTANCE: usize = 128;
100
101const MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION: usize = 4;
103
104impl Default for ChainHeadConfig {
105 fn default() -> Self {
106 ChainHeadConfig {
107 global_max_pinned_blocks: MAX_PINNED_BLOCKS,
108 subscription_max_pinned_duration: MAX_PINNED_DURATION,
109 subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS,
110 max_lagging_distance: MAX_LAGGING_DISTANCE,
111 max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
112 subscription_buffer_cap: MAX_PINNED_BLOCKS,
113 }
114 }
115}
116
117pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
119 client: Arc<Client>,
121 backend: Arc<BE>,
123 executor: SubscriptionTaskExecutor,
125 subscriptions: SubscriptionManagement<Block, BE>,
127 max_lagging_distance: usize,
130 _phantom: PhantomData<Block>,
132 subscription_buffer_cap: usize,
134}
135
136impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
137 pub fn new(
139 client: Arc<Client>,
140 backend: Arc<BE>,
141 executor: SubscriptionTaskExecutor,
142 config: ChainHeadConfig,
143 ) -> Self {
144 Self {
145 client,
146 backend: backend.clone(),
147 executor,
148 subscriptions: SubscriptionManagement::new(
149 config.global_max_pinned_blocks,
150 config.subscription_max_pinned_duration,
151 config.subscription_max_ongoing_operations,
152 config.max_follow_subscriptions_per_connection,
153 backend,
154 ),
155 max_lagging_distance: config.max_lagging_distance,
156 subscription_buffer_cap: config.subscription_buffer_cap,
157 _phantom: PhantomData,
158 }
159 }
160}
161
162pub fn read_subscription_id_as_string(sink: &Subscription) -> String {
164 match sink.subscription_id() {
165 SubscriptionId::Num(n) => n.to_string(),
166 SubscriptionId::Str(s) => s.into_owned().into(),
167 }
168}
169
170fn parse_hex_param(param: String) -> Result<Vec<u8>, ChainHeadRpcError> {
174 if param.is_empty() {
176 return Ok(Default::default());
177 }
178
179 match array_bytes::hex2bytes(¶m) {
180 Ok(bytes) => Ok(bytes),
181 Err(_) => Err(ChainHeadRpcError::InvalidParam(param)),
182 }
183}
184
185#[async_trait]
186impl<BE, Block, Client> ChainHeadApiServer<Block::Hash> for ChainHead<BE, Block, Client>
187where
188 Block: BlockT + 'static,
189 Block::Header: Unpin,
190 BE: Backend<Block> + 'static,
191 Client: BlockBackend<Block>
192 + ExecutorProvider<Block>
193 + HeaderBackend<Block>
194 + HeaderMetadata<Block, Error = BlockChainError>
195 + BlockchainEvents<Block>
196 + CallApiAt<Block>
197 + StorageProvider<Block, BE>
198 + 'static,
199{
200 fn chain_head_unstable_follow(&self, pending: PendingSubscriptionSink, with_runtime: bool) {
201 let subscriptions = self.subscriptions.clone();
202 let backend = self.backend.clone();
203 let client = self.client.clone();
204 let max_lagging_distance = self.max_lagging_distance;
205 let subscription_buffer_cap = self.subscription_buffer_cap;
206
207 let fut = async move {
208 let connection_id = pending.connection_id();
210 let Some(mut reserved_subscription) = subscriptions.reserve_subscription(connection_id)
214 else {
215 pending.reject(ChainHeadRpcError::ReachedLimits).await;
216 return;
217 };
218
219 let Ok(sink) = pending.accept().await.map(Subscription::from) else { return };
220
221 let sub_id = read_subscription_id_as_string(&sink);
222 let Some(sub_data) =
224 reserved_subscription.insert_subscription(sub_id.clone(), with_runtime)
225 else {
226 debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted", sub_id);
229 let _ = sink.send(&FollowEvent::<String>::Stop).await;
230 return;
231 };
232 debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription accepted", sub_id);
233
234 let mut chain_head_follow = ChainHeadFollower::new(
235 client,
236 backend,
237 subscriptions,
238 with_runtime,
239 sub_id.clone(),
240 max_lagging_distance,
241 subscription_buffer_cap,
242 );
243 let result = chain_head_follow.generate_events(sink, sub_data).await;
244 if let Err(SubscriptionManagementError::BlockDistanceTooLarge) = result {
245 debug!(target: LOG_TARGET, "[follow][id={:?}] All subscriptions are stopped", sub_id);
246 reserved_subscription.stop_all_subscriptions();
247 }
248
249 debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription removed", sub_id);
250 };
251
252 self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
253 }
254
255 async fn chain_head_unstable_body(
256 &self,
257 ext: &Extensions,
258 follow_subscription: String,
259 hash: Block::Hash,
260 ) -> ResponsePayload<'static, MethodResponse> {
261 let conn_id = ext
262 .get::<ConnectionId>()
263 .copied()
264 .expect("ConnectionId is always set by jsonrpsee; qed");
265
266 if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
267 return ResponsePayload::success(MethodResponse::LimitReached);
270 }
271
272 let client = self.client.clone();
273 let subscriptions = self.subscriptions.clone();
274 let executor = self.executor.clone();
275
276 let result = spawn_blocking(&self.executor, async move {
277 let mut block_guard = match subscriptions.lock_block(&follow_subscription, hash, 1) {
278 Ok(block) => block,
279 Err(SubscriptionManagementError::SubscriptionAbsent) |
280 Err(SubscriptionManagementError::ExceededLimits) => {
281 return ResponsePayload::success(MethodResponse::LimitReached)
282 },
283 Err(SubscriptionManagementError::BlockHashAbsent) => {
284 return ResponsePayload::error(ChainHeadRpcError::InvalidBlock);
286 },
287 Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock),
288 };
289
290 let operation_id = block_guard.operation().operation_id();
291
292 let event = match client.block(hash) {
293 Ok(Some(signed_block)) => {
294 let extrinsics = signed_block
295 .block
296 .extrinsics()
297 .iter()
298 .map(|extrinsic| hex_string(&extrinsic.encode()))
299 .collect();
300 FollowEvent::<Block::Hash>::OperationBodyDone(OperationBodyDone {
301 operation_id: operation_id.clone(),
302 value: extrinsics,
303 })
304 },
305 Ok(None) => {
306 debug!(
308 target: LOG_TARGET,
309 "[body][id={:?}] Stopping subscription because hash={:?} was pruned",
310 &follow_subscription,
311 hash
312 );
313 subscriptions.remove_subscription(&follow_subscription);
314 return ResponsePayload::error(ChainHeadRpcError::InvalidBlock);
315 },
316 Err(error) => FollowEvent::<Block::Hash>::OperationError(OperationError {
317 operation_id: operation_id.clone(),
318 error: error.to_string(),
319 }),
320 };
321
322 let (rp, rp_fut) = method_started_response(operation_id, None);
323 let fut = async move {
324 if rp_fut.await.is_err() {
327 return;
328 }
329
330 let _ = block_guard.response_sender().send(event).await;
331 };
332 executor.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
333
334 rp
335 });
336
337 result
338 .await
339 .unwrap_or_else(|_| ResponsePayload::success(MethodResponse::LimitReached))
340 }
341
342 async fn chain_head_unstable_header(
343 &self,
344 ext: &Extensions,
345 follow_subscription: String,
346 hash: Block::Hash,
347 ) -> Result<Option<String>, ChainHeadRpcError> {
348 let conn_id = ext
349 .get::<ConnectionId>()
350 .copied()
351 .expect("ConnectionId is always set by jsonrpsee; qed");
352
353 if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
354 return Ok(None);
355 }
356
357 let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
358 Ok(block) => block,
359 Err(SubscriptionManagementError::SubscriptionAbsent) |
360 Err(SubscriptionManagementError::ExceededLimits) => return Ok(None),
361 Err(SubscriptionManagementError::BlockHashAbsent) => {
362 return Err(ChainHeadRpcError::InvalidBlock.into());
364 },
365 Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
366 };
367
368 let client = self.client.clone();
369 let result = spawn_blocking(&self.executor, async move {
370 let _block_guard = block_guard;
371
372 client
373 .header(hash)
374 .map(|opt_header| opt_header.map(|h| hex_string(&h.encode())))
375 .map_err(|err| ChainHeadRpcError::InternalError(err.to_string()))
376 });
377 result.await.unwrap_or_else(|_| Ok(None))
378 }
379
380 async fn chain_head_unstable_storage(
381 &self,
382 ext: &Extensions,
383 follow_subscription: String,
384 hash: Block::Hash,
385 items: Vec<StorageQuery<String>>,
386 child_trie: Option<String>,
387 ) -> ResponsePayload<'static, MethodResponse> {
388 let conn_id = ext
389 .get::<ConnectionId>()
390 .copied()
391 .expect("ConnectionId is always set by jsonrpsee; qed");
392
393 if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
394 return ResponsePayload::success(MethodResponse::LimitReached);
397 }
398
399 let items = match items
401 .into_iter()
402 .map(|query| {
403 let key = StorageKey(parse_hex_param(query.key)?);
404 Ok(StorageQuery { key, query_type: query.query_type, pagination_start_key: None })
405 })
406 .collect::<Result<Vec<_>, ChainHeadRpcError>>()
407 {
408 Ok(items) => items,
409 Err(err) => {
410 return ResponsePayload::error(err);
411 },
412 };
413
414 let child_trie = match child_trie.map(|child_trie| parse_hex_param(child_trie)).transpose()
415 {
416 Ok(c) => c.map(ChildInfo::new_default_from_vec),
417 Err(e) => return ResponsePayload::error(e),
418 };
419
420 let mut block_guard =
421 match self.subscriptions.lock_block(&follow_subscription, hash, items.len()) {
422 Ok(block) => block,
423 Err(SubscriptionManagementError::SubscriptionAbsent) |
424 Err(SubscriptionManagementError::ExceededLimits) => {
425 return ResponsePayload::success(MethodResponse::LimitReached);
426 },
427 Err(SubscriptionManagementError::BlockHashAbsent) => {
428 return ResponsePayload::error(ChainHeadRpcError::InvalidBlock);
430 },
431 Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock),
432 };
433
434 let mut storage_client = ChainHeadStorage::<Client, Block, BE>::new(self.client.clone());
435
436 let (rp, rp_fut) = method_started_response(block_guard.operation().operation_id(), Some(0));
438
439 let fut = async move {
440 if rp_fut.await.is_err() {
443 return;
444 }
445
446 let (tx, rx) = tokio::sync::mpsc::channel(STORAGE_QUERY_BUF);
447 let operation_id = block_guard.operation().operation_id();
448 let stop_handle = block_guard.operation().stop_handle().clone();
449 let response_sender = block_guard.response_sender();
450
451 let _ = futures::future::join(
454 storage_client.generate_events(hash, items, child_trie, tx),
455 process_storage_items(rx, response_sender, operation_id, &stop_handle),
456 )
457 .await;
458 };
459 self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
460
461 rp
462 }
463
464 async fn chain_head_unstable_call(
465 &self,
466 ext: &Extensions,
467 follow_subscription: String,
468 hash: Block::Hash,
469 function: String,
470 call_parameters: String,
471 ) -> ResponsePayload<'static, MethodResponse> {
472 let call_parameters = match parse_hex_param(call_parameters) {
473 Ok(hex) => Bytes::from(hex),
474 Err(err) => return ResponsePayload::error(err),
475 };
476
477 let conn_id = ext
478 .get::<ConnectionId>()
479 .copied()
480 .expect("ConnectionId is always set by jsonrpsee; qed");
481
482 if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
483 return ResponsePayload::success(MethodResponse::LimitReached);
486 }
487
488 let mut block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
489 Ok(block) => block,
490 Err(SubscriptionManagementError::SubscriptionAbsent) |
491 Err(SubscriptionManagementError::ExceededLimits) => {
492 return ResponsePayload::success(MethodResponse::LimitReached);
494 },
495 Err(SubscriptionManagementError::BlockHashAbsent) => {
496 return ResponsePayload::error(ChainHeadRpcError::InvalidBlock);
498 },
499 Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock),
500 };
501
502 if !block_guard.has_runtime() {
504 return ResponsePayload::error(ChainHeadRpcError::InvalidRuntimeCall(
505 "The runtime updates flag must be set".to_string(),
506 ));
507 }
508
509 let operation_id = block_guard.operation().operation_id();
510 let client = self.client.clone();
511
512 let (rp, rp_fut) = method_started_response(operation_id.clone(), None);
513 let fut = async move {
514 if rp_fut.await.is_err() {
517 return;
518 }
519
520 let event = client
521 .executor()
522 .call(hash, &function, &call_parameters, CallContext::Offchain)
523 .map(|result| {
524 FollowEvent::<Block::Hash>::OperationCallDone(OperationCallDone {
525 operation_id: operation_id.clone(),
526 output: hex_string(&result),
527 })
528 })
529 .unwrap_or_else(|error| {
530 FollowEvent::<Block::Hash>::OperationError(OperationError {
531 operation_id: operation_id.clone(),
532 error: error.to_string(),
533 })
534 });
535
536 let _ = block_guard.response_sender().send(event).await;
537 };
538 self.executor
539 .spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
540
541 rp
542 }
543
544 async fn chain_head_unstable_unpin(
545 &self,
546 ext: &Extensions,
547 follow_subscription: String,
548 hash_or_hashes: ListOrValue<Block::Hash>,
549 ) -> Result<(), ChainHeadRpcError> {
550 let conn_id = ext
551 .get::<ConnectionId>()
552 .copied()
553 .expect("ConnectionId is always set by jsonrpsee; qed");
554
555 if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
556 return Ok(());
557 }
558
559 let result = match hash_or_hashes {
560 ListOrValue::Value(hash) => {
561 self.subscriptions.unpin_blocks(&follow_subscription, [hash])
562 },
563 ListOrValue::List(hashes) => {
564 self.subscriptions.unpin_blocks(&follow_subscription, hashes)
565 },
566 };
567
568 match result {
569 Ok(()) => Ok(()),
570 Err(SubscriptionManagementError::SubscriptionAbsent) => {
571 Ok(())
573 },
574 Err(SubscriptionManagementError::BlockHashAbsent) => {
575 Err(ChainHeadRpcError::InvalidBlock)
577 },
578 Err(SubscriptionManagementError::DuplicateHashes) => {
579 Err(ChainHeadRpcError::InvalidDuplicateHashes)
580 },
581 Err(_) => Err(ChainHeadRpcError::InvalidBlock),
582 }
583 }
584
585 async fn chain_head_unstable_continue(
586 &self,
587 ext: &Extensions,
588 follow_subscription: String,
589 operation_id: String,
590 ) -> Result<(), ChainHeadRpcError> {
591 let conn_id = ext
592 .get::<ConnectionId>()
593 .copied()
594 .expect("ConnectionId is always set by jsonrpsee; qed");
595
596 if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
597 return Ok(());
598 }
599
600 if self.subscriptions.get_operation(&follow_subscription, &operation_id).is_some() {
603 Err(ChainHeadRpcError::InvalidContinue.into())
604 } else {
605 Ok(())
606 }
607 }
608
609 async fn chain_head_unstable_stop_operation(
610 &self,
611 ext: &Extensions,
612 follow_subscription: String,
613 operation_id: String,
614 ) -> Result<(), ChainHeadRpcError> {
615 let conn_id = ext
616 .get::<ConnectionId>()
617 .copied()
618 .expect("ConnectionId is always set by jsonrpsee; qed");
619
620 if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
621 return Ok(());
622 }
623
624 let Some(mut operation) =
625 self.subscriptions.get_operation(&follow_subscription, &operation_id)
626 else {
627 return Ok(());
628 };
629
630 operation.stop();
631
632 Ok(())
633 }
634}
635
636fn method_started_response(
637 operation_id: String,
638 discarded_items: Option<usize>,
639) -> (ResponsePayload<'static, MethodResponse>, MethodResponseFuture) {
640 let rp = MethodResponse::Started(MethodResponseStarted { operation_id, discarded_items });
641 ResponsePayload::success(rp).notify_on_completion()
642}
643
644fn spawn_blocking<R>(
648 executor: &SubscriptionTaskExecutor,
649 fut: impl std::future::Future<Output = R> + Send + 'static,
650) -> oneshot::Receiver<R>
651where
652 R: Send + 'static,
653{
654 let (tx, rx) = oneshot::channel();
655
656 let blocking_fut = async move {
657 let result = fut.await;
658 let _ = tx.send(result);
660 };
661
662 executor.spawn_blocking("substrate-rpc-subscription", Some("rpc"), blocking_fut.boxed());
663
664 rx
665}
666
667async fn process_storage_items<Hash>(
668 mut storage_query_stream: mpsc::Receiver<QueryResult>,
669 mut sender: FollowEventSender<Hash>,
670 operation_id: String,
671 stop_handle: &StopHandle,
672) -> Result<(), FollowEventSendError> {
673 loop {
674 tokio::select! {
675 _ = stop_handle.stopped() => {
676 break;
677 },
678
679 maybe_storage = storage_query_stream.recv() => {
680 let Some(storage) = maybe_storage else {
681 break;
682 };
683
684 let item = match storage {
685 QueryResult::Err(error) => {
686 return sender
687 .send(FollowEvent::OperationError(OperationError { operation_id, error }))
688 .await
689 }
690 QueryResult::Ok(Some(v)) => v,
691 QueryResult::Ok(None) => continue,
692 };
693
694 sender
695 .send(FollowEvent::OperationStorageItems(OperationStorageItems {
696 operation_id: operation_id.clone(),
697 items: vec![item],
698 })).await?;
699 },
700 }
701 }
702
703 sender
704 .send(FollowEvent::OperationStorageDone(OperationId { operation_id }))
705 .await?;
706
707 Ok(())
708}