sc_consensus_manual_seal/
rpc.rs1use crate::error::Error;
22use futures::{
23 channel::{mpsc, oneshot},
24 SinkExt,
25};
26use jsonrpsee::{core::async_trait, proc_macros::rpc};
27use sc_consensus::ImportedAux;
28use serde::{Deserialize, Serialize};
29use sp_runtime::EncodedJustification;
30
31pub type Sender<T> = Option<oneshot::Sender<std::result::Result<T, Error>>>;
33
34pub enum EngineCommand<Hash> {
36 SealNewBlock {
43 create_empty: bool,
46 finalize: bool,
48 parent_hash: Option<Hash>,
50 sender: Sender<CreatedBlock<Hash>>,
52 },
53 FinalizeBlock {
55 hash: Hash,
57 sender: Sender<()>,
59 justification: Option<EncodedJustification>,
61 },
62}
63
64#[rpc(client, server)]
66pub trait ManualSealApi<Hash> {
67 #[method(name = "engine_createBlock")]
69 async fn create_block(
70 &self,
71 create_empty: bool,
72 finalize: bool,
73 parent_hash: Option<Hash>,
74 ) -> Result<CreatedBlock<Hash>, Error>;
75
76 #[method(name = "engine_finalizeBlock")]
78 async fn finalize_block(
79 &self,
80 hash: Hash,
81 justification: Option<EncodedJustification>,
82 ) -> Result<bool, Error>;
83}
84
85pub struct ManualSeal<Hash> {
87 import_block_channel: mpsc::Sender<EngineCommand<Hash>>,
88}
89
90#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
92pub struct CreatedBlock<Hash> {
93 pub hash: Hash,
95 pub aux: ImportedAux,
97 pub proof_size: usize,
99}
100
101impl<Hash> ManualSeal<Hash> {
102 pub fn new(import_block_channel: mpsc::Sender<EngineCommand<Hash>>) -> Self {
104 Self { import_block_channel }
105 }
106}
107
108#[async_trait]
109impl<Hash: Send + 'static> ManualSealApiServer<Hash> for ManualSeal<Hash> {
110 async fn create_block(
111 &self,
112 create_empty: bool,
113 finalize: bool,
114 parent_hash: Option<Hash>,
115 ) -> Result<CreatedBlock<Hash>, Error> {
116 let mut sink = self.import_block_channel.clone();
117 let (sender, receiver) = oneshot::channel();
118 let command = EngineCommand::SealNewBlock {
120 create_empty,
121 finalize,
122 parent_hash,
123 sender: Some(sender),
124 };
125
126 sink.send(command).await?;
127
128 match receiver.await {
129 Ok(Ok(rx)) => Ok(rx),
130 Ok(Err(e)) => Err(e.into()),
131 Err(e) => Err(e.into()),
132 }
133 }
134
135 async fn finalize_block(
136 &self,
137 hash: Hash,
138 justification: Option<EncodedJustification>,
139 ) -> Result<bool, Error> {
140 let mut sink = self.import_block_channel.clone();
141 let (sender, receiver) = oneshot::channel();
142 let command = EngineCommand::FinalizeBlock { hash, sender: Some(sender), justification };
143 sink.send(command).await?;
144 receiver.await.map(|_| true).map_err(Into::into)
145 }
146}
147
148pub fn send_result<T: std::fmt::Debug>(
151 sender: &mut Sender<T>,
152 result: std::result::Result<T, crate::Error>,
153) {
154 if let Some(sender) = sender.take() {
155 if let Err(err) = sender.send(result) {
156 match err {
157 Ok(value) => log::warn!("Server is shutting down: {:?}", value),
158 Err(error) => log::warn!("Server is shutting down with error: {}", error),
159 }
160 }
161 } else {
162 match result {
165 Ok(r) => log::info!("Consensus with no RPC sender success: {:?}", r),
166 Err(e) => log::error!("Consensus with no RPC sender encountered an error: {}", e),
167 }
168 }
169}