1use codec::Decode;
22use futures::FutureExt;
23use jsonrpsee::{
24 core::{async_trait, RpcResult},
25 Extensions, PendingSubscriptionSink,
26};
27pub use sc_rpc_api::statement::{error::Error, StatementApiServer};
29use sp_core::Bytes;
30use sp_statement_store::{
31 OptimizedTopicFilter, StatementEvent, StatementSource, SubmitResult, TopicFilter,
32};
33use std::sync::Arc;
34const LOG_TARGET: &str = "statement-store-rpc";
35const MAX_CHUNK_BYTES_LIMIT: usize = 4 * 1024 * 1024;
41
42use crate::{
43 utils::{spawn_subscription_task, BoundedVecDeque, PendingSubscription},
44 SubscriptionTaskExecutor,
45};
46
47#[cfg(test)]
48mod tests;
49
50async fn send_in_chunks(
56 existing_statements: Vec<Vec<u8>>,
57 subscription_sender: async_channel::Sender<StatementEvent>,
58) {
59 let mut iter = existing_statements.into_iter().peekable();
60 loop {
61 let mut chunk = Vec::<Bytes>::new();
62 let mut chunk_json_size = 0usize;
63 while let Some(statement) = iter.peek() {
64 let json_size_estimate = statement.len() * 2;
66 if json_size_estimate > MAX_CHUNK_BYTES_LIMIT {
70 iter.next();
71 continue;
72 }
73 if chunk_json_size + json_size_estimate > MAX_CHUNK_BYTES_LIMIT {
74 break;
75 }
76 let Some(statement) = iter.next() else { break };
77 chunk_json_size += json_size_estimate;
78 chunk.push(statement.into());
79 }
80 if chunk.is_empty() {
81 break;
82 }
83 let remaining = iter.len();
84 if let Err(e) = subscription_sender
85 .send(StatementEvent::NewStatements {
86 statements: chunk,
87 remaining: Some(remaining as u32),
88 })
89 .await
90 {
91 log::warn!(
92 target: LOG_TARGET,
93 "Failed to send existing statement in subscription: {:?}", e
94 );
95 break;
96 }
97 }
98}
99
100pub trait StatementStoreApi:
102 sp_statement_store::StatementStore + sc_statement_store::StatementStoreSubscriptionApi
103{
104}
105impl<T> StatementStoreApi for T where
106 T: sp_statement_store::StatementStore + sc_statement_store::StatementStoreSubscriptionApi
107{
108}
109pub struct StatementStore {
111 store: Arc<dyn StatementStoreApi>,
112 executor: SubscriptionTaskExecutor,
113}
114
115impl StatementStore {
116 pub fn new(store: Arc<dyn StatementStoreApi>, executor: SubscriptionTaskExecutor) -> Self {
118 StatementStore { store, executor }
119 }
120}
121
122#[async_trait]
123impl StatementApiServer for StatementStore {
124 fn submit(&self, encoded: Bytes) -> RpcResult<SubmitResult> {
125 let statement = Decode::decode(&mut &*encoded)
126 .map_err(|e| Error::StatementStore(format!("Error decoding statement: {:?}", e)))?;
127 match self.store.submit(statement, StatementSource::Local) {
128 SubmitResult::InternalError(e) => Err(Error::StatementStore(e.to_string()).into()),
129 result => Ok(result),
132 }
133 }
134
135 fn subscribe_statement(
136 &self,
137 pending: PendingSubscriptionSink,
138 _ext: &Extensions,
139 topic_filter: TopicFilter,
140 ) {
141 let optimized_topic_filter: OptimizedTopicFilter = topic_filter.into();
142
143 let (existing_statements, subscription_sender, subscription_stream) =
144 match self.store.subscribe_statement(optimized_topic_filter) {
145 Ok(res) => res,
146 Err(err) => {
147 spawn_subscription_task(
148 &self.executor,
149 pending.reject(Error::StatementStore(format!(
150 "Error collecting existing statements: {:?}",
151 err
152 ))),
153 );
154 return;
155 },
156 };
157
158 spawn_subscription_task(
159 &self.executor,
160 PendingSubscription::from(pending)
161 .pipe_from_stream(subscription_stream, BoundedVecDeque::new(128)),
162 );
163
164 self.executor.spawn(
165 "statement-store-rpc-send",
166 Some("rpc"),
167 send_in_chunks(existing_statements, subscription_sender).boxed(),
168 )
169 }
170}