1#[cfg(test)]
22mod tests;
23
24use std::sync::Arc;
25
26use crate::{
27 utils::{spawn_subscription_task, BoundedVecDeque, PendingSubscription},
28 SubscriptionTaskExecutor,
29};
30
31use codec::{Decode, Encode};
32use futures::TryFutureExt;
33use jsonrpsee::{core::async_trait, types::ErrorObject, Extensions, PendingSubscriptionSink};
34use sc_rpc_api::check_if_safe;
35use sc_transaction_pool_api::{
36 error::IntoPoolError, BlockHash, InPoolTransaction, TransactionFor, TransactionPool,
37 TransactionSource, TxHash,
38};
39use sp_api::{ApiExt, ProvideRuntimeApi};
40use sp_blockchain::HeaderBackend;
41use sp_core::Bytes;
42use sp_keystore::{KeystoreExt, KeystorePtr};
43use sp_runtime::traits::Block as BlockT;
44use sp_session::SessionKeys;
45
46use self::error::{Error, Result};
47pub use sc_rpc_api::author::*;
49
50pub struct Author<P, Client> {
52 client: Arc<Client>,
54 pool: Arc<P>,
56 keystore: KeystorePtr,
58 executor: SubscriptionTaskExecutor,
60}
61
62impl<P, Client> Author<P, Client> {
63 pub fn new(
65 client: Arc<Client>,
66 pool: Arc<P>,
67 keystore: KeystorePtr,
68 executor: SubscriptionTaskExecutor,
69 ) -> Self {
70 Author { client, pool, keystore, executor }
71 }
72}
73
74const TX_SOURCE: TransactionSource = TransactionSource::External;
80
81#[async_trait]
82impl<P, Client> AuthorApiServer<TxHash<P>, BlockHash<P>> for Author<P, Client>
83where
84 P: TransactionPool + Sync + Send + 'static,
85 Client: HeaderBackend<P::Block> + ProvideRuntimeApi<P::Block> + Send + Sync + 'static,
86 Client::Api: SessionKeys<P::Block>,
87 P::Hash: Unpin,
88 <P::Block as BlockT>::Hash: Unpin,
89{
90 async fn submit_extrinsic(&self, ext: Bytes) -> Result<TxHash<P>> {
91 let xt = match Decode::decode(&mut &ext[..]) {
92 Ok(xt) => xt,
93 Err(err) => return Err(Error::Client(Box::new(err)).into()),
94 };
95 let best_block_hash = self.client.info().best_hash;
96 self.pool.submit_one(best_block_hash, TX_SOURCE, xt).await.map_err(|e| {
97 e.into_pool_error()
98 .map(|e| Error::Pool(e))
99 .unwrap_or_else(|e| Error::Verification(Box::new(e)))
100 .into()
101 })
102 }
103
104 fn insert_key(
105 &self,
106 ext: &Extensions,
107 key_type: String,
108 suri: String,
109 public: Bytes,
110 ) -> Result<()> {
111 check_if_safe(ext)?;
112
113 let key_type = key_type.as_str().try_into().map_err(|_| Error::BadKeyType)?;
114 self.keystore
115 .insert(key_type, &suri, &public[..])
116 .map_err(|_| Error::KeystoreUnavailable)?;
117 Ok(())
118 }
119
120 fn rotate_keys(&self, ext: &Extensions) -> Result<Bytes> {
121 check_if_safe(ext)?;
122
123 let best_block_hash = self.client.info().best_hash;
124 let mut runtime_api = self.client.runtime_api();
125
126 runtime_api.register_extension(KeystoreExt::from(self.keystore.clone()));
127
128 runtime_api
129 .generate_session_keys(best_block_hash, None)
130 .map(Into::into)
131 .map_err(|api_err| Error::Client(Box::new(api_err)).into())
132 }
133
134 fn has_session_keys(&self, ext: &Extensions, session_keys: Bytes) -> Result<bool> {
135 check_if_safe(ext)?;
136
137 let best_block_hash = self.client.info().best_hash;
138 let keys = self
139 .client
140 .runtime_api()
141 .decode_session_keys(best_block_hash, session_keys.to_vec())
142 .map_err(|e| Error::Client(Box::new(e)))?
143 .ok_or(Error::InvalidSessionKeys)?;
144
145 Ok(self.keystore.has_keys(&keys))
146 }
147
148 fn has_key(&self, ext: &Extensions, public_key: Bytes, key_type: String) -> Result<bool> {
149 check_if_safe(ext)?;
150
151 let key_type = key_type.as_str().try_into().map_err(|_| Error::BadKeyType)?;
152 Ok(self.keystore.has_keys(&[(public_key.to_vec(), key_type)]))
153 }
154
155 fn pending_extrinsics(&self) -> Result<Vec<Bytes>> {
156 Ok(self.pool.ready().map(|tx| tx.data().encode().into()).collect())
157 }
158
159 fn remove_extrinsic(
160 &self,
161 ext: &Extensions,
162 bytes_or_hash: Vec<hash::ExtrinsicOrHash<TxHash<P>>>,
163 ) -> Result<Vec<TxHash<P>>> {
164 check_if_safe(ext)?;
165 let hashes = bytes_or_hash
166 .into_iter()
167 .map(|x| match x {
168 hash::ExtrinsicOrHash::Hash(h) => Ok(h),
169 hash::ExtrinsicOrHash::Extrinsic(bytes) => {
170 let xt = Decode::decode(&mut &bytes[..])?;
171 Ok(self.pool.hash_of(&xt))
172 },
173 })
174 .collect::<Result<Vec<_>>>()?;
175
176 Ok(self
177 .pool
178 .remove_invalid(&hashes)
179 .into_iter()
180 .map(|tx| tx.hash().clone())
181 .collect())
182 }
183
184 fn watch_extrinsic(&self, pending: PendingSubscriptionSink, xt: Bytes) {
185 let best_block_hash = self.client.info().best_hash;
186 let dxt = match TransactionFor::<P>::decode(&mut &xt[..]).map_err(|e| Error::from(e)) {
187 Ok(dxt) => dxt,
188 Err(e) => {
189 spawn_subscription_task(&self.executor, pending.reject(e));
190 return
191 },
192 };
193
194 let submit = self.pool.submit_and_watch(best_block_hash, TX_SOURCE, dxt).map_err(|e| {
195 e.into_pool_error()
196 .map(error::Error::from)
197 .unwrap_or_else(|e| error::Error::Verification(Box::new(e)))
198 });
199
200 let fut = async move {
201 let stream = match submit.await {
202 Ok(stream) => stream,
203 Err(err) => {
204 let _ = pending.reject(ErrorObject::from(err)).await;
205 return
206 },
207 };
208
209 PendingSubscription::from(pending)
210 .pipe_from_stream(stream, BoundedVecDeque::default())
211 .await;
212 };
213
214 spawn_subscription_task(&self.executor, fut);
215 }
216}