1#![warn(missing_docs)]
37
38use std::{fmt, sync::Arc};
39
40use futures::{
41 future::{ready, Future},
42 prelude::*,
43};
44use parking_lot::Mutex;
45use sc_client_api::BlockchainEvents;
46use sc_network::{NetworkPeers, NetworkStateInfo};
47use sc_transaction_pool_api::OffchainTransactionPoolFactory;
48use sp_api::{ApiExt, ProvideRuntimeApi};
49use sp_core::{offchain, traits::SpawnNamed};
50use sp_externalities::Extension;
51use sp_keystore::{KeystoreExt, KeystorePtr};
52use sp_runtime::traits::{self, Header};
53use threadpool::ThreadPool;
54
55mod api;
56
57pub use sp_core::offchain::storage::OffchainDb;
58pub use sp_offchain::{OffchainWorkerApi, STORAGE_PREFIX};
59
60const LOG_TARGET: &str = "offchain-worker";
61
62pub trait NetworkProvider: NetworkStateInfo + NetworkPeers {}
65
66impl<T> NetworkProvider for T where T: NetworkStateInfo + NetworkPeers {}
67
68#[derive(Clone)]
73pub enum NoOffchainStorage {}
74
75impl offchain::OffchainStorage for NoOffchainStorage {
76 fn set(&mut self, _: &[u8], _: &[u8], _: &[u8]) {
77 unimplemented!("`NoOffchainStorage` can not be constructed!")
78 }
79
80 fn remove(&mut self, _: &[u8], _: &[u8]) {
81 unimplemented!("`NoOffchainStorage` can not be constructed!")
82 }
83
84 fn get(&self, _: &[u8], _: &[u8]) -> Option<Vec<u8>> {
85 unimplemented!("`NoOffchainStorage` can not be constructed!")
86 }
87
88 fn compare_and_set(&mut self, _: &[u8], _: &[u8], _: Option<&[u8]>, _: &[u8]) -> bool {
89 unimplemented!("`NoOffchainStorage` can not be constructed!")
90 }
91}
92
93pub struct OffchainWorkerOptions<RA, Block: traits::Block, Storage, CE> {
95 pub runtime_api_provider: Arc<RA>,
97 pub keystore: Option<KeystorePtr>,
99 pub offchain_db: Option<Storage>,
103 pub transaction_pool: Option<OffchainTransactionPoolFactory<Block>>,
105 pub network_provider: Arc<dyn NetworkProvider + Send + Sync>,
107 pub is_validator: bool,
109 pub enable_http_requests: bool,
113 pub custom_extensions: CE,
127}
128
129pub struct OffchainWorkers<RA, Block: traits::Block, Storage> {
131 runtime_api_provider: Arc<RA>,
132 thread_pool: Mutex<ThreadPool>,
133 shared_http_client: api::SharedClient,
134 enable_http_requests: bool,
135 keystore: Option<KeystorePtr>,
136 offchain_db: Option<OffchainDb<Storage>>,
137 transaction_pool: Option<OffchainTransactionPoolFactory<Block>>,
138 network_provider: Arc<dyn NetworkProvider + Send + Sync>,
139 is_validator: bool,
140 custom_extensions: Box<dyn Fn(Block::Hash) -> Vec<Box<dyn Extension>> + Send>,
141}
142
143impl<RA, Block: traits::Block, Storage> OffchainWorkers<RA, Block, Storage> {
144 pub fn new<CE: Fn(Block::Hash) -> Vec<Box<dyn Extension>> + Send + 'static>(
146 OffchainWorkerOptions {
147 runtime_api_provider,
148 keystore,
149 offchain_db,
150 transaction_pool,
151 network_provider,
152 is_validator,
153 enable_http_requests,
154 custom_extensions,
155 }: OffchainWorkerOptions<RA, Block, Storage, CE>,
156 ) -> std::io::Result<Self> {
157 Ok(Self {
158 runtime_api_provider,
159 thread_pool: Mutex::new(ThreadPool::with_name(
160 "offchain-worker".into(),
161 num_cpus::get(),
162 )),
163 shared_http_client: api::SharedClient::new()?,
164 enable_http_requests,
165 keystore,
166 offchain_db: offchain_db.map(OffchainDb::new),
167 transaction_pool,
168 is_validator,
169 network_provider,
170 custom_extensions: Box::new(custom_extensions),
171 })
172 }
173}
174
175impl<RA, Block: traits::Block, Storage: offchain::OffchainStorage> fmt::Debug
176 for OffchainWorkers<RA, Block, Storage>
177{
178 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
179 f.debug_tuple("OffchainWorkers").finish()
180 }
181}
182
183impl<RA, Block, Storage> OffchainWorkers<RA, Block, Storage>
184where
185 Block: traits::Block,
186 RA: ProvideRuntimeApi<Block> + Send + Sync + 'static,
187 RA::Api: OffchainWorkerApi<Block>,
188 Storage: offchain::OffchainStorage + 'static,
189{
190 pub async fn run<BE: BlockchainEvents<Block>>(
192 self,
193 import_events: Arc<BE>,
194 spawner: impl SpawnNamed,
195 ) {
196 import_events
197 .import_notification_stream()
198 .for_each(move |n| {
199 if n.is_new_best {
200 spawner.spawn(
201 "offchain-on-block",
202 Some("offchain-worker"),
203 self.on_block_imported(&n.header).boxed(),
204 );
205 } else {
206 tracing::debug!(
207 target: LOG_TARGET,
208 "Skipping offchain workers for non-canon block: {:?}",
209 n.header,
210 )
211 }
212
213 ready(())
214 })
215 .await;
216 }
217
218 #[must_use]
220 fn on_block_imported(&self, header: &Block::Header) -> impl Future<Output = ()> {
221 let runtime = self.runtime_api_provider.runtime_api();
222 let hash = header.hash();
223 let has_api_v1 = runtime.has_api_with::<dyn OffchainWorkerApi<Block>, _>(hash, |v| v == 1);
224 let has_api_v2 = runtime.has_api_with::<dyn OffchainWorkerApi<Block>, _>(hash, |v| v == 2);
225 let version = match (has_api_v1, has_api_v2) {
226 (_, Ok(true)) => 2,
227 (Ok(true), _) => 1,
228 err => {
229 let help =
230 "Consider turning off offchain workers if they are not part of your runtime.";
231 tracing::error!(
232 target: LOG_TARGET,
233 "Unsupported Offchain Worker API version: {:?}. {}.",
234 err,
235 help
236 );
237 0
238 },
239 };
240 tracing::debug!(
241 target: LOG_TARGET,
242 "Checking offchain workers at {hash:?}: version: {version}",
243 );
244
245 let process = (version > 0).then(|| {
246 let (api, runner) = api::AsyncApi::new(
247 self.network_provider.clone(),
248 self.is_validator,
249 self.shared_http_client.clone(),
250 );
251 tracing::debug!(target: LOG_TARGET, "Spawning offchain workers at {hash:?}");
252 let header = header.clone();
253 let client = self.runtime_api_provider.clone();
254
255 let mut capabilities = offchain::Capabilities::all();
256 capabilities.set(offchain::Capabilities::HTTP, self.enable_http_requests);
257
258 let keystore = self.keystore.clone();
259 let db = self.offchain_db.clone();
260 let tx_pool = self.transaction_pool.clone();
261 let custom_extensions = (*self.custom_extensions)(hash);
262
263 self.spawn_worker(move || {
264 let mut runtime = client.runtime_api();
265 let api = Box::new(api);
266 tracing::debug!(target: LOG_TARGET, "Running offchain workers at {hash:?}");
267
268 if let Some(keystore) = keystore {
269 runtime.register_extension(KeystoreExt(keystore.clone()));
270 }
271
272 if let Some(pool) = tx_pool {
273 runtime.register_extension(pool.offchain_transaction_pool(hash));
274 }
275
276 if let Some(offchain_db) = db {
277 runtime.register_extension(offchain::OffchainDbExt::new(
278 offchain::LimitedExternalities::new(capabilities, offchain_db.clone()),
279 ));
280 }
281
282 runtime.register_extension(offchain::OffchainWorkerExt::new(
283 offchain::LimitedExternalities::new(capabilities, api),
284 ));
285
286 custom_extensions.into_iter().for_each(|ext| runtime.register_extension(ext));
287
288 let run = if version == 2 {
289 runtime.offchain_worker(hash, &header)
290 } else {
291 #[allow(deprecated)]
292 runtime.offchain_worker_before_version_2(hash, *header.number())
293 };
294
295 if let Err(e) = run {
296 tracing::error!(
297 target: LOG_TARGET,
298 "Error running offchain workers at {:?}: {}",
299 hash,
300 e
301 );
302 }
303 });
304
305 runner.process()
306 });
307
308 async move {
309 futures::future::OptionFuture::from(process).await;
310 }
311 }
312
313 fn spawn_worker(&self, f: impl FnOnce() -> () + Send + 'static) {
322 self.thread_pool.lock().execute(f);
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use super::*;
329 use futures::executor::block_on;
330 use sc_block_builder::BlockBuilderBuilder;
331 use sc_client_api::Backend as _;
332 use sc_network::{
333 config::MultiaddrWithPeerId, types::ProtocolName, Multiaddr, ObservedRole, ReputationChange,
334 };
335 use sc_network_types::PeerId;
336 use sc_transaction_pool::BasicPool;
337 use sc_transaction_pool_api::{InPoolTransaction, TransactionPool};
338 use sp_consensus::BlockOrigin;
339 use sp_runtime::traits::Block as BlockT;
340 use std::{collections::HashSet, sync::Arc};
341 use substrate_test_runtime_client::{
342 runtime::{
343 substrate_test_pallet::pallet::Call as PalletCall, ExtrinsicBuilder, RuntimeCall,
344 },
345 ClientBlockImportExt, DefaultTestClientBuilderExt, TestClientBuilderExt,
346 };
347
348 struct TestNetwork();
349
350 impl NetworkStateInfo for TestNetwork {
351 fn external_addresses(&self) -> Vec<Multiaddr> {
352 Vec::new()
353 }
354
355 fn local_peer_id(&self) -> PeerId {
356 PeerId::random()
357 }
358
359 fn listen_addresses(&self) -> Vec<Multiaddr> {
360 Vec::new()
361 }
362 }
363
364 #[async_trait::async_trait]
365 impl NetworkPeers for TestNetwork {
366 fn set_authorized_peers(&self, _peers: HashSet<PeerId>) {
367 unimplemented!();
368 }
369
370 fn set_authorized_only(&self, _reserved_only: bool) {
371 unimplemented!();
372 }
373
374 fn add_known_address(&self, _peer_id: PeerId, _addr: Multiaddr) {
375 unimplemented!();
376 }
377
378 fn report_peer(&self, _peer_id: PeerId, _cost_benefit: ReputationChange) {
379 unimplemented!();
380 }
381
382 fn peer_reputation(&self, _peer_id: &PeerId) -> i32 {
383 unimplemented!()
384 }
385
386 fn disconnect_peer(&self, _peer_id: PeerId, _protocol: ProtocolName) {
387 unimplemented!();
388 }
389
390 fn accept_unreserved_peers(&self) {
391 unimplemented!();
392 }
393
394 fn deny_unreserved_peers(&self) {
395 unimplemented!();
396 }
397
398 fn add_reserved_peer(&self, _peer: MultiaddrWithPeerId) -> Result<(), String> {
399 unimplemented!();
400 }
401
402 fn remove_reserved_peer(&self, _peer_id: PeerId) {
403 unimplemented!();
404 }
405
406 fn set_reserved_peers(
407 &self,
408 _protocol: ProtocolName,
409 _peers: HashSet<Multiaddr>,
410 ) -> Result<(), String> {
411 unimplemented!();
412 }
413
414 fn add_peers_to_reserved_set(
415 &self,
416 _protocol: ProtocolName,
417 _peers: HashSet<Multiaddr>,
418 ) -> Result<(), String> {
419 unimplemented!();
420 }
421
422 fn remove_peers_from_reserved_set(
423 &self,
424 _protocol: ProtocolName,
425 _peers: Vec<PeerId>,
426 ) -> Result<(), String> {
427 unimplemented!();
428 }
429
430 fn sync_num_connected(&self) -> usize {
431 unimplemented!();
432 }
433
434 fn peer_role(&self, _peer_id: PeerId, _handshake: Vec<u8>) -> Option<ObservedRole> {
435 None
436 }
437
438 async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
439 unimplemented!();
440 }
441 }
442
443 #[test]
444 fn should_call_into_runtime_and_produce_extrinsic() {
445 sp_tracing::try_init_simple();
446
447 let client = Arc::new(substrate_test_runtime_client::new());
448 let spawner = sp_core::testing::TaskExecutor::new();
449 let pool = Arc::from(BasicPool::new_full(
450 Default::default(),
451 true.into(),
452 None,
453 spawner,
454 client.clone(),
455 ));
456 let network = Arc::new(TestNetwork());
457 let header = client.header(client.chain_info().genesis_hash).unwrap().unwrap();
458
459 let offchain = OffchainWorkers::new(OffchainWorkerOptions {
461 runtime_api_provider: client,
462 keystore: None,
463 offchain_db: None::<NoOffchainStorage>,
464 transaction_pool: Some(OffchainTransactionPoolFactory::new(pool.clone())),
465 network_provider: network,
466 is_validator: false,
467 enable_http_requests: false,
468 custom_extensions: |_| Vec::new(),
469 })
470 .unwrap();
471 futures::executor::block_on(offchain.on_block_imported(&header));
472
473 assert_eq!(pool.status().ready, 1);
475 assert!(matches!(
476 pool.ready().next().unwrap().data().function,
477 RuntimeCall::SubstrateTest(PalletCall::storage_change { .. })
478 ));
479 }
480
481 #[test]
482 fn offchain_index_set_and_clear_works() {
483 use sp_core::offchain::OffchainStorage;
484
485 sp_tracing::try_init_simple();
486
487 let (client, backend) = substrate_test_runtime_client::TestClientBuilder::new()
488 .enable_offchain_indexing_api()
489 .build_with_backend();
490 let client = Arc::new(client);
491 let offchain_db = backend.offchain_storage().unwrap();
492
493 let key = &b"hello"[..];
494 let value = &b"world"[..];
495 let mut block_builder = BlockBuilderBuilder::new(&*client)
496 .on_parent_block(client.chain_info().genesis_hash)
497 .with_parent_block_number(0)
498 .build()
499 .unwrap();
500 let ext = ExtrinsicBuilder::new_offchain_index_set(key.to_vec(), value.to_vec()).build();
501 block_builder.push(ext).unwrap();
502
503 let block = block_builder.build().unwrap().block;
504 block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
505
506 assert_eq!(value, &offchain_db.get(sp_offchain::STORAGE_PREFIX, &key).unwrap());
507
508 let mut block_builder = BlockBuilderBuilder::new(&*client)
509 .on_parent_block(block.hash())
510 .with_parent_block_number(1)
511 .build()
512 .unwrap();
513 let ext = ExtrinsicBuilder::new_offchain_index_clear(key.to_vec()).nonce(1).build();
514 block_builder.push(ext).unwrap();
515
516 let block = block_builder.build().unwrap().block;
517 block_on(client.import(BlockOrigin::Own, block)).unwrap();
518
519 assert!(offchain_db.get(sp_offchain::STORAGE_PREFIX, &key).is_none());
520 }
521}