referrerpolicy=no-referrer-when-downgrade

cumulus_relay_chain_rpc_interface/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18use async_trait::async_trait;
19use core::time::Duration;
20use cumulus_primitives_core::{
21	relay_chain::{
22		CandidateEvent, CommittedCandidateReceiptV2 as CommittedCandidateReceipt,
23		Hash as RelayHash, Header as RelayHeader, InboundHrmpMessage, OccupiedCoreAssumption,
24		SessionIndex, ValidationCodeHash, ValidatorId,
25	},
26	InboundDownwardMessage, ParaId, PersistedValidationData,
27};
28use cumulus_relay_chain_interface::{
29	BlockNumber, CoreState, PHeader, RelayChainError, RelayChainInterface, RelayChainResult,
30};
31use futures::{FutureExt, Stream, StreamExt};
32use polkadot_overseer::Handle;
33
34use sc_client_api::StorageProof;
35use sp_state_machine::StorageValue;
36use sp_storage::StorageKey;
37use sp_version::RuntimeVersion;
38use std::{collections::btree_map::BTreeMap, pin::Pin};
39
40use cumulus_primitives_core::relay_chain::BlockId;
41pub use url::Url;
42
43mod metrics;
44mod reconnecting_ws_client;
45mod rpc_client;
46
47pub use rpc_client::{create_client_and_start_worker, RelayChainRpcClient};
48
49const TIMEOUT_IN_SECONDS: u64 = 6;
50
51/// RelayChainRpcInterface is used to interact with a full node that is running locally
52/// in the same process.
53#[derive(Clone)]
54pub struct RelayChainRpcInterface {
55	rpc_client: RelayChainRpcClient,
56	overseer_handle: Handle,
57}
58
59impl RelayChainRpcInterface {
60	pub fn new(rpc_client: RelayChainRpcClient, overseer_handle: Handle) -> Self {
61		Self { rpc_client, overseer_handle }
62	}
63}
64
65#[async_trait]
66impl RelayChainInterface for RelayChainRpcInterface {
67	async fn retrieve_dmq_contents(
68		&self,
69		para_id: ParaId,
70		relay_parent: RelayHash,
71	) -> RelayChainResult<Vec<InboundDownwardMessage>> {
72		self.rpc_client.parachain_host_dmq_contents(para_id, relay_parent).await
73	}
74
75	async fn retrieve_all_inbound_hrmp_channel_contents(
76		&self,
77		para_id: ParaId,
78		relay_parent: RelayHash,
79	) -> RelayChainResult<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
80		self.rpc_client
81			.parachain_host_inbound_hrmp_channels_contents(para_id, relay_parent)
82			.await
83	}
84
85	async fn header(&self, block_id: BlockId) -> RelayChainResult<Option<PHeader>> {
86		let hash = match block_id {
87			BlockId::Hash(hash) => hash,
88			BlockId::Number(num) => {
89				if let Some(hash) = self.rpc_client.chain_get_block_hash(Some(num)).await? {
90					hash
91				} else {
92					return Ok(None)
93				}
94			},
95		};
96		let header = self.rpc_client.chain_get_header(Some(hash)).await?;
97
98		Ok(header)
99	}
100
101	async fn persisted_validation_data(
102		&self,
103		hash: RelayHash,
104		para_id: ParaId,
105		occupied_core_assumption: OccupiedCoreAssumption,
106	) -> RelayChainResult<Option<PersistedValidationData>> {
107		self.rpc_client
108			.parachain_host_persisted_validation_data(hash, para_id, occupied_core_assumption)
109			.await
110	}
111
112	async fn validation_code_hash(
113		&self,
114		hash: RelayHash,
115		para_id: ParaId,
116		occupied_core_assumption: OccupiedCoreAssumption,
117	) -> RelayChainResult<Option<ValidationCodeHash>> {
118		self.rpc_client
119			.validation_code_hash(hash, para_id, occupied_core_assumption)
120			.await
121	}
122
123	async fn candidate_pending_availability(
124		&self,
125		hash: RelayHash,
126		para_id: ParaId,
127	) -> RelayChainResult<Option<CommittedCandidateReceipt>> {
128		self.rpc_client
129			.parachain_host_candidate_pending_availability(hash, para_id)
130			.await
131	}
132
133	async fn session_index_for_child(&self, hash: RelayHash) -> RelayChainResult<SessionIndex> {
134		self.rpc_client.parachain_host_session_index_for_child(hash).await
135	}
136
137	async fn validators(&self, block_id: RelayHash) -> RelayChainResult<Vec<ValidatorId>> {
138		self.rpc_client.parachain_host_validators(block_id).await
139	}
140
141	async fn import_notification_stream(
142		&self,
143	) -> RelayChainResult<Pin<Box<dyn Stream<Item = RelayHeader> + Send>>> {
144		let imported_headers_stream = self.rpc_client.get_imported_heads_stream()?;
145
146		Ok(imported_headers_stream.boxed())
147	}
148
149	async fn finality_notification_stream(
150		&self,
151	) -> RelayChainResult<Pin<Box<dyn Stream<Item = RelayHeader> + Send>>> {
152		let imported_headers_stream = self.rpc_client.get_finalized_heads_stream()?;
153
154		Ok(imported_headers_stream.boxed())
155	}
156
157	async fn best_block_hash(&self) -> RelayChainResult<RelayHash> {
158		self.rpc_client.chain_get_head(None).await
159	}
160
161	async fn finalized_block_hash(&self) -> RelayChainResult<RelayHash> {
162		self.rpc_client.chain_get_finalized_head().await
163	}
164
165	async fn call_runtime_api(
166		&self,
167		method_name: &'static str,
168		hash: RelayHash,
169		payload: &[u8],
170	) -> RelayChainResult<Vec<u8>> {
171		self.rpc_client
172			.call_remote_runtime_function_encoded(method_name, hash, payload)
173			.await
174			.map(|bytes| bytes.to_vec())
175	}
176
177	async fn is_major_syncing(&self) -> RelayChainResult<bool> {
178		self.rpc_client.system_health().await.map(|h| h.is_syncing)
179	}
180
181	fn overseer_handle(&self) -> RelayChainResult<Handle> {
182		Ok(self.overseer_handle.clone())
183	}
184
185	async fn get_storage_by_key(
186		&self,
187		relay_parent: RelayHash,
188		key: &[u8],
189	) -> RelayChainResult<Option<StorageValue>> {
190		let storage_key = StorageKey(key.to_vec());
191		self.rpc_client
192			.state_get_storage(storage_key, Some(relay_parent))
193			.await
194			.map(|storage_data| storage_data.map(|sv| sv.0))
195	}
196
197	async fn prove_read(
198		&self,
199		relay_parent: RelayHash,
200		relevant_keys: &Vec<Vec<u8>>,
201	) -> RelayChainResult<StorageProof> {
202		let cloned = relevant_keys.clone();
203		let storage_keys: Vec<StorageKey> = cloned.into_iter().map(StorageKey).collect();
204
205		self.rpc_client
206			.state_get_read_proof(storage_keys, Some(relay_parent))
207			.await
208			.map(|read_proof| {
209				StorageProof::new(read_proof.proof.into_iter().map(|bytes| bytes.to_vec()))
210			})
211	}
212
213	/// Wait for a given relay chain block
214	///
215	/// The hash of the block to wait for is passed. We wait for the block to arrive or return after
216	/// a timeout.
217	///
218	/// Implementation:
219	/// 1. Register a listener to all new blocks.
220	/// 2. Check if the block is already in chain. If yes, succeed early.
221	/// 3. Wait for the block to be imported via subscription.
222	/// 4. If timeout is reached, we return an error.
223	async fn wait_for_block(&self, wait_for_hash: RelayHash) -> RelayChainResult<()> {
224		let mut head_stream = self.rpc_client.get_imported_heads_stream()?;
225
226		if self.rpc_client.chain_get_header(Some(wait_for_hash)).await?.is_some() {
227			return Ok(())
228		}
229
230		let mut timeout = futures_timer::Delay::new(Duration::from_secs(TIMEOUT_IN_SECONDS)).fuse();
231
232		loop {
233			futures::select! {
234				_ = timeout => return Err(RelayChainError::WaitTimeout(wait_for_hash)),
235				evt = head_stream.next().fuse() => match evt {
236					Some(evt) if evt.hash() == wait_for_hash => return Ok(()),
237					// Not the event we waited on.
238					Some(_) => continue,
239					None => return Err(RelayChainError::ImportListenerClosed(wait_for_hash)),
240				}
241			}
242		}
243	}
244
245	async fn new_best_notification_stream(
246		&self,
247	) -> RelayChainResult<Pin<Box<dyn Stream<Item = RelayHeader> + Send>>> {
248		let imported_headers_stream = self.rpc_client.get_best_heads_stream()?;
249		Ok(imported_headers_stream.boxed())
250	}
251
252	async fn candidates_pending_availability(
253		&self,
254		hash: RelayHash,
255		para_id: ParaId,
256	) -> RelayChainResult<Vec<CommittedCandidateReceipt>> {
257		self.rpc_client
258			.parachain_host_candidates_pending_availability(hash, para_id)
259			.await
260	}
261
262	async fn version(&self, relay_parent: RelayHash) -> RelayChainResult<RuntimeVersion> {
263		self.rpc_client.runtime_version(relay_parent).await
264	}
265
266	async fn availability_cores(
267		&self,
268		relay_parent: RelayHash,
269	) -> RelayChainResult<Vec<CoreState<RelayHash, BlockNumber>>> {
270		self.rpc_client.parachain_host_availability_cores(relay_parent).await
271	}
272
273	async fn claim_queue(
274		&self,
275		relay_parent: RelayHash,
276	) -> RelayChainResult<
277		BTreeMap<cumulus_relay_chain_interface::CoreIndex, std::collections::VecDeque<ParaId>>,
278	> {
279		self.rpc_client.parachain_host_claim_queue(relay_parent).await
280	}
281
282	async fn scheduling_lookahead(&self, relay_parent: RelayHash) -> RelayChainResult<u32> {
283		self.rpc_client.parachain_host_scheduling_lookahead(relay_parent).await
284	}
285
286	async fn candidate_events(
287		&self,
288		relay_parent: RelayHash,
289	) -> RelayChainResult<Vec<CandidateEvent>> {
290		self.rpc_client.parachain_host_candidate_events(relay_parent).await
291	}
292}