referrerpolicy=no-referrer-when-downgrade

sc_rpc_spec_v2/chain_head/
chain_head.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! API implementation for `chainHead`.
20
21use super::{
22	chain_head_storage::ChainHeadStorage,
23	event::{MethodResponseStarted, OperationBodyDone, OperationCallDone},
24};
25use crate::{
26	chain_head::{
27		api::ChainHeadApiServer,
28		chain_head_follow::ChainHeadFollower,
29		error::Error as ChainHeadRpcError,
30		event::{FollowEvent, MethodResponse, OperationError, OperationId, OperationStorageItems},
31		subscription::{StopHandle, SubscriptionManagement, SubscriptionManagementError},
32		FollowEventSendError, FollowEventSender,
33	},
34	common::{events::StorageQuery, storage::QueryResult},
35	hex_string, SubscriptionTaskExecutor,
36};
37use codec::Encode;
38use futures::{channel::oneshot, future::FutureExt, SinkExt};
39use jsonrpsee::{
40	core::async_trait, server::ResponsePayload, types::SubscriptionId, ConnectionId, Extensions,
41	MethodResponseFuture, PendingSubscriptionSink,
42};
43use log::debug;
44use sc_client_api::{
45	Backend, BlockBackend, BlockchainEvents, CallExecutor, ChildInfo, ExecutorProvider, StorageKey,
46	StorageProvider,
47};
48use sc_rpc::utils::Subscription;
49use sp_api::CallApiAt;
50use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
51use sp_core::{traits::CallContext, Bytes};
52use sp_rpc::list::ListOrValue;
53use sp_runtime::traits::Block as BlockT;
54use std::{marker::PhantomData, sync::Arc, time::Duration};
55use tokio::sync::mpsc;
56
57pub(crate) const LOG_TARGET: &str = "rpc-spec-v2";
58
59/// The buffer capacity for each storage query.
60///
61/// This is small because the underlying JSON-RPC server has
62/// its down buffer capacity per connection as well.
63const STORAGE_QUERY_BUF: usize = 16;
64
65/// The configuration of [`ChainHead`].
66pub struct ChainHeadConfig {
67	/// The maximum number of pinned blocks across all subscriptions.
68	pub global_max_pinned_blocks: usize,
69	/// The maximum duration that a block is allowed to be pinned per subscription.
70	pub subscription_max_pinned_duration: Duration,
71	/// The maximum number of ongoing operations per subscription.
72	pub subscription_max_ongoing_operations: usize,
73	/// Stop all subscriptions if the distance between the leaves and the current finalized
74	/// block is larger than this value.
75	pub max_lagging_distance: usize,
76	/// The maximum number of `chainHead_follow` subscriptions per connection.
77	pub max_follow_subscriptions_per_connection: usize,
78	/// The maximum number of pending messages per subscription.
79	pub subscription_buffer_cap: usize,
80}
81
82/// Maximum pinned blocks across all connections.
83/// This number is large enough to consider immediate blocks.
84/// Note: This should never exceed the `PINNING_CACHE_SIZE` from client/db.
85pub(crate) const MAX_PINNED_BLOCKS: usize = 512;
86
87/// Any block of any subscription should not be pinned more than
88/// this constant. When a subscription contains a block older than this,
89/// the subscription becomes subject to termination.
90/// Note: This should be enough for immediate blocks.
91const MAX_PINNED_DURATION: Duration = Duration::from_secs(60);
92
93/// The maximum number of ongoing operations per subscription.
94/// Note: The lower limit imposed by the spec is 16.
95const MAX_ONGOING_OPERATIONS: usize = 16;
96
97/// Stop all subscriptions if the distance between the leaves and the current finalized
98/// block is larger than this value.
99const MAX_LAGGING_DISTANCE: usize = 128;
100
101/// The maximum number of `chainHead_follow` subscriptions per connection.
102const MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION: usize = 4;
103
104impl Default for ChainHeadConfig {
105	fn default() -> Self {
106		ChainHeadConfig {
107			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
108			subscription_max_pinned_duration: MAX_PINNED_DURATION,
109			subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS,
110			max_lagging_distance: MAX_LAGGING_DISTANCE,
111			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
112			subscription_buffer_cap: MAX_PINNED_BLOCKS,
113		}
114	}
115}
116
117/// An API for chain head RPC calls.
118pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
119	/// Substrate client.
120	client: Arc<Client>,
121	/// Backend of the chain.
122	backend: Arc<BE>,
123	/// Executor to spawn subscriptions.
124	executor: SubscriptionTaskExecutor,
125	/// Keep track of the pinned blocks for each subscription.
126	subscriptions: SubscriptionManagement<Block, BE>,
127	/// Stop all subscriptions if the distance between the leaves and the current finalized
128	/// block is larger than this value.
129	max_lagging_distance: usize,
130	/// Phantom member to pin the block type.
131	_phantom: PhantomData<Block>,
132	/// The maximum number of pending messages per subscription.
133	subscription_buffer_cap: usize,
134}
135
136impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
137	/// Create a new [`ChainHead`].
138	pub fn new(
139		client: Arc<Client>,
140		backend: Arc<BE>,
141		executor: SubscriptionTaskExecutor,
142		config: ChainHeadConfig,
143	) -> Self {
144		Self {
145			client,
146			backend: backend.clone(),
147			executor,
148			subscriptions: SubscriptionManagement::new(
149				config.global_max_pinned_blocks,
150				config.subscription_max_pinned_duration,
151				config.subscription_max_ongoing_operations,
152				config.max_follow_subscriptions_per_connection,
153				backend,
154			),
155			max_lagging_distance: config.max_lagging_distance,
156			subscription_buffer_cap: config.subscription_buffer_cap,
157			_phantom: PhantomData,
158		}
159	}
160}
161
162/// Helper to convert the `subscription ID` to a string.
163pub fn read_subscription_id_as_string(sink: &Subscription) -> String {
164	match sink.subscription_id() {
165		SubscriptionId::Num(n) => n.to_string(),
166		SubscriptionId::Str(s) => s.into_owned().into(),
167	}
168}
169
170/// Parse hex-encoded string parameter as raw bytes.
171///
172/// If the parsing fails, returns an error propagated to the RPC method.
173fn parse_hex_param(param: String) -> Result<Vec<u8>, ChainHeadRpcError> {
174	// Methods can accept empty parameters.
175	if param.is_empty() {
176		return Ok(Default::default());
177	}
178
179	match array_bytes::hex2bytes(&param) {
180		Ok(bytes) => Ok(bytes),
181		Err(_) => Err(ChainHeadRpcError::InvalidParam(param)),
182	}
183}
184
185#[async_trait]
186impl<BE, Block, Client> ChainHeadApiServer<Block::Hash> for ChainHead<BE, Block, Client>
187where
188	Block: BlockT + 'static,
189	Block::Header: Unpin,
190	BE: Backend<Block> + 'static,
191	Client: BlockBackend<Block>
192		+ ExecutorProvider<Block>
193		+ HeaderBackend<Block>
194		+ HeaderMetadata<Block, Error = BlockChainError>
195		+ BlockchainEvents<Block>
196		+ CallApiAt<Block>
197		+ StorageProvider<Block, BE>
198		+ 'static,
199{
200	fn chain_head_unstable_follow(&self, pending: PendingSubscriptionSink, with_runtime: bool) {
201		let subscriptions = self.subscriptions.clone();
202		let backend = self.backend.clone();
203		let client = self.client.clone();
204		let max_lagging_distance = self.max_lagging_distance;
205		let subscription_buffer_cap = self.subscription_buffer_cap;
206
207		let fut = async move {
208			// Ensure the current connection ID has enough space to accept a new subscription.
209			let connection_id = pending.connection_id();
210			// The RAII `reserved_subscription` will clean up resources on drop:
211			// - free the reserved subscription for the connection ID.
212			// - remove the subscription ID from the subscription management.
213			let Some(mut reserved_subscription) = subscriptions.reserve_subscription(connection_id)
214			else {
215				pending.reject(ChainHeadRpcError::ReachedLimits).await;
216				return;
217			};
218
219			let Ok(sink) = pending.accept().await.map(Subscription::from) else { return };
220
221			let sub_id = read_subscription_id_as_string(&sink);
222			// Keep track of the subscription.
223			let Some(sub_data) =
224				reserved_subscription.insert_subscription(sub_id.clone(), with_runtime)
225			else {
226				// Inserting the subscription can only fail if the JsonRPSee generated a duplicate
227				// subscription ID.
228				debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted", sub_id);
229				let _ = sink.send(&FollowEvent::<String>::Stop).await;
230				return;
231			};
232			debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription accepted", sub_id);
233
234			let mut chain_head_follow = ChainHeadFollower::new(
235				client,
236				backend,
237				subscriptions,
238				with_runtime,
239				sub_id.clone(),
240				max_lagging_distance,
241				subscription_buffer_cap,
242			);
243			let result = chain_head_follow.generate_events(sink, sub_data).await;
244			if let Err(SubscriptionManagementError::BlockDistanceTooLarge) = result {
245				debug!(target: LOG_TARGET, "[follow][id={:?}] All subscriptions are stopped", sub_id);
246				reserved_subscription.stop_all_subscriptions();
247			}
248
249			debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription removed", sub_id);
250		};
251
252		self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
253	}
254
255	async fn chain_head_unstable_body(
256		&self,
257		ext: &Extensions,
258		follow_subscription: String,
259		hash: Block::Hash,
260	) -> ResponsePayload<'static, MethodResponse> {
261		let conn_id = ext
262			.get::<ConnectionId>()
263			.copied()
264			.expect("ConnectionId is always set by jsonrpsee; qed");
265
266		if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
267			// The spec says to return `LimitReached` if the follow subscription is invalid or
268			// stale.
269			return ResponsePayload::success(MethodResponse::LimitReached);
270		}
271
272		let client = self.client.clone();
273		let subscriptions = self.subscriptions.clone();
274		let executor = self.executor.clone();
275
276		let result = spawn_blocking(&self.executor, async move {
277			let mut block_guard = match subscriptions.lock_block(&follow_subscription, hash, 1) {
278				Ok(block) => block,
279				Err(SubscriptionManagementError::SubscriptionAbsent) |
280				Err(SubscriptionManagementError::ExceededLimits) => {
281					return ResponsePayload::success(MethodResponse::LimitReached)
282				},
283				Err(SubscriptionManagementError::BlockHashAbsent) => {
284					// Block is not part of the subscription.
285					return ResponsePayload::error(ChainHeadRpcError::InvalidBlock);
286				},
287				Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock),
288			};
289
290			let operation_id = block_guard.operation().operation_id();
291
292			let event = match client.block(hash) {
293				Ok(Some(signed_block)) => {
294					let extrinsics = signed_block
295						.block
296						.extrinsics()
297						.iter()
298						.map(|extrinsic| hex_string(&extrinsic.encode()))
299						.collect();
300					FollowEvent::<Block::Hash>::OperationBodyDone(OperationBodyDone {
301						operation_id: operation_id.clone(),
302						value: extrinsics,
303					})
304				},
305				Ok(None) => {
306					// The block's body was pruned. This subscription ID has become invalid.
307					debug!(
308						target: LOG_TARGET,
309						"[body][id={:?}] Stopping subscription because hash={:?} was pruned",
310						&follow_subscription,
311						hash
312					);
313					subscriptions.remove_subscription(&follow_subscription);
314					return ResponsePayload::error(ChainHeadRpcError::InvalidBlock);
315				},
316				Err(error) => FollowEvent::<Block::Hash>::OperationError(OperationError {
317					operation_id: operation_id.clone(),
318					error: error.to_string(),
319				}),
320			};
321
322			let (rp, rp_fut) = method_started_response(operation_id, None);
323			let fut = async move {
324				// Wait for the server to send out the response and if it produces an error no event
325				// should be generated.
326				if rp_fut.await.is_err() {
327					return;
328				}
329
330				let _ = block_guard.response_sender().send(event).await;
331			};
332			executor.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
333
334			rp
335		});
336
337		result
338			.await
339			.unwrap_or_else(|_| ResponsePayload::success(MethodResponse::LimitReached))
340	}
341
342	async fn chain_head_unstable_header(
343		&self,
344		ext: &Extensions,
345		follow_subscription: String,
346		hash: Block::Hash,
347	) -> Result<Option<String>, ChainHeadRpcError> {
348		let conn_id = ext
349			.get::<ConnectionId>()
350			.copied()
351			.expect("ConnectionId is always set by jsonrpsee; qed");
352
353		if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
354			return Ok(None);
355		}
356
357		let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
358			Ok(block) => block,
359			Err(SubscriptionManagementError::SubscriptionAbsent) |
360			Err(SubscriptionManagementError::ExceededLimits) => return Ok(None),
361			Err(SubscriptionManagementError::BlockHashAbsent) => {
362				// Block is not part of the subscription.
363				return Err(ChainHeadRpcError::InvalidBlock.into());
364			},
365			Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
366		};
367
368		let client = self.client.clone();
369		let result = spawn_blocking(&self.executor, async move {
370			let _block_guard = block_guard;
371
372			client
373				.header(hash)
374				.map(|opt_header| opt_header.map(|h| hex_string(&h.encode())))
375				.map_err(|err| ChainHeadRpcError::InternalError(err.to_string()))
376		});
377		result.await.unwrap_or_else(|_| Ok(None))
378	}
379
380	async fn chain_head_unstable_storage(
381		&self,
382		ext: &Extensions,
383		follow_subscription: String,
384		hash: Block::Hash,
385		items: Vec<StorageQuery<String>>,
386		child_trie: Option<String>,
387	) -> ResponsePayload<'static, MethodResponse> {
388		let conn_id = ext
389			.get::<ConnectionId>()
390			.copied()
391			.expect("ConnectionId is always set by jsonrpsee; qed");
392
393		if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
394			// The spec says to return `LimitReached` if the follow subscription is invalid or
395			// stale.
396			return ResponsePayload::success(MethodResponse::LimitReached);
397		}
398
399		// Gain control over parameter parsing and returned error.
400		let items = match items
401			.into_iter()
402			.map(|query| {
403				let key = StorageKey(parse_hex_param(query.key)?);
404				Ok(StorageQuery { key, query_type: query.query_type, pagination_start_key: None })
405			})
406			.collect::<Result<Vec<_>, ChainHeadRpcError>>()
407		{
408			Ok(items) => items,
409			Err(err) => {
410				return ResponsePayload::error(err);
411			},
412		};
413
414		let child_trie = match child_trie.map(|child_trie| parse_hex_param(child_trie)).transpose()
415		{
416			Ok(c) => c.map(ChildInfo::new_default_from_vec),
417			Err(e) => return ResponsePayload::error(e),
418		};
419
420		let mut block_guard =
421			match self.subscriptions.lock_block(&follow_subscription, hash, items.len()) {
422				Ok(block) => block,
423				Err(SubscriptionManagementError::SubscriptionAbsent) |
424				Err(SubscriptionManagementError::ExceededLimits) => {
425					return ResponsePayload::success(MethodResponse::LimitReached);
426				},
427				Err(SubscriptionManagementError::BlockHashAbsent) => {
428					// Block is not part of the subscription.
429					return ResponsePayload::error(ChainHeadRpcError::InvalidBlock);
430				},
431				Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock),
432			};
433
434		let mut storage_client = ChainHeadStorage::<Client, Block, BE>::new(self.client.clone());
435
436		// Storage items are never discarded.
437		let (rp, rp_fut) = method_started_response(block_guard.operation().operation_id(), Some(0));
438
439		let fut = async move {
440			// Wait for the server to send out the response and if it produces an error no event
441			// should be generated.
442			if rp_fut.await.is_err() {
443				return;
444			}
445
446			let (tx, rx) = tokio::sync::mpsc::channel(STORAGE_QUERY_BUF);
447			let operation_id = block_guard.operation().operation_id();
448			let stop_handle = block_guard.operation().stop_handle().clone();
449			let response_sender = block_guard.response_sender();
450
451			// May fail if the channel is closed or the connection is closed.
452			// which is okay to ignore.
453			let _ = futures::future::join(
454				storage_client.generate_events(hash, items, child_trie, tx),
455				process_storage_items(rx, response_sender, operation_id, &stop_handle),
456			)
457			.await;
458		};
459		self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
460
461		rp
462	}
463
464	async fn chain_head_unstable_call(
465		&self,
466		ext: &Extensions,
467		follow_subscription: String,
468		hash: Block::Hash,
469		function: String,
470		call_parameters: String,
471	) -> ResponsePayload<'static, MethodResponse> {
472		let call_parameters = match parse_hex_param(call_parameters) {
473			Ok(hex) => Bytes::from(hex),
474			Err(err) => return ResponsePayload::error(err),
475		};
476
477		let conn_id = ext
478			.get::<ConnectionId>()
479			.copied()
480			.expect("ConnectionId is always set by jsonrpsee; qed");
481
482		if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
483			// The spec says to return `LimitReached` if the follow subscription is invalid or
484			// stale.
485			return ResponsePayload::success(MethodResponse::LimitReached);
486		}
487
488		let mut block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
489			Ok(block) => block,
490			Err(SubscriptionManagementError::SubscriptionAbsent) |
491			Err(SubscriptionManagementError::ExceededLimits) => {
492				// Invalid invalid subscription ID.
493				return ResponsePayload::success(MethodResponse::LimitReached);
494			},
495			Err(SubscriptionManagementError::BlockHashAbsent) => {
496				// Block is not part of the subscription.
497				return ResponsePayload::error(ChainHeadRpcError::InvalidBlock);
498			},
499			Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock),
500		};
501
502		// Reject subscription if with_runtime is false.
503		if !block_guard.has_runtime() {
504			return ResponsePayload::error(ChainHeadRpcError::InvalidRuntimeCall(
505				"The runtime updates flag must be set".to_string(),
506			));
507		}
508
509		let operation_id = block_guard.operation().operation_id();
510		let client = self.client.clone();
511
512		let (rp, rp_fut) = method_started_response(operation_id.clone(), None);
513		let fut = async move {
514			// Wait for the server to send out the response and if it produces an error no event
515			// should be generated.
516			if rp_fut.await.is_err() {
517				return;
518			}
519
520			let event = client
521				.executor()
522				.call(hash, &function, &call_parameters, CallContext::Offchain)
523				.map(|result| {
524					FollowEvent::<Block::Hash>::OperationCallDone(OperationCallDone {
525						operation_id: operation_id.clone(),
526						output: hex_string(&result),
527					})
528				})
529				.unwrap_or_else(|error| {
530					FollowEvent::<Block::Hash>::OperationError(OperationError {
531						operation_id: operation_id.clone(),
532						error: error.to_string(),
533					})
534				});
535
536			let _ = block_guard.response_sender().send(event).await;
537		};
538		self.executor
539			.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
540
541		rp
542	}
543
544	async fn chain_head_unstable_unpin(
545		&self,
546		ext: &Extensions,
547		follow_subscription: String,
548		hash_or_hashes: ListOrValue<Block::Hash>,
549	) -> Result<(), ChainHeadRpcError> {
550		let conn_id = ext
551			.get::<ConnectionId>()
552			.copied()
553			.expect("ConnectionId is always set by jsonrpsee; qed");
554
555		if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
556			return Ok(());
557		}
558
559		let result = match hash_or_hashes {
560			ListOrValue::Value(hash) => {
561				self.subscriptions.unpin_blocks(&follow_subscription, [hash])
562			},
563			ListOrValue::List(hashes) => {
564				self.subscriptions.unpin_blocks(&follow_subscription, hashes)
565			},
566		};
567
568		match result {
569			Ok(()) => Ok(()),
570			Err(SubscriptionManagementError::SubscriptionAbsent) => {
571				// Invalid invalid subscription ID.
572				Ok(())
573			},
574			Err(SubscriptionManagementError::BlockHashAbsent) => {
575				// Block is not part of the subscription.
576				Err(ChainHeadRpcError::InvalidBlock)
577			},
578			Err(SubscriptionManagementError::DuplicateHashes) => {
579				Err(ChainHeadRpcError::InvalidDuplicateHashes)
580			},
581			Err(_) => Err(ChainHeadRpcError::InvalidBlock),
582		}
583	}
584
585	async fn chain_head_unstable_continue(
586		&self,
587		ext: &Extensions,
588		follow_subscription: String,
589		operation_id: String,
590	) -> Result<(), ChainHeadRpcError> {
591		let conn_id = ext
592			.get::<ConnectionId>()
593			.copied()
594			.expect("ConnectionId is always set by jsonrpsee; qed");
595
596		if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
597			return Ok(());
598		}
599
600		// WaitingForContinue event is never emitted, in such cases
601		// emit an `InvalidContinue error`.
602		if self.subscriptions.get_operation(&follow_subscription, &operation_id).is_some() {
603			Err(ChainHeadRpcError::InvalidContinue.into())
604		} else {
605			Ok(())
606		}
607	}
608
609	async fn chain_head_unstable_stop_operation(
610		&self,
611		ext: &Extensions,
612		follow_subscription: String,
613		operation_id: String,
614	) -> Result<(), ChainHeadRpcError> {
615		let conn_id = ext
616			.get::<ConnectionId>()
617			.copied()
618			.expect("ConnectionId is always set by jsonrpsee; qed");
619
620		if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
621			return Ok(());
622		}
623
624		let Some(mut operation) =
625			self.subscriptions.get_operation(&follow_subscription, &operation_id)
626		else {
627			return Ok(());
628		};
629
630		operation.stop();
631
632		Ok(())
633	}
634}
635
636fn method_started_response(
637	operation_id: String,
638	discarded_items: Option<usize>,
639) -> (ResponsePayload<'static, MethodResponse>, MethodResponseFuture) {
640	let rp = MethodResponse::Started(MethodResponseStarted { operation_id, discarded_items });
641	ResponsePayload::success(rp).notify_on_completion()
642}
643
644/// Spawn a blocking future on the provided executor and return the result on a oneshot channel.
645///
646/// This is a wrapper to extract the result of a `executor.spawn_blocking` future.
647fn spawn_blocking<R>(
648	executor: &SubscriptionTaskExecutor,
649	fut: impl std::future::Future<Output = R> + Send + 'static,
650) -> oneshot::Receiver<R>
651where
652	R: Send + 'static,
653{
654	let (tx, rx) = oneshot::channel();
655
656	let blocking_fut = async move {
657		let result = fut.await;
658		// Send the result back on the channel.
659		let _ = tx.send(result);
660	};
661
662	executor.spawn_blocking("substrate-rpc-subscription", Some("rpc"), blocking_fut.boxed());
663
664	rx
665}
666
667async fn process_storage_items<Hash>(
668	mut storage_query_stream: mpsc::Receiver<QueryResult>,
669	mut sender: FollowEventSender<Hash>,
670	operation_id: String,
671	stop_handle: &StopHandle,
672) -> Result<(), FollowEventSendError> {
673	loop {
674		tokio::select! {
675			_ = stop_handle.stopped() => {
676				break;
677			},
678
679			maybe_storage = storage_query_stream.recv() => {
680				let Some(storage) = maybe_storage else {
681					break;
682				};
683
684				let item = match storage {
685					QueryResult::Err(error) => {
686						return sender
687						.send(FollowEvent::OperationError(OperationError { operation_id, error }))
688						.await
689					}
690					QueryResult::Ok(Some(v)) => v,
691					QueryResult::Ok(None) => continue,
692				};
693
694				sender
695					.send(FollowEvent::OperationStorageItems(OperationStorageItems {
696						operation_id: operation_id.clone(),
697						items: vec![item],
698				})).await?;
699			},
700		}
701	}
702
703	sender
704		.send(FollowEvent::OperationStorageDone(OperationId { operation_id }))
705		.await?;
706
707	Ok(())
708}