referrerpolicy=no-referrer-when-downgrade

sc_consensus_pow/
worker.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use futures::{
20	prelude::*,
21	task::{Context, Poll},
22};
23use futures_timer::Delay;
24use log::*;
25use parking_lot::Mutex;
26use sc_client_api::ImportNotifications;
27use sc_consensus::{BlockImportParams, BoxBlockImport, StateAction, StorageChanges};
28use sp_consensus::{BlockOrigin, Proposal};
29use sp_runtime::{
30	generic::BlockId,
31	traits::{Block as BlockT, Header as HeaderT},
32	DigestItem,
33};
34use std::{
35	pin::Pin,
36	sync::{
37		atomic::{AtomicUsize, Ordering},
38		Arc,
39	},
40	time::Duration,
41};
42
43use crate::{PowAlgorithm, PowIntermediate, Seal, INTERMEDIATE_KEY, LOG_TARGET, POW_ENGINE_ID};
44
45/// Mining metadata. This is the information needed to start an actual mining loop.
46#[derive(Clone, Eq, PartialEq)]
47pub struct MiningMetadata<H, D> {
48	/// Currently known best hash which the pre-hash is built on.
49	pub best_hash: H,
50	/// Mining pre-hash.
51	pub pre_hash: H,
52	/// Pre-runtime digest item.
53	pub pre_runtime: Option<Vec<u8>>,
54	/// Mining target difficulty.
55	pub difficulty: D,
56}
57
58/// A build of mining, containing the metadata and the block proposal.
59pub struct MiningBuild<Block: BlockT, Algorithm: PowAlgorithm<Block>, Proof> {
60	/// Mining metadata.
61	pub metadata: MiningMetadata<Block::Hash, Algorithm::Difficulty>,
62	/// Mining proposal.
63	pub proposal: Proposal<Block, Proof>,
64}
65
66/// Version of the mining worker.
67#[derive(Eq, PartialEq, Clone, Copy)]
68pub struct Version(usize);
69
70/// Mining worker that exposes structs to query the current mining build and submit mined blocks.
71pub struct MiningHandle<
72	Block: BlockT,
73	Algorithm: PowAlgorithm<Block>,
74	L: sc_consensus::JustificationSyncLink<Block>,
75	Proof,
76> {
77	version: Arc<AtomicUsize>,
78	algorithm: Arc<Algorithm>,
79	justification_sync_link: Arc<L>,
80	build: Arc<Mutex<Option<MiningBuild<Block, Algorithm, Proof>>>>,
81	block_import: Arc<Mutex<BoxBlockImport<Block>>>,
82}
83
84impl<Block, Algorithm, L, Proof> MiningHandle<Block, Algorithm, L, Proof>
85where
86	Block: BlockT,
87	Algorithm: PowAlgorithm<Block>,
88	Algorithm::Difficulty: 'static + Send,
89	L: sc_consensus::JustificationSyncLink<Block>,
90{
91	fn increment_version(&self) {
92		self.version.fetch_add(1, Ordering::SeqCst);
93	}
94
95	pub(crate) fn new(
96		algorithm: Algorithm,
97		block_import: BoxBlockImport<Block>,
98		justification_sync_link: L,
99	) -> Self {
100		Self {
101			version: Arc::new(AtomicUsize::new(0)),
102			algorithm: Arc::new(algorithm),
103			justification_sync_link: Arc::new(justification_sync_link),
104			build: Arc::new(Mutex::new(None)),
105			block_import: Arc::new(Mutex::new(block_import)),
106		}
107	}
108
109	pub(crate) fn on_major_syncing(&self) {
110		let mut build = self.build.lock();
111		*build = None;
112		self.increment_version();
113	}
114
115	pub(crate) fn on_build(&self, value: MiningBuild<Block, Algorithm, Proof>) {
116		let mut build = self.build.lock();
117		*build = Some(value);
118		self.increment_version();
119	}
120
121	/// Get the version of the mining worker.
122	///
123	/// This returns type `Version` which can only compare equality. If `Version` is unchanged, then
124	/// it can be certain that `best_hash` and `metadata` were not changed.
125	pub fn version(&self) -> Version {
126		Version(self.version.load(Ordering::SeqCst))
127	}
128
129	/// Get the current best hash. `None` if the worker has just started or the client is doing
130	/// major syncing.
131	pub fn best_hash(&self) -> Option<Block::Hash> {
132		self.build.lock().as_ref().map(|b| b.metadata.best_hash)
133	}
134
135	/// Get a copy of the current mining metadata, if available.
136	pub fn metadata(&self) -> Option<MiningMetadata<Block::Hash, Algorithm::Difficulty>> {
137		self.build.lock().as_ref().map(|b| b.metadata.clone())
138	}
139
140	/// Submit a mined seal. The seal will be validated again. Returns true if the submission is
141	/// successful.
142	pub async fn submit(&self, seal: Seal) -> bool {
143		if let Some(metadata) = self.metadata() {
144			match self.algorithm.verify(
145				&BlockId::Hash(metadata.best_hash),
146				&metadata.pre_hash,
147				metadata.pre_runtime.as_ref().map(|v| &v[..]),
148				&seal,
149				metadata.difficulty,
150			) {
151				Ok(true) => (),
152				Ok(false) => {
153					warn!(target: LOG_TARGET, "Unable to import mined block: seal is invalid",);
154					return false
155				},
156				Err(err) => {
157					warn!(target: LOG_TARGET, "Unable to import mined block: {}", err,);
158					return false
159				},
160			}
161		} else {
162			warn!(target: LOG_TARGET, "Unable to import mined block: metadata does not exist",);
163			return false
164		}
165
166		let build = if let Some(build) = {
167			let mut build = self.build.lock();
168			let value = build.take();
169			if value.is_some() {
170				self.increment_version();
171			}
172			value
173		} {
174			build
175		} else {
176			warn!(target: LOG_TARGET, "Unable to import mined block: build does not exist",);
177			return false
178		};
179
180		let seal = DigestItem::Seal(POW_ENGINE_ID, seal);
181		let (header, body) = build.proposal.block.deconstruct();
182
183		let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
184		import_block.post_digests.push(seal);
185		import_block.body = Some(body);
186		import_block.state_action =
187			StateAction::ApplyChanges(StorageChanges::Changes(build.proposal.storage_changes));
188
189		let intermediate = PowIntermediate::<Algorithm::Difficulty> {
190			difficulty: Some(build.metadata.difficulty),
191		};
192		import_block.insert_intermediate(INTERMEDIATE_KEY, intermediate);
193
194		let header = import_block.post_header();
195		let block_import = self.block_import.lock();
196
197		match block_import.import_block(import_block).await {
198			Ok(res) => {
199				res.handle_justification(
200					&header.hash(),
201					*header.number(),
202					&self.justification_sync_link,
203				);
204
205				info!(
206					target: LOG_TARGET,
207					"✅ Successfully mined block on top of: {}", build.metadata.best_hash
208				);
209				true
210			},
211			Err(err) => {
212				warn!(target: LOG_TARGET, "Unable to import mined block: {}", err,);
213				false
214			},
215		}
216	}
217}
218
219impl<Block, Algorithm, L, Proof> Clone for MiningHandle<Block, Algorithm, L, Proof>
220where
221	Block: BlockT,
222	Algorithm: PowAlgorithm<Block>,
223	L: sc_consensus::JustificationSyncLink<Block>,
224{
225	fn clone(&self) -> Self {
226		Self {
227			version: self.version.clone(),
228			algorithm: self.algorithm.clone(),
229			justification_sync_link: self.justification_sync_link.clone(),
230			build: self.build.clone(),
231			block_import: self.block_import.clone(),
232		}
233	}
234}
235
236/// A stream that waits for a block import or timeout.
237pub struct UntilImportedOrTimeout<Block: BlockT> {
238	import_notifications: ImportNotifications<Block>,
239	timeout: Duration,
240	inner_delay: Option<Delay>,
241}
242
243impl<Block: BlockT> UntilImportedOrTimeout<Block> {
244	/// Create a new stream using the given import notification and timeout duration.
245	pub fn new(import_notifications: ImportNotifications<Block>, timeout: Duration) -> Self {
246		Self { import_notifications, timeout, inner_delay: None }
247	}
248}
249
250impl<Block: BlockT> Stream for UntilImportedOrTimeout<Block> {
251	type Item = ();
252
253	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<()>> {
254		let mut fire = false;
255
256		loop {
257			match Stream::poll_next(Pin::new(&mut self.import_notifications), cx) {
258				Poll::Pending => break,
259				Poll::Ready(Some(_)) => {
260					fire = true;
261				},
262				Poll::Ready(None) => return Poll::Ready(None),
263			}
264		}
265
266		let timeout = self.timeout;
267		let inner_delay = self.inner_delay.get_or_insert_with(|| Delay::new(timeout));
268
269		match Future::poll(Pin::new(inner_delay), cx) {
270			Poll::Pending => (),
271			Poll::Ready(()) => {
272				fire = true;
273			},
274		}
275
276		if fire {
277			self.inner_delay = None;
278			Poll::Ready(Some(()))
279		} else {
280			Poll::Pending
281		}
282	}
283}