#![warn(missing_docs)]
use parking_lot::RwLock;
use sp_consensus_beefy::AuthorityIdBound;
use std::sync::Arc;
use sc_rpc::{
utils::{BoundedVecDeque, PendingSubscription},
SubscriptionTaskExecutor,
};
use sp_application_crypto::RuntimeAppPublic;
use sp_runtime::traits::Block as BlockT;
use futures::{task::SpawnError, FutureExt, StreamExt};
use jsonrpsee::{
core::async_trait,
proc_macros::rpc,
types::{ErrorObject, ErrorObjectOwned},
PendingSubscriptionSink,
};
use log::warn;
use sc_consensus_beefy::communication::notification::{
BeefyBestBlockStream, BeefyVersionedFinalityProofStream,
};
mod notification;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("BEEFY RPC endpoint not ready")]
EndpointNotReady,
#[error("BEEFY RPC background task failed to spawn")]
RpcTaskFailure(#[from] SpawnError),
}
pub enum ErrorCode {
NotReady = 1,
TaskFailure = 2,
}
impl From<Error> for ErrorCode {
fn from(error: Error) -> Self {
match error {
Error::EndpointNotReady => ErrorCode::NotReady,
Error::RpcTaskFailure(_) => ErrorCode::TaskFailure,
}
}
}
impl From<Error> for ErrorObjectOwned {
fn from(error: Error) -> Self {
let message = error.to_string();
let code = ErrorCode::from(error);
ErrorObject::owned(code as i32, message, None::<()>)
}
}
#[rpc(client, server)]
pub trait BeefyApi<Notification, Hash> {
#[subscription(
name = "beefy_subscribeJustifications" => "beefy_justifications",
unsubscribe = "beefy_unsubscribeJustifications",
item = Notification,
)]
fn subscribe_justifications(&self);
#[method(name = "beefy_getFinalizedHead")]
async fn latest_finalized(&self) -> Result<Hash, Error>;
}
pub struct Beefy<Block: BlockT, AuthorityId: AuthorityIdBound> {
finality_proof_stream: BeefyVersionedFinalityProofStream<Block, AuthorityId>,
beefy_best_block: Arc<RwLock<Option<Block::Hash>>>,
executor: SubscriptionTaskExecutor,
}
impl<Block, AuthorityId> Beefy<Block, AuthorityId>
where
Block: BlockT,
AuthorityId: AuthorityIdBound,
{
pub fn new(
finality_proof_stream: BeefyVersionedFinalityProofStream<Block, AuthorityId>,
best_block_stream: BeefyBestBlockStream<Block>,
executor: SubscriptionTaskExecutor,
) -> Result<Self, Error> {
let beefy_best_block = Arc::new(RwLock::new(None));
let stream = best_block_stream.subscribe(100_000);
let closure_clone = beefy_best_block.clone();
let future = stream.for_each(move |best_beefy| {
let async_clone = closure_clone.clone();
async move { *async_clone.write() = Some(best_beefy) }
});
executor.spawn("substrate-rpc-subscription", Some("rpc"), future.map(drop).boxed());
Ok(Self { finality_proof_stream, beefy_best_block, executor })
}
}
#[async_trait]
impl<Block, AuthorityId> BeefyApiServer<notification::EncodedVersionedFinalityProof, Block::Hash>
for Beefy<Block, AuthorityId>
where
Block: BlockT,
AuthorityId: AuthorityIdBound,
<AuthorityId as RuntimeAppPublic>::Signature: Send + Sync,
{
fn subscribe_justifications(&self, pending: PendingSubscriptionSink) {
let stream = self
.finality_proof_stream
.subscribe(100_000)
.map(|vfp| notification::EncodedVersionedFinalityProof::new::<Block, AuthorityId>(vfp));
sc_rpc::utils::spawn_subscription_task(
&self.executor,
PendingSubscription::from(pending).pipe_from_stream(stream, BoundedVecDeque::default()),
);
}
async fn latest_finalized(&self) -> Result<Block::Hash, Error> {
self.beefy_best_block.read().as_ref().cloned().ok_or(Error::EndpointNotReady)
}
}
#[cfg(test)]
mod tests {
use super::*;
use codec::{Decode, Encode};
use jsonrpsee::{core::EmptyServerParams as EmptyParams, RpcModule};
use sc_consensus_beefy::{
communication::notification::BeefyVersionedFinalityProofSender,
justification::BeefyVersionedFinalityProof,
};
use sp_consensus_beefy::{ecdsa_crypto, known_payloads, Payload, SignedCommitment};
use sp_runtime::traits::{BlakeTwo256, Hash};
use substrate_test_runtime_client::runtime::Block;
fn setup_io_handler() -> (
RpcModule<Beefy<Block, ecdsa_crypto::AuthorityId>>,
BeefyVersionedFinalityProofSender<Block, ecdsa_crypto::AuthorityId>,
) {
let (_, stream) = BeefyBestBlockStream::<Block>::channel();
setup_io_handler_with_best_block_stream(stream)
}
fn setup_io_handler_with_best_block_stream(
best_block_stream: BeefyBestBlockStream<Block>,
) -> (
RpcModule<Beefy<Block, ecdsa_crypto::AuthorityId>>,
BeefyVersionedFinalityProofSender<Block, ecdsa_crypto::AuthorityId>,
) {
let (finality_proof_sender, finality_proof_stream) =
BeefyVersionedFinalityProofStream::<Block, ecdsa_crypto::AuthorityId>::channel();
let handler =
Beefy::new(finality_proof_stream, best_block_stream, sc_rpc::testing::test_executor())
.expect("Setting up the BEEFY RPC handler works");
(handler.into_rpc(), finality_proof_sender)
}
#[tokio::test]
async fn uninitialized_rpc_handler() {
let (rpc, _) = setup_io_handler();
let request = r#"{"jsonrpc":"2.0","method":"beefy_getFinalizedHead","params":[],"id":1}"#;
let expected_response = r#"{"jsonrpc":"2.0","id":1,"error":{"code":1,"message":"BEEFY RPC endpoint not ready"}}"#;
let (response, _) = rpc.raw_json_request(&request, 1).await.unwrap();
assert_eq!(expected_response, response);
}
#[tokio::test]
async fn latest_finalized_rpc() {
let (sender, stream) = BeefyBestBlockStream::<Block>::channel();
let (io, _) = setup_io_handler_with_best_block_stream(stream);
let hash = BlakeTwo256::hash(b"42");
let r: Result<(), ()> = sender.notify(|| Ok(hash));
r.unwrap();
let request = r#"{"jsonrpc":"2.0","method":"beefy_getFinalizedHead","params":[],"id":1}"#;
let expected = "{\
\"jsonrpc\":\"2.0\",\
\"id\":1,\
\"result\":\"0x2f0039e93a27221fcf657fb877a1d4f60307106113e885096cb44a461cd0afbf\"\
}";
let not_ready: &str = "{\
\"jsonrpc\":\"2.0\",\
\"id\":1,\
\"error\":{\"code\":1,\"message\":\"BEEFY RPC endpoint not ready\"}\
}";
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
while std::time::Instant::now() < deadline {
let (response, _) = io.raw_json_request(request, 1).await.expect("RPC requests work");
if response != not_ready {
assert_eq!(response, expected);
return
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
panic!(
"Deadline reached while waiting for best BEEFY block to update. Perhaps the background task is broken?"
);
}
#[tokio::test]
async fn subscribe_and_unsubscribe_with_wrong_id() {
let (rpc, _) = setup_io_handler();
let _sub = rpc
.subscribe_unbounded("beefy_subscribeJustifications", EmptyParams::new())
.await
.unwrap();
let (response, _) = rpc
.raw_json_request(
r#"{"jsonrpc":"2.0","method":"beefy_unsubscribeJustifications","params":["FOO"],"id":1}"#,
1,
)
.await
.unwrap();
let expected = r#"{"jsonrpc":"2.0","id":1,"result":false}"#;
assert_eq!(response, expected);
}
fn create_finality_proof() -> BeefyVersionedFinalityProof<Block, ecdsa_crypto::AuthorityId> {
let payload =
Payload::from_single_entry(known_payloads::MMR_ROOT_ID, "Hello World!".encode());
BeefyVersionedFinalityProof::<Block, ecdsa_crypto::AuthorityId>::V1(SignedCommitment {
commitment: sp_consensus_beefy::Commitment {
payload,
block_number: 5,
validator_set_id: 0,
},
signatures: vec![],
})
}
#[tokio::test]
async fn subscribe_and_listen_to_one_justification() {
let (rpc, finality_proof_sender) = setup_io_handler();
let mut sub = rpc
.subscribe_unbounded("beefy_subscribeJustifications", EmptyParams::new())
.await
.unwrap();
let finality_proof = create_finality_proof();
let r: Result<(), ()> = finality_proof_sender.notify(|| Ok(finality_proof.clone()));
r.unwrap();
let (bytes, recv_sub_id) = sub.next::<sp_core::Bytes>().await.unwrap().unwrap();
let recv_finality_proof: BeefyVersionedFinalityProof<Block, ecdsa_crypto::AuthorityId> =
Decode::decode(&mut &bytes[..]).unwrap();
assert_eq!(&recv_sub_id, sub.subscription_id());
assert_eq!(recv_finality_proof, finality_proof);
}
}