1#[cfg(test)]
22mod tests;
23
24use self::error::{Error, Result};
25use crate::{
26 utils::{spawn_subscription_task, BoundedVecDeque, PendingSubscription},
27 SubscriptionTaskExecutor,
28};
29use codec::{Decode, Encode};
30use jsonrpsee::{core::async_trait, types::ErrorObject, Extensions, PendingSubscriptionSink};
31use sc_rpc_api::check_if_safe;
32use sc_transaction_pool_api::{
33 error::IntoPoolError, BlockHash, InPoolTransaction, TransactionFor, TransactionPool,
34 TransactionSource, TxHash, TxInvalidityReportMap,
35};
36use sp_api::{ApiExt, ProvideRuntimeApi};
37use sp_blockchain::HeaderBackend;
38use sp_core::Bytes;
39use sp_keystore::{KeystoreExt, KeystorePtr};
40use sp_runtime::traits::Block as BlockT;
41use sp_session::SessionKeys;
42use std::sync::Arc;
43
44pub use sc_rpc_api::author::*;
46
47pub struct Author<P, Client> {
49 client: Arc<Client>,
51 pool: Arc<P>,
53 keystore: KeystorePtr,
55 executor: SubscriptionTaskExecutor,
57}
58
59impl<P, Client> Author<P, Client> {
60 pub fn new(
62 client: Arc<Client>,
63 pool: Arc<P>,
64 keystore: KeystorePtr,
65 executor: SubscriptionTaskExecutor,
66 ) -> Self {
67 Author { client, pool, keystore, executor }
68 }
69}
70
71const TX_SOURCE: TransactionSource = TransactionSource::External;
77
78#[async_trait]
79impl<P, Client> AuthorApiServer<TxHash<P>, BlockHash<P>> for Author<P, Client>
80where
81 P: TransactionPool + Sync + Send + 'static,
82 Client: HeaderBackend<P::Block> + ProvideRuntimeApi<P::Block> + Send + Sync + 'static,
83 Client::Api: SessionKeys<P::Block>,
84 P::Hash: Unpin,
85 <P::Block as BlockT>::Hash: Unpin,
86{
87 async fn submit_extrinsic(&self, ext: Bytes) -> Result<TxHash<P>> {
88 let xt = match Decode::decode(&mut &ext[..]) {
89 Ok(xt) => xt,
90 Err(err) => return Err(Error::Client(Box::new(err)).into()),
91 };
92 let best_block_hash = self.client.info().best_hash;
93 self.pool.submit_one(best_block_hash, TX_SOURCE, xt).await.map_err(|e| {
94 e.into_pool_error()
95 .map(|e| Error::Pool(e))
96 .unwrap_or_else(|e| Error::Verification(Box::new(e)))
97 .into()
98 })
99 }
100
101 fn insert_key(
102 &self,
103 ext: &Extensions,
104 key_type: String,
105 suri: String,
106 public: Bytes,
107 ) -> Result<()> {
108 check_if_safe(ext)?;
109
110 let key_type = key_type.as_str().try_into().map_err(|_| Error::BadKeyType)?;
111 self.keystore
112 .insert(key_type, &suri, &public[..])
113 .map_err(|_| Error::KeystoreUnavailable)?;
114 Ok(())
115 }
116
117 fn rotate_keys(&self, ext: &Extensions) -> Result<Bytes> {
118 check_if_safe(ext)?;
119
120 let best_block_hash = self.client.info().best_hash;
121 let mut runtime_api = self.client.runtime_api();
122
123 runtime_api.register_extension(KeystoreExt::from(self.keystore.clone()));
124
125 runtime_api
126 .generate_session_keys(best_block_hash, None)
127 .map(Into::into)
128 .map_err(|api_err| Error::Client(Box::new(api_err)).into())
129 }
130
131 fn has_session_keys(&self, ext: &Extensions, session_keys: Bytes) -> Result<bool> {
132 check_if_safe(ext)?;
133
134 let best_block_hash = self.client.info().best_hash;
135 let keys = self
136 .client
137 .runtime_api()
138 .decode_session_keys(best_block_hash, session_keys.to_vec())
139 .map_err(|e| Error::Client(Box::new(e)))?
140 .ok_or(Error::InvalidSessionKeys)?;
141
142 Ok(self.keystore.has_keys(&keys))
143 }
144
145 fn has_key(&self, ext: &Extensions, public_key: Bytes, key_type: String) -> Result<bool> {
146 check_if_safe(ext)?;
147
148 let key_type = key_type.as_str().try_into().map_err(|_| Error::BadKeyType)?;
149 Ok(self.keystore.has_keys(&[(public_key.to_vec(), key_type)]))
150 }
151
152 fn pending_extrinsics(&self) -> Result<Vec<Bytes>> {
153 Ok(self.pool.ready().map(|tx| tx.data().encode().into()).collect())
154 }
155
156 async fn remove_extrinsic(
157 &self,
158 ext: &Extensions,
159 bytes_or_hash: Vec<hash::ExtrinsicOrHash<TxHash<P>>>,
160 ) -> Result<Vec<TxHash<P>>> {
161 check_if_safe(ext)?;
162 let hashes = bytes_or_hash
163 .into_iter()
164 .map(|x| match x {
165 hash::ExtrinsicOrHash::Hash(h) => Ok((h, None)),
166 hash::ExtrinsicOrHash::Extrinsic(bytes) => {
167 let xt = Decode::decode(&mut &bytes[..])?;
168 Ok((self.pool.hash_of(&xt), None))
169 },
170 })
171 .collect::<Result<TxInvalidityReportMap<TxHash<P>>>>()?;
172
173 Ok(self
174 .pool
175 .report_invalid(None, hashes)
176 .await
177 .into_iter()
178 .map(|tx| tx.hash().clone())
179 .collect())
180 }
181
182 fn watch_extrinsic(&self, pending: PendingSubscriptionSink, xt: Bytes) {
183 let best_block_hash = self.client.info().best_hash;
184 let dxt = match TransactionFor::<P>::decode(&mut &xt[..]).map_err(|e| Error::from(e)) {
185 Ok(dxt) => dxt,
186 Err(e) => {
187 spawn_subscription_task(&self.executor, pending.reject(e));
188 return
189 },
190 };
191
192 let pool = self.pool.clone();
193 let fut = async move {
194 let submit =
195 pool.submit_and_watch(best_block_hash, TX_SOURCE, dxt).await.map_err(|e| {
196 e.into_pool_error()
197 .map(error::Error::from)
198 .unwrap_or_else(|e| error::Error::Verification(Box::new(e)))
199 });
200
201 let stream = match submit {
202 Ok(stream) => stream,
203 Err(err) => {
204 let _ = pending.reject(ErrorObject::from(err)).await;
205 return
206 },
207 };
208
209 PendingSubscription::from(pending)
210 .pipe_from_stream(stream, BoundedVecDeque::default())
211 .await;
212 };
213
214 spawn_subscription_task(&self.executor, fut);
215 }
216}