sc_rpc_spec_v2/chain_head/
chain_head.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! API implementation for `chainHead`.
20
21use super::{
22	chain_head_storage::ChainHeadStorage,
23	event::{MethodResponseStarted, OperationBodyDone, OperationCallDone},
24};
25use crate::{
26	chain_head::{
27		api::ChainHeadApiServer,
28		chain_head_follow::ChainHeadFollower,
29		error::Error as ChainHeadRpcError,
30		event::{FollowEvent, MethodResponse, OperationError},
31		subscription::{SubscriptionManagement, SubscriptionManagementError},
32	},
33	common::events::StorageQuery,
34	hex_string, SubscriptionTaskExecutor,
35};
36use codec::Encode;
37use futures::{channel::oneshot, future::FutureExt};
38use jsonrpsee::{
39	core::async_trait, server::ResponsePayload, types::SubscriptionId, ConnectionId, Extensions,
40	MethodResponseFuture, PendingSubscriptionSink,
41};
42use log::debug;
43use sc_client_api::{
44	Backend, BlockBackend, BlockchainEvents, CallExecutor, ChildInfo, ExecutorProvider, StorageKey,
45	StorageProvider,
46};
47use sc_rpc::utils::Subscription;
48use sp_api::CallApiAt;
49use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
50use sp_core::{traits::CallContext, Bytes};
51use sp_rpc::list::ListOrValue;
52use sp_runtime::traits::Block as BlockT;
53use std::{marker::PhantomData, sync::Arc, time::Duration};
54
55pub(crate) const LOG_TARGET: &str = "rpc-spec-v2";
56
57/// The configuration of [`ChainHead`].
58pub struct ChainHeadConfig {
59	/// The maximum number of pinned blocks across all subscriptions.
60	pub global_max_pinned_blocks: usize,
61	/// The maximum duration that a block is allowed to be pinned per subscription.
62	pub subscription_max_pinned_duration: Duration,
63	/// The maximum number of ongoing operations per subscription.
64	pub subscription_max_ongoing_operations: usize,
65	/// Stop all subscriptions if the distance between the leaves and the current finalized
66	/// block is larger than this value.
67	pub max_lagging_distance: usize,
68	/// The maximum number of items reported by the `chainHead_storage` before
69	/// pagination is required.
70	pub operation_max_storage_items: usize,
71	/// The maximum number of `chainHead_follow` subscriptions per connection.
72	pub max_follow_subscriptions_per_connection: usize,
73}
74
75/// Maximum pinned blocks across all connections.
76/// This number is large enough to consider immediate blocks.
77/// Note: This should never exceed the `PINNING_CACHE_SIZE` from client/db.
78pub(crate) const MAX_PINNED_BLOCKS: usize = 512;
79
80/// Any block of any subscription should not be pinned more than
81/// this constant. When a subscription contains a block older than this,
82/// the subscription becomes subject to termination.
83/// Note: This should be enough for immediate blocks.
84const MAX_PINNED_DURATION: Duration = Duration::from_secs(60);
85
86/// The maximum number of ongoing operations per subscription.
87/// Note: The lower limit imposed by the spec is 16.
88const MAX_ONGOING_OPERATIONS: usize = 16;
89
90/// The maximum number of items the `chainHead_storage` can return
91/// before paginations is required.
92const MAX_STORAGE_ITER_ITEMS: usize = 5;
93
94/// Stop all subscriptions if the distance between the leaves and the current finalized
95/// block is larger than this value.
96const MAX_LAGGING_DISTANCE: usize = 128;
97
98/// The maximum number of `chainHead_follow` subscriptions per connection.
99const MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION: usize = 4;
100
101impl Default for ChainHeadConfig {
102	fn default() -> Self {
103		ChainHeadConfig {
104			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
105			subscription_max_pinned_duration: MAX_PINNED_DURATION,
106			subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS,
107			max_lagging_distance: MAX_LAGGING_DISTANCE,
108			operation_max_storage_items: MAX_STORAGE_ITER_ITEMS,
109			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
110		}
111	}
112}
113
114/// An API for chain head RPC calls.
115pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
116	/// Substrate client.
117	client: Arc<Client>,
118	/// Backend of the chain.
119	backend: Arc<BE>,
120	/// Executor to spawn subscriptions.
121	executor: SubscriptionTaskExecutor,
122	/// Keep track of the pinned blocks for each subscription.
123	subscriptions: SubscriptionManagement<Block, BE>,
124	/// The maximum number of items reported by the `chainHead_storage` before
125	/// pagination is required.
126	operation_max_storage_items: usize,
127	/// Stop all subscriptions if the distance between the leaves and the current finalized
128	/// block is larger than this value.
129	max_lagging_distance: usize,
130	/// Phantom member to pin the block type.
131	_phantom: PhantomData<Block>,
132}
133
134impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
135	/// Create a new [`ChainHead`].
136	pub fn new(
137		client: Arc<Client>,
138		backend: Arc<BE>,
139		executor: SubscriptionTaskExecutor,
140		config: ChainHeadConfig,
141	) -> Self {
142		Self {
143			client,
144			backend: backend.clone(),
145			executor,
146			subscriptions: SubscriptionManagement::new(
147				config.global_max_pinned_blocks,
148				config.subscription_max_pinned_duration,
149				config.subscription_max_ongoing_operations,
150				config.max_follow_subscriptions_per_connection,
151				backend,
152			),
153			operation_max_storage_items: config.operation_max_storage_items,
154			max_lagging_distance: config.max_lagging_distance,
155			_phantom: PhantomData,
156		}
157	}
158}
159
160/// Helper to convert the `subscription ID` to a string.
161pub fn read_subscription_id_as_string(sink: &Subscription) -> String {
162	match sink.subscription_id() {
163		SubscriptionId::Num(n) => n.to_string(),
164		SubscriptionId::Str(s) => s.into_owned().into(),
165	}
166}
167
168/// Parse hex-encoded string parameter as raw bytes.
169///
170/// If the parsing fails, returns an error propagated to the RPC method.
171fn parse_hex_param(param: String) -> Result<Vec<u8>, ChainHeadRpcError> {
172	// Methods can accept empty parameters.
173	if param.is_empty() {
174		return Ok(Default::default())
175	}
176
177	match array_bytes::hex2bytes(&param) {
178		Ok(bytes) => Ok(bytes),
179		Err(_) => Err(ChainHeadRpcError::InvalidParam(param)),
180	}
181}
182
183#[async_trait]
184impl<BE, Block, Client> ChainHeadApiServer<Block::Hash> for ChainHead<BE, Block, Client>
185where
186	Block: BlockT + 'static,
187	Block::Header: Unpin,
188	BE: Backend<Block> + 'static,
189	Client: BlockBackend<Block>
190		+ ExecutorProvider<Block>
191		+ HeaderBackend<Block>
192		+ HeaderMetadata<Block, Error = BlockChainError>
193		+ BlockchainEvents<Block>
194		+ CallApiAt<Block>
195		+ StorageProvider<Block, BE>
196		+ 'static,
197{
198	fn chain_head_unstable_follow(&self, pending: PendingSubscriptionSink, with_runtime: bool) {
199		let subscriptions = self.subscriptions.clone();
200		let backend = self.backend.clone();
201		let client = self.client.clone();
202		let max_lagging_distance = self.max_lagging_distance;
203
204		let fut = async move {
205			// Ensure the current connection ID has enough space to accept a new subscription.
206			let connection_id = pending.connection_id();
207			// The RAII `reserved_subscription` will clean up resources on drop:
208			// - free the reserved subscription for the connection ID.
209			// - remove the subscription ID from the subscription management.
210			let Some(mut reserved_subscription) = subscriptions.reserve_subscription(connection_id)
211			else {
212				pending.reject(ChainHeadRpcError::ReachedLimits).await;
213				return
214			};
215
216			let Ok(sink) = pending.accept().await.map(Subscription::from) else { return };
217
218			let sub_id = read_subscription_id_as_string(&sink);
219			// Keep track of the subscription.
220			let Some(sub_data) =
221				reserved_subscription.insert_subscription(sub_id.clone(), with_runtime)
222			else {
223				// Inserting the subscription can only fail if the JsonRPSee generated a duplicate
224				// subscription ID.
225				debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted", sub_id);
226				let _ = sink.send(&FollowEvent::<String>::Stop).await;
227				return
228			};
229			debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription accepted", sub_id);
230
231			let mut chain_head_follow = ChainHeadFollower::new(
232				client,
233				backend,
234				subscriptions,
235				with_runtime,
236				sub_id.clone(),
237				max_lagging_distance,
238			);
239			let result = chain_head_follow.generate_events(sink, sub_data).await;
240			if let Err(SubscriptionManagementError::BlockDistanceTooLarge) = result {
241				debug!(target: LOG_TARGET, "[follow][id={:?}] All subscriptions are stopped", sub_id);
242				reserved_subscription.stop_all_subscriptions();
243			}
244
245			debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription removed", sub_id);
246		};
247
248		self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
249	}
250
251	async fn chain_head_unstable_body(
252		&self,
253		ext: &Extensions,
254		follow_subscription: String,
255		hash: Block::Hash,
256	) -> ResponsePayload<'static, MethodResponse> {
257		let conn_id = ext
258			.get::<ConnectionId>()
259			.copied()
260			.expect("ConnectionId is always set by jsonrpsee; qed");
261
262		if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
263			// The spec says to return `LimitReached` if the follow subscription is invalid or
264			// stale.
265			return ResponsePayload::success(MethodResponse::LimitReached);
266		}
267
268		let client = self.client.clone();
269		let subscriptions = self.subscriptions.clone();
270		let executor = self.executor.clone();
271
272		let result = spawn_blocking(&self.executor, async move {
273			let mut block_guard = match subscriptions.lock_block(&follow_subscription, hash, 1) {
274				Ok(block) => block,
275				Err(SubscriptionManagementError::SubscriptionAbsent) |
276				Err(SubscriptionManagementError::ExceededLimits) =>
277					return ResponsePayload::success(MethodResponse::LimitReached),
278				Err(SubscriptionManagementError::BlockHashAbsent) => {
279					// Block is not part of the subscription.
280					return ResponsePayload::error(ChainHeadRpcError::InvalidBlock);
281				},
282				Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock),
283			};
284
285			let operation_id = block_guard.operation().operation_id();
286
287			let event = match client.block(hash) {
288				Ok(Some(signed_block)) => {
289					let extrinsics = signed_block
290						.block
291						.extrinsics()
292						.iter()
293						.map(|extrinsic| hex_string(&extrinsic.encode()))
294						.collect();
295					FollowEvent::<Block::Hash>::OperationBodyDone(OperationBodyDone {
296						operation_id: operation_id.clone(),
297						value: extrinsics,
298					})
299				},
300				Ok(None) => {
301					// The block's body was pruned. This subscription ID has become invalid.
302					debug!(
303						target: LOG_TARGET,
304						"[body][id={:?}] Stopping subscription because hash={:?} was pruned",
305						&follow_subscription,
306						hash
307					);
308					subscriptions.remove_subscription(&follow_subscription);
309					return ResponsePayload::error(ChainHeadRpcError::InvalidBlock)
310				},
311				Err(error) => FollowEvent::<Block::Hash>::OperationError(OperationError {
312					operation_id: operation_id.clone(),
313					error: error.to_string(),
314				}),
315			};
316
317			let (rp, rp_fut) = method_started_response(operation_id, None);
318			let fut = async move {
319				// Wait for the server to send out the response and if it produces an error no event
320				// should be generated.
321				if rp_fut.await.is_err() {
322					return;
323				}
324
325				let _ = block_guard.response_sender().unbounded_send(event);
326			};
327			executor.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
328
329			rp
330		});
331
332		result
333			.await
334			.unwrap_or_else(|_| ResponsePayload::success(MethodResponse::LimitReached))
335	}
336
337	async fn chain_head_unstable_header(
338		&self,
339		ext: &Extensions,
340		follow_subscription: String,
341		hash: Block::Hash,
342	) -> Result<Option<String>, ChainHeadRpcError> {
343		let conn_id = ext
344			.get::<ConnectionId>()
345			.copied()
346			.expect("ConnectionId is always set by jsonrpsee; qed");
347
348		if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
349			return Ok(None);
350		}
351
352		let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
353			Ok(block) => block,
354			Err(SubscriptionManagementError::SubscriptionAbsent) |
355			Err(SubscriptionManagementError::ExceededLimits) => return Ok(None),
356			Err(SubscriptionManagementError::BlockHashAbsent) => {
357				// Block is not part of the subscription.
358				return Err(ChainHeadRpcError::InvalidBlock.into())
359			},
360			Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
361		};
362
363		let client = self.client.clone();
364		let result = spawn_blocking(&self.executor, async move {
365			let _block_guard = block_guard;
366
367			client
368				.header(hash)
369				.map(|opt_header| opt_header.map(|h| hex_string(&h.encode())))
370				.map_err(|err| ChainHeadRpcError::InternalError(err.to_string()))
371		});
372		result.await.unwrap_or_else(|_| Ok(None))
373	}
374
375	async fn chain_head_unstable_storage(
376		&self,
377		ext: &Extensions,
378		follow_subscription: String,
379		hash: Block::Hash,
380		items: Vec<StorageQuery<String>>,
381		child_trie: Option<String>,
382	) -> ResponsePayload<'static, MethodResponse> {
383		let conn_id = ext
384			.get::<ConnectionId>()
385			.copied()
386			.expect("ConnectionId is always set by jsonrpsee; qed");
387
388		if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
389			// The spec says to return `LimitReached` if the follow subscription is invalid or
390			// stale.
391			return ResponsePayload::success(MethodResponse::LimitReached);
392		}
393
394		// Gain control over parameter parsing and returned error.
395		let items = match items
396			.into_iter()
397			.map(|query| {
398				let key = StorageKey(parse_hex_param(query.key)?);
399				Ok(StorageQuery { key, query_type: query.query_type })
400			})
401			.collect::<Result<Vec<_>, ChainHeadRpcError>>()
402		{
403			Ok(items) => items,
404			Err(err) => {
405				return ResponsePayload::error(err);
406			},
407		};
408
409		let child_trie = match child_trie.map(|child_trie| parse_hex_param(child_trie)).transpose()
410		{
411			Ok(c) => c.map(ChildInfo::new_default_from_vec),
412			Err(e) => return ResponsePayload::error(e),
413		};
414
415		let mut block_guard =
416			match self.subscriptions.lock_block(&follow_subscription, hash, items.len()) {
417				Ok(block) => block,
418				Err(SubscriptionManagementError::SubscriptionAbsent) |
419				Err(SubscriptionManagementError::ExceededLimits) => {
420					return ResponsePayload::success(MethodResponse::LimitReached);
421				},
422				Err(SubscriptionManagementError::BlockHashAbsent) => {
423					// Block is not part of the subscription.
424					return ResponsePayload::error(ChainHeadRpcError::InvalidBlock)
425				},
426				Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock),
427			};
428
429		let mut storage_client = ChainHeadStorage::<Client, Block, BE>::new(
430			self.client.clone(),
431			self.operation_max_storage_items,
432		);
433		let operation = block_guard.operation();
434		let operation_id = operation.operation_id();
435
436		// The number of operations we are allowed to execute.
437		let num_operations = operation.num_reserved();
438		let discarded = items.len().saturating_sub(num_operations);
439		let mut items = items;
440		items.truncate(num_operations);
441
442		let (rp, rp_fut) = method_started_response(operation_id, Some(discarded));
443		let fut = async move {
444			// Wait for the server to send out the response and if it produces an error no event
445			// should be generated.
446			if rp_fut.await.is_err() {
447				return;
448			}
449
450			storage_client.generate_events(block_guard, hash, items, child_trie).await;
451		};
452		self.executor
453			.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
454
455		rp
456	}
457
458	async fn chain_head_unstable_call(
459		&self,
460		ext: &Extensions,
461		follow_subscription: String,
462		hash: Block::Hash,
463		function: String,
464		call_parameters: String,
465	) -> ResponsePayload<'static, MethodResponse> {
466		let call_parameters = match parse_hex_param(call_parameters) {
467			Ok(hex) => Bytes::from(hex),
468			Err(err) => return ResponsePayload::error(err),
469		};
470
471		let conn_id = ext
472			.get::<ConnectionId>()
473			.copied()
474			.expect("ConnectionId is always set by jsonrpsee; qed");
475
476		if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
477			// The spec says to return `LimitReached` if the follow subscription is invalid or
478			// stale.
479			return ResponsePayload::success(MethodResponse::LimitReached);
480		}
481
482		let mut block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
483			Ok(block) => block,
484			Err(SubscriptionManagementError::SubscriptionAbsent) |
485			Err(SubscriptionManagementError::ExceededLimits) => {
486				// Invalid invalid subscription ID.
487				return ResponsePayload::success(MethodResponse::LimitReached)
488			},
489			Err(SubscriptionManagementError::BlockHashAbsent) => {
490				// Block is not part of the subscription.
491				return ResponsePayload::error(ChainHeadRpcError::InvalidBlock)
492			},
493			Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock),
494		};
495
496		// Reject subscription if with_runtime is false.
497		if !block_guard.has_runtime() {
498			return ResponsePayload::error(ChainHeadRpcError::InvalidRuntimeCall(
499				"The runtime updates flag must be set".to_string(),
500			));
501		}
502
503		let operation_id = block_guard.operation().operation_id();
504		let client = self.client.clone();
505
506		let (rp, rp_fut) = method_started_response(operation_id.clone(), None);
507		let fut = async move {
508			// Wait for the server to send out the response and if it produces an error no event
509			// should be generated.
510			if rp_fut.await.is_err() {
511				return
512			}
513
514			let event = client
515				.executor()
516				.call(hash, &function, &call_parameters, CallContext::Offchain)
517				.map(|result| {
518					FollowEvent::<Block::Hash>::OperationCallDone(OperationCallDone {
519						operation_id: operation_id.clone(),
520						output: hex_string(&result),
521					})
522				})
523				.unwrap_or_else(|error| {
524					FollowEvent::<Block::Hash>::OperationError(OperationError {
525						operation_id: operation_id.clone(),
526						error: error.to_string(),
527					})
528				});
529
530			let _ = block_guard.response_sender().unbounded_send(event);
531		};
532		self.executor
533			.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
534
535		rp
536	}
537
538	async fn chain_head_unstable_unpin(
539		&self,
540		ext: &Extensions,
541		follow_subscription: String,
542		hash_or_hashes: ListOrValue<Block::Hash>,
543	) -> Result<(), ChainHeadRpcError> {
544		let conn_id = ext
545			.get::<ConnectionId>()
546			.copied()
547			.expect("ConnectionId is always set by jsonrpsee; qed");
548
549		if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
550			return Ok(());
551		}
552
553		let result = match hash_or_hashes {
554			ListOrValue::Value(hash) =>
555				self.subscriptions.unpin_blocks(&follow_subscription, [hash]),
556			ListOrValue::List(hashes) =>
557				self.subscriptions.unpin_blocks(&follow_subscription, hashes),
558		};
559
560		match result {
561			Ok(()) => Ok(()),
562			Err(SubscriptionManagementError::SubscriptionAbsent) => {
563				// Invalid invalid subscription ID.
564				Ok(())
565			},
566			Err(SubscriptionManagementError::BlockHashAbsent) => {
567				// Block is not part of the subscription.
568				Err(ChainHeadRpcError::InvalidBlock)
569			},
570			Err(SubscriptionManagementError::DuplicateHashes) =>
571				Err(ChainHeadRpcError::InvalidDuplicateHashes),
572			Err(_) => Err(ChainHeadRpcError::InvalidBlock),
573		}
574	}
575
576	async fn chain_head_unstable_continue(
577		&self,
578		ext: &Extensions,
579		follow_subscription: String,
580		operation_id: String,
581	) -> Result<(), ChainHeadRpcError> {
582		let conn_id = ext
583			.get::<ConnectionId>()
584			.copied()
585			.expect("ConnectionId is always set by jsonrpsee; qed");
586
587		if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
588			return Ok(())
589		}
590
591		let Some(operation) = self.subscriptions.get_operation(&follow_subscription, &operation_id)
592		else {
593			return Ok(())
594		};
595
596		if !operation.submit_continue() {
597			// Continue called without generating a `WaitingForContinue` event.
598			Err(ChainHeadRpcError::InvalidContinue.into())
599		} else {
600			Ok(())
601		}
602	}
603
604	async fn chain_head_unstable_stop_operation(
605		&self,
606		ext: &Extensions,
607		follow_subscription: String,
608		operation_id: String,
609	) -> Result<(), ChainHeadRpcError> {
610		let conn_id = ext
611			.get::<ConnectionId>()
612			.copied()
613			.expect("ConnectionId is always set by jsonrpsee; qed");
614
615		if !self.subscriptions.contains_subscription(conn_id, &follow_subscription) {
616			return Ok(())
617		}
618
619		let Some(operation) = self.subscriptions.get_operation(&follow_subscription, &operation_id)
620		else {
621			return Ok(())
622		};
623
624		operation.stop_operation();
625
626		Ok(())
627	}
628}
629
630fn method_started_response(
631	operation_id: String,
632	discarded_items: Option<usize>,
633) -> (ResponsePayload<'static, MethodResponse>, MethodResponseFuture) {
634	let rp = MethodResponse::Started(MethodResponseStarted { operation_id, discarded_items });
635	ResponsePayload::success(rp).notify_on_completion()
636}
637
638/// Spawn a blocking future on the provided executor and return the result on a oneshot channel.
639///
640/// This is a wrapper to extract the result of a `executor.spawn_blocking` future.
641fn spawn_blocking<R>(
642	executor: &SubscriptionTaskExecutor,
643	fut: impl std::future::Future<Output = R> + Send + 'static,
644) -> oneshot::Receiver<R>
645where
646	R: Send + 'static,
647{
648	let (tx, rx) = oneshot::channel();
649
650	let blocking_fut = async move {
651		let result = fut.await;
652		// Send the result back on the channel.
653		let _ = tx.send(result);
654	};
655
656	executor.spawn_blocking("substrate-rpc-subscription", Some("rpc"), blocking_fut.boxed());
657
658	rx
659}