referrerpolicy=no-referrer-when-downgrade

cumulus_relay_chain_interface/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18use std::{
19	collections::{BTreeMap, VecDeque},
20	pin::Pin,
21	sync::Arc,
22};
23
24use futures::Stream;
25use polkadot_overseer::prometheus::PrometheusError;
26use sc_client_api::StorageProof;
27use sp_version::RuntimeVersion;
28
29use async_trait::async_trait;
30use codec::{Decode, Encode, Error as CodecError};
31use jsonrpsee_core::ClientError as JsonRpcError;
32use sp_api::ApiError;
33
34use cumulus_primitives_core::relay_chain::{BlockId, CandidateEvent, Hash as RelayHash};
35pub use cumulus_primitives_core::{
36	relay_chain::{
37		BlockNumber, CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreIndex,
38		CoreState, Hash as PHash, Header as PHeader, InboundHrmpMessage, OccupiedCoreAssumption,
39		SessionIndex, ValidationCodeHash, ValidatorId,
40	},
41	InboundDownwardMessage, ParaId, PersistedValidationData,
42};
43pub use polkadot_overseer::Handle as OverseerHandle;
44pub use sp_state_machine::StorageValue;
45
46pub type RelayChainResult<T> = Result<T, RelayChainError>;
47
48#[derive(thiserror::Error, Debug)]
49pub enum RelayChainError {
50	#[error("Error occurred while calling relay chain runtime: {0}")]
51	ApiError(#[from] ApiError),
52	#[error("Timeout while waiting for relay-chain block `{0}` to be imported.")]
53	WaitTimeout(PHash),
54	#[error("Import listener closed while waiting for relay-chain block `{0}` to be imported.")]
55	ImportListenerClosed(PHash),
56	#[error(
57		"Blockchain returned an error while waiting for relay-chain block `{0}` to be imported: {1}"
58	)]
59	WaitBlockchainError(PHash, sp_blockchain::Error),
60	#[error("Blockchain returned an error: {0}")]
61	BlockchainError(#[from] sp_blockchain::Error),
62	#[error("State machine error occurred: {0}")]
63	StateMachineError(Box<dyn sp_state_machine::Error>),
64	#[error("Unable to call RPC method '{0}'")]
65	RpcCallError(String),
66	#[error("RPC Error: '{0}'")]
67	JsonRpcError(#[from] JsonRpcError),
68	#[error("Unable to communicate with RPC worker: {0}")]
69	WorkerCommunicationError(String),
70	#[error("Scale codec deserialization error: {0}")]
71	DeserializationError(CodecError),
72	#[error(transparent)]
73	Application(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
74	#[error("Prometheus error: {0}")]
75	PrometheusError(#[from] PrometheusError),
76	#[error("Unspecified error occurred: {0}")]
77	GenericError(String),
78}
79
80impl From<RelayChainError> for ApiError {
81	fn from(r: RelayChainError) -> Self {
82		sp_api::ApiError::Application(Box::new(r))
83	}
84}
85
86impl From<CodecError> for RelayChainError {
87	fn from(e: CodecError) -> Self {
88		RelayChainError::DeserializationError(e)
89	}
90}
91
92impl From<RelayChainError> for sp_blockchain::Error {
93	fn from(r: RelayChainError) -> Self {
94		sp_blockchain::Error::Application(Box::new(r))
95	}
96}
97
98impl<T: std::error::Error + Send + Sync + 'static> From<Box<T>> for RelayChainError {
99	fn from(r: Box<T>) -> Self {
100		RelayChainError::Application(r)
101	}
102}
103
104/// Trait that provides all necessary methods for interaction between collator and relay chain.
105#[async_trait]
106pub trait RelayChainInterface: Send + Sync {
107	/// Fetch a storage item by key.
108	async fn get_storage_by_key(
109		&self,
110		relay_parent: PHash,
111		key: &[u8],
112	) -> RelayChainResult<Option<StorageValue>>;
113
114	/// Fetch a vector of current validators.
115	async fn validators(&self, block_id: PHash) -> RelayChainResult<Vec<ValidatorId>>;
116
117	/// Get the hash of the current best block.
118	async fn best_block_hash(&self) -> RelayChainResult<PHash>;
119
120	/// Fetch the block header of a given hash or height, if it exists.
121	async fn header(&self, block_id: BlockId) -> RelayChainResult<Option<PHeader>>;
122
123	/// Get the hash of the finalized block.
124	async fn finalized_block_hash(&self) -> RelayChainResult<PHash>;
125
126	/// Call an arbitrary runtime api. The input and output are SCALE-encoded.
127	async fn call_runtime_api(
128		&self,
129		method_name: &'static str,
130		hash: RelayHash,
131		payload: &[u8],
132	) -> RelayChainResult<Vec<u8>>;
133
134	/// Returns the whole contents of the downward message queue for the parachain we are collating
135	/// for.
136	///
137	/// Returns `None` in case of an error.
138	async fn retrieve_dmq_contents(
139		&self,
140		para_id: ParaId,
141		relay_parent: PHash,
142	) -> RelayChainResult<Vec<InboundDownwardMessage>>;
143
144	/// Returns channels contents for each inbound HRMP channel addressed to the parachain we are
145	/// collating for.
146	///
147	/// Empty channels are also included.
148	async fn retrieve_all_inbound_hrmp_channel_contents(
149		&self,
150		para_id: ParaId,
151		relay_parent: PHash,
152	) -> RelayChainResult<BTreeMap<ParaId, Vec<InboundHrmpMessage>>>;
153
154	/// Yields the persisted validation data for the given `ParaId` along with an assumption that
155	/// should be used if the para currently occupies a core.
156	///
157	/// Returns `None` if either the para is not registered or the assumption is `Freed`
158	/// and the para already occupies a core.
159	async fn persisted_validation_data(
160		&self,
161		block_id: PHash,
162		para_id: ParaId,
163		_: OccupiedCoreAssumption,
164	) -> RelayChainResult<Option<PersistedValidationData>>;
165
166	/// Get the receipt of the first candidate pending availability of this para_id. This returns
167	/// `Some` for any paras assigned to occupied cores in `availability_cores` and `None`
168	/// otherwise.
169	#[deprecated(
170		note = "`candidate_pending_availability` only returns one candidate and is deprecated. Use `candidates_pending_availability` instead."
171	)]
172	async fn candidate_pending_availability(
173		&self,
174		block_id: PHash,
175		para_id: ParaId,
176	) -> RelayChainResult<Option<CommittedCandidateReceipt>>;
177
178	/// Returns the session index expected at a child of the block.
179	async fn session_index_for_child(&self, block_id: PHash) -> RelayChainResult<SessionIndex>;
180
181	/// Get a stream of import block notifications.
182	async fn import_notification_stream(
183		&self,
184	) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>>;
185
186	/// Get a stream of new best block notifications.
187	async fn new_best_notification_stream(
188		&self,
189	) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>>;
190
191	/// Wait for a block with a given hash in the relay chain.
192	///
193	/// This method returns immediately on error or if the block is already
194	/// reported to be in chain. Otherwise, it waits for the block to arrive.
195	async fn wait_for_block(&self, hash: PHash) -> RelayChainResult<()>;
196
197	/// Get a stream of finality notifications.
198	async fn finality_notification_stream(
199		&self,
200	) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>>;
201
202	/// Whether the synchronization service is undergoing major sync.
203	/// Returns true if so.
204	async fn is_major_syncing(&self) -> RelayChainResult<bool>;
205
206	/// Get a handle to the overseer.
207	fn overseer_handle(&self) -> RelayChainResult<OverseerHandle>;
208
209	/// Generate a storage read proof.
210	async fn prove_read(
211		&self,
212		relay_parent: PHash,
213		relevant_keys: &Vec<Vec<u8>>,
214	) -> RelayChainResult<StorageProof>;
215
216	/// Returns the validation code hash for the given `para_id` using the given
217	/// `occupied_core_assumption`.
218	async fn validation_code_hash(
219		&self,
220		relay_parent: PHash,
221		para_id: ParaId,
222		occupied_core_assumption: OccupiedCoreAssumption,
223	) -> RelayChainResult<Option<ValidationCodeHash>>;
224
225	/// Get the receipts of all candidates pending availability for this para_id.
226	async fn candidates_pending_availability(
227		&self,
228		block_id: PHash,
229		para_id: ParaId,
230	) -> RelayChainResult<Vec<CommittedCandidateReceipt>>;
231
232	/// Get the runtime version of the relay chain.
233	async fn version(&self, relay_parent: PHash) -> RelayChainResult<RuntimeVersion>;
234
235	/// Yields information on all availability cores as relevant to the child block.
236	///
237	/// Cores are either free, scheduled or occupied. Free cores can have paras assigned to them.
238	async fn availability_cores(
239		&self,
240		relay_parent: PHash,
241	) -> RelayChainResult<Vec<CoreState<PHash, BlockNumber>>>;
242
243	/// Fetch the claim queue.
244	async fn claim_queue(
245		&self,
246		relay_parent: PHash,
247	) -> RelayChainResult<BTreeMap<CoreIndex, VecDeque<ParaId>>>;
248
249	/// Fetch the scheduling lookahead value.
250	async fn scheduling_lookahead(&self, relay_parent: PHash) -> RelayChainResult<u32>;
251
252	async fn candidate_events(&self, at: RelayHash) -> RelayChainResult<Vec<CandidateEvent>>;
253}
254
255#[async_trait]
256impl<T> RelayChainInterface for Arc<T>
257where
258	T: RelayChainInterface + ?Sized,
259{
260	async fn retrieve_dmq_contents(
261		&self,
262		para_id: ParaId,
263		relay_parent: PHash,
264	) -> RelayChainResult<Vec<InboundDownwardMessage>> {
265		(**self).retrieve_dmq_contents(para_id, relay_parent).await
266	}
267
268	async fn retrieve_all_inbound_hrmp_channel_contents(
269		&self,
270		para_id: ParaId,
271		relay_parent: PHash,
272	) -> RelayChainResult<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
273		(**self).retrieve_all_inbound_hrmp_channel_contents(para_id, relay_parent).await
274	}
275
276	async fn persisted_validation_data(
277		&self,
278		block_id: PHash,
279		para_id: ParaId,
280		occupied_core_assumption: OccupiedCoreAssumption,
281	) -> RelayChainResult<Option<PersistedValidationData>> {
282		(**self)
283			.persisted_validation_data(block_id, para_id, occupied_core_assumption)
284			.await
285	}
286
287	#[allow(deprecated)]
288	async fn candidate_pending_availability(
289		&self,
290		block_id: PHash,
291		para_id: ParaId,
292	) -> RelayChainResult<Option<CommittedCandidateReceipt>> {
293		(**self).candidate_pending_availability(block_id, para_id).await
294	}
295
296	async fn session_index_for_child(&self, block_id: PHash) -> RelayChainResult<SessionIndex> {
297		(**self).session_index_for_child(block_id).await
298	}
299
300	async fn validators(&self, block_id: PHash) -> RelayChainResult<Vec<ValidatorId>> {
301		(**self).validators(block_id).await
302	}
303
304	async fn import_notification_stream(
305		&self,
306	) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
307		(**self).import_notification_stream().await
308	}
309
310	async fn finality_notification_stream(
311		&self,
312	) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
313		(**self).finality_notification_stream().await
314	}
315
316	async fn best_block_hash(&self) -> RelayChainResult<PHash> {
317		(**self).best_block_hash().await
318	}
319
320	async fn finalized_block_hash(&self) -> RelayChainResult<PHash> {
321		(**self).finalized_block_hash().await
322	}
323
324	async fn call_runtime_api(
325		&self,
326		method_name: &'static str,
327		hash: RelayHash,
328		payload: &[u8],
329	) -> RelayChainResult<Vec<u8>> {
330		(**self).call_runtime_api(method_name, hash, payload).await
331	}
332
333	async fn is_major_syncing(&self) -> RelayChainResult<bool> {
334		(**self).is_major_syncing().await
335	}
336
337	fn overseer_handle(&self) -> RelayChainResult<OverseerHandle> {
338		(**self).overseer_handle()
339	}
340
341	async fn get_storage_by_key(
342		&self,
343		relay_parent: PHash,
344		key: &[u8],
345	) -> RelayChainResult<Option<StorageValue>> {
346		(**self).get_storage_by_key(relay_parent, key).await
347	}
348
349	async fn prove_read(
350		&self,
351		relay_parent: PHash,
352		relevant_keys: &Vec<Vec<u8>>,
353	) -> RelayChainResult<StorageProof> {
354		(**self).prove_read(relay_parent, relevant_keys).await
355	}
356
357	async fn wait_for_block(&self, hash: PHash) -> RelayChainResult<()> {
358		(**self).wait_for_block(hash).await
359	}
360
361	async fn new_best_notification_stream(
362		&self,
363	) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
364		(**self).new_best_notification_stream().await
365	}
366
367	async fn header(&self, block_id: BlockId) -> RelayChainResult<Option<PHeader>> {
368		(**self).header(block_id).await
369	}
370
371	async fn validation_code_hash(
372		&self,
373		relay_parent: PHash,
374		para_id: ParaId,
375		occupied_core_assumption: OccupiedCoreAssumption,
376	) -> RelayChainResult<Option<ValidationCodeHash>> {
377		(**self)
378			.validation_code_hash(relay_parent, para_id, occupied_core_assumption)
379			.await
380	}
381
382	async fn availability_cores(
383		&self,
384		relay_parent: PHash,
385	) -> RelayChainResult<Vec<CoreState<PHash, BlockNumber>>> {
386		(**self).availability_cores(relay_parent).await
387	}
388
389	async fn candidates_pending_availability(
390		&self,
391		block_id: PHash,
392		para_id: ParaId,
393	) -> RelayChainResult<Vec<CommittedCandidateReceipt>> {
394		(**self).candidates_pending_availability(block_id, para_id).await
395	}
396
397	async fn version(&self, relay_parent: PHash) -> RelayChainResult<RuntimeVersion> {
398		(**self).version(relay_parent).await
399	}
400
401	async fn claim_queue(
402		&self,
403		relay_parent: PHash,
404	) -> RelayChainResult<BTreeMap<CoreIndex, VecDeque<ParaId>>> {
405		(**self).claim_queue(relay_parent).await
406	}
407
408	async fn scheduling_lookahead(&self, relay_parent: PHash) -> RelayChainResult<u32> {
409		(**self).scheduling_lookahead(relay_parent).await
410	}
411
412	async fn candidate_events(&self, at: RelayHash) -> RelayChainResult<Vec<CandidateEvent>> {
413		(**self).candidate_events(at).await
414	}
415}
416
417/// Helper function to call an arbitrary runtime API using a `RelayChainInterface` client.
418/// Unlike the trait method, this function can be generic, so it handles the encoding of input and
419/// output params.
420pub async fn call_runtime_api<R>(
421	client: &(impl RelayChainInterface + ?Sized),
422	method_name: &'static str,
423	hash: RelayHash,
424	payload: impl Encode,
425) -> RelayChainResult<R>
426where
427	R: Decode,
428{
429	let res = client.call_runtime_api(method_name, hash, &payload.encode()).await?;
430	Decode::decode(&mut &*res).map_err(Into::into)
431}