referrerpolicy=no-referrer-when-downgrade

cumulus_relay_chain_inprocess_interface/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18use std::{
19	collections::{BTreeMap, VecDeque},
20	pin::Pin,
21	sync::Arc,
22	time::Duration,
23};
24
25use async_trait::async_trait;
26use cumulus_client_bootnodes::bootnode_request_response_config;
27use cumulus_primitives_core::{
28	relay_chain::{
29		runtime_api::ParachainHost,
30		vstaging::{CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreState},
31		Block as PBlock, BlockId, BlockNumber, CoreIndex, Hash as PHash, Header as PHeader,
32		InboundHrmpMessage, OccupiedCoreAssumption, SessionIndex, ValidationCodeHash, ValidatorId,
33	},
34	InboundDownwardMessage, ParaId, PersistedValidationData,
35};
36use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult};
37use futures::{FutureExt, Stream, StreamExt};
38use polkadot_primitives::vstaging::CandidateEvent;
39use polkadot_service::{
40	builder::PolkadotServiceBuilder, CollatorOverseerGen, CollatorPair, Configuration, FullBackend,
41	FullClient, Handle, NewFull, NewFullParams, TaskManager,
42};
43use sc_cli::{RuntimeVersion, SubstrateCli};
44use sc_client_api::{
45	blockchain::BlockStatus, Backend, BlockchainEvents, HeaderBackend, ImportNotifications,
46	StorageProof, TrieCacheContext,
47};
48use sc_network::{
49	config::NetworkBackendType,
50	request_responses::IncomingRequest,
51	service::traits::{NetworkBackend, NetworkService},
52};
53use sc_telemetry::TelemetryWorkerHandle;
54use sp_api::{CallApiAt, CallApiAtParams, CallContext, ProvideRuntimeApi};
55use sp_consensus::SyncOracle;
56use sp_core::Pair;
57use sp_state_machine::{Backend as StateBackend, StorageValue};
58
59/// The timeout in seconds after that the waiting for a block should be aborted.
60const TIMEOUT_IN_SECONDS: u64 = 6;
61
62/// Provides an implementation of the [`RelayChainInterface`] using a local in-process relay chain
63/// node.
64#[derive(Clone)]
65pub struct RelayChainInProcessInterface {
66	full_client: Arc<FullClient>,
67	backend: Arc<FullBackend>,
68	sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
69	overseer_handle: Handle,
70}
71
72impl RelayChainInProcessInterface {
73	/// Create a new instance of [`RelayChainInProcessInterface`]
74	pub fn new(
75		full_client: Arc<FullClient>,
76		backend: Arc<FullBackend>,
77		sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
78		overseer_handle: Handle,
79	) -> Self {
80		Self { full_client, backend, sync_oracle, overseer_handle }
81	}
82}
83
84#[async_trait]
85impl RelayChainInterface for RelayChainInProcessInterface {
86	async fn version(&self, relay_parent: PHash) -> RelayChainResult<RuntimeVersion> {
87		Ok(self.full_client.runtime_version_at(relay_parent)?)
88	}
89
90	async fn retrieve_dmq_contents(
91		&self,
92		para_id: ParaId,
93		relay_parent: PHash,
94	) -> RelayChainResult<Vec<InboundDownwardMessage>> {
95		Ok(self.full_client.runtime_api().dmq_contents(relay_parent, para_id)?)
96	}
97
98	async fn retrieve_all_inbound_hrmp_channel_contents(
99		&self,
100		para_id: ParaId,
101		relay_parent: PHash,
102	) -> RelayChainResult<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
103		Ok(self
104			.full_client
105			.runtime_api()
106			.inbound_hrmp_channels_contents(relay_parent, para_id)?)
107	}
108
109	async fn header(&self, block_id: BlockId) -> RelayChainResult<Option<PHeader>> {
110		let hash = match block_id {
111			BlockId::Hash(hash) => hash,
112			BlockId::Number(num) =>
113				if let Some(hash) = self.full_client.hash(num)? {
114					hash
115				} else {
116					return Ok(None)
117				},
118		};
119		let header = self.full_client.header(hash)?;
120
121		Ok(header)
122	}
123
124	async fn persisted_validation_data(
125		&self,
126		hash: PHash,
127		para_id: ParaId,
128		occupied_core_assumption: OccupiedCoreAssumption,
129	) -> RelayChainResult<Option<PersistedValidationData>> {
130		Ok(self.full_client.runtime_api().persisted_validation_data(
131			hash,
132			para_id,
133			occupied_core_assumption,
134		)?)
135	}
136
137	async fn validation_code_hash(
138		&self,
139		hash: PHash,
140		para_id: ParaId,
141		occupied_core_assumption: OccupiedCoreAssumption,
142	) -> RelayChainResult<Option<ValidationCodeHash>> {
143		Ok(self.full_client.runtime_api().validation_code_hash(
144			hash,
145			para_id,
146			occupied_core_assumption,
147		)?)
148	}
149
150	async fn candidate_pending_availability(
151		&self,
152		hash: PHash,
153		para_id: ParaId,
154	) -> RelayChainResult<Option<CommittedCandidateReceipt>> {
155		Ok(self
156			.full_client
157			.runtime_api()
158			.candidate_pending_availability(hash, para_id)?
159			.map(|receipt| receipt.into()))
160	}
161
162	async fn session_index_for_child(&self, hash: PHash) -> RelayChainResult<SessionIndex> {
163		Ok(self.full_client.runtime_api().session_index_for_child(hash)?)
164	}
165
166	async fn validators(&self, hash: PHash) -> RelayChainResult<Vec<ValidatorId>> {
167		Ok(self.full_client.runtime_api().validators(hash)?)
168	}
169
170	async fn import_notification_stream(
171		&self,
172	) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
173		let notification_stream = self
174			.full_client
175			.import_notification_stream()
176			.map(|notification| notification.header);
177		Ok(Box::pin(notification_stream))
178	}
179
180	async fn finality_notification_stream(
181		&self,
182	) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
183		let notification_stream = self
184			.full_client
185			.finality_notification_stream()
186			.map(|notification| notification.header);
187		Ok(Box::pin(notification_stream))
188	}
189
190	async fn best_block_hash(&self) -> RelayChainResult<PHash> {
191		Ok(self.backend.blockchain().info().best_hash)
192	}
193
194	async fn finalized_block_hash(&self) -> RelayChainResult<PHash> {
195		Ok(self.backend.blockchain().info().finalized_hash)
196	}
197
198	async fn call_runtime_api(
199		&self,
200		method_name: &'static str,
201		hash: PHash,
202		payload: &[u8],
203	) -> RelayChainResult<Vec<u8>> {
204		Ok(self.full_client.call_api_at(CallApiAtParams {
205			at: hash,
206			function: method_name,
207			arguments: payload.to_vec(),
208			overlayed_changes: &Default::default(),
209			call_context: CallContext::Offchain,
210			recorder: &None,
211			extensions: &Default::default(),
212		})?)
213	}
214
215	async fn is_major_syncing(&self) -> RelayChainResult<bool> {
216		Ok(self.sync_oracle.is_major_syncing())
217	}
218
219	fn overseer_handle(&self) -> RelayChainResult<Handle> {
220		Ok(self.overseer_handle.clone())
221	}
222
223	async fn get_storage_by_key(
224		&self,
225		relay_parent: PHash,
226		key: &[u8],
227	) -> RelayChainResult<Option<StorageValue>> {
228		let state = self.backend.state_at(relay_parent, TrieCacheContext::Untrusted)?;
229		state.storage(key).map_err(RelayChainError::GenericError)
230	}
231
232	async fn prove_read(
233		&self,
234		relay_parent: PHash,
235		relevant_keys: &Vec<Vec<u8>>,
236	) -> RelayChainResult<StorageProof> {
237		let state_backend = self.backend.state_at(relay_parent, TrieCacheContext::Untrusted)?;
238
239		sp_state_machine::prove_read(state_backend, relevant_keys)
240			.map_err(RelayChainError::StateMachineError)
241	}
242
243	/// Wait for a given relay chain block in an async way.
244	///
245	/// The caller needs to pass the hash of a block it waits for and the function will return when
246	/// the block is available or an error occurred.
247	///
248	/// The waiting for the block is implemented as follows:
249	///
250	/// 1. Get a read lock on the import lock from the backend.
251	///
252	/// 2. Check if the block is already imported. If yes, return from the function.
253	///
254	/// 3. If the block isn't imported yet, add an import notification listener.
255	///
256	/// 4. Poll the import notification listener until the block is imported or the timeout is
257	/// fired.
258	///
259	/// The timeout is set to 6 seconds. This should be enough time to import the block in the
260	/// current round and if not, the new round of the relay chain already started anyway.
261	async fn wait_for_block(&self, hash: PHash) -> RelayChainResult<()> {
262		let mut listener =
263			match check_block_in_chain(self.backend.clone(), self.full_client.clone(), hash)? {
264				BlockCheckStatus::InChain => return Ok(()),
265				BlockCheckStatus::Unknown(listener) => listener,
266			};
267
268		let mut timeout = futures_timer::Delay::new(Duration::from_secs(TIMEOUT_IN_SECONDS)).fuse();
269
270		loop {
271			futures::select! {
272				_ = timeout => return Err(RelayChainError::WaitTimeout(hash)),
273				evt = listener.next() => match evt {
274					Some(evt) if evt.hash == hash => return Ok(()),
275					// Not the event we waited on.
276					Some(_) => continue,
277					None => return Err(RelayChainError::ImportListenerClosed(hash)),
278				}
279			}
280		}
281	}
282
283	async fn new_best_notification_stream(
284		&self,
285	) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
286		let notifications_stream =
287			self.full_client
288				.import_notification_stream()
289				.filter_map(|notification| async move {
290					notification.is_new_best.then_some(notification.header)
291				});
292		Ok(Box::pin(notifications_stream))
293	}
294
295	async fn availability_cores(
296		&self,
297		relay_parent: PHash,
298	) -> RelayChainResult<Vec<CoreState<PHash, BlockNumber>>> {
299		Ok(self
300			.full_client
301			.runtime_api()
302			.availability_cores(relay_parent)?
303			.into_iter()
304			.map(|core_state| core_state.into())
305			.collect::<Vec<_>>())
306	}
307
308	async fn candidates_pending_availability(
309		&self,
310		hash: PHash,
311		para_id: ParaId,
312	) -> RelayChainResult<Vec<CommittedCandidateReceipt>> {
313		Ok(self
314			.full_client
315			.runtime_api()
316			.candidates_pending_availability(hash, para_id)?
317			.into_iter()
318			.map(|receipt| receipt.into())
319			.collect::<Vec<_>>())
320	}
321
322	async fn claim_queue(
323		&self,
324		hash: PHash,
325	) -> RelayChainResult<BTreeMap<CoreIndex, VecDeque<ParaId>>> {
326		Ok(self.full_client.runtime_api().claim_queue(hash)?)
327	}
328
329	async fn scheduling_lookahead(&self, hash: PHash) -> RelayChainResult<u32> {
330		Ok(self.full_client.runtime_api().scheduling_lookahead(hash)?)
331	}
332
333	async fn candidate_events(&self, hash: PHash) -> RelayChainResult<Vec<CandidateEvent>> {
334		Ok(self.full_client.runtime_api().candidate_events(hash)?)
335	}
336}
337
338pub enum BlockCheckStatus {
339	/// Block is in chain
340	InChain,
341	/// Block status is unknown, listener can be used to wait for notification
342	Unknown(ImportNotifications<PBlock>),
343}
344
345// Helper function to check if a block is in chain.
346pub fn check_block_in_chain(
347	backend: Arc<FullBackend>,
348	client: Arc<FullClient>,
349	hash: PHash,
350) -> RelayChainResult<BlockCheckStatus> {
351	let _lock = backend.get_import_lock().read();
352
353	if backend.blockchain().status(hash)? == BlockStatus::InChain {
354		return Ok(BlockCheckStatus::InChain)
355	}
356
357	let listener = client.import_notification_stream();
358
359	Ok(BlockCheckStatus::Unknown(listener))
360}
361
362/// Build Polkadot full node with parachain bootnode request-response protocol.
363fn build_polkadot_with_paranode_protocol<Network>(
364	config: Configuration,
365	params: NewFullParams<CollatorOverseerGen>,
366) -> Result<(NewFull, async_channel::Receiver<IncomingRequest>), polkadot_service::Error>
367where
368	Network: NetworkBackend<PBlock, PHash>,
369{
370	let fork_id = config.chain_spec.fork_id().map(ToString::to_string);
371	let mut polkadot_builder = PolkadotServiceBuilder::<_, Network>::new(config, params)?;
372	let (config, request_receiver) = bootnode_request_response_config::<_, _, Network>(
373		polkadot_builder.genesis_hash(),
374		fork_id.as_deref(),
375	);
376	polkadot_builder.add_extra_request_response_protocol(config);
377
378	Ok((polkadot_builder.build()?, request_receiver))
379}
380
381/// Build the Polkadot full node using the given `config`.
382#[sc_tracing::logging::prefix_logs_with("Relaychain")]
383fn build_polkadot_full_node(
384	config: Configuration,
385	parachain_config: &Configuration,
386	telemetry_worker_handle: Option<TelemetryWorkerHandle>,
387	hwbench: Option<sc_sysinfo::HwBench>,
388) -> Result<
389	(NewFull, Option<CollatorPair>, async_channel::Receiver<IncomingRequest>),
390	polkadot_service::Error,
391> {
392	let (is_parachain_node, maybe_collator_key) = if parachain_config.role.is_authority() {
393		let collator_key = CollatorPair::generate().0;
394		(polkadot_service::IsParachainNode::Collator(collator_key.clone()), Some(collator_key))
395	} else {
396		(polkadot_service::IsParachainNode::FullNode, None)
397	};
398
399	let new_full_params = polkadot_service::NewFullParams {
400		is_parachain_node,
401		// Disable BEEFY. It should not be required by the internal relay chain node.
402		enable_beefy: false,
403		force_authoring_backoff: false,
404		telemetry_worker_handle,
405
406		// Cumulus doesn't spawn PVF workers, so we can disable version checks.
407		node_version: None,
408		secure_validator_mode: false,
409		workers_path: None,
410		workers_names: None,
411
412		overseer_gen: CollatorOverseerGen,
413		overseer_message_channel_capacity_override: None,
414		malus_finality_delay: None,
415		hwbench,
416		execute_workers_max_num: None,
417		prepare_workers_hard_max_num: None,
418		prepare_workers_soft_max_num: None,
419		keep_finalized_for: None,
420	};
421
422	let (relay_chain_full_node, paranode_req_receiver) = match config.network.network_backend {
423		NetworkBackendType::Libp2p => build_polkadot_with_paranode_protocol::<
424			sc_network::NetworkWorker<_, _>,
425		>(config, new_full_params)?,
426		NetworkBackendType::Litep2p => build_polkadot_with_paranode_protocol::<
427			sc_network::Litep2pNetworkBackend,
428		>(config, new_full_params)?,
429	};
430
431	Ok((relay_chain_full_node, maybe_collator_key, paranode_req_receiver))
432}
433
434/// Builds a relay chain interface by constructing a full relay chain node
435pub fn build_inprocess_relay_chain(
436	mut polkadot_config: Configuration,
437	parachain_config: &Configuration,
438	telemetry_worker_handle: Option<TelemetryWorkerHandle>,
439	task_manager: &mut TaskManager,
440	hwbench: Option<sc_sysinfo::HwBench>,
441) -> RelayChainResult<(
442	Arc<(dyn RelayChainInterface + 'static)>,
443	Option<CollatorPair>,
444	Arc<dyn NetworkService>,
445	async_channel::Receiver<IncomingRequest>,
446)> {
447	// This is essentially a hack, but we want to ensure that we send the correct node version
448	// to the telemetry.
449	polkadot_config.impl_version = polkadot_cli::Cli::impl_version();
450	polkadot_config.impl_name = polkadot_cli::Cli::impl_name();
451
452	let (full_node, collator_key, paranode_req_receiver) = build_polkadot_full_node(
453		polkadot_config,
454		parachain_config,
455		telemetry_worker_handle,
456		hwbench,
457	)
458	.map_err(|e| RelayChainError::Application(Box::new(e) as Box<_>))?;
459
460	let relay_chain_interface = Arc::new(RelayChainInProcessInterface::new(
461		full_node.client,
462		full_node.backend,
463		full_node.sync_service,
464		full_node.overseer_handle.clone().ok_or(RelayChainError::GenericError(
465			"Overseer not running in full node.".to_string(),
466		))?,
467	));
468
469	task_manager.add_child(full_node.task_manager);
470
471	Ok((relay_chain_interface, collator_key, full_node.network, paranode_req_receiver))
472}
473
474#[cfg(test)]
475mod tests {
476	use super::*;
477
478	use polkadot_primitives::Block as PBlock;
479	use polkadot_test_client::{
480		construct_transfer_extrinsic, BlockBuilderExt, Client, ClientBlockImportExt,
481		DefaultTestClientBuilderExt, InitPolkadotBlockBuilder, TestClientBuilder,
482		TestClientBuilderExt,
483	};
484	use sp_consensus::{BlockOrigin, SyncOracle};
485	use sp_runtime::traits::Block as BlockT;
486	use std::sync::Arc;
487
488	use futures::{executor::block_on, poll, task::Poll};
489
490	struct DummyNetwork {}
491
492	impl SyncOracle for DummyNetwork {
493		fn is_major_syncing(&self) -> bool {
494			unimplemented!("Not needed for test")
495		}
496
497		fn is_offline(&self) -> bool {
498			unimplemented!("Not needed for test")
499		}
500	}
501
502	fn build_client_backend_and_block() -> (Arc<Client>, PBlock, RelayChainInProcessInterface) {
503		let builder = TestClientBuilder::new();
504		let backend = builder.backend();
505		let client = Arc::new(builder.build());
506
507		let block_builder = client.init_polkadot_block_builder();
508		let block = block_builder.build().expect("Finalizes the block").block;
509		let dummy_network: Arc<dyn SyncOracle + Sync + Send> = Arc::new(DummyNetwork {});
510
511		let (tx, _rx) = metered::channel(30);
512		let mock_handle = Handle::new(tx);
513		(
514			client.clone(),
515			block,
516			RelayChainInProcessInterface::new(client, backend, dummy_network, mock_handle),
517		)
518	}
519
520	#[test]
521	fn returns_directly_for_available_block() {
522		let (client, block, relay_chain_interface) = build_client_backend_and_block();
523		let hash = block.hash();
524
525		block_on(client.import(BlockOrigin::Own, block)).expect("Imports the block");
526
527		block_on(async move {
528			// Should be ready on the first poll
529			assert!(matches!(
530				poll!(relay_chain_interface.wait_for_block(hash)),
531				Poll::Ready(Ok(()))
532			));
533		});
534	}
535
536	#[test]
537	fn resolve_after_block_import_notification_was_received() {
538		let (client, block, relay_chain_interface) = build_client_backend_and_block();
539		let hash = block.hash();
540
541		block_on(async move {
542			let mut future = relay_chain_interface.wait_for_block(hash);
543			// As the block is not yet imported, the first poll should return `Pending`
544			assert!(poll!(&mut future).is_pending());
545
546			// Import the block that should fire the notification
547			client.import(BlockOrigin::Own, block).await.expect("Imports the block");
548
549			// Now it should have received the notification and report that the block was imported
550			assert!(matches!(poll!(future), Poll::Ready(Ok(()))));
551		});
552	}
553
554	#[test]
555	fn wait_for_block_time_out_when_block_is_not_imported() {
556		let (_, block, relay_chain_interface) = build_client_backend_and_block();
557		let hash = block.hash();
558
559		assert!(matches!(
560			block_on(relay_chain_interface.wait_for_block(hash)),
561			Err(RelayChainError::WaitTimeout(_))
562		));
563	}
564
565	#[test]
566	fn do_not_resolve_after_different_block_import_notification_was_received() {
567		let (client, block, relay_chain_interface) = build_client_backend_and_block();
568		let hash = block.hash();
569
570		let ext = construct_transfer_extrinsic(
571			&client,
572			sp_keyring::Sr25519Keyring::Alice,
573			sp_keyring::Sr25519Keyring::Bob,
574			1000,
575		);
576		let mut block_builder = client.init_polkadot_block_builder();
577		// Push an extrinsic to get a different block hash.
578		block_builder.push_polkadot_extrinsic(ext).expect("Push extrinsic");
579		let block2 = block_builder.build().expect("Build second block").block;
580		let hash2 = block2.hash();
581
582		block_on(async move {
583			let mut future = relay_chain_interface.wait_for_block(hash);
584			let mut future2 = relay_chain_interface.wait_for_block(hash2);
585			// As the block is not yet imported, the first poll should return `Pending`
586			assert!(poll!(&mut future).is_pending());
587			assert!(poll!(&mut future2).is_pending());
588
589			// Import the block that should fire the notification
590			client.import(BlockOrigin::Own, block2).await.expect("Imports the second block");
591
592			// The import notification of the second block should not make this one finish
593			assert!(poll!(&mut future).is_pending());
594			// Now it should have received the notification and report that the block was imported
595			assert!(matches!(poll!(future2), Poll::Ready(Ok(()))));
596
597			client.import(BlockOrigin::Own, block).await.expect("Imports the first block");
598
599			// Now it should be ready
600			assert!(matches!(poll!(future), Poll::Ready(Ok(()))));
601		});
602	}
603}