referrerpolicy=no-referrer-when-downgrade

sc_offchain/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Substrate offchain workers.
20//!
21//! The offchain workers is a special function of the runtime that
22//! gets executed after block is imported. During execution
23//! it's able to asynchronously submit extrinsics that will either
24//! be propagated to other nodes or added to the next block
25//! produced by the node as unsigned transactions.
26//!
27//! Offchain workers can be used for computation-heavy tasks
28//! that are not feasible for execution during regular block processing.
29//! It can either be tasks that no consensus is required for,
30//! or some form of consensus over the data can be built on-chain
31//! for instance via:
32//! 1. Challenge period for incorrect computations
33//! 2. Majority voting for results
34//! 3. etc
35
36#![warn(missing_docs)]
37
38use std::{fmt, sync::Arc};
39
40use futures::{
41	future::{ready, Future},
42	prelude::*,
43};
44use parking_lot::Mutex;
45use sc_client_api::BlockchainEvents;
46use sc_network::{NetworkPeers, NetworkStateInfo};
47use sc_transaction_pool_api::OffchainTransactionPoolFactory;
48use sp_api::{ApiExt, ProvideRuntimeApi};
49use sp_core::{offchain, traits::SpawnNamed};
50use sp_externalities::Extension;
51use sp_keystore::{KeystoreExt, KeystorePtr};
52use sp_runtime::traits::{self, Header};
53use threadpool::ThreadPool;
54
55mod api;
56
57pub use sp_core::offchain::storage::OffchainDb;
58pub use sp_offchain::{OffchainWorkerApi, STORAGE_PREFIX};
59
60const LOG_TARGET: &str = "offchain-worker";
61
62/// NetworkProvider provides [`OffchainWorkers`] with all necessary hooks into the
63/// underlying Substrate networking.
64pub trait NetworkProvider: NetworkStateInfo + NetworkPeers {}
65
66impl<T> NetworkProvider for T where T: NetworkStateInfo + NetworkPeers {}
67
68/// Special type that implements [`OffchainStorage`](offchain::OffchainStorage).
69///
70/// This type can not be constructed and should only be used when passing `None` as `offchain_db` to
71/// [`OffchainWorkerOptions`] to make the compiler happy.
72#[derive(Clone)]
73pub enum NoOffchainStorage {}
74
75impl offchain::OffchainStorage for NoOffchainStorage {
76	fn set(&mut self, _: &[u8], _: &[u8], _: &[u8]) {
77		unimplemented!("`NoOffchainStorage` can not be constructed!")
78	}
79
80	fn remove(&mut self, _: &[u8], _: &[u8]) {
81		unimplemented!("`NoOffchainStorage` can not be constructed!")
82	}
83
84	fn get(&self, _: &[u8], _: &[u8]) -> Option<Vec<u8>> {
85		unimplemented!("`NoOffchainStorage` can not be constructed!")
86	}
87
88	fn compare_and_set(&mut self, _: &[u8], _: &[u8], _: Option<&[u8]>, _: &[u8]) -> bool {
89		unimplemented!("`NoOffchainStorage` can not be constructed!")
90	}
91}
92
93/// Options for [`OffchainWorkers`]
94pub struct OffchainWorkerOptions<RA, Block: traits::Block, Storage, CE> {
95	/// Provides access to the runtime api.
96	pub runtime_api_provider: Arc<RA>,
97	/// Provides access to the keystore.
98	pub keystore: Option<KeystorePtr>,
99	/// Provides access to the offchain database.
100	///
101	/// Use [`NoOffchainStorage`] as type when passing `None` to have some type that works.
102	pub offchain_db: Option<Storage>,
103	/// Provides access to the transaction pool.
104	pub transaction_pool: Option<OffchainTransactionPoolFactory<Block>>,
105	/// Provides access to network information.
106	pub network_provider: Arc<dyn NetworkProvider + Send + Sync>,
107	/// Is the node running as validator?
108	pub is_validator: bool,
109	/// Enable http requests from offchain workers?
110	///
111	/// If not enabled, any http request will panic.
112	pub enable_http_requests: bool,
113	/// Callback to create custom [`Extension`]s that should be registered for the
114	/// `offchain_worker` runtime call.
115	///
116	/// These [`Extension`]s are registered along-side the default extensions and are accessible in
117	/// the host functions.
118	///
119	/// # Example:
120	///
121	/// ```nocompile
122	/// custom_extensions: |block_hash| {
123	///     vec![MyCustomExtension::new()]
124	/// }
125	/// ```
126	pub custom_extensions: CE,
127}
128
129/// An offchain workers manager.
130pub struct OffchainWorkers<RA, Block: traits::Block, Storage> {
131	runtime_api_provider: Arc<RA>,
132	thread_pool: Mutex<ThreadPool>,
133	shared_http_client: api::SharedClient,
134	enable_http_requests: bool,
135	keystore: Option<KeystorePtr>,
136	offchain_db: Option<OffchainDb<Storage>>,
137	transaction_pool: Option<OffchainTransactionPoolFactory<Block>>,
138	network_provider: Arc<dyn NetworkProvider + Send + Sync>,
139	is_validator: bool,
140	custom_extensions: Box<dyn Fn(Block::Hash) -> Vec<Box<dyn Extension>> + Send>,
141}
142
143impl<RA, Block: traits::Block, Storage> OffchainWorkers<RA, Block, Storage> {
144	/// Creates new [`OffchainWorkers`].
145	pub fn new<CE: Fn(Block::Hash) -> Vec<Box<dyn Extension>> + Send + 'static>(
146		OffchainWorkerOptions {
147			runtime_api_provider,
148			keystore,
149			offchain_db,
150			transaction_pool,
151			network_provider,
152			is_validator,
153			enable_http_requests,
154			custom_extensions,
155		}: OffchainWorkerOptions<RA, Block, Storage, CE>,
156	) -> std::io::Result<Self> {
157		Ok(Self {
158			runtime_api_provider,
159			thread_pool: Mutex::new(ThreadPool::with_name(
160				"offchain-worker".into(),
161				num_cpus::get(),
162			)),
163			shared_http_client: api::SharedClient::new()?,
164			enable_http_requests,
165			keystore,
166			offchain_db: offchain_db.map(OffchainDb::new),
167			transaction_pool,
168			is_validator,
169			network_provider,
170			custom_extensions: Box::new(custom_extensions),
171		})
172	}
173}
174
175impl<RA, Block: traits::Block, Storage: offchain::OffchainStorage> fmt::Debug
176	for OffchainWorkers<RA, Block, Storage>
177{
178	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
179		f.debug_tuple("OffchainWorkers").finish()
180	}
181}
182
183impl<RA, Block, Storage> OffchainWorkers<RA, Block, Storage>
184where
185	Block: traits::Block,
186	RA: ProvideRuntimeApi<Block> + Send + Sync + 'static,
187	RA::Api: OffchainWorkerApi<Block>,
188	Storage: offchain::OffchainStorage + 'static,
189{
190	/// Run the offchain workers on every block import.
191	pub async fn run<BE: BlockchainEvents<Block>>(
192		self,
193		import_events: Arc<BE>,
194		spawner: impl SpawnNamed,
195	) {
196		import_events
197			.import_notification_stream()
198			.for_each(move |n| {
199				if n.is_new_best {
200					spawner.spawn(
201						"offchain-on-block",
202						Some("offchain-worker"),
203						self.on_block_imported(&n.header).boxed(),
204					);
205				} else {
206					tracing::debug!(
207						target: LOG_TARGET,
208						"Skipping offchain workers for non-canon block: {:?}",
209						n.header,
210					)
211				}
212
213				ready(())
214			})
215			.await;
216	}
217
218	/// Start the offchain workers after given block.
219	#[must_use]
220	fn on_block_imported(&self, header: &Block::Header) -> impl Future<Output = ()> {
221		let runtime = self.runtime_api_provider.runtime_api();
222		let hash = header.hash();
223		let has_api_v1 = runtime.has_api_with::<dyn OffchainWorkerApi<Block>, _>(hash, |v| v == 1);
224		let has_api_v2 = runtime.has_api_with::<dyn OffchainWorkerApi<Block>, _>(hash, |v| v == 2);
225		let version = match (has_api_v1, has_api_v2) {
226			(_, Ok(true)) => 2,
227			(Ok(true), _) => 1,
228			err => {
229				let help =
230					"Consider turning off offchain workers if they are not part of your runtime.";
231				tracing::error!(
232					target: LOG_TARGET,
233					"Unsupported Offchain Worker API version: {:?}. {}.",
234					err,
235					help
236				);
237				0
238			},
239		};
240		tracing::debug!(
241			target: LOG_TARGET,
242			"Checking offchain workers at {hash:?}: version: {version}",
243		);
244
245		let process = (version > 0).then(|| {
246			let (api, runner) = api::AsyncApi::new(
247				self.network_provider.clone(),
248				self.is_validator,
249				self.shared_http_client.clone(),
250			);
251			tracing::debug!(target: LOG_TARGET, "Spawning offchain workers at {hash:?}");
252			let header = header.clone();
253			let client = self.runtime_api_provider.clone();
254
255			let mut capabilities = offchain::Capabilities::all();
256			capabilities.set(offchain::Capabilities::HTTP, self.enable_http_requests);
257
258			let keystore = self.keystore.clone();
259			let db = self.offchain_db.clone();
260			let tx_pool = self.transaction_pool.clone();
261			let custom_extensions = (*self.custom_extensions)(hash);
262
263			self.spawn_worker(move || {
264				let mut runtime = client.runtime_api();
265				let api = Box::new(api);
266				tracing::debug!(target: LOG_TARGET, "Running offchain workers at {hash:?}");
267
268				if let Some(keystore) = keystore {
269					runtime.register_extension(KeystoreExt(keystore.clone()));
270				}
271
272				if let Some(pool) = tx_pool {
273					runtime.register_extension(pool.offchain_transaction_pool(hash));
274				}
275
276				if let Some(offchain_db) = db {
277					runtime.register_extension(offchain::OffchainDbExt::new(
278						offchain::LimitedExternalities::new(capabilities, offchain_db.clone()),
279					));
280				}
281
282				runtime.register_extension(offchain::OffchainWorkerExt::new(
283					offchain::LimitedExternalities::new(capabilities, api),
284				));
285
286				custom_extensions.into_iter().for_each(|ext| runtime.register_extension(ext));
287
288				let run = if version == 2 {
289					runtime.offchain_worker(hash, &header)
290				} else {
291					#[allow(deprecated)]
292					runtime.offchain_worker_before_version_2(hash, *header.number())
293				};
294
295				if let Err(e) = run {
296					tracing::error!(
297						target: LOG_TARGET,
298						"Error running offchain workers at {:?}: {}",
299						hash,
300						e
301					);
302				}
303			});
304
305			runner.process()
306		});
307
308		async move {
309			futures::future::OptionFuture::from(process).await;
310		}
311	}
312
313	/// Spawns a new offchain worker.
314	///
315	/// We spawn offchain workers for each block in a separate thread,
316	/// since they can run for a significant amount of time
317	/// in a blocking fashion and we don't want to block the runtime.
318	///
319	/// Note that we should avoid that if we switch to future-based runtime in the future,
320	/// alternatively:
321	fn spawn_worker(&self, f: impl FnOnce() -> () + Send + 'static) {
322		self.thread_pool.lock().execute(f);
323	}
324}
325
326#[cfg(test)]
327mod tests {
328	use super::*;
329	use futures::executor::block_on;
330	use sc_block_builder::BlockBuilderBuilder;
331	use sc_client_api::Backend as _;
332	use sc_network::{
333		config::MultiaddrWithPeerId, types::ProtocolName, Multiaddr, ObservedRole, ReputationChange,
334	};
335	use sc_network_types::PeerId;
336	use sc_transaction_pool::BasicPool;
337	use sc_transaction_pool_api::{InPoolTransaction, TransactionPool};
338	use sp_consensus::BlockOrigin;
339	use sp_runtime::traits::Block as BlockT;
340	use std::{collections::HashSet, sync::Arc};
341	use substrate_test_runtime_client::{
342		runtime::{
343			substrate_test_pallet::pallet::Call as PalletCall, ExtrinsicBuilder, RuntimeCall,
344		},
345		ClientBlockImportExt, DefaultTestClientBuilderExt, TestClientBuilderExt,
346	};
347
348	struct TestNetwork();
349
350	impl NetworkStateInfo for TestNetwork {
351		fn external_addresses(&self) -> Vec<Multiaddr> {
352			Vec::new()
353		}
354
355		fn local_peer_id(&self) -> PeerId {
356			PeerId::random()
357		}
358
359		fn listen_addresses(&self) -> Vec<Multiaddr> {
360			Vec::new()
361		}
362	}
363
364	#[async_trait::async_trait]
365	impl NetworkPeers for TestNetwork {
366		fn set_authorized_peers(&self, _peers: HashSet<PeerId>) {
367			unimplemented!();
368		}
369
370		fn set_authorized_only(&self, _reserved_only: bool) {
371			unimplemented!();
372		}
373
374		fn add_known_address(&self, _peer_id: PeerId, _addr: Multiaddr) {
375			unimplemented!();
376		}
377
378		fn report_peer(&self, _peer_id: PeerId, _cost_benefit: ReputationChange) {
379			unimplemented!();
380		}
381
382		fn peer_reputation(&self, _peer_id: &PeerId) -> i32 {
383			unimplemented!()
384		}
385
386		fn disconnect_peer(&self, _peer_id: PeerId, _protocol: ProtocolName) {
387			unimplemented!();
388		}
389
390		fn accept_unreserved_peers(&self) {
391			unimplemented!();
392		}
393
394		fn deny_unreserved_peers(&self) {
395			unimplemented!();
396		}
397
398		fn add_reserved_peer(&self, _peer: MultiaddrWithPeerId) -> Result<(), String> {
399			unimplemented!();
400		}
401
402		fn remove_reserved_peer(&self, _peer_id: PeerId) {
403			unimplemented!();
404		}
405
406		fn set_reserved_peers(
407			&self,
408			_protocol: ProtocolName,
409			_peers: HashSet<Multiaddr>,
410		) -> Result<(), String> {
411			unimplemented!();
412		}
413
414		fn add_peers_to_reserved_set(
415			&self,
416			_protocol: ProtocolName,
417			_peers: HashSet<Multiaddr>,
418		) -> Result<(), String> {
419			unimplemented!();
420		}
421
422		fn remove_peers_from_reserved_set(
423			&self,
424			_protocol: ProtocolName,
425			_peers: Vec<PeerId>,
426		) -> Result<(), String> {
427			unimplemented!();
428		}
429
430		fn sync_num_connected(&self) -> usize {
431			unimplemented!();
432		}
433
434		fn peer_role(&self, _peer_id: PeerId, _handshake: Vec<u8>) -> Option<ObservedRole> {
435			None
436		}
437
438		async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
439			unimplemented!();
440		}
441	}
442
443	#[test]
444	fn should_call_into_runtime_and_produce_extrinsic() {
445		sp_tracing::try_init_simple();
446
447		let client = Arc::new(substrate_test_runtime_client::new());
448		let spawner = sp_core::testing::TaskExecutor::new();
449		let pool = Arc::from(BasicPool::new_full(
450			Default::default(),
451			true.into(),
452			None,
453			spawner,
454			client.clone(),
455		));
456		let network = Arc::new(TestNetwork());
457		let header = client.header(client.chain_info().genesis_hash).unwrap().unwrap();
458
459		// when
460		let offchain = OffchainWorkers::new(OffchainWorkerOptions {
461			runtime_api_provider: client,
462			keystore: None,
463			offchain_db: None::<NoOffchainStorage>,
464			transaction_pool: Some(OffchainTransactionPoolFactory::new(pool.clone())),
465			network_provider: network,
466			is_validator: false,
467			enable_http_requests: false,
468			custom_extensions: |_| Vec::new(),
469		})
470		.unwrap();
471		futures::executor::block_on(offchain.on_block_imported(&header));
472
473		// then
474		assert_eq!(pool.status().ready, 1);
475		assert!(matches!(
476			pool.ready().next().unwrap().data().function,
477			RuntimeCall::SubstrateTest(PalletCall::storage_change { .. })
478		));
479	}
480
481	#[test]
482	fn offchain_index_set_and_clear_works() {
483		use sp_core::offchain::OffchainStorage;
484
485		sp_tracing::try_init_simple();
486
487		let (client, backend) = substrate_test_runtime_client::TestClientBuilder::new()
488			.enable_offchain_indexing_api()
489			.build_with_backend();
490		let client = Arc::new(client);
491		let offchain_db = backend.offchain_storage().unwrap();
492
493		let key = &b"hello"[..];
494		let value = &b"world"[..];
495		let mut block_builder = BlockBuilderBuilder::new(&*client)
496			.on_parent_block(client.chain_info().genesis_hash)
497			.with_parent_block_number(0)
498			.build()
499			.unwrap();
500		let ext = ExtrinsicBuilder::new_offchain_index_set(key.to_vec(), value.to_vec()).build();
501		block_builder.push(ext).unwrap();
502
503		let block = block_builder.build().unwrap().block;
504		block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
505
506		assert_eq!(value, &offchain_db.get(sp_offchain::STORAGE_PREFIX, &key).unwrap());
507
508		let mut block_builder = BlockBuilderBuilder::new(&*client)
509			.on_parent_block(block.hash())
510			.with_parent_block_number(1)
511			.build()
512			.unwrap();
513		let ext = ExtrinsicBuilder::new_offchain_index_clear(key.to_vec()).nonce(1).build();
514		block_builder.push(ext).unwrap();
515
516		let block = block_builder.build().unwrap().block;
517		block_on(client.import(BlockOrigin::Own, block)).unwrap();
518
519		assert!(offchain_db.get(sp_offchain::STORAGE_PREFIX, &key).is_none());
520	}
521}