referrerpolicy=no-referrer-when-downgrade

sc_rpc/statement/
mod.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Substrate statement store API.
20
21use codec::Decode;
22use futures::FutureExt;
23use jsonrpsee::{
24	core::{async_trait, RpcResult},
25	Extensions, PendingSubscriptionSink,
26};
27/// Re-export the API for backward compatibility.
28pub use sc_rpc_api::statement::{error::Error, StatementApiServer};
29use sp_core::Bytes;
30use sp_statement_store::{
31	OptimizedTopicFilter, StatementEvent, StatementSource, SubmitResult, TopicFilter,
32};
33use std::sync::Arc;
34const LOG_TARGET: &str = "statement-store-rpc";
35// The maximum size of a chunk of statements to send in a single JSON response. This is needed to
36// avoid hitting the maximum JSON size limit in the RPC response. Each statement is SCALE-encoded
37// and then hex-encoded in the JSON response, so the size of the JSON response is approximately 2x.
38// This value is chosen to be large enough to send a reasonable number of statements in a single
39// chunk, but small enough to avoid hitting the JSON size limit.
40const MAX_CHUNK_BYTES_LIMIT: usize = 4 * 1024 * 1024;
41
42use crate::{
43	utils::{spawn_subscription_task, BoundedVecDeque, PendingSubscription},
44	SubscriptionTaskExecutor,
45};
46
47#[cfg(test)]
48mod tests;
49
50/// Send existing statements in chunks over the subscription channel.
51///
52/// Splits the statements into chunks that fit within [`MAX_CHUNK_BYTES_LIMIT`] to avoid
53/// exceeding the RPC max response size, then sends each chunk as a
54/// [`StatementEvent::NewStatements`].
55async fn send_in_chunks(
56	existing_statements: Vec<Vec<u8>>,
57	subscription_sender: async_channel::Sender<StatementEvent>,
58) {
59	let mut iter = existing_statements.into_iter().peekable();
60	loop {
61		let mut chunk = Vec::<Bytes>::new();
62		let mut chunk_json_size = 0usize;
63		while let Some(statement) = iter.peek() {
64			// Each SCALE-encoded byte becomes 2 hex chars in the JSON response
65			let json_size_estimate = statement.len() * 2;
66			// If a single statement exceeds the max chunk size, skip it but continue sending the
67			// rest of the statements. This would never happen in practice because the statement
68			// store should reject statements that are too large, but we add this check to be safe.
69			if json_size_estimate > MAX_CHUNK_BYTES_LIMIT {
70				iter.next();
71				continue;
72			}
73			if chunk_json_size + json_size_estimate > MAX_CHUNK_BYTES_LIMIT {
74				break;
75			}
76			let Some(statement) = iter.next() else { break };
77			chunk_json_size += json_size_estimate;
78			chunk.push(statement.into());
79		}
80		if chunk.is_empty() {
81			break;
82		}
83		let remaining = iter.len();
84		if let Err(e) = subscription_sender
85			.send(StatementEvent::NewStatements {
86				statements: chunk,
87				remaining: Some(remaining as u32),
88			})
89			.await
90		{
91			log::warn!(
92				target: LOG_TARGET,
93				"Failed to send existing statement in subscription: {:?}", e
94			);
95			break;
96		}
97	}
98}
99
100/// Trait alias for statement store API required by the RPC.
101pub trait StatementStoreApi:
102	sp_statement_store::StatementStore + sc_statement_store::StatementStoreSubscriptionApi
103{
104}
105impl<T> StatementStoreApi for T where
106	T: sp_statement_store::StatementStore + sc_statement_store::StatementStoreSubscriptionApi
107{
108}
109/// Statement store API
110pub struct StatementStore {
111	store: Arc<dyn StatementStoreApi>,
112	executor: SubscriptionTaskExecutor,
113}
114
115impl StatementStore {
116	/// Create new instance of Offchain API.
117	pub fn new(store: Arc<dyn StatementStoreApi>, executor: SubscriptionTaskExecutor) -> Self {
118		StatementStore { store, executor }
119	}
120}
121
122#[async_trait]
123impl StatementApiServer for StatementStore {
124	fn submit(&self, encoded: Bytes) -> RpcResult<SubmitResult> {
125		let statement = Decode::decode(&mut &*encoded)
126			.map_err(|e| Error::StatementStore(format!("Error decoding statement: {:?}", e)))?;
127		match self.store.submit(statement, StatementSource::Local) {
128			SubmitResult::InternalError(e) => Err(Error::StatementStore(e.to_string()).into()),
129			// We return the result as is but `KnownExpired` should not happen. Expired statements
130			// submitted with `StatementSource::Rpc` should be renewed.
131			result => Ok(result),
132		}
133	}
134
135	fn subscribe_statement(
136		&self,
137		pending: PendingSubscriptionSink,
138		_ext: &Extensions,
139		topic_filter: TopicFilter,
140	) {
141		let optimized_topic_filter: OptimizedTopicFilter = topic_filter.into();
142
143		let (existing_statements, subscription_sender, subscription_stream) =
144			match self.store.subscribe_statement(optimized_topic_filter) {
145				Ok(res) => res,
146				Err(err) => {
147					spawn_subscription_task(
148						&self.executor,
149						pending.reject(Error::StatementStore(format!(
150							"Error collecting existing statements: {:?}",
151							err
152						))),
153					);
154					return;
155				},
156			};
157
158		spawn_subscription_task(
159			&self.executor,
160			PendingSubscription::from(pending)
161				.pipe_from_stream(subscription_stream, BoundedVecDeque::new(128)),
162		);
163
164		self.executor.spawn(
165			"statement-store-rpc-send",
166			Some("rpc"),
167			send_in_chunks(existing_statements, subscription_sender).boxed(),
168		)
169	}
170}