cumulus_relay_chain_interface/
lib.rs1use std::{
19 collections::{BTreeMap, VecDeque},
20 pin::Pin,
21 sync::Arc,
22};
23
24use futures::Stream;
25use polkadot_overseer::prometheus::PrometheusError;
26use sc_client_api::StorageProof;
27use sp_version::RuntimeVersion;
28
29use async_trait::async_trait;
30use codec::{Decode, Encode, Error as CodecError};
31use jsonrpsee_core::ClientError as JsonRpcError;
32use sp_api::ApiError;
33
34use cumulus_primitives_core::relay_chain::{BlockId, CandidateEvent, Hash as RelayHash};
35pub use cumulus_primitives_core::{
36 relay_chain::{
37 BlockNumber, CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreIndex,
38 CoreState, Hash as PHash, Header as PHeader, InboundHrmpMessage, OccupiedCoreAssumption,
39 SessionIndex, ValidationCodeHash, ValidatorId,
40 },
41 InboundDownwardMessage, ParaId, PersistedValidationData,
42};
43pub use polkadot_overseer::Handle as OverseerHandle;
44pub use sp_state_machine::StorageValue;
45
46pub type RelayChainResult<T> = Result<T, RelayChainError>;
47
48#[derive(thiserror::Error, Debug)]
49pub enum RelayChainError {
50 #[error("Error occurred while calling relay chain runtime: {0}")]
51 ApiError(#[from] ApiError),
52 #[error("Timeout while waiting for relay-chain block `{0}` to be imported.")]
53 WaitTimeout(PHash),
54 #[error("Import listener closed while waiting for relay-chain block `{0}` to be imported.")]
55 ImportListenerClosed(PHash),
56 #[error(
57 "Blockchain returned an error while waiting for relay-chain block `{0}` to be imported: {1}"
58 )]
59 WaitBlockchainError(PHash, sp_blockchain::Error),
60 #[error("Blockchain returned an error: {0}")]
61 BlockchainError(#[from] sp_blockchain::Error),
62 #[error("State machine error occurred: {0}")]
63 StateMachineError(Box<dyn sp_state_machine::Error>),
64 #[error("Unable to call RPC method '{0}'")]
65 RpcCallError(String),
66 #[error("RPC Error: '{0}'")]
67 JsonRpcError(#[from] JsonRpcError),
68 #[error("Unable to communicate with RPC worker: {0}")]
69 WorkerCommunicationError(String),
70 #[error("Scale codec deserialization error: {0}")]
71 DeserializationError(CodecError),
72 #[error(transparent)]
73 Application(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
74 #[error("Prometheus error: {0}")]
75 PrometheusError(#[from] PrometheusError),
76 #[error("Unspecified error occurred: {0}")]
77 GenericError(String),
78}
79
80impl From<RelayChainError> for ApiError {
81 fn from(r: RelayChainError) -> Self {
82 sp_api::ApiError::Application(Box::new(r))
83 }
84}
85
86impl From<CodecError> for RelayChainError {
87 fn from(e: CodecError) -> Self {
88 RelayChainError::DeserializationError(e)
89 }
90}
91
92impl From<RelayChainError> for sp_blockchain::Error {
93 fn from(r: RelayChainError) -> Self {
94 sp_blockchain::Error::Application(Box::new(r))
95 }
96}
97
98impl<T: std::error::Error + Send + Sync + 'static> From<Box<T>> for RelayChainError {
99 fn from(r: Box<T>) -> Self {
100 RelayChainError::Application(r)
101 }
102}
103
104#[async_trait]
106pub trait RelayChainInterface: Send + Sync {
107 async fn get_storage_by_key(
109 &self,
110 relay_parent: PHash,
111 key: &[u8],
112 ) -> RelayChainResult<Option<StorageValue>>;
113
114 async fn validators(&self, block_id: PHash) -> RelayChainResult<Vec<ValidatorId>>;
116
117 async fn best_block_hash(&self) -> RelayChainResult<PHash>;
119
120 async fn header(&self, block_id: BlockId) -> RelayChainResult<Option<PHeader>>;
122
123 async fn finalized_block_hash(&self) -> RelayChainResult<PHash>;
125
126 async fn call_runtime_api(
128 &self,
129 method_name: &'static str,
130 hash: RelayHash,
131 payload: &[u8],
132 ) -> RelayChainResult<Vec<u8>>;
133
134 async fn retrieve_dmq_contents(
139 &self,
140 para_id: ParaId,
141 relay_parent: PHash,
142 ) -> RelayChainResult<Vec<InboundDownwardMessage>>;
143
144 async fn retrieve_all_inbound_hrmp_channel_contents(
149 &self,
150 para_id: ParaId,
151 relay_parent: PHash,
152 ) -> RelayChainResult<BTreeMap<ParaId, Vec<InboundHrmpMessage>>>;
153
154 async fn persisted_validation_data(
160 &self,
161 block_id: PHash,
162 para_id: ParaId,
163 _: OccupiedCoreAssumption,
164 ) -> RelayChainResult<Option<PersistedValidationData>>;
165
166 #[deprecated(
170 note = "`candidate_pending_availability` only returns one candidate and is deprecated. Use `candidates_pending_availability` instead."
171 )]
172 async fn candidate_pending_availability(
173 &self,
174 block_id: PHash,
175 para_id: ParaId,
176 ) -> RelayChainResult<Option<CommittedCandidateReceipt>>;
177
178 async fn session_index_for_child(&self, block_id: PHash) -> RelayChainResult<SessionIndex>;
180
181 async fn import_notification_stream(
183 &self,
184 ) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>>;
185
186 async fn new_best_notification_stream(
188 &self,
189 ) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>>;
190
191 async fn wait_for_block(&self, hash: PHash) -> RelayChainResult<()>;
196
197 async fn finality_notification_stream(
199 &self,
200 ) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>>;
201
202 async fn is_major_syncing(&self) -> RelayChainResult<bool>;
205
206 fn overseer_handle(&self) -> RelayChainResult<OverseerHandle>;
208
209 async fn prove_read(
211 &self,
212 relay_parent: PHash,
213 relevant_keys: &Vec<Vec<u8>>,
214 ) -> RelayChainResult<StorageProof>;
215
216 async fn validation_code_hash(
219 &self,
220 relay_parent: PHash,
221 para_id: ParaId,
222 occupied_core_assumption: OccupiedCoreAssumption,
223 ) -> RelayChainResult<Option<ValidationCodeHash>>;
224
225 async fn candidates_pending_availability(
227 &self,
228 block_id: PHash,
229 para_id: ParaId,
230 ) -> RelayChainResult<Vec<CommittedCandidateReceipt>>;
231
232 async fn version(&self, relay_parent: PHash) -> RelayChainResult<RuntimeVersion>;
234
235 async fn availability_cores(
239 &self,
240 relay_parent: PHash,
241 ) -> RelayChainResult<Vec<CoreState<PHash, BlockNumber>>>;
242
243 async fn claim_queue(
245 &self,
246 relay_parent: PHash,
247 ) -> RelayChainResult<BTreeMap<CoreIndex, VecDeque<ParaId>>>;
248
249 async fn scheduling_lookahead(&self, relay_parent: PHash) -> RelayChainResult<u32>;
251
252 async fn candidate_events(&self, at: RelayHash) -> RelayChainResult<Vec<CandidateEvent>>;
253}
254
255#[async_trait]
256impl<T> RelayChainInterface for Arc<T>
257where
258 T: RelayChainInterface + ?Sized,
259{
260 async fn retrieve_dmq_contents(
261 &self,
262 para_id: ParaId,
263 relay_parent: PHash,
264 ) -> RelayChainResult<Vec<InboundDownwardMessage>> {
265 (**self).retrieve_dmq_contents(para_id, relay_parent).await
266 }
267
268 async fn retrieve_all_inbound_hrmp_channel_contents(
269 &self,
270 para_id: ParaId,
271 relay_parent: PHash,
272 ) -> RelayChainResult<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
273 (**self).retrieve_all_inbound_hrmp_channel_contents(para_id, relay_parent).await
274 }
275
276 async fn persisted_validation_data(
277 &self,
278 block_id: PHash,
279 para_id: ParaId,
280 occupied_core_assumption: OccupiedCoreAssumption,
281 ) -> RelayChainResult<Option<PersistedValidationData>> {
282 (**self)
283 .persisted_validation_data(block_id, para_id, occupied_core_assumption)
284 .await
285 }
286
287 #[allow(deprecated)]
288 async fn candidate_pending_availability(
289 &self,
290 block_id: PHash,
291 para_id: ParaId,
292 ) -> RelayChainResult<Option<CommittedCandidateReceipt>> {
293 (**self).candidate_pending_availability(block_id, para_id).await
294 }
295
296 async fn session_index_for_child(&self, block_id: PHash) -> RelayChainResult<SessionIndex> {
297 (**self).session_index_for_child(block_id).await
298 }
299
300 async fn validators(&self, block_id: PHash) -> RelayChainResult<Vec<ValidatorId>> {
301 (**self).validators(block_id).await
302 }
303
304 async fn import_notification_stream(
305 &self,
306 ) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
307 (**self).import_notification_stream().await
308 }
309
310 async fn finality_notification_stream(
311 &self,
312 ) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
313 (**self).finality_notification_stream().await
314 }
315
316 async fn best_block_hash(&self) -> RelayChainResult<PHash> {
317 (**self).best_block_hash().await
318 }
319
320 async fn finalized_block_hash(&self) -> RelayChainResult<PHash> {
321 (**self).finalized_block_hash().await
322 }
323
324 async fn call_runtime_api(
325 &self,
326 method_name: &'static str,
327 hash: RelayHash,
328 payload: &[u8],
329 ) -> RelayChainResult<Vec<u8>> {
330 (**self).call_runtime_api(method_name, hash, payload).await
331 }
332
333 async fn is_major_syncing(&self) -> RelayChainResult<bool> {
334 (**self).is_major_syncing().await
335 }
336
337 fn overseer_handle(&self) -> RelayChainResult<OverseerHandle> {
338 (**self).overseer_handle()
339 }
340
341 async fn get_storage_by_key(
342 &self,
343 relay_parent: PHash,
344 key: &[u8],
345 ) -> RelayChainResult<Option<StorageValue>> {
346 (**self).get_storage_by_key(relay_parent, key).await
347 }
348
349 async fn prove_read(
350 &self,
351 relay_parent: PHash,
352 relevant_keys: &Vec<Vec<u8>>,
353 ) -> RelayChainResult<StorageProof> {
354 (**self).prove_read(relay_parent, relevant_keys).await
355 }
356
357 async fn wait_for_block(&self, hash: PHash) -> RelayChainResult<()> {
358 (**self).wait_for_block(hash).await
359 }
360
361 async fn new_best_notification_stream(
362 &self,
363 ) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
364 (**self).new_best_notification_stream().await
365 }
366
367 async fn header(&self, block_id: BlockId) -> RelayChainResult<Option<PHeader>> {
368 (**self).header(block_id).await
369 }
370
371 async fn validation_code_hash(
372 &self,
373 relay_parent: PHash,
374 para_id: ParaId,
375 occupied_core_assumption: OccupiedCoreAssumption,
376 ) -> RelayChainResult<Option<ValidationCodeHash>> {
377 (**self)
378 .validation_code_hash(relay_parent, para_id, occupied_core_assumption)
379 .await
380 }
381
382 async fn availability_cores(
383 &self,
384 relay_parent: PHash,
385 ) -> RelayChainResult<Vec<CoreState<PHash, BlockNumber>>> {
386 (**self).availability_cores(relay_parent).await
387 }
388
389 async fn candidates_pending_availability(
390 &self,
391 block_id: PHash,
392 para_id: ParaId,
393 ) -> RelayChainResult<Vec<CommittedCandidateReceipt>> {
394 (**self).candidates_pending_availability(block_id, para_id).await
395 }
396
397 async fn version(&self, relay_parent: PHash) -> RelayChainResult<RuntimeVersion> {
398 (**self).version(relay_parent).await
399 }
400
401 async fn claim_queue(
402 &self,
403 relay_parent: PHash,
404 ) -> RelayChainResult<BTreeMap<CoreIndex, VecDeque<ParaId>>> {
405 (**self).claim_queue(relay_parent).await
406 }
407
408 async fn scheduling_lookahead(&self, relay_parent: PHash) -> RelayChainResult<u32> {
409 (**self).scheduling_lookahead(relay_parent).await
410 }
411
412 async fn candidate_events(&self, at: RelayHash) -> RelayChainResult<Vec<CandidateEvent>> {
413 (**self).candidate_events(at).await
414 }
415}
416
417pub async fn call_runtime_api<R>(
421 client: &(impl RelayChainInterface + ?Sized),
422 method_name: &'static str,
423 hash: RelayHash,
424 payload: impl Encode,
425) -> RelayChainResult<R>
426where
427 R: Decode,
428{
429 let res = client.call_runtime_api(method_name, hash, &payload.encode()).await?;
430 Decode::decode(&mut &*res).map_err(Into::into)
431}