1use super::{
22 chain_head_storage::ChainHeadStorage,
23 event::{MethodResponseStarted, OperationBodyDone, OperationCallDone},
24};
25use crate::{
26 chain_head::{
27 api::ChainHeadApiServer,
28 chain_head_follow::ChainHeadFollower,
29 error::Error as ChainHeadRpcError,
30 event::{FollowEvent, MethodResponse, OperationError, OperationId, OperationStorageItems},
31 subscription::{StopHandle, SubscriptionManagement, SubscriptionManagementError},
32 FollowEventSendError, FollowEventSender,
33 },
34 common::{events::StorageQuery, storage::QueryResult},
35 hex_string, SubscriptionTaskExecutor,
36};
37use codec::Encode;
38use futures::{channel::oneshot, future::FutureExt, SinkExt};
39use jsonrpsee::{
40 core::async_trait, server::ResponsePayload, types::SubscriptionId, ConnectionId, Extensions,
41 MethodResponseFuture, PendingSubscriptionSink,
42};
43use log::debug;
44use sc_client_api::{
45 Backend, BlockBackend, BlockchainEvents, CallExecutor, ChildInfo, ExecutorProvider, StorageKey,
46 StorageProvider,
47};
48use sc_rpc::utils::Subscription;
49use sp_api::CallApiAt;
50use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
51use sp_core::{traits::CallContext, Bytes};
52use sp_rpc::list::ListOrValue;
53use sp_runtime::traits::Block as BlockT;
54use std::{marker::PhantomData, sync::Arc, time::Duration};
55use tokio::sync::mpsc;
56
57pub(crate) const LOG_TARGET: &str = "rpc-spec-v2";
58
59const STORAGE_QUERY_BUF: usize = 16;
64
65pub struct ChainHeadConfig {
67 pub global_max_pinned_blocks: usize,
69 pub subscription_max_pinned_duration: Duration,
71 pub subscription_max_ongoing_operations: usize,
73 pub max_lagging_distance: usize,
76 pub max_follow_subscriptions_per_connection: usize,
78 pub subscription_buffer_cap: usize,
80}
81
82pub(crate) const MAX_PINNED_BLOCKS: usize = 512;
86
87const MAX_PINNED_DURATION: Duration = Duration::from_secs(60);
92
93const MAX_ONGOING_OPERATIONS: usize = 16;
96
97const MAX_LAGGING_DISTANCE: usize = 128;
100
101const MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION: usize = 4;
103
104impl Default for ChainHeadConfig {
105 fn default() -> Self {
106 ChainHeadConfig {
107 global_max_pinned_blocks: MAX_PINNED_BLOCKS,
108 subscription_max_pinned_duration: MAX_PINNED_DURATION,
109 subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS,
110 max_lagging_distance: MAX_LAGGING_DISTANCE,
111 max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
112 subscription_buffer_cap: MAX_PINNED_BLOCKS,
113 }
114 }
115}
116
117pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
119 client: Arc<Client>,
121 backend: Arc<BE>,
123 executor: SubscriptionTaskExecutor,
125 subscriptions: SubscriptionManagement<Block, BE>,
127 max_lagging_distance: usize,
130 _phantom: PhantomData<Block>,
132 subscription_buffer_cap: usize,
134}
135
136impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
137 pub fn new(
139 client: Arc<Client>,
140 backend: Arc<BE>,
141 executor: SubscriptionTaskExecutor,
142 config: ChainHeadConfig,
143 ) -> Self {
144 Self {
145 client,
146 backend: backend.clone(),
147 executor,
148 subscriptions: SubscriptionManagement::new(
149 config.global_max_pinned_blocks,
150 config.subscription_max_pinned_duration,
151 config.subscription_max_ongoing_operations,
152 config.max_follow_subscriptions_per_connection,
153 backend,
154 ),
155 max_lagging_distance: config.max_lagging_distance,
156 subscription_buffer_cap: config.subscription_buffer_cap,
157 _phantom: PhantomData,
158 }
159 }
160}
161
162pub fn read_subscription_id_as_string(sink: &Subscription) -> String {
164 match sink.subscription_id() {
165 SubscriptionId::Num(n) => n.to_string(),
166 SubscriptionId::Str(s) => s.into_owned().into(),
167 }
168}
169
170fn parse_hex_param(param: String) -> Result<Vec<u8>, ChainHeadRpcError> {
174 if param.is_empty() {
176 return Ok(Default::default())
177 }
178
179 match array_bytes::hex2bytes(¶m) {
180 Ok(bytes) => Ok(bytes),
181 Err(_) => Err(ChainHeadRpcError::InvalidParam(param)),
182 }
183}
184
185#[async_trait]
186impl<BE, Block, Client> ChainHeadApiServer<Block::Hash> for ChainHead<BE, Block, Client>
187where
188 Block: BlockT + 'static,
189 Block::Header: Unpin,
190 BE: Backend<Block> + 'static,
191 Client: BlockBackend<Block>
192 + ExecutorProvider<Block>
193 + HeaderBackend<Block>
194 + HeaderMetadata<Block, Error = BlockChainError>
195 + BlockchainEvents<Block>
196 + CallApiAt<Block>
197 + StorageProvider<Block, BE>
198 + 'static,
199{
200 fn chain_head_unstable_follow(&self, pending: PendingSubscriptionSink, with_runtime: bool) {
201 let subscriptions = self.subscriptions.clone();
202 let backend = self.backend.clone();
203 let client = self.client.clone();
204 let max_lagging_distance = self.max_lagging_distance;
205 let subscription_buffer_cap = self.subscription_buffer_cap;
206
207 let fut = async move {
208 let connection_id = pending.connection_id();
210 let Some(mut reserved_subscription) = subscriptions.reserve_subscription(connection_id)
214 else {
215 pending.reject(ChainHeadRpcError::ReachedLimits).await;
216 return
217 };
218
219 let Ok(sink) = pending.accept().await.map(Subscription::from) else { return };
220
221 let sub_id = read_subscription_id_as_string(&sink);
222 let Some(sub_data) =
224 reserved_subscription.insert_subscription(sub_id.clone(), with_runtime)
225 else {
226 debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted", sub_id);
229 let _ = sink.send(&FollowEvent::<String>::Stop).await;
230 return
231 };
232 debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription accepted", sub_id);
233
234 let mut chain_head_follow = ChainHeadFollower::new(
235 client,
236 backend,
237 subscriptions,
238 with_runtime,
239 sub_id.clone(),
240 max_lagging_distance,
241 subscription_buffer_cap,
242 );
243 let result = chain_head_follow.generate_events(sink, sub_data).await;
244 if let Err(SubscriptionManagementError::BlockDistanceTooLarge) = result {
245 debug!(target: LOG_TARGET, "[follow][id={:?}] All subscriptions are stopped", sub_id);
246 reserved_subscription.stop_all_subscriptions();
247 }
248
249 debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription removed", sub_id);
250 };
251
252 self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
253 }
254
255 async fn chain_head_unstable_body(
256 &self,
257 ext: &Extensions,
258 follow_subscription: String,
259 hash: Block::Hash,
260 ) -> ResponsePayload<'static, MethodResponse> {
261 let conn_id = ext
262 .get::<ConnectionId>()
263 .copied()
264 .expect("ConnectionId is always set by jsonrpsee; qed");
265
266 if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
267 return ResponsePayload::success(MethodResponse::LimitReached);
270 }
271
272 let client = self.client.clone();
273 let subscriptions = self.subscriptions.clone();
274 let executor = self.executor.clone();
275
276 let result = spawn_blocking(&self.executor, async move {
277 let mut block_guard = match subscriptions.lock_block(&follow_subscription, hash, 1) {
278 Ok(block) => block,
279 Err(SubscriptionManagementError::SubscriptionAbsent) |
280 Err(SubscriptionManagementError::ExceededLimits) =>
281 return ResponsePayload::success(MethodResponse::LimitReached),
282 Err(SubscriptionManagementError::BlockHashAbsent) => {
283 return ResponsePayload::error(ChainHeadRpcError::InvalidBlock);
285 },
286 Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock),
287 };
288
289 let operation_id = block_guard.operation().operation_id();
290
291 let event = match client.block(hash) {
292 Ok(Some(signed_block)) => {
293 let extrinsics = signed_block
294 .block
295 .extrinsics()
296 .iter()
297 .map(|extrinsic| hex_string(&extrinsic.encode()))
298 .collect();
299 FollowEvent::<Block::Hash>::OperationBodyDone(OperationBodyDone {
300 operation_id: operation_id.clone(),
301 value: extrinsics,
302 })
303 },
304 Ok(None) => {
305 debug!(
307 target: LOG_TARGET,
308 "[body][id={:?}] Stopping subscription because hash={:?} was pruned",
309 &follow_subscription,
310 hash
311 );
312 subscriptions.remove_subscription(&follow_subscription);
313 return ResponsePayload::error(ChainHeadRpcError::InvalidBlock)
314 },
315 Err(error) => FollowEvent::<Block::Hash>::OperationError(OperationError {
316 operation_id: operation_id.clone(),
317 error: error.to_string(),
318 }),
319 };
320
321 let (rp, rp_fut) = method_started_response(operation_id, None);
322 let fut = async move {
323 if rp_fut.await.is_err() {
326 return;
327 }
328
329 let _ = block_guard.response_sender().send(event).await;
330 };
331 executor.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
332
333 rp
334 });
335
336 result
337 .await
338 .unwrap_or_else(|_| ResponsePayload::success(MethodResponse::LimitReached))
339 }
340
341 async fn chain_head_unstable_header(
342 &self,
343 ext: &Extensions,
344 follow_subscription: String,
345 hash: Block::Hash,
346 ) -> Result<Option<String>, ChainHeadRpcError> {
347 let conn_id = ext
348 .get::<ConnectionId>()
349 .copied()
350 .expect("ConnectionId is always set by jsonrpsee; qed");
351
352 if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
353 return Ok(None);
354 }
355
356 let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
357 Ok(block) => block,
358 Err(SubscriptionManagementError::SubscriptionAbsent) |
359 Err(SubscriptionManagementError::ExceededLimits) => return Ok(None),
360 Err(SubscriptionManagementError::BlockHashAbsent) => {
361 return Err(ChainHeadRpcError::InvalidBlock.into())
363 },
364 Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
365 };
366
367 let client = self.client.clone();
368 let result = spawn_blocking(&self.executor, async move {
369 let _block_guard = block_guard;
370
371 client
372 .header(hash)
373 .map(|opt_header| opt_header.map(|h| hex_string(&h.encode())))
374 .map_err(|err| ChainHeadRpcError::InternalError(err.to_string()))
375 });
376 result.await.unwrap_or_else(|_| Ok(None))
377 }
378
379 async fn chain_head_unstable_storage(
380 &self,
381 ext: &Extensions,
382 follow_subscription: String,
383 hash: Block::Hash,
384 items: Vec<StorageQuery<String>>,
385 child_trie: Option<String>,
386 ) -> ResponsePayload<'static, MethodResponse> {
387 let conn_id = ext
388 .get::<ConnectionId>()
389 .copied()
390 .expect("ConnectionId is always set by jsonrpsee; qed");
391
392 if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
393 return ResponsePayload::success(MethodResponse::LimitReached);
396 }
397
398 let items = match items
400 .into_iter()
401 .map(|query| {
402 let key = StorageKey(parse_hex_param(query.key)?);
403 Ok(StorageQuery { key, query_type: query.query_type })
404 })
405 .collect::<Result<Vec<_>, ChainHeadRpcError>>()
406 {
407 Ok(items) => items,
408 Err(err) => {
409 return ResponsePayload::error(err);
410 },
411 };
412
413 let child_trie = match child_trie.map(|child_trie| parse_hex_param(child_trie)).transpose()
414 {
415 Ok(c) => c.map(ChildInfo::new_default_from_vec),
416 Err(e) => return ResponsePayload::error(e),
417 };
418
419 let mut block_guard =
420 match self.subscriptions.lock_block(&follow_subscription, hash, items.len()) {
421 Ok(block) => block,
422 Err(SubscriptionManagementError::SubscriptionAbsent) |
423 Err(SubscriptionManagementError::ExceededLimits) => {
424 return ResponsePayload::success(MethodResponse::LimitReached);
425 },
426 Err(SubscriptionManagementError::BlockHashAbsent) => {
427 return ResponsePayload::error(ChainHeadRpcError::InvalidBlock)
429 },
430 Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock),
431 };
432
433 let mut storage_client = ChainHeadStorage::<Client, Block, BE>::new(self.client.clone());
434
435 let (rp, rp_fut) = method_started_response(block_guard.operation().operation_id(), Some(0));
437
438 let fut = async move {
439 if rp_fut.await.is_err() {
442 return;
443 }
444
445 let (tx, rx) = tokio::sync::mpsc::channel(STORAGE_QUERY_BUF);
446 let operation_id = block_guard.operation().operation_id();
447 let stop_handle = block_guard.operation().stop_handle().clone();
448 let response_sender = block_guard.response_sender();
449
450 let _ = futures::future::join(
453 storage_client.generate_events(hash, items, child_trie, tx),
454 process_storage_items(rx, response_sender, operation_id, &stop_handle),
455 )
456 .await;
457 };
458 self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
459
460 rp
461 }
462
463 async fn chain_head_unstable_call(
464 &self,
465 ext: &Extensions,
466 follow_subscription: String,
467 hash: Block::Hash,
468 function: String,
469 call_parameters: String,
470 ) -> ResponsePayload<'static, MethodResponse> {
471 let call_parameters = match parse_hex_param(call_parameters) {
472 Ok(hex) => Bytes::from(hex),
473 Err(err) => return ResponsePayload::error(err),
474 };
475
476 let conn_id = ext
477 .get::<ConnectionId>()
478 .copied()
479 .expect("ConnectionId is always set by jsonrpsee; qed");
480
481 if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
482 return ResponsePayload::success(MethodResponse::LimitReached);
485 }
486
487 let mut block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
488 Ok(block) => block,
489 Err(SubscriptionManagementError::SubscriptionAbsent) |
490 Err(SubscriptionManagementError::ExceededLimits) => {
491 return ResponsePayload::success(MethodResponse::LimitReached)
493 },
494 Err(SubscriptionManagementError::BlockHashAbsent) => {
495 return ResponsePayload::error(ChainHeadRpcError::InvalidBlock)
497 },
498 Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock),
499 };
500
501 if !block_guard.has_runtime() {
503 return ResponsePayload::error(ChainHeadRpcError::InvalidRuntimeCall(
504 "The runtime updates flag must be set".to_string(),
505 ));
506 }
507
508 let operation_id = block_guard.operation().operation_id();
509 let client = self.client.clone();
510
511 let (rp, rp_fut) = method_started_response(operation_id.clone(), None);
512 let fut = async move {
513 if rp_fut.await.is_err() {
516 return
517 }
518
519 let event = client
520 .executor()
521 .call(hash, &function, &call_parameters, CallContext::Offchain)
522 .map(|result| {
523 FollowEvent::<Block::Hash>::OperationCallDone(OperationCallDone {
524 operation_id: operation_id.clone(),
525 output: hex_string(&result),
526 })
527 })
528 .unwrap_or_else(|error| {
529 FollowEvent::<Block::Hash>::OperationError(OperationError {
530 operation_id: operation_id.clone(),
531 error: error.to_string(),
532 })
533 });
534
535 let _ = block_guard.response_sender().send(event).await;
536 };
537 self.executor
538 .spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
539
540 rp
541 }
542
543 async fn chain_head_unstable_unpin(
544 &self,
545 ext: &Extensions,
546 follow_subscription: String,
547 hash_or_hashes: ListOrValue<Block::Hash>,
548 ) -> Result<(), ChainHeadRpcError> {
549 let conn_id = ext
550 .get::<ConnectionId>()
551 .copied()
552 .expect("ConnectionId is always set by jsonrpsee; qed");
553
554 if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
555 return Ok(());
556 }
557
558 let result = match hash_or_hashes {
559 ListOrValue::Value(hash) =>
560 self.subscriptions.unpin_blocks(&follow_subscription, [hash]),
561 ListOrValue::List(hashes) =>
562 self.subscriptions.unpin_blocks(&follow_subscription, hashes),
563 };
564
565 match result {
566 Ok(()) => Ok(()),
567 Err(SubscriptionManagementError::SubscriptionAbsent) => {
568 Ok(())
570 },
571 Err(SubscriptionManagementError::BlockHashAbsent) => {
572 Err(ChainHeadRpcError::InvalidBlock)
574 },
575 Err(SubscriptionManagementError::DuplicateHashes) =>
576 Err(ChainHeadRpcError::InvalidDuplicateHashes),
577 Err(_) => Err(ChainHeadRpcError::InvalidBlock),
578 }
579 }
580
581 async fn chain_head_unstable_continue(
582 &self,
583 ext: &Extensions,
584 follow_subscription: String,
585 operation_id: String,
586 ) -> Result<(), ChainHeadRpcError> {
587 let conn_id = ext
588 .get::<ConnectionId>()
589 .copied()
590 .expect("ConnectionId is always set by jsonrpsee; qed");
591
592 if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
593 return Ok(())
594 }
595
596 if self.subscriptions.get_operation(&follow_subscription, &operation_id).is_some() {
599 Err(ChainHeadRpcError::InvalidContinue.into())
600 } else {
601 Ok(())
602 }
603 }
604
605 async fn chain_head_unstable_stop_operation(
606 &self,
607 ext: &Extensions,
608 follow_subscription: String,
609 operation_id: String,
610 ) -> Result<(), ChainHeadRpcError> {
611 let conn_id = ext
612 .get::<ConnectionId>()
613 .copied()
614 .expect("ConnectionId is always set by jsonrpsee; qed");
615
616 if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
617 return Ok(())
618 }
619
620 let Some(mut operation) =
621 self.subscriptions.get_operation(&follow_subscription, &operation_id)
622 else {
623 return Ok(())
624 };
625
626 operation.stop();
627
628 Ok(())
629 }
630}
631
632fn method_started_response(
633 operation_id: String,
634 discarded_items: Option<usize>,
635) -> (ResponsePayload<'static, MethodResponse>, MethodResponseFuture) {
636 let rp = MethodResponse::Started(MethodResponseStarted { operation_id, discarded_items });
637 ResponsePayload::success(rp).notify_on_completion()
638}
639
640fn spawn_blocking<R>(
644 executor: &SubscriptionTaskExecutor,
645 fut: impl std::future::Future<Output = R> + Send + 'static,
646) -> oneshot::Receiver<R>
647where
648 R: Send + 'static,
649{
650 let (tx, rx) = oneshot::channel();
651
652 let blocking_fut = async move {
653 let result = fut.await;
654 let _ = tx.send(result);
656 };
657
658 executor.spawn_blocking("substrate-rpc-subscription", Some("rpc"), blocking_fut.boxed());
659
660 rx
661}
662
663async fn process_storage_items<Hash>(
664 mut storage_query_stream: mpsc::Receiver<QueryResult>,
665 mut sender: FollowEventSender<Hash>,
666 operation_id: String,
667 stop_handle: &StopHandle,
668) -> Result<(), FollowEventSendError> {
669 loop {
670 tokio::select! {
671 _ = stop_handle.stopped() => {
672 break;
673 },
674
675 maybe_storage = storage_query_stream.recv() => {
676 let Some(storage) = maybe_storage else {
677 break;
678 };
679
680 let item = match storage {
681 QueryResult::Err(error) => {
682 return sender
683 .send(FollowEvent::OperationError(OperationError { operation_id, error }))
684 .await
685 }
686 QueryResult::Ok(Some(v)) => v,
687 QueryResult::Ok(None) => continue,
688 };
689
690 sender
691 .send(FollowEvent::OperationStorageItems(OperationStorageItems {
692 operation_id: operation_id.clone(),
693 items: vec![item],
694 })).await?;
695 },
696 }
697 }
698
699 sender
700 .send(FollowEvent::OperationStorageDone(OperationId { operation_id }))
701 .await?;
702
703 Ok(())
704}