referrerpolicy=no-referrer-when-downgrade

cumulus_relay_chain_rpc_interface/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18use async_trait::async_trait;
19use core::time::Duration;
20use cumulus_primitives_core::{
21	relay_chain::{
22		vstaging::{CandidateEvent, CommittedCandidateReceiptV2 as CommittedCandidateReceipt},
23		Hash as RelayHash, Header as RelayHeader, InboundHrmpMessage, OccupiedCoreAssumption,
24		SessionIndex, ValidationCodeHash, ValidatorId,
25	},
26	InboundDownwardMessage, ParaId, PersistedValidationData,
27};
28use cumulus_relay_chain_interface::{
29	BlockNumber, CoreState, PHeader, RelayChainError, RelayChainInterface, RelayChainResult,
30};
31use futures::{FutureExt, Stream, StreamExt};
32use polkadot_overseer::Handle;
33
34use sc_client_api::StorageProof;
35use sp_state_machine::StorageValue;
36use sp_storage::StorageKey;
37use sp_version::RuntimeVersion;
38use std::{collections::btree_map::BTreeMap, pin::Pin};
39
40use cumulus_primitives_core::relay_chain::BlockId;
41pub use url::Url;
42
43mod light_client_worker;
44mod metrics;
45mod reconnecting_ws_client;
46mod rpc_client;
47mod tokio_platform;
48
49pub use rpc_client::{
50	create_client_and_start_light_client_worker, create_client_and_start_worker,
51	RelayChainRpcClient,
52};
53
54const TIMEOUT_IN_SECONDS: u64 = 6;
55
56/// RelayChainRpcInterface is used to interact with a full node that is running locally
57/// in the same process.
58#[derive(Clone)]
59pub struct RelayChainRpcInterface {
60	rpc_client: RelayChainRpcClient,
61	overseer_handle: Handle,
62}
63
64impl RelayChainRpcInterface {
65	pub fn new(rpc_client: RelayChainRpcClient, overseer_handle: Handle) -> Self {
66		Self { rpc_client, overseer_handle }
67	}
68}
69
70#[async_trait]
71impl RelayChainInterface for RelayChainRpcInterface {
72	async fn retrieve_dmq_contents(
73		&self,
74		para_id: ParaId,
75		relay_parent: RelayHash,
76	) -> RelayChainResult<Vec<InboundDownwardMessage>> {
77		self.rpc_client.parachain_host_dmq_contents(para_id, relay_parent).await
78	}
79
80	async fn retrieve_all_inbound_hrmp_channel_contents(
81		&self,
82		para_id: ParaId,
83		relay_parent: RelayHash,
84	) -> RelayChainResult<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
85		self.rpc_client
86			.parachain_host_inbound_hrmp_channels_contents(para_id, relay_parent)
87			.await
88	}
89
90	async fn header(&self, block_id: BlockId) -> RelayChainResult<Option<PHeader>> {
91		let hash = match block_id {
92			BlockId::Hash(hash) => hash,
93			BlockId::Number(num) => {
94				if let Some(hash) = self.rpc_client.chain_get_block_hash(Some(num)).await? {
95					hash
96				} else {
97					return Ok(None)
98				}
99			},
100		};
101		let header = self.rpc_client.chain_get_header(Some(hash)).await?;
102
103		Ok(header)
104	}
105
106	async fn persisted_validation_data(
107		&self,
108		hash: RelayHash,
109		para_id: ParaId,
110		occupied_core_assumption: OccupiedCoreAssumption,
111	) -> RelayChainResult<Option<PersistedValidationData>> {
112		self.rpc_client
113			.parachain_host_persisted_validation_data(hash, para_id, occupied_core_assumption)
114			.await
115	}
116
117	async fn validation_code_hash(
118		&self,
119		hash: RelayHash,
120		para_id: ParaId,
121		occupied_core_assumption: OccupiedCoreAssumption,
122	) -> RelayChainResult<Option<ValidationCodeHash>> {
123		self.rpc_client
124			.validation_code_hash(hash, para_id, occupied_core_assumption)
125			.await
126	}
127
128	async fn candidate_pending_availability(
129		&self,
130		hash: RelayHash,
131		para_id: ParaId,
132	) -> RelayChainResult<Option<CommittedCandidateReceipt>> {
133		self.rpc_client
134			.parachain_host_candidate_pending_availability(hash, para_id)
135			.await
136	}
137
138	async fn session_index_for_child(&self, hash: RelayHash) -> RelayChainResult<SessionIndex> {
139		self.rpc_client.parachain_host_session_index_for_child(hash).await
140	}
141
142	async fn validators(&self, block_id: RelayHash) -> RelayChainResult<Vec<ValidatorId>> {
143		self.rpc_client.parachain_host_validators(block_id).await
144	}
145
146	async fn import_notification_stream(
147		&self,
148	) -> RelayChainResult<Pin<Box<dyn Stream<Item = RelayHeader> + Send>>> {
149		let imported_headers_stream = self.rpc_client.get_imported_heads_stream()?;
150
151		Ok(imported_headers_stream.boxed())
152	}
153
154	async fn finality_notification_stream(
155		&self,
156	) -> RelayChainResult<Pin<Box<dyn Stream<Item = RelayHeader> + Send>>> {
157		let imported_headers_stream = self.rpc_client.get_finalized_heads_stream()?;
158
159		Ok(imported_headers_stream.boxed())
160	}
161
162	async fn best_block_hash(&self) -> RelayChainResult<RelayHash> {
163		self.rpc_client.chain_get_head(None).await
164	}
165
166	async fn finalized_block_hash(&self) -> RelayChainResult<RelayHash> {
167		self.rpc_client.chain_get_finalized_head().await
168	}
169
170	async fn call_runtime_api(
171		&self,
172		method_name: &'static str,
173		hash: RelayHash,
174		payload: &[u8],
175	) -> RelayChainResult<Vec<u8>> {
176		self.rpc_client
177			.call_remote_runtime_function_encoded(method_name, hash, payload)
178			.await
179			.map(|bytes| bytes.to_vec())
180	}
181
182	async fn is_major_syncing(&self) -> RelayChainResult<bool> {
183		self.rpc_client.system_health().await.map(|h| h.is_syncing)
184	}
185
186	fn overseer_handle(&self) -> RelayChainResult<Handle> {
187		Ok(self.overseer_handle.clone())
188	}
189
190	async fn get_storage_by_key(
191		&self,
192		relay_parent: RelayHash,
193		key: &[u8],
194	) -> RelayChainResult<Option<StorageValue>> {
195		let storage_key = StorageKey(key.to_vec());
196		self.rpc_client
197			.state_get_storage(storage_key, Some(relay_parent))
198			.await
199			.map(|storage_data| storage_data.map(|sv| sv.0))
200	}
201
202	async fn prove_read(
203		&self,
204		relay_parent: RelayHash,
205		relevant_keys: &Vec<Vec<u8>>,
206	) -> RelayChainResult<StorageProof> {
207		let cloned = relevant_keys.clone();
208		let storage_keys: Vec<StorageKey> = cloned.into_iter().map(StorageKey).collect();
209
210		self.rpc_client
211			.state_get_read_proof(storage_keys, Some(relay_parent))
212			.await
213			.map(|read_proof| {
214				StorageProof::new(read_proof.proof.into_iter().map(|bytes| bytes.to_vec()))
215			})
216	}
217
218	/// Wait for a given relay chain block
219	///
220	/// The hash of the block to wait for is passed. We wait for the block to arrive or return after
221	/// a timeout.
222	///
223	/// Implementation:
224	/// 1. Register a listener to all new blocks.
225	/// 2. Check if the block is already in chain. If yes, succeed early.
226	/// 3. Wait for the block to be imported via subscription.
227	/// 4. If timeout is reached, we return an error.
228	async fn wait_for_block(&self, wait_for_hash: RelayHash) -> RelayChainResult<()> {
229		let mut head_stream = self.rpc_client.get_imported_heads_stream()?;
230
231		if self.rpc_client.chain_get_header(Some(wait_for_hash)).await?.is_some() {
232			return Ok(())
233		}
234
235		let mut timeout = futures_timer::Delay::new(Duration::from_secs(TIMEOUT_IN_SECONDS)).fuse();
236
237		loop {
238			futures::select! {
239				_ = timeout => return Err(RelayChainError::WaitTimeout(wait_for_hash)),
240				evt = head_stream.next().fuse() => match evt {
241					Some(evt) if evt.hash() == wait_for_hash => return Ok(()),
242					// Not the event we waited on.
243					Some(_) => continue,
244					None => return Err(RelayChainError::ImportListenerClosed(wait_for_hash)),
245				}
246			}
247		}
248	}
249
250	async fn new_best_notification_stream(
251		&self,
252	) -> RelayChainResult<Pin<Box<dyn Stream<Item = RelayHeader> + Send>>> {
253		let imported_headers_stream = self.rpc_client.get_best_heads_stream()?;
254		Ok(imported_headers_stream.boxed())
255	}
256
257	async fn candidates_pending_availability(
258		&self,
259		hash: RelayHash,
260		para_id: ParaId,
261	) -> RelayChainResult<Vec<CommittedCandidateReceipt>> {
262		self.rpc_client
263			.parachain_host_candidates_pending_availability(hash, para_id)
264			.await
265	}
266
267	async fn version(&self, relay_parent: RelayHash) -> RelayChainResult<RuntimeVersion> {
268		self.rpc_client.runtime_version(relay_parent).await
269	}
270
271	async fn availability_cores(
272		&self,
273		relay_parent: RelayHash,
274	) -> RelayChainResult<Vec<CoreState<RelayHash, BlockNumber>>> {
275		self.rpc_client.parachain_host_availability_cores(relay_parent).await
276	}
277
278	async fn claim_queue(
279		&self,
280		relay_parent: RelayHash,
281	) -> RelayChainResult<
282		BTreeMap<cumulus_relay_chain_interface::CoreIndex, std::collections::VecDeque<ParaId>>,
283	> {
284		self.rpc_client.parachain_host_claim_queue(relay_parent).await
285	}
286
287	async fn scheduling_lookahead(&self, relay_parent: RelayHash) -> RelayChainResult<u32> {
288		self.rpc_client.parachain_host_scheduling_lookahead(relay_parent).await
289	}
290
291	async fn candidate_events(
292		&self,
293		relay_parent: RelayHash,
294	) -> RelayChainResult<Vec<CandidateEvent>> {
295		self.rpc_client.parachain_host_candidate_events(relay_parent).await
296	}
297}