1use futures::{
20 prelude::*,
21 task::{Context, Poll},
22};
23use futures_timer::Delay;
24use log::*;
25use parking_lot::Mutex;
26use sc_client_api::ImportNotifications;
27use sc_consensus::{BlockImportParams, BoxBlockImport, StateAction, StorageChanges};
28use sp_consensus::{BlockOrigin, Proposal};
29use sp_runtime::{
30 generic::BlockId,
31 traits::{Block as BlockT, Header as HeaderT},
32 DigestItem,
33};
34use std::{
35 pin::Pin,
36 sync::{
37 atomic::{AtomicUsize, Ordering},
38 Arc,
39 },
40 time::Duration,
41};
42
43use crate::{PowAlgorithm, PowIntermediate, Seal, INTERMEDIATE_KEY, LOG_TARGET, POW_ENGINE_ID};
44
45#[derive(Clone, Eq, PartialEq)]
47pub struct MiningMetadata<H, D> {
48 pub best_hash: H,
50 pub pre_hash: H,
52 pub pre_runtime: Option<Vec<u8>>,
54 pub difficulty: D,
56}
57
58pub struct MiningBuild<Block: BlockT, Algorithm: PowAlgorithm<Block>, Proof> {
60 pub metadata: MiningMetadata<Block::Hash, Algorithm::Difficulty>,
62 pub proposal: Proposal<Block, Proof>,
64}
65
66#[derive(Eq, PartialEq, Clone, Copy)]
68pub struct Version(usize);
69
70pub struct MiningHandle<
72 Block: BlockT,
73 Algorithm: PowAlgorithm<Block>,
74 L: sc_consensus::JustificationSyncLink<Block>,
75 Proof,
76> {
77 version: Arc<AtomicUsize>,
78 algorithm: Arc<Algorithm>,
79 justification_sync_link: Arc<L>,
80 build: Arc<Mutex<Option<MiningBuild<Block, Algorithm, Proof>>>>,
81 block_import: Arc<Mutex<BoxBlockImport<Block>>>,
82}
83
84impl<Block, Algorithm, L, Proof> MiningHandle<Block, Algorithm, L, Proof>
85where
86 Block: BlockT,
87 Algorithm: PowAlgorithm<Block>,
88 Algorithm::Difficulty: 'static + Send,
89 L: sc_consensus::JustificationSyncLink<Block>,
90{
91 fn increment_version(&self) {
92 self.version.fetch_add(1, Ordering::SeqCst);
93 }
94
95 pub(crate) fn new(
96 algorithm: Algorithm,
97 block_import: BoxBlockImport<Block>,
98 justification_sync_link: L,
99 ) -> Self {
100 Self {
101 version: Arc::new(AtomicUsize::new(0)),
102 algorithm: Arc::new(algorithm),
103 justification_sync_link: Arc::new(justification_sync_link),
104 build: Arc::new(Mutex::new(None)),
105 block_import: Arc::new(Mutex::new(block_import)),
106 }
107 }
108
109 pub(crate) fn on_major_syncing(&self) {
110 let mut build = self.build.lock();
111 *build = None;
112 self.increment_version();
113 }
114
115 pub(crate) fn on_build(&self, value: MiningBuild<Block, Algorithm, Proof>) {
116 let mut build = self.build.lock();
117 *build = Some(value);
118 self.increment_version();
119 }
120
121 pub fn version(&self) -> Version {
126 Version(self.version.load(Ordering::SeqCst))
127 }
128
129 pub fn best_hash(&self) -> Option<Block::Hash> {
132 self.build.lock().as_ref().map(|b| b.metadata.best_hash)
133 }
134
135 pub fn metadata(&self) -> Option<MiningMetadata<Block::Hash, Algorithm::Difficulty>> {
137 self.build.lock().as_ref().map(|b| b.metadata.clone())
138 }
139
140 pub async fn submit(&self, seal: Seal) -> bool {
143 if let Some(metadata) = self.metadata() {
144 match self.algorithm.verify(
145 &BlockId::Hash(metadata.best_hash),
146 &metadata.pre_hash,
147 metadata.pre_runtime.as_ref().map(|v| &v[..]),
148 &seal,
149 metadata.difficulty,
150 ) {
151 Ok(true) => (),
152 Ok(false) => {
153 warn!(target: LOG_TARGET, "Unable to import mined block: seal is invalid",);
154 return false
155 },
156 Err(err) => {
157 warn!(target: LOG_TARGET, "Unable to import mined block: {}", err,);
158 return false
159 },
160 }
161 } else {
162 warn!(target: LOG_TARGET, "Unable to import mined block: metadata does not exist",);
163 return false
164 }
165
166 let build = if let Some(build) = {
167 let mut build = self.build.lock();
168 let value = build.take();
169 if value.is_some() {
170 self.increment_version();
171 }
172 value
173 } {
174 build
175 } else {
176 warn!(target: LOG_TARGET, "Unable to import mined block: build does not exist",);
177 return false
178 };
179
180 let seal = DigestItem::Seal(POW_ENGINE_ID, seal);
181 let (header, body) = build.proposal.block.deconstruct();
182
183 let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
184 import_block.post_digests.push(seal);
185 import_block.body = Some(body);
186 import_block.state_action =
187 StateAction::ApplyChanges(StorageChanges::Changes(build.proposal.storage_changes));
188
189 let intermediate = PowIntermediate::<Algorithm::Difficulty> {
190 difficulty: Some(build.metadata.difficulty),
191 };
192 import_block.insert_intermediate(INTERMEDIATE_KEY, intermediate);
193
194 let header = import_block.post_header();
195 let block_import = self.block_import.lock();
196
197 match block_import.import_block(import_block).await {
198 Ok(res) => {
199 res.handle_justification(
200 &header.hash(),
201 *header.number(),
202 &self.justification_sync_link,
203 );
204
205 info!(
206 target: LOG_TARGET,
207 "✅ Successfully mined block on top of: {}", build.metadata.best_hash
208 );
209 true
210 },
211 Err(err) => {
212 warn!(target: LOG_TARGET, "Unable to import mined block: {}", err,);
213 false
214 },
215 }
216 }
217}
218
219impl<Block, Algorithm, L, Proof> Clone for MiningHandle<Block, Algorithm, L, Proof>
220where
221 Block: BlockT,
222 Algorithm: PowAlgorithm<Block>,
223 L: sc_consensus::JustificationSyncLink<Block>,
224{
225 fn clone(&self) -> Self {
226 Self {
227 version: self.version.clone(),
228 algorithm: self.algorithm.clone(),
229 justification_sync_link: self.justification_sync_link.clone(),
230 build: self.build.clone(),
231 block_import: self.block_import.clone(),
232 }
233 }
234}
235
236pub struct UntilImportedOrTimeout<Block: BlockT> {
238 import_notifications: ImportNotifications<Block>,
239 timeout: Duration,
240 inner_delay: Option<Delay>,
241}
242
243impl<Block: BlockT> UntilImportedOrTimeout<Block> {
244 pub fn new(import_notifications: ImportNotifications<Block>, timeout: Duration) -> Self {
246 Self { import_notifications, timeout, inner_delay: None }
247 }
248}
249
250impl<Block: BlockT> Stream for UntilImportedOrTimeout<Block> {
251 type Item = ();
252
253 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<()>> {
254 let mut fire = false;
255
256 loop {
257 match Stream::poll_next(Pin::new(&mut self.import_notifications), cx) {
258 Poll::Pending => break,
259 Poll::Ready(Some(_)) => {
260 fire = true;
261 },
262 Poll::Ready(None) => return Poll::Ready(None),
263 }
264 }
265
266 let timeout = self.timeout;
267 let inner_delay = self.inner_delay.get_or_insert_with(|| Delay::new(timeout));
268
269 match Future::poll(Pin::new(inner_delay), cx) {
270 Poll::Pending => (),
271 Poll::Ready(()) => {
272 fire = true;
273 },
274 }
275
276 if fire {
277 self.inner_delay = None;
278 Poll::Ready(Some(()))
279 } else {
280 Poll::Pending
281 }
282 }
283}