1#[cfg(test)]
22mod tests;
23
24use self::error::{Error, Result};
25use crate::{
26 utils::{spawn_subscription_task, BoundedVecDeque, PendingSubscription},
27 SubscriptionTaskExecutor,
28};
29use codec::{Decode, Encode};
30use jsonrpsee::{core::async_trait, types::ErrorObject, Extensions, PendingSubscriptionSink};
31use sc_rpc_api::check_if_safe;
32use sc_transaction_pool_api::{
33 error::IntoPoolError, BlockHash, InPoolTransaction, TransactionFor, TransactionPool,
34 TransactionSource, TxHash, TxInvalidityReportMap,
35};
36use sp_api::{ApiExt, ProvideRuntimeApi};
37use sp_blockchain::HeaderBackend;
38use sp_core::Bytes;
39use sp_keystore::{KeystoreExt, KeystorePtr};
40use sp_runtime::traits::Block as BlockT;
41use sp_session::SessionKeys;
42use std::sync::Arc;
43
44pub use sc_rpc_api::author::*;
46
47pub struct Author<P, Client> {
49 client: Arc<Client>,
51 pool: Arc<P>,
53 keystore: KeystorePtr,
55 executor: SubscriptionTaskExecutor,
57}
58
59impl<P, Client> Author<P, Client> {
60 pub fn new(
62 client: Arc<Client>,
63 pool: Arc<P>,
64 keystore: KeystorePtr,
65 executor: SubscriptionTaskExecutor,
66 ) -> Self {
67 Author { client, pool, keystore, executor }
68 }
69}
70
71impl<P, Client> Author<P, Client>
72where
73 P: TransactionPool + Sync + Send + 'static,
74 Client: HeaderBackend<P::Block> + ProvideRuntimeApi<P::Block> + Send + Sync + 'static,
75 Client::Api: SessionKeys<P::Block>,
76 P::Hash: Unpin,
77 <P::Block as BlockT>::Hash: Unpin,
78{
79 fn rotate_keys_impl(&self, owner: Vec<u8>) -> Result<GeneratedSessionKeys> {
80 let best_block_hash = self.client.info().best_hash;
81 let mut runtime_api = self.client.runtime_api();
82
83 runtime_api.register_extension(KeystoreExt::from(self.keystore.clone()));
84
85 let version = runtime_api
86 .api_version::<dyn SessionKeys<P::Block>>(best_block_hash)
87 .map_err(|api_err| Error::Client(Box::new(api_err)))?
88 .ok_or_else(|| Error::MissingSessionKeysApi)?;
89
90 if version < 2 {
91 #[allow(deprecated)]
92 runtime_api
93 .generate_session_keys_before_version_2(best_block_hash, None)
94 .map(|sk| GeneratedSessionKeys { keys: sk.into(), proof: None })
95 .map_err(|api_err| Error::Client(Box::new(api_err)).into())
96 } else {
97 runtime_api
98 .generate_session_keys(best_block_hash, owner, None)
99 .map(|sk| GeneratedSessionKeys {
100 keys: sk.keys.into(),
101 proof: Some(sk.proof.into()),
102 })
103 .map_err(|api_err| Error::Client(Box::new(api_err)).into())
104 }
105 }
106}
107
108const TX_SOURCE: TransactionSource = TransactionSource::External;
114
115#[async_trait]
116impl<P, Client> AuthorApiServer<TxHash<P>, BlockHash<P>> for Author<P, Client>
117where
118 P: TransactionPool + Sync + Send + 'static,
119 Client: HeaderBackend<P::Block> + ProvideRuntimeApi<P::Block> + Send + Sync + 'static,
120 Client::Api: SessionKeys<P::Block>,
121 P::Hash: Unpin,
122 <P::Block as BlockT>::Hash: Unpin,
123{
124 async fn submit_extrinsic(&self, ext: Bytes) -> Result<TxHash<P>> {
125 let xt = match Decode::decode(&mut &ext[..]) {
126 Ok(xt) => xt,
127 Err(err) => return Err(Error::Client(Box::new(err)).into()),
128 };
129 let best_block_hash = self.client.info().best_hash;
130 self.pool.submit_one(best_block_hash, TX_SOURCE, xt).await.map_err(|e| {
131 e.into_pool_error()
132 .map(|e| Error::Pool(e))
133 .unwrap_or_else(|e| Error::Verification(Box::new(e)))
134 .into()
135 })
136 }
137
138 fn insert_key(
139 &self,
140 ext: &Extensions,
141 key_type: String,
142 suri: String,
143 public: Bytes,
144 ) -> Result<()> {
145 check_if_safe(ext)?;
146
147 let key_type = key_type.as_str().try_into().map_err(|_| Error::BadKeyType)?;
148 self.keystore
149 .insert(key_type, &suri, &public[..])
150 .map_err(|_| Error::KeystoreUnavailable)?;
151 Ok(())
152 }
153
154 fn rotate_keys(&self, ext: &Extensions) -> Result<Bytes> {
155 check_if_safe(ext)?;
156
157 self.rotate_keys_impl(Vec::new()).map(|k| k.keys)
158 }
159
160 fn rotate_keys_with_owner(
161 &self,
162 ext: &Extensions,
163 owner: Bytes,
164 ) -> Result<GeneratedSessionKeys> {
165 check_if_safe(ext)?;
166
167 self.rotate_keys_impl(owner.0)
168 }
169
170 fn has_session_keys(&self, ext: &Extensions, session_keys: Bytes) -> Result<bool> {
171 check_if_safe(ext)?;
172
173 let best_block_hash = self.client.info().best_hash;
174 let keys = self
175 .client
176 .runtime_api()
177 .decode_session_keys(best_block_hash, session_keys.to_vec())
178 .map_err(|e| Error::Client(Box::new(e)))?
179 .ok_or(Error::InvalidSessionKeys)?;
180
181 Ok(self.keystore.has_keys(&keys))
182 }
183
184 fn has_key(&self, ext: &Extensions, public_key: Bytes, key_type: String) -> Result<bool> {
185 check_if_safe(ext)?;
186
187 let key_type = key_type.as_str().try_into().map_err(|_| Error::BadKeyType)?;
188 Ok(self.keystore.has_keys(&[(public_key.to_vec(), key_type)]))
189 }
190
191 fn pending_extrinsics(&self) -> Result<Vec<Bytes>> {
192 Ok(self.pool.ready().map(|tx| tx.data().encode().into()).collect())
193 }
194
195 async fn remove_extrinsic(
196 &self,
197 ext: &Extensions,
198 bytes_or_hash: Vec<hash::ExtrinsicOrHash<TxHash<P>>>,
199 ) -> Result<Vec<TxHash<P>>> {
200 check_if_safe(ext)?;
201 let hashes = bytes_or_hash
202 .into_iter()
203 .map(|x| match x {
204 hash::ExtrinsicOrHash::Hash(h) => Ok((h, None)),
205 hash::ExtrinsicOrHash::Extrinsic(bytes) => {
206 let xt = Decode::decode(&mut &bytes[..])?;
207 Ok((self.pool.hash_of(&xt), None))
208 },
209 })
210 .collect::<Result<TxInvalidityReportMap<TxHash<P>>>>()?;
211
212 Ok(self
213 .pool
214 .report_invalid(None, hashes)
215 .await
216 .into_iter()
217 .map(|tx| tx.hash().clone())
218 .collect())
219 }
220
221 fn watch_extrinsic(&self, pending: PendingSubscriptionSink, xt: Bytes) {
222 let best_block_hash = self.client.info().best_hash;
223 let dxt = match TransactionFor::<P>::decode(&mut &xt[..]).map_err(|e| Error::from(e)) {
224 Ok(dxt) => dxt,
225 Err(e) => {
226 spawn_subscription_task(&self.executor, pending.reject(e));
227 return
228 },
229 };
230
231 let pool = self.pool.clone();
232 let fut = async move {
233 let submit =
234 pool.submit_and_watch(best_block_hash, TX_SOURCE, dxt).await.map_err(|e| {
235 e.into_pool_error()
236 .map(error::Error::from)
237 .unwrap_or_else(|e| error::Error::Verification(Box::new(e)))
238 });
239
240 let stream = match submit {
241 Ok(stream) => stream,
242 Err(err) => {
243 let _ = pending.reject(ErrorObject::from(err)).await;
244 return
245 },
246 };
247
248 PendingSubscription::from(pending)
249 .pipe_from_stream(stream, BoundedVecDeque::default())
250 .await;
251 };
252
253 spawn_subscription_task(&self.executor, fut);
254 }
255}