1#![warn(missing_docs)]
22
23use parking_lot::RwLock;
24use sp_consensus_beefy::AuthorityIdBound;
25use std::sync::Arc;
26
27use sc_rpc::{
28 utils::{BoundedVecDeque, PendingSubscription},
29 SubscriptionTaskExecutor,
30};
31use sp_application_crypto::RuntimeAppPublic;
32use sp_runtime::traits::Block as BlockT;
33
34use futures::{task::SpawnError, FutureExt, StreamExt};
35use jsonrpsee::{
36 core::async_trait,
37 proc_macros::rpc,
38 types::{ErrorObject, ErrorObjectOwned},
39 PendingSubscriptionSink,
40};
41use log::warn;
42
43use sc_consensus_beefy::communication::notification::{
44 BeefyBestBlockStream, BeefyVersionedFinalityProofStream,
45};
46
47mod notification;
48
49#[derive(Debug, thiserror::Error)]
50pub enum Error {
52 #[error("BEEFY RPC endpoint not ready")]
54 EndpointNotReady,
55 #[error("BEEFY RPC background task failed to spawn")]
57 RpcTaskFailure(#[from] SpawnError),
58}
59
60pub enum ErrorCode {
62 NotReady = 1,
64 TaskFailure = 2,
66}
67
68impl From<Error> for ErrorCode {
69 fn from(error: Error) -> Self {
70 match error {
71 Error::EndpointNotReady => ErrorCode::NotReady,
72 Error::RpcTaskFailure(_) => ErrorCode::TaskFailure,
73 }
74 }
75}
76
77impl From<Error> for ErrorObjectOwned {
78 fn from(error: Error) -> Self {
79 let message = error.to_string();
80 let code = ErrorCode::from(error);
81 ErrorObject::owned(code as i32, message, None::<()>)
82 }
83}
84
85#[rpc(client, server)]
87pub trait BeefyApi<Notification, Hash> {
88 #[subscription(
90 name = "beefy_subscribeJustifications" => "beefy_justifications",
91 unsubscribe = "beefy_unsubscribeJustifications",
92 item = Notification,
93 )]
94 fn subscribe_justifications(&self);
95
96 #[method(name = "beefy_getFinalizedHead")]
102 async fn latest_finalized(&self) -> Result<Hash, Error>;
103}
104
105pub struct Beefy<Block: BlockT, AuthorityId: AuthorityIdBound> {
107 finality_proof_stream: BeefyVersionedFinalityProofStream<Block, AuthorityId>,
108 beefy_best_block: Arc<RwLock<Option<Block::Hash>>>,
109 executor: SubscriptionTaskExecutor,
110}
111
112impl<Block, AuthorityId> Beefy<Block, AuthorityId>
113where
114 Block: BlockT,
115 AuthorityId: AuthorityIdBound,
116{
117 pub fn new(
119 finality_proof_stream: BeefyVersionedFinalityProofStream<Block, AuthorityId>,
120 best_block_stream: BeefyBestBlockStream<Block>,
121 executor: SubscriptionTaskExecutor,
122 ) -> Result<Self, Error> {
123 let beefy_best_block = Arc::new(RwLock::new(None));
124
125 let stream = best_block_stream.subscribe(100_000);
126 let closure_clone = beefy_best_block.clone();
127 let future = stream.for_each(move |best_beefy| {
128 let async_clone = closure_clone.clone();
129 async move { *async_clone.write() = Some(best_beefy) }
130 });
131
132 executor.spawn("substrate-rpc-subscription", Some("rpc"), future.map(drop).boxed());
133 Ok(Self { finality_proof_stream, beefy_best_block, executor })
134 }
135}
136
137#[async_trait]
138impl<Block, AuthorityId> BeefyApiServer<notification::EncodedVersionedFinalityProof, Block::Hash>
139 for Beefy<Block, AuthorityId>
140where
141 Block: BlockT,
142 AuthorityId: AuthorityIdBound,
143 <AuthorityId as RuntimeAppPublic>::Signature: Send + Sync,
144{
145 fn subscribe_justifications(&self, pending: PendingSubscriptionSink) {
146 let stream = self
147 .finality_proof_stream
148 .subscribe(100_000)
149 .map(|vfp| notification::EncodedVersionedFinalityProof::new::<Block, AuthorityId>(vfp));
150
151 sc_rpc::utils::spawn_subscription_task(
152 &self.executor,
153 PendingSubscription::from(pending).pipe_from_stream(stream, BoundedVecDeque::default()),
154 );
155 }
156
157 async fn latest_finalized(&self) -> Result<Block::Hash, Error> {
158 self.beefy_best_block.read().as_ref().cloned().ok_or(Error::EndpointNotReady)
159 }
160}
161
162#[cfg(test)]
163mod tests {
164 use super::*;
165
166 use codec::{Decode, Encode};
167 use jsonrpsee::{core::EmptyServerParams as EmptyParams, RpcModule};
168 use sc_consensus_beefy::{
169 communication::notification::BeefyVersionedFinalityProofSender,
170 justification::BeefyVersionedFinalityProof,
171 };
172 use sp_consensus_beefy::{ecdsa_crypto, known_payloads, Payload, SignedCommitment};
173 use sp_runtime::traits::{BlakeTwo256, Hash};
174 use substrate_test_runtime_client::runtime::Block;
175
176 fn setup_io_handler() -> (
177 RpcModule<Beefy<Block, ecdsa_crypto::AuthorityId>>,
178 BeefyVersionedFinalityProofSender<Block, ecdsa_crypto::AuthorityId>,
179 ) {
180 let (_, stream) = BeefyBestBlockStream::<Block>::channel();
181 setup_io_handler_with_best_block_stream(stream)
182 }
183
184 fn setup_io_handler_with_best_block_stream(
185 best_block_stream: BeefyBestBlockStream<Block>,
186 ) -> (
187 RpcModule<Beefy<Block, ecdsa_crypto::AuthorityId>>,
188 BeefyVersionedFinalityProofSender<Block, ecdsa_crypto::AuthorityId>,
189 ) {
190 let (finality_proof_sender, finality_proof_stream) =
191 BeefyVersionedFinalityProofStream::<Block, ecdsa_crypto::AuthorityId>::channel();
192
193 let handler =
194 Beefy::new(finality_proof_stream, best_block_stream, sc_rpc::testing::test_executor())
195 .expect("Setting up the BEEFY RPC handler works");
196
197 (handler.into_rpc(), finality_proof_sender)
198 }
199
200 #[tokio::test]
201 async fn uninitialized_rpc_handler() {
202 let (rpc, _) = setup_io_handler();
203 let request = r#"{"jsonrpc":"2.0","method":"beefy_getFinalizedHead","params":[],"id":1}"#;
204 let expected_response = r#"{"jsonrpc":"2.0","id":1,"error":{"code":1,"message":"BEEFY RPC endpoint not ready"}}"#;
205 let (response, _) = rpc.raw_json_request(&request, 1).await.unwrap();
206
207 assert_eq!(expected_response, response);
208 }
209
210 #[tokio::test]
211 async fn latest_finalized_rpc() {
212 let (sender, stream) = BeefyBestBlockStream::<Block>::channel();
213 let (io, _) = setup_io_handler_with_best_block_stream(stream);
214
215 let hash = BlakeTwo256::hash(b"42");
216 let r: Result<(), ()> = sender.notify(|| Ok(hash));
217 r.unwrap();
218
219 let request = r#"{"jsonrpc":"2.0","method":"beefy_getFinalizedHead","params":[],"id":1}"#;
221 let expected = "{\
222 \"jsonrpc\":\"2.0\",\
223 \"id\":1,\
224 \"result\":\"0x2f0039e93a27221fcf657fb877a1d4f60307106113e885096cb44a461cd0afbf\"\
225 }";
226 let not_ready: &str = "{\
227 \"jsonrpc\":\"2.0\",\
228 \"id\":1,\
229 \"error\":{\"code\":1,\"message\":\"BEEFY RPC endpoint not ready\"}\
230 }";
231
232 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
233 while std::time::Instant::now() < deadline {
234 let (response, _) = io.raw_json_request(request, 1).await.expect("RPC requests work");
235 if response != not_ready {
236 assert_eq!(response, expected);
237 return
239 }
240 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
241 }
242
243 panic!(
244 "Deadline reached while waiting for best BEEFY block to update. Perhaps the background task is broken?"
245 );
246 }
247
248 #[tokio::test]
249 async fn subscribe_and_unsubscribe_with_wrong_id() {
250 let (rpc, _) = setup_io_handler();
251 let _sub = rpc
253 .subscribe_unbounded("beefy_subscribeJustifications", EmptyParams::new())
254 .await
255 .unwrap();
256
257 let (response, _) = rpc
259 .raw_json_request(
260 r#"{"jsonrpc":"2.0","method":"beefy_unsubscribeJustifications","params":["FOO"],"id":1}"#,
261 1,
262 )
263 .await
264 .unwrap();
265 let expected = r#"{"jsonrpc":"2.0","id":1,"result":false}"#;
266
267 assert_eq!(response, expected);
268 }
269
270 fn create_finality_proof() -> BeefyVersionedFinalityProof<Block, ecdsa_crypto::AuthorityId> {
271 let payload =
272 Payload::from_single_entry(known_payloads::MMR_ROOT_ID, "Hello World!".encode());
273 BeefyVersionedFinalityProof::<Block, ecdsa_crypto::AuthorityId>::V1(SignedCommitment {
274 commitment: sp_consensus_beefy::Commitment {
275 payload,
276 block_number: 5,
277 validator_set_id: 0,
278 },
279 signatures: vec![],
280 })
281 }
282
283 #[tokio::test]
284 async fn subscribe_and_listen_to_one_justification() {
285 let (rpc, finality_proof_sender) = setup_io_handler();
286
287 let mut sub = rpc
289 .subscribe_unbounded("beefy_subscribeJustifications", EmptyParams::new())
290 .await
291 .unwrap();
292
293 let finality_proof = create_finality_proof();
295 let r: Result<(), ()> = finality_proof_sender.notify(|| Ok(finality_proof.clone()));
296 r.unwrap();
297
298 let (bytes, recv_sub_id) = sub.next::<sp_core::Bytes>().await.unwrap().unwrap();
300 let recv_finality_proof: BeefyVersionedFinalityProof<Block, ecdsa_crypto::AuthorityId> =
301 Decode::decode(&mut &bytes[..]).unwrap();
302 assert_eq!(&recv_sub_id, sub.subscription_id());
303 assert_eq!(recv_finality_proof, finality_proof);
304 }
305}