referrerpolicy=no-referrer-when-downgrade

sc_rpc_spec_v2/chain_head/
chain_head.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! API implementation for `chainHead`.
20
21use super::{
22	chain_head_storage::ChainHeadStorage,
23	event::{MethodResponseStarted, OperationBodyDone, OperationCallDone},
24};
25use crate::{
26	chain_head::{
27		api::ChainHeadApiServer,
28		chain_head_follow::ChainHeadFollower,
29		error::Error as ChainHeadRpcError,
30		event::{FollowEvent, MethodResponse, OperationError, OperationId, OperationStorageItems},
31		subscription::{StopHandle, SubscriptionManagement, SubscriptionManagementError},
32		FollowEventSendError, FollowEventSender,
33	},
34	common::{events::StorageQuery, storage::QueryResult},
35	hex_string, SubscriptionTaskExecutor,
36};
37use codec::Encode;
38use futures::{channel::oneshot, future::FutureExt, SinkExt};
39use jsonrpsee::{
40	core::async_trait, server::ResponsePayload, types::SubscriptionId, ConnectionId, Extensions,
41	MethodResponseFuture, PendingSubscriptionSink,
42};
43use log::debug;
44use sc_client_api::{
45	Backend, BlockBackend, BlockchainEvents, CallExecutor, ChildInfo, ExecutorProvider, StorageKey,
46	StorageProvider,
47};
48use sc_rpc::utils::Subscription;
49use sp_api::CallApiAt;
50use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
51use sp_core::{traits::CallContext, Bytes};
52use sp_rpc::list::ListOrValue;
53use sp_runtime::traits::Block as BlockT;
54use std::{marker::PhantomData, sync::Arc, time::Duration};
55use tokio::sync::mpsc;
56
57pub(crate) const LOG_TARGET: &str = "rpc-spec-v2";
58
59/// The buffer capacity for each storage query.
60///
61/// This is small because the underlying JSON-RPC server has
62/// its down buffer capacity per connection as well.
63const STORAGE_QUERY_BUF: usize = 16;
64
65/// The configuration of [`ChainHead`].
66pub struct ChainHeadConfig {
67	/// The maximum number of pinned blocks across all subscriptions.
68	pub global_max_pinned_blocks: usize,
69	/// The maximum duration that a block is allowed to be pinned per subscription.
70	pub subscription_max_pinned_duration: Duration,
71	/// The maximum number of ongoing operations per subscription.
72	pub subscription_max_ongoing_operations: usize,
73	/// Stop all subscriptions if the distance between the leaves and the current finalized
74	/// block is larger than this value.
75	pub max_lagging_distance: usize,
76	/// The maximum number of `chainHead_follow` subscriptions per connection.
77	pub max_follow_subscriptions_per_connection: usize,
78	/// The maximum number of pending messages per subscription.
79	pub subscription_buffer_cap: usize,
80}
81
82/// Maximum pinned blocks across all connections.
83/// This number is large enough to consider immediate blocks.
84/// Note: This should never exceed the `PINNING_CACHE_SIZE` from client/db.
85pub(crate) const MAX_PINNED_BLOCKS: usize = 512;
86
87/// Any block of any subscription should not be pinned more than
88/// this constant. When a subscription contains a block older than this,
89/// the subscription becomes subject to termination.
90/// Note: This should be enough for immediate blocks.
91const MAX_PINNED_DURATION: Duration = Duration::from_secs(60);
92
93/// The maximum number of ongoing operations per subscription.
94/// Note: The lower limit imposed by the spec is 16.
95const MAX_ONGOING_OPERATIONS: usize = 16;
96
97/// Stop all subscriptions if the distance between the leaves and the current finalized
98/// block is larger than this value.
99const MAX_LAGGING_DISTANCE: usize = 128;
100
101/// The maximum number of `chainHead_follow` subscriptions per connection.
102const MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION: usize = 4;
103
104impl Default for ChainHeadConfig {
105	fn default() -> Self {
106		ChainHeadConfig {
107			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
108			subscription_max_pinned_duration: MAX_PINNED_DURATION,
109			subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS,
110			max_lagging_distance: MAX_LAGGING_DISTANCE,
111			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
112			subscription_buffer_cap: MAX_PINNED_BLOCKS,
113		}
114	}
115}
116
117/// An API for chain head RPC calls.
118pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
119	/// Substrate client.
120	client: Arc<Client>,
121	/// Backend of the chain.
122	backend: Arc<BE>,
123	/// Executor to spawn subscriptions.
124	executor: SubscriptionTaskExecutor,
125	/// Keep track of the pinned blocks for each subscription.
126	subscriptions: SubscriptionManagement<Block, BE>,
127	/// Stop all subscriptions if the distance between the leaves and the current finalized
128	/// block is larger than this value.
129	max_lagging_distance: usize,
130	/// Phantom member to pin the block type.
131	_phantom: PhantomData<Block>,
132	/// The maximum number of pending messages per subscription.
133	subscription_buffer_cap: usize,
134}
135
136impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
137	/// Create a new [`ChainHead`].
138	pub fn new(
139		client: Arc<Client>,
140		backend: Arc<BE>,
141		executor: SubscriptionTaskExecutor,
142		config: ChainHeadConfig,
143	) -> Self {
144		Self {
145			client,
146			backend: backend.clone(),
147			executor,
148			subscriptions: SubscriptionManagement::new(
149				config.global_max_pinned_blocks,
150				config.subscription_max_pinned_duration,
151				config.subscription_max_ongoing_operations,
152				config.max_follow_subscriptions_per_connection,
153				backend,
154			),
155			max_lagging_distance: config.max_lagging_distance,
156			subscription_buffer_cap: config.subscription_buffer_cap,
157			_phantom: PhantomData,
158		}
159	}
160}
161
162/// Helper to convert the `subscription ID` to a string.
163pub fn read_subscription_id_as_string(sink: &Subscription) -> String {
164	match sink.subscription_id() {
165		SubscriptionId::Num(n) => n.to_string(),
166		SubscriptionId::Str(s) => s.into_owned().into(),
167	}
168}
169
170/// Parse hex-encoded string parameter as raw bytes.
171///
172/// If the parsing fails, returns an error propagated to the RPC method.
173fn parse_hex_param(param: String) -> Result<Vec<u8>, ChainHeadRpcError> {
174	// Methods can accept empty parameters.
175	if param.is_empty() {
176		return Ok(Default::default())
177	}
178
179	match array_bytes::hex2bytes(&param) {
180		Ok(bytes) => Ok(bytes),
181		Err(_) => Err(ChainHeadRpcError::InvalidParam(param)),
182	}
183}
184
185#[async_trait]
186impl<BE, Block, Client> ChainHeadApiServer<Block::Hash> for ChainHead<BE, Block, Client>
187where
188	Block: BlockT + 'static,
189	Block::Header: Unpin,
190	BE: Backend<Block> + 'static,
191	Client: BlockBackend<Block>
192		+ ExecutorProvider<Block>
193		+ HeaderBackend<Block>
194		+ HeaderMetadata<Block, Error = BlockChainError>
195		+ BlockchainEvents<Block>
196		+ CallApiAt<Block>
197		+ StorageProvider<Block, BE>
198		+ 'static,
199{
200	fn chain_head_unstable_follow(&self, pending: PendingSubscriptionSink, with_runtime: bool) {
201		let subscriptions = self.subscriptions.clone();
202		let backend = self.backend.clone();
203		let client = self.client.clone();
204		let max_lagging_distance = self.max_lagging_distance;
205		let subscription_buffer_cap = self.subscription_buffer_cap;
206
207		let fut = async move {
208			// Ensure the current connection ID has enough space to accept a new subscription.
209			let connection_id = pending.connection_id();
210			// The RAII `reserved_subscription` will clean up resources on drop:
211			// - free the reserved subscription for the connection ID.
212			// - remove the subscription ID from the subscription management.
213			let Some(mut reserved_subscription) = subscriptions.reserve_subscription(connection_id)
214			else {
215				pending.reject(ChainHeadRpcError::ReachedLimits).await;
216				return
217			};
218
219			let Ok(sink) = pending.accept().await.map(Subscription::from) else { return };
220
221			let sub_id = read_subscription_id_as_string(&sink);
222			// Keep track of the subscription.
223			let Some(sub_data) =
224				reserved_subscription.insert_subscription(sub_id.clone(), with_runtime)
225			else {
226				// Inserting the subscription can only fail if the JsonRPSee generated a duplicate
227				// subscription ID.
228				debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted", sub_id);
229				let _ = sink.send(&FollowEvent::<String>::Stop).await;
230				return
231			};
232			debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription accepted", sub_id);
233
234			let mut chain_head_follow = ChainHeadFollower::new(
235				client,
236				backend,
237				subscriptions,
238				with_runtime,
239				sub_id.clone(),
240				max_lagging_distance,
241				subscription_buffer_cap,
242			);
243			let result = chain_head_follow.generate_events(sink, sub_data).await;
244			if let Err(SubscriptionManagementError::BlockDistanceTooLarge) = result {
245				debug!(target: LOG_TARGET, "[follow][id={:?}] All subscriptions are stopped", sub_id);
246				reserved_subscription.stop_all_subscriptions();
247			}
248
249			debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription removed", sub_id);
250		};
251
252		self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
253	}
254
255	async fn chain_head_unstable_body(
256		&self,
257		ext: &Extensions,
258		follow_subscription: String,
259		hash: Block::Hash,
260	) -> ResponsePayload<'static, MethodResponse> {
261		let conn_id = ext
262			.get::<ConnectionId>()
263			.copied()
264			.expect("ConnectionId is always set by jsonrpsee; qed");
265
266		if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
267			// The spec says to return `LimitReached` if the follow subscription is invalid or
268			// stale.
269			return ResponsePayload::success(MethodResponse::LimitReached);
270		}
271
272		let client = self.client.clone();
273		let subscriptions = self.subscriptions.clone();
274		let executor = self.executor.clone();
275
276		let result = spawn_blocking(&self.executor, async move {
277			let mut block_guard = match subscriptions.lock_block(&follow_subscription, hash, 1) {
278				Ok(block) => block,
279				Err(SubscriptionManagementError::SubscriptionAbsent) |
280				Err(SubscriptionManagementError::ExceededLimits) =>
281					return ResponsePayload::success(MethodResponse::LimitReached),
282				Err(SubscriptionManagementError::BlockHashAbsent) => {
283					// Block is not part of the subscription.
284					return ResponsePayload::error(ChainHeadRpcError::InvalidBlock);
285				},
286				Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock),
287			};
288
289			let operation_id = block_guard.operation().operation_id();
290
291			let event = match client.block(hash) {
292				Ok(Some(signed_block)) => {
293					let extrinsics = signed_block
294						.block
295						.extrinsics()
296						.iter()
297						.map(|extrinsic| hex_string(&extrinsic.encode()))
298						.collect();
299					FollowEvent::<Block::Hash>::OperationBodyDone(OperationBodyDone {
300						operation_id: operation_id.clone(),
301						value: extrinsics,
302					})
303				},
304				Ok(None) => {
305					// The block's body was pruned. This subscription ID has become invalid.
306					debug!(
307						target: LOG_TARGET,
308						"[body][id={:?}] Stopping subscription because hash={:?} was pruned",
309						&follow_subscription,
310						hash
311					);
312					subscriptions.remove_subscription(&follow_subscription);
313					return ResponsePayload::error(ChainHeadRpcError::InvalidBlock)
314				},
315				Err(error) => FollowEvent::<Block::Hash>::OperationError(OperationError {
316					operation_id: operation_id.clone(),
317					error: error.to_string(),
318				}),
319			};
320
321			let (rp, rp_fut) = method_started_response(operation_id, None);
322			let fut = async move {
323				// Wait for the server to send out the response and if it produces an error no event
324				// should be generated.
325				if rp_fut.await.is_err() {
326					return;
327				}
328
329				let _ = block_guard.response_sender().send(event).await;
330			};
331			executor.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
332
333			rp
334		});
335
336		result
337			.await
338			.unwrap_or_else(|_| ResponsePayload::success(MethodResponse::LimitReached))
339	}
340
341	async fn chain_head_unstable_header(
342		&self,
343		ext: &Extensions,
344		follow_subscription: String,
345		hash: Block::Hash,
346	) -> Result<Option<String>, ChainHeadRpcError> {
347		let conn_id = ext
348			.get::<ConnectionId>()
349			.copied()
350			.expect("ConnectionId is always set by jsonrpsee; qed");
351
352		if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
353			return Ok(None);
354		}
355
356		let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
357			Ok(block) => block,
358			Err(SubscriptionManagementError::SubscriptionAbsent) |
359			Err(SubscriptionManagementError::ExceededLimits) => return Ok(None),
360			Err(SubscriptionManagementError::BlockHashAbsent) => {
361				// Block is not part of the subscription.
362				return Err(ChainHeadRpcError::InvalidBlock.into())
363			},
364			Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
365		};
366
367		let client = self.client.clone();
368		let result = spawn_blocking(&self.executor, async move {
369			let _block_guard = block_guard;
370
371			client
372				.header(hash)
373				.map(|opt_header| opt_header.map(|h| hex_string(&h.encode())))
374				.map_err(|err| ChainHeadRpcError::InternalError(err.to_string()))
375		});
376		result.await.unwrap_or_else(|_| Ok(None))
377	}
378
379	async fn chain_head_unstable_storage(
380		&self,
381		ext: &Extensions,
382		follow_subscription: String,
383		hash: Block::Hash,
384		items: Vec<StorageQuery<String>>,
385		child_trie: Option<String>,
386	) -> ResponsePayload<'static, MethodResponse> {
387		let conn_id = ext
388			.get::<ConnectionId>()
389			.copied()
390			.expect("ConnectionId is always set by jsonrpsee; qed");
391
392		if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
393			// The spec says to return `LimitReached` if the follow subscription is invalid or
394			// stale.
395			return ResponsePayload::success(MethodResponse::LimitReached);
396		}
397
398		// Gain control over parameter parsing and returned error.
399		let items = match items
400			.into_iter()
401			.map(|query| {
402				let key = StorageKey(parse_hex_param(query.key)?);
403				Ok(StorageQuery { key, query_type: query.query_type })
404			})
405			.collect::<Result<Vec<_>, ChainHeadRpcError>>()
406		{
407			Ok(items) => items,
408			Err(err) => {
409				return ResponsePayload::error(err);
410			},
411		};
412
413		let child_trie = match child_trie.map(|child_trie| parse_hex_param(child_trie)).transpose()
414		{
415			Ok(c) => c.map(ChildInfo::new_default_from_vec),
416			Err(e) => return ResponsePayload::error(e),
417		};
418
419		let mut block_guard =
420			match self.subscriptions.lock_block(&follow_subscription, hash, items.len()) {
421				Ok(block) => block,
422				Err(SubscriptionManagementError::SubscriptionAbsent) |
423				Err(SubscriptionManagementError::ExceededLimits) => {
424					return ResponsePayload::success(MethodResponse::LimitReached);
425				},
426				Err(SubscriptionManagementError::BlockHashAbsent) => {
427					// Block is not part of the subscription.
428					return ResponsePayload::error(ChainHeadRpcError::InvalidBlock)
429				},
430				Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock),
431			};
432
433		let mut storage_client = ChainHeadStorage::<Client, Block, BE>::new(self.client.clone());
434
435		// Storage items are never discarded.
436		let (rp, rp_fut) = method_started_response(block_guard.operation().operation_id(), Some(0));
437
438		let fut = async move {
439			// Wait for the server to send out the response and if it produces an error no event
440			// should be generated.
441			if rp_fut.await.is_err() {
442				return;
443			}
444
445			let (tx, rx) = tokio::sync::mpsc::channel(STORAGE_QUERY_BUF);
446			let operation_id = block_guard.operation().operation_id();
447			let stop_handle = block_guard.operation().stop_handle().clone();
448			let response_sender = block_guard.response_sender();
449
450			// May fail if the channel is closed or the connection is closed.
451			// which is okay to ignore.
452			let _ = futures::future::join(
453				storage_client.generate_events(hash, items, child_trie, tx),
454				process_storage_items(rx, response_sender, operation_id, &stop_handle),
455			)
456			.await;
457		};
458		self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
459
460		rp
461	}
462
463	async fn chain_head_unstable_call(
464		&self,
465		ext: &Extensions,
466		follow_subscription: String,
467		hash: Block::Hash,
468		function: String,
469		call_parameters: String,
470	) -> ResponsePayload<'static, MethodResponse> {
471		let call_parameters = match parse_hex_param(call_parameters) {
472			Ok(hex) => Bytes::from(hex),
473			Err(err) => return ResponsePayload::error(err),
474		};
475
476		let conn_id = ext
477			.get::<ConnectionId>()
478			.copied()
479			.expect("ConnectionId is always set by jsonrpsee; qed");
480
481		if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
482			// The spec says to return `LimitReached` if the follow subscription is invalid or
483			// stale.
484			return ResponsePayload::success(MethodResponse::LimitReached);
485		}
486
487		let mut block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
488			Ok(block) => block,
489			Err(SubscriptionManagementError::SubscriptionAbsent) |
490			Err(SubscriptionManagementError::ExceededLimits) => {
491				// Invalid invalid subscription ID.
492				return ResponsePayload::success(MethodResponse::LimitReached)
493			},
494			Err(SubscriptionManagementError::BlockHashAbsent) => {
495				// Block is not part of the subscription.
496				return ResponsePayload::error(ChainHeadRpcError::InvalidBlock)
497			},
498			Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock),
499		};
500
501		// Reject subscription if with_runtime is false.
502		if !block_guard.has_runtime() {
503			return ResponsePayload::error(ChainHeadRpcError::InvalidRuntimeCall(
504				"The runtime updates flag must be set".to_string(),
505			));
506		}
507
508		let operation_id = block_guard.operation().operation_id();
509		let client = self.client.clone();
510
511		let (rp, rp_fut) = method_started_response(operation_id.clone(), None);
512		let fut = async move {
513			// Wait for the server to send out the response and if it produces an error no event
514			// should be generated.
515			if rp_fut.await.is_err() {
516				return
517			}
518
519			let event = client
520				.executor()
521				.call(hash, &function, &call_parameters, CallContext::Offchain)
522				.map(|result| {
523					FollowEvent::<Block::Hash>::OperationCallDone(OperationCallDone {
524						operation_id: operation_id.clone(),
525						output: hex_string(&result),
526					})
527				})
528				.unwrap_or_else(|error| {
529					FollowEvent::<Block::Hash>::OperationError(OperationError {
530						operation_id: operation_id.clone(),
531						error: error.to_string(),
532					})
533				});
534
535			let _ = block_guard.response_sender().send(event).await;
536		};
537		self.executor
538			.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
539
540		rp
541	}
542
543	async fn chain_head_unstable_unpin(
544		&self,
545		ext: &Extensions,
546		follow_subscription: String,
547		hash_or_hashes: ListOrValue<Block::Hash>,
548	) -> Result<(), ChainHeadRpcError> {
549		let conn_id = ext
550			.get::<ConnectionId>()
551			.copied()
552			.expect("ConnectionId is always set by jsonrpsee; qed");
553
554		if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
555			return Ok(());
556		}
557
558		let result = match hash_or_hashes {
559			ListOrValue::Value(hash) =>
560				self.subscriptions.unpin_blocks(&follow_subscription, [hash]),
561			ListOrValue::List(hashes) =>
562				self.subscriptions.unpin_blocks(&follow_subscription, hashes),
563		};
564
565		match result {
566			Ok(()) => Ok(()),
567			Err(SubscriptionManagementError::SubscriptionAbsent) => {
568				// Invalid invalid subscription ID.
569				Ok(())
570			},
571			Err(SubscriptionManagementError::BlockHashAbsent) => {
572				// Block is not part of the subscription.
573				Err(ChainHeadRpcError::InvalidBlock)
574			},
575			Err(SubscriptionManagementError::DuplicateHashes) =>
576				Err(ChainHeadRpcError::InvalidDuplicateHashes),
577			Err(_) => Err(ChainHeadRpcError::InvalidBlock),
578		}
579	}
580
581	async fn chain_head_unstable_continue(
582		&self,
583		ext: &Extensions,
584		follow_subscription: String,
585		operation_id: String,
586	) -> Result<(), ChainHeadRpcError> {
587		let conn_id = ext
588			.get::<ConnectionId>()
589			.copied()
590			.expect("ConnectionId is always set by jsonrpsee; qed");
591
592		if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
593			return Ok(())
594		}
595
596		// WaitingForContinue event is never emitted, in such cases
597		// emit an `InvalidContinue error`.
598		if self.subscriptions.get_operation(&follow_subscription, &operation_id).is_some() {
599			Err(ChainHeadRpcError::InvalidContinue.into())
600		} else {
601			Ok(())
602		}
603	}
604
605	async fn chain_head_unstable_stop_operation(
606		&self,
607		ext: &Extensions,
608		follow_subscription: String,
609		operation_id: String,
610	) -> Result<(), ChainHeadRpcError> {
611		let conn_id = ext
612			.get::<ConnectionId>()
613			.copied()
614			.expect("ConnectionId is always set by jsonrpsee; qed");
615
616		if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
617			return Ok(())
618		}
619
620		let Some(mut operation) =
621			self.subscriptions.get_operation(&follow_subscription, &operation_id)
622		else {
623			return Ok(())
624		};
625
626		operation.stop();
627
628		Ok(())
629	}
630}
631
632fn method_started_response(
633	operation_id: String,
634	discarded_items: Option<usize>,
635) -> (ResponsePayload<'static, MethodResponse>, MethodResponseFuture) {
636	let rp = MethodResponse::Started(MethodResponseStarted { operation_id, discarded_items });
637	ResponsePayload::success(rp).notify_on_completion()
638}
639
640/// Spawn a blocking future on the provided executor and return the result on a oneshot channel.
641///
642/// This is a wrapper to extract the result of a `executor.spawn_blocking` future.
643fn spawn_blocking<R>(
644	executor: &SubscriptionTaskExecutor,
645	fut: impl std::future::Future<Output = R> + Send + 'static,
646) -> oneshot::Receiver<R>
647where
648	R: Send + 'static,
649{
650	let (tx, rx) = oneshot::channel();
651
652	let blocking_fut = async move {
653		let result = fut.await;
654		// Send the result back on the channel.
655		let _ = tx.send(result);
656	};
657
658	executor.spawn_blocking("substrate-rpc-subscription", Some("rpc"), blocking_fut.boxed());
659
660	rx
661}
662
663async fn process_storage_items<Hash>(
664	mut storage_query_stream: mpsc::Receiver<QueryResult>,
665	mut sender: FollowEventSender<Hash>,
666	operation_id: String,
667	stop_handle: &StopHandle,
668) -> Result<(), FollowEventSendError> {
669	loop {
670		tokio::select! {
671			_ = stop_handle.stopped() => {
672				break;
673			},
674
675			maybe_storage = storage_query_stream.recv() => {
676				let Some(storage) = maybe_storage else {
677					break;
678				};
679
680				let item = match storage {
681					QueryResult::Err(error) => {
682						return sender
683						.send(FollowEvent::OperationError(OperationError { operation_id, error }))
684						.await
685					}
686					QueryResult::Ok(Some(v)) => v,
687					QueryResult::Ok(None) => continue,
688				};
689
690				sender
691					.send(FollowEvent::OperationStorageItems(OperationStorageItems {
692						operation_id: operation_id.clone(),
693						items: vec![item],
694				})).await?;
695			},
696		}
697	}
698
699	sender
700		.send(FollowEvent::OperationStorageDone(OperationId { operation_id }))
701		.await?;
702
703	Ok(())
704}