referrerpolicy=no-referrer-when-downgrade

sc_consensus_beefy_rpc/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! RPC API for BEEFY.
20
21#![warn(missing_docs)]
22
23use parking_lot::RwLock;
24use sp_consensus_beefy::AuthorityIdBound;
25use std::sync::Arc;
26
27use sc_rpc::{
28	utils::{BoundedVecDeque, PendingSubscription},
29	SubscriptionTaskExecutor,
30};
31use sp_application_crypto::RuntimeAppPublic;
32use sp_runtime::traits::Block as BlockT;
33
34use futures::{task::SpawnError, FutureExt, StreamExt};
35use jsonrpsee::{
36	core::async_trait,
37	proc_macros::rpc,
38	types::{ErrorObject, ErrorObjectOwned},
39	PendingSubscriptionSink,
40};
41use log::warn;
42
43use sc_consensus_beefy::communication::notification::{
44	BeefyBestBlockStream, BeefyVersionedFinalityProofStream,
45};
46
47mod notification;
48
49#[derive(Debug, thiserror::Error)]
50/// Top-level error type for the RPC handler
51pub enum Error {
52	/// The BEEFY RPC endpoint is not ready.
53	#[error("BEEFY RPC endpoint not ready")]
54	EndpointNotReady,
55	/// The BEEFY RPC background task failed to spawn.
56	#[error("BEEFY RPC background task failed to spawn")]
57	RpcTaskFailure(#[from] SpawnError),
58}
59
60/// The error codes returned by jsonrpc.
61pub enum ErrorCode {
62	/// Returned when BEEFY RPC endpoint is not ready.
63	NotReady = 1,
64	/// Returned on BEEFY RPC background task failure.
65	TaskFailure = 2,
66}
67
68impl From<Error> for ErrorCode {
69	fn from(error: Error) -> Self {
70		match error {
71			Error::EndpointNotReady => ErrorCode::NotReady,
72			Error::RpcTaskFailure(_) => ErrorCode::TaskFailure,
73		}
74	}
75}
76
77impl From<Error> for ErrorObjectOwned {
78	fn from(error: Error) -> Self {
79		let message = error.to_string();
80		let code = ErrorCode::from(error);
81		ErrorObject::owned(code as i32, message, None::<()>)
82	}
83}
84
85// Provides RPC methods for interacting with BEEFY.
86#[rpc(client, server)]
87pub trait BeefyApi<Notification, Hash> {
88	/// Returns the block most recently finalized by BEEFY, alongside its justification.
89	#[subscription(
90		name = "beefy_subscribeJustifications" => "beefy_justifications",
91		unsubscribe = "beefy_unsubscribeJustifications",
92		item = Notification,
93	)]
94	fn subscribe_justifications(&self);
95
96	/// Returns hash of the latest BEEFY finalized block as seen by this client.
97	///
98	/// The latest BEEFY block might not be available if the BEEFY gadget is not running
99	/// in the network or if the client is still initializing or syncing with the network.
100	/// In such case an error would be returned.
101	#[method(name = "beefy_getFinalizedHead")]
102	async fn latest_finalized(&self) -> Result<Hash, Error>;
103}
104
105/// Implements the BeefyApi RPC trait for interacting with BEEFY.
106pub struct Beefy<Block: BlockT, AuthorityId: AuthorityIdBound> {
107	finality_proof_stream: BeefyVersionedFinalityProofStream<Block, AuthorityId>,
108	beefy_best_block: Arc<RwLock<Option<Block::Hash>>>,
109	executor: SubscriptionTaskExecutor,
110}
111
112impl<Block, AuthorityId> Beefy<Block, AuthorityId>
113where
114	Block: BlockT,
115	AuthorityId: AuthorityIdBound,
116{
117	/// Creates a new Beefy Rpc handler instance.
118	pub fn new(
119		finality_proof_stream: BeefyVersionedFinalityProofStream<Block, AuthorityId>,
120		best_block_stream: BeefyBestBlockStream<Block>,
121		executor: SubscriptionTaskExecutor,
122	) -> Result<Self, Error> {
123		let beefy_best_block = Arc::new(RwLock::new(None));
124
125		let stream = best_block_stream.subscribe(100_000);
126		let closure_clone = beefy_best_block.clone();
127		let future = stream.for_each(move |best_beefy| {
128			let async_clone = closure_clone.clone();
129			async move { *async_clone.write() = Some(best_beefy) }
130		});
131
132		executor.spawn("substrate-rpc-subscription", Some("rpc"), future.map(drop).boxed());
133		Ok(Self { finality_proof_stream, beefy_best_block, executor })
134	}
135}
136
137#[async_trait]
138impl<Block, AuthorityId> BeefyApiServer<notification::EncodedVersionedFinalityProof, Block::Hash>
139	for Beefy<Block, AuthorityId>
140where
141	Block: BlockT,
142	AuthorityId: AuthorityIdBound,
143	<AuthorityId as RuntimeAppPublic>::Signature: Send + Sync,
144{
145	fn subscribe_justifications(&self, pending: PendingSubscriptionSink) {
146		let stream = self
147			.finality_proof_stream
148			.subscribe(100_000)
149			.map(|vfp| notification::EncodedVersionedFinalityProof::new::<Block, AuthorityId>(vfp));
150
151		sc_rpc::utils::spawn_subscription_task(
152			&self.executor,
153			PendingSubscription::from(pending).pipe_from_stream(stream, BoundedVecDeque::default()),
154		);
155	}
156
157	async fn latest_finalized(&self) -> Result<Block::Hash, Error> {
158		self.beefy_best_block.read().as_ref().cloned().ok_or(Error::EndpointNotReady)
159	}
160}
161
162#[cfg(test)]
163mod tests {
164	use super::*;
165
166	use codec::{Decode, Encode};
167	use jsonrpsee::{core::EmptyServerParams as EmptyParams, RpcModule};
168	use sc_consensus_beefy::{
169		communication::notification::BeefyVersionedFinalityProofSender,
170		justification::BeefyVersionedFinalityProof,
171	};
172	use sp_consensus_beefy::{ecdsa_crypto, known_payloads, Payload, SignedCommitment};
173	use sp_runtime::traits::{BlakeTwo256, Hash};
174	use substrate_test_runtime_client::runtime::Block;
175
176	fn setup_io_handler() -> (
177		RpcModule<Beefy<Block, ecdsa_crypto::AuthorityId>>,
178		BeefyVersionedFinalityProofSender<Block, ecdsa_crypto::AuthorityId>,
179	) {
180		let (_, stream) = BeefyBestBlockStream::<Block>::channel();
181		setup_io_handler_with_best_block_stream(stream)
182	}
183
184	fn setup_io_handler_with_best_block_stream(
185		best_block_stream: BeefyBestBlockStream<Block>,
186	) -> (
187		RpcModule<Beefy<Block, ecdsa_crypto::AuthorityId>>,
188		BeefyVersionedFinalityProofSender<Block, ecdsa_crypto::AuthorityId>,
189	) {
190		let (finality_proof_sender, finality_proof_stream) =
191			BeefyVersionedFinalityProofStream::<Block, ecdsa_crypto::AuthorityId>::channel();
192
193		let handler =
194			Beefy::new(finality_proof_stream, best_block_stream, sc_rpc::testing::test_executor())
195				.expect("Setting up the BEEFY RPC handler works");
196
197		(handler.into_rpc(), finality_proof_sender)
198	}
199
200	#[tokio::test]
201	async fn uninitialized_rpc_handler() {
202		let (rpc, _) = setup_io_handler();
203		let request = r#"{"jsonrpc":"2.0","method":"beefy_getFinalizedHead","params":[],"id":1}"#;
204		let expected_response = r#"{"jsonrpc":"2.0","id":1,"error":{"code":1,"message":"BEEFY RPC endpoint not ready"}}"#;
205		let (response, _) = rpc.raw_json_request(&request, 1).await.unwrap();
206
207		assert_eq!(expected_response, response);
208	}
209
210	#[tokio::test]
211	async fn latest_finalized_rpc() {
212		let (sender, stream) = BeefyBestBlockStream::<Block>::channel();
213		let (io, _) = setup_io_handler_with_best_block_stream(stream);
214
215		let hash = BlakeTwo256::hash(b"42");
216		let r: Result<(), ()> = sender.notify(|| Ok(hash));
217		r.unwrap();
218
219		// Verify RPC `beefy_getFinalizedHead` returns expected hash.
220		let request = r#"{"jsonrpc":"2.0","method":"beefy_getFinalizedHead","params":[],"id":1}"#;
221		let expected = "{\
222			\"jsonrpc\":\"2.0\",\
223			\"id\":1,\
224			\"result\":\"0x2f0039e93a27221fcf657fb877a1d4f60307106113e885096cb44a461cd0afbf\"\
225		}";
226		let not_ready: &str = "{\
227			\"jsonrpc\":\"2.0\",\
228			\"id\":1,\
229			\"error\":{\"code\":1,\"message\":\"BEEFY RPC endpoint not ready\"}\
230		}";
231
232		let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
233		while std::time::Instant::now() < deadline {
234			let (response, _) = io.raw_json_request(request, 1).await.expect("RPC requests work");
235			if response != not_ready {
236				assert_eq!(response, expected);
237				// Success
238				return
239			}
240			tokio::time::sleep(std::time::Duration::from_millis(50)).await;
241		}
242
243		panic!(
244			"Deadline reached while waiting for best BEEFY block to update. Perhaps the background task is broken?"
245		);
246	}
247
248	#[tokio::test]
249	async fn subscribe_and_unsubscribe_with_wrong_id() {
250		let (rpc, _) = setup_io_handler();
251		// Subscribe call.
252		let _sub = rpc
253			.subscribe_unbounded("beefy_subscribeJustifications", EmptyParams::new())
254			.await
255			.unwrap();
256
257		// Unsubscribe with wrong ID
258		let (response, _) = rpc
259			.raw_json_request(
260				r#"{"jsonrpc":"2.0","method":"beefy_unsubscribeJustifications","params":["FOO"],"id":1}"#,
261				1,
262			)
263			.await
264			.unwrap();
265		let expected = r#"{"jsonrpc":"2.0","id":1,"result":false}"#;
266
267		assert_eq!(response, expected);
268	}
269
270	fn create_finality_proof() -> BeefyVersionedFinalityProof<Block, ecdsa_crypto::AuthorityId> {
271		let payload =
272			Payload::from_single_entry(known_payloads::MMR_ROOT_ID, "Hello World!".encode());
273		BeefyVersionedFinalityProof::<Block, ecdsa_crypto::AuthorityId>::V1(SignedCommitment {
274			commitment: sp_consensus_beefy::Commitment {
275				payload,
276				block_number: 5,
277				validator_set_id: 0,
278			},
279			signatures: vec![],
280		})
281	}
282
283	#[tokio::test]
284	async fn subscribe_and_listen_to_one_justification() {
285		let (rpc, finality_proof_sender) = setup_io_handler();
286
287		// Subscribe
288		let mut sub = rpc
289			.subscribe_unbounded("beefy_subscribeJustifications", EmptyParams::new())
290			.await
291			.unwrap();
292
293		// Notify with finality_proof
294		let finality_proof = create_finality_proof();
295		let r: Result<(), ()> = finality_proof_sender.notify(|| Ok(finality_proof.clone()));
296		r.unwrap();
297
298		// Inspect what we received
299		let (bytes, recv_sub_id) = sub.next::<sp_core::Bytes>().await.unwrap().unwrap();
300		let recv_finality_proof: BeefyVersionedFinalityProof<Block, ecdsa_crypto::AuthorityId> =
301			Decode::decode(&mut &bytes[..]).unwrap();
302		assert_eq!(&recv_sub_id, sub.subscription_id());
303		assert_eq!(recv_finality_proof, finality_proof);
304	}
305}