referrerpolicy=no-referrer-when-downgrade

cumulus_relay_chain_rpc_interface/
rpc_client.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18use futures::channel::{
19	mpsc::{Receiver, Sender},
20	oneshot::Sender as OneshotSender,
21};
22use jsonrpsee::{
23	core::{params::ArrayParams, ClientError as JsonRpseeError},
24	rpc_params,
25};
26use prometheus::Registry;
27use serde::{de::DeserializeOwned, Serialize};
28use serde_json::Value as JsonValue;
29use std::collections::{btree_map::BTreeMap, VecDeque};
30use tokio::sync::mpsc::Sender as TokioSender;
31
32use codec::{Decode, Encode};
33
34use cumulus_primitives_core::{
35	relay_chain::{
36		async_backing::{AsyncBackingParams, BackingState, Constraints},
37		slashing, ApprovalVotingParams, BlockNumber, CandidateCommitments, CandidateEvent,
38		CandidateHash, CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreIndex,
39		CoreState, DisputeState, ExecutorParams, GroupRotationInfo, Hash as RelayHash,
40		Header as RelayHeader, InboundHrmpMessage, NodeFeatures, OccupiedCoreAssumption,
41		PvfCheckStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo, ValidationCode,
42		ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature,
43	},
44	InboundDownwardMessage, ParaId, PersistedValidationData,
45};
46use cumulus_relay_chain_interface::{RelayChainError, RelayChainResult};
47
48use sc_client_api::StorageData;
49use sc_rpc_api::{state::ReadProof, system::Health};
50use sc_service::TaskManager;
51use sp_consensus_babe::Epoch;
52use sp_storage::StorageKey;
53use sp_version::RuntimeVersion;
54
55use crate::{metrics::RelaychainRpcMetrics, reconnecting_ws_client::ReconnectingWebsocketWorker};
56pub use url::Url;
57
58const LOG_TARGET: &str = "relay-chain-rpc-client";
59const NOTIFICATION_CHANNEL_SIZE_LIMIT: usize = 20;
60
61/// Messages for communication between [`RelayChainRpcClient`] and the RPC workers.
62#[derive(Debug)]
63pub enum RpcDispatcherMessage {
64	/// Register new listener for the best headers stream. Contains a sender which will be used
65	/// to send incoming headers.
66	RegisterBestHeadListener(Sender<RelayHeader>),
67
68	/// Register new listener for the import headers stream. Contains a sender which will be used
69	/// to send incoming headers.
70	RegisterImportListener(Sender<RelayHeader>),
71
72	/// Register new listener for the finalized headers stream. Contains a sender which will be
73	/// used to send incoming headers.
74	RegisterFinalizationListener(Sender<RelayHeader>),
75
76	/// Register new listener for the finalized headers stream.
77	/// Contains the following:
78	/// - [`String`] representing the RPC method to be called
79	/// - [`ArrayParams`] for the parameters to the RPC call
80	/// - [`OneshotSender`] for the return value of the request
81	Request(String, ArrayParams, OneshotSender<Result<JsonValue, JsonRpseeError>>),
82}
83
84/// Entry point to create [`RelayChainRpcClient`] and start a worker that communicates
85/// to JsonRPC servers over the network.
86pub async fn create_client_and_start_worker(
87	urls: Vec<Url>,
88	task_manager: &mut TaskManager,
89	prometheus_registry: Option<&Registry>,
90) -> RelayChainResult<RelayChainRpcClient> {
91	let (worker, sender) = ReconnectingWebsocketWorker::new(urls).await;
92
93	task_manager
94		.spawn_essential_handle()
95		.spawn("relay-chain-rpc-worker", None, worker.run());
96
97	let client = RelayChainRpcClient::new(sender, prometheus_registry);
98
99	Ok(client)
100}
101
102#[derive(Serialize)]
103struct PayloadToHex<'a>(#[serde(with = "sp_core::bytes")] &'a [u8]);
104
105/// Client that maps RPC methods and deserializes results
106#[derive(Clone)]
107pub struct RelayChainRpcClient {
108	/// Sender to send messages to the worker.
109	worker_channel: TokioSender<RpcDispatcherMessage>,
110	metrics: Option<RelaychainRpcMetrics>,
111}
112
113impl RelayChainRpcClient {
114	/// Initialize new RPC Client.
115	///
116	/// This client expects a channel connected to a worker that processes
117	/// requests sent via this channel.
118	pub(crate) fn new(
119		worker_channel: TokioSender<RpcDispatcherMessage>,
120		prometheus_registry: Option<&Registry>,
121	) -> Self {
122		RelayChainRpcClient {
123			worker_channel,
124			metrics: prometheus_registry
125				.and_then(|inner| RelaychainRpcMetrics::register(inner).map_err(|err| {
126					tracing::warn!(target: LOG_TARGET, error = %err, "Unable to instantiate the RPC client metrics, continuing w/o metrics setup.");
127				}).ok()),
128		}
129	}
130
131	/// Same as `call_remote_runtime_function` but work on encoded data
132	pub async fn call_remote_runtime_function_encoded(
133		&self,
134		method_name: &str,
135		hash: RelayHash,
136		payload: &[u8],
137	) -> RelayChainResult<sp_core::Bytes> {
138		let payload = PayloadToHex(payload);
139
140		let params = rpc_params! {
141			method_name,
142			payload,
143			hash
144		};
145
146		self.request_tracing::<sp_core::Bytes, _>("state_call", params, |err| {
147			tracing::trace!(
148				target: LOG_TARGET,
149				%method_name,
150				%hash,
151				error = %err,
152				"Error during call to 'state_call'.",
153			);
154		})
155		.await
156	}
157
158	/// Call a call to `state_call` rpc method.
159	pub async fn call_remote_runtime_function<R: Decode>(
160		&self,
161		method_name: &str,
162		hash: RelayHash,
163		payload: Option<impl Encode>,
164	) -> RelayChainResult<R> {
165		let payload_bytes =
166			payload.map_or(sp_core::Bytes(Vec::new()), |v| sp_core::Bytes(v.encode()));
167		let res = self
168			.call_remote_runtime_function_encoded(method_name, hash, &payload_bytes)
169			.await?;
170		Decode::decode(&mut &*res.0).map_err(Into::into)
171	}
172
173	/// Perform RPC request
174	async fn request<'a, R>(
175		&self,
176		method: &'a str,
177		params: ArrayParams,
178	) -> Result<R, RelayChainError>
179	where
180		R: DeserializeOwned + std::fmt::Debug,
181	{
182		self.request_tracing(
183			method,
184			params,
185			|e| tracing::trace!(target:LOG_TARGET, error = %e, %method, "Unable to complete RPC request"),
186		)
187		.await
188	}
189
190	/// Perform RPC request
191	async fn request_tracing<'a, R, OR>(
192		&self,
193		method: &'a str,
194		params: ArrayParams,
195		trace_error: OR,
196	) -> Result<R, RelayChainError>
197	where
198		R: DeserializeOwned + std::fmt::Debug,
199		OR: Fn(&RelayChainError),
200	{
201		let _timer = self.metrics.as_ref().map(|inner| inner.start_request_timer(method));
202
203		let (tx, rx) = futures::channel::oneshot::channel();
204
205		let message = RpcDispatcherMessage::Request(method.into(), params, tx);
206		self.worker_channel.send(message).await.map_err(|err| {
207			RelayChainError::WorkerCommunicationError(format!(
208				"Unable to send message to RPC worker: {}",
209				err
210			))
211		})?;
212
213		let value = rx.await.map_err(|err| {
214			RelayChainError::WorkerCommunicationError(format!(
215				"RPC worker channel closed. This can hint and connectivity issues with the supplied RPC endpoints. Message: {}",
216				err
217			))
218		})??;
219
220		serde_json::from_value(value).map_err(|_| {
221			trace_error(&RelayChainError::GenericError("Unable to deserialize value".to_string()));
222			RelayChainError::RpcCallError(method.to_string())
223		})
224	}
225
226	/// Returns information regarding the current epoch.
227	pub async fn babe_api_current_epoch(&self, at: RelayHash) -> Result<Epoch, RelayChainError> {
228		self.call_remote_runtime_function("BabeApi_current_epoch", at, None::<()>).await
229	}
230
231	/// Scrape dispute relevant from on-chain, backing votes and resolved disputes.
232	pub async fn parachain_host_on_chain_votes(
233		&self,
234		at: RelayHash,
235	) -> Result<Option<ScrapedOnChainVotes<RelayHash>>, RelayChainError> {
236		self.call_remote_runtime_function("ParachainHost_on_chain_votes", at, None::<()>)
237			.await
238	}
239
240	/// Returns code hashes of PVFs that require pre-checking by validators in the active set.
241	pub async fn parachain_host_pvfs_require_precheck(
242		&self,
243		at: RelayHash,
244	) -> Result<Vec<ValidationCodeHash>, RelayChainError> {
245		self.call_remote_runtime_function("ParachainHost_pvfs_require_precheck", at, None::<()>)
246			.await
247	}
248
249	/// Submits a PVF pre-checking statement into the transaction pool.
250	pub async fn parachain_host_submit_pvf_check_statement(
251		&self,
252		at: RelayHash,
253		stmt: PvfCheckStatement,
254		signature: ValidatorSignature,
255	) -> Result<(), RelayChainError> {
256		self.call_remote_runtime_function(
257			"ParachainHost_submit_pvf_check_statement",
258			at,
259			Some((stmt, signature)),
260		)
261		.await
262	}
263
264	/// Get system health information
265	pub async fn system_health(&self) -> Result<Health, RelayChainError> {
266		self.request("system_health", rpc_params![]).await
267	}
268
269	/// Get read proof for `storage_keys`
270	pub async fn state_get_read_proof(
271		&self,
272		storage_keys: Vec<StorageKey>,
273		at: Option<RelayHash>,
274	) -> Result<ReadProof<RelayHash>, RelayChainError> {
275		let params = rpc_params![storage_keys, at];
276		self.request("state_getReadProof", params).await
277	}
278
279	/// Retrieve storage item at `storage_key`
280	pub async fn state_get_storage(
281		&self,
282		storage_key: StorageKey,
283		at: Option<RelayHash>,
284	) -> Result<Option<StorageData>, RelayChainError> {
285		let params = rpc_params![storage_key, at];
286		self.request("state_getStorage", params).await
287	}
288
289	/// Get hash of the n-th block in the canon chain.
290	///
291	/// By default returns latest block hash.
292	pub async fn chain_get_head(&self, at: Option<u64>) -> Result<RelayHash, RelayChainError> {
293		let params = rpc_params![at];
294		self.request("chain_getHead", params).await
295	}
296
297	/// Returns the validator groups and rotation info localized based on the hypothetical child
298	///  of a block whose state  this is invoked on. Note that `now` in the `GroupRotationInfo`
299	/// should be the successor of the number of the block.
300	pub async fn parachain_host_validator_groups(
301		&self,
302		at: RelayHash,
303	) -> Result<(Vec<Vec<ValidatorIndex>>, GroupRotationInfo), RelayChainError> {
304		self.call_remote_runtime_function("ParachainHost_validator_groups", at, None::<()>)
305			.await
306	}
307
308	/// Get a vector of events concerning candidates that occurred within a block.
309	pub async fn parachain_host_candidate_events(
310		&self,
311		at: RelayHash,
312	) -> Result<Vec<CandidateEvent>, RelayChainError> {
313		self.call_remote_runtime_function("ParachainHost_candidate_events", at, None::<()>)
314			.await
315	}
316
317	/// Checks if the given validation outputs pass the acceptance criteria.
318	pub async fn parachain_host_check_validation_outputs(
319		&self,
320		at: RelayHash,
321		para_id: ParaId,
322		outputs: CandidateCommitments,
323	) -> Result<bool, RelayChainError> {
324		self.call_remote_runtime_function(
325			"ParachainHost_check_validation_outputs",
326			at,
327			Some((para_id, outputs)),
328		)
329		.await
330	}
331
332	/// Returns the persisted validation data for the given `ParaId` along with the corresponding
333	/// validation code hash. Instead of accepting assumption about the para, matches the validation
334	/// data hash against an expected one and yields `None` if they're not equal.
335	pub async fn parachain_host_assumed_validation_data(
336		&self,
337		at: RelayHash,
338		para_id: ParaId,
339		expected_hash: RelayHash,
340	) -> Result<Option<(PersistedValidationData, ValidationCodeHash)>, RelayChainError> {
341		self.call_remote_runtime_function(
342			"ParachainHost_persisted_assumed_validation_data",
343			at,
344			Some((para_id, expected_hash)),
345		)
346		.await
347	}
348
349	/// Get hash of last finalized block.
350	pub async fn chain_get_finalized_head(&self) -> Result<RelayHash, RelayChainError> {
351		self.request("chain_getFinalizedHead", rpc_params![]).await
352	}
353
354	/// Get hash of n-th block.
355	pub async fn chain_get_block_hash(
356		&self,
357		block_number: Option<BlockNumber>,
358	) -> Result<Option<RelayHash>, RelayChainError> {
359		let params = rpc_params![block_number];
360		self.request("chain_getBlockHash", params).await
361	}
362
363	/// Yields the persisted validation data for the given `ParaId` along with an assumption that
364	/// should be used if the para currently occupies a core.
365	///
366	/// Returns `None` if either the para is not registered or the assumption is `Freed`
367	/// and the para already occupies a core.
368	pub async fn parachain_host_persisted_validation_data(
369		&self,
370		at: RelayHash,
371		para_id: ParaId,
372		occupied_core_assumption: OccupiedCoreAssumption,
373	) -> Result<Option<PersistedValidationData>, RelayChainError> {
374		self.call_remote_runtime_function(
375			"ParachainHost_persisted_validation_data",
376			at,
377			Some((para_id, occupied_core_assumption)),
378		)
379		.await
380	}
381
382	/// Get the validation code from its hash.
383	pub async fn parachain_host_validation_code_by_hash(
384		&self,
385		at: RelayHash,
386		validation_code_hash: ValidationCodeHash,
387	) -> Result<Option<ValidationCode>, RelayChainError> {
388		self.call_remote_runtime_function(
389			"ParachainHost_validation_code_by_hash",
390			at,
391			Some(validation_code_hash),
392		)
393		.await
394	}
395
396	/// Yields information on all availability cores as relevant to the child block.
397	/// Cores are either free or occupied. Free cores can have paras assigned to them.
398	pub async fn parachain_host_availability_cores(
399		&self,
400		at: RelayHash,
401	) -> Result<Vec<CoreState<RelayHash, BlockNumber>>, RelayChainError> {
402		self.call_remote_runtime_function("ParachainHost_availability_cores", at, None::<()>)
403			.await
404	}
405
406	/// Get runtime version
407	pub async fn runtime_version(&self, at: RelayHash) -> Result<RuntimeVersion, RelayChainError> {
408		let params = rpc_params![at];
409		self.request("state_getRuntimeVersion", params).await
410	}
411
412	/// Returns all onchain disputes.
413	pub async fn parachain_host_disputes(
414		&self,
415		at: RelayHash,
416	) -> Result<Vec<(SessionIndex, CandidateHash, DisputeState<BlockNumber>)>, RelayChainError> {
417		self.call_remote_runtime_function("ParachainHost_disputes", at, None::<()>)
418			.await
419	}
420
421	/// Returns a list of validators that lost a past session dispute and need to be slashed.
422	///
423	/// This is a staging method! Do not use on production runtimes!
424	pub async fn parachain_host_unapplied_slashes(
425		&self,
426		at: RelayHash,
427	) -> Result<Vec<(SessionIndex, CandidateHash, slashing::PendingSlashes)>, RelayChainError> {
428		self.call_remote_runtime_function("ParachainHost_unapplied_slashes", at, None::<()>)
429			.await
430	}
431
432	/// Returns a merkle proof of a validator session key in a past session.
433	///
434	/// This is a staging method! Do not use on production runtimes!
435	pub async fn parachain_host_key_ownership_proof(
436		&self,
437		at: RelayHash,
438		validator_id: ValidatorId,
439	) -> Result<Option<slashing::OpaqueKeyOwnershipProof>, RelayChainError> {
440		self.call_remote_runtime_function(
441			"ParachainHost_key_ownership_proof",
442			at,
443			Some(validator_id),
444		)
445		.await
446	}
447
448	/// Submits an unsigned extrinsic to slash validators who lost a dispute about
449	/// a candidate of a past session.
450	///
451	/// This is a staging method! Do not use on production runtimes!
452	pub async fn parachain_host_submit_report_dispute_lost(
453		&self,
454		at: RelayHash,
455		dispute_proof: slashing::DisputeProof,
456		key_ownership_proof: slashing::OpaqueKeyOwnershipProof,
457	) -> Result<Option<()>, RelayChainError> {
458		self.call_remote_runtime_function(
459			"ParachainHost_submit_report_dispute_lost",
460			at,
461			Some((dispute_proof, key_ownership_proof)),
462		)
463		.await
464	}
465
466	pub async fn authority_discovery_authorities(
467		&self,
468		at: RelayHash,
469	) -> Result<Vec<sp_authority_discovery::AuthorityId>, RelayChainError> {
470		self.call_remote_runtime_function("AuthorityDiscoveryApi_authorities", at, None::<()>)
471			.await
472	}
473
474	/// Fetch the validation code used by a para, making the given `OccupiedCoreAssumption`.
475	///
476	/// Returns `None` if either the para is not registered or the assumption is `Freed`
477	/// and the para already occupies a core.
478	pub async fn parachain_host_validation_code(
479		&self,
480		at: RelayHash,
481		para_id: ParaId,
482		occupied_core_assumption: OccupiedCoreAssumption,
483	) -> Result<Option<ValidationCode>, RelayChainError> {
484		self.call_remote_runtime_function(
485			"ParachainHost_validation_code",
486			at,
487			Some((para_id, occupied_core_assumption)),
488		)
489		.await
490	}
491
492	/// Fetch the hash of the validation code used by a para, making the given
493	/// `OccupiedCoreAssumption`.
494	pub async fn parachain_host_validation_code_hash(
495		&self,
496		at: RelayHash,
497		para_id: ParaId,
498		occupied_core_assumption: OccupiedCoreAssumption,
499	) -> Result<Option<ValidationCodeHash>, RelayChainError> {
500		self.call_remote_runtime_function(
501			"ParachainHost_validation_code_hash",
502			at,
503			Some((para_id, occupied_core_assumption)),
504		)
505		.await
506	}
507
508	/// Get the session info for the given session, if stored.
509	pub async fn parachain_host_session_info(
510		&self,
511		at: RelayHash,
512		index: SessionIndex,
513	) -> Result<Option<SessionInfo>, RelayChainError> {
514		self.call_remote_runtime_function("ParachainHost_session_info", at, Some(index))
515			.await
516	}
517
518	/// Get the executor parameters for the given session, if stored
519	pub async fn parachain_host_session_executor_params(
520		&self,
521		at: RelayHash,
522		session_index: SessionIndex,
523	) -> Result<Option<ExecutorParams>, RelayChainError> {
524		self.call_remote_runtime_function(
525			"ParachainHost_session_executor_params",
526			at,
527			Some(session_index),
528		)
529		.await
530	}
531
532	/// Get header at specified hash
533	pub async fn chain_get_header(
534		&self,
535		hash: Option<RelayHash>,
536	) -> Result<Option<RelayHeader>, RelayChainError> {
537		let params = rpc_params![hash];
538		self.request("chain_getHeader", params).await
539	}
540
541	/// Get the receipt of a candidate pending availability. This returns `Some` for any paras
542	/// assigned to occupied cores in `availability_cores` and `None` otherwise.
543	pub async fn parachain_host_candidate_pending_availability(
544		&self,
545		at: RelayHash,
546		para_id: ParaId,
547	) -> Result<Option<CommittedCandidateReceipt>, RelayChainError> {
548		self.call_remote_runtime_function(
549			"ParachainHost_candidate_pending_availability",
550			at,
551			Some(para_id),
552		)
553		.await
554	}
555
556	/// Returns the session index expected at a child of the block.
557	///
558	/// This can be used to instantiate a `SigningContext`.
559	pub async fn parachain_host_session_index_for_child(
560		&self,
561		at: RelayHash,
562	) -> Result<SessionIndex, RelayChainError> {
563		self.call_remote_runtime_function("ParachainHost_session_index_for_child", at, None::<()>)
564			.await
565	}
566
567	/// Get the current validators.
568	pub async fn parachain_host_validators(
569		&self,
570		at: RelayHash,
571	) -> Result<Vec<ValidatorId>, RelayChainError> {
572		self.call_remote_runtime_function("ParachainHost_validators", at, None::<()>)
573			.await
574	}
575
576	/// Get the contents of all channels addressed to the given recipient. Channels that have no
577	/// messages in them are also included.
578	pub async fn parachain_host_inbound_hrmp_channels_contents(
579		&self,
580		para_id: ParaId,
581		at: RelayHash,
582	) -> Result<BTreeMap<ParaId, Vec<InboundHrmpMessage>>, RelayChainError> {
583		self.call_remote_runtime_function(
584			"ParachainHost_inbound_hrmp_channels_contents",
585			at,
586			Some(para_id),
587		)
588		.await
589	}
590
591	/// Get all the pending inbound messages in the downward message queue for a para.
592	pub async fn parachain_host_dmq_contents(
593		&self,
594		para_id: ParaId,
595		at: RelayHash,
596	) -> Result<Vec<InboundDownwardMessage>, RelayChainError> {
597		self.call_remote_runtime_function("ParachainHost_dmq_contents", at, Some(para_id))
598			.await
599	}
600
601	/// Get the minimum number of backing votes for a candidate.
602	pub async fn parachain_host_minimum_backing_votes(
603		&self,
604		at: RelayHash,
605		_session_index: SessionIndex,
606	) -> Result<u32, RelayChainError> {
607		self.call_remote_runtime_function("ParachainHost_minimum_backing_votes", at, None::<()>)
608			.await
609	}
610
611	pub async fn parachain_host_node_features(
612		&self,
613		at: RelayHash,
614	) -> Result<NodeFeatures, RelayChainError> {
615		self.call_remote_runtime_function("ParachainHost_node_features", at, None::<()>)
616			.await
617	}
618
619	pub async fn parachain_host_disabled_validators(
620		&self,
621		at: RelayHash,
622	) -> Result<Vec<ValidatorIndex>, RelayChainError> {
623		self.call_remote_runtime_function("ParachainHost_disabled_validators", at, None::<()>)
624			.await
625	}
626
627	#[allow(missing_docs)]
628	pub async fn parachain_host_async_backing_params(
629		&self,
630		at: RelayHash,
631	) -> Result<AsyncBackingParams, RelayChainError> {
632		self.call_remote_runtime_function("ParachainHost_async_backing_params", at, None::<()>)
633			.await
634	}
635
636	#[allow(missing_docs)]
637	pub async fn parachain_host_staging_approval_voting_params(
638		&self,
639		at: RelayHash,
640		_session_index: SessionIndex,
641	) -> Result<ApprovalVotingParams, RelayChainError> {
642		self.call_remote_runtime_function(
643			"ParachainHost_staging_approval_voting_params",
644			at,
645			None::<()>,
646		)
647		.await
648	}
649
650	pub async fn parachain_host_para_backing_state(
651		&self,
652		at: RelayHash,
653		para_id: ParaId,
654	) -> Result<Option<BackingState>, RelayChainError> {
655		self.call_remote_runtime_function("ParachainHost_para_backing_state", at, Some(para_id))
656			.await
657	}
658
659	pub async fn parachain_host_claim_queue(
660		&self,
661		at: RelayHash,
662	) -> Result<BTreeMap<CoreIndex, VecDeque<ParaId>>, RelayChainError> {
663		self.call_remote_runtime_function("ParachainHost_claim_queue", at, None::<()>)
664			.await
665	}
666
667	/// Get the receipt of all candidates pending availability.
668	pub async fn parachain_host_candidates_pending_availability(
669		&self,
670		at: RelayHash,
671		para_id: ParaId,
672	) -> Result<Vec<CommittedCandidateReceipt>, RelayChainError> {
673		self.call_remote_runtime_function(
674			"ParachainHost_candidates_pending_availability",
675			at,
676			Some(para_id),
677		)
678		.await
679	}
680
681	pub async fn parachain_host_scheduling_lookahead(
682		&self,
683		at: RelayHash,
684	) -> Result<u32, RelayChainError> {
685		self.call_remote_runtime_function("ParachainHost_scheduling_lookahead", at, None::<()>)
686			.await
687	}
688
689	pub async fn parachain_host_validation_code_bomb_limit(
690		&self,
691		at: RelayHash,
692	) -> Result<u32, RelayChainError> {
693		self.call_remote_runtime_function(
694			"ParachainHost_validation_code_bomb_limit",
695			at,
696			None::<()>,
697		)
698		.await
699	}
700
701	pub async fn validation_code_hash(
702		&self,
703		at: RelayHash,
704		para_id: ParaId,
705		occupied_core_assumption: OccupiedCoreAssumption,
706	) -> Result<Option<ValidationCodeHash>, RelayChainError> {
707		self.call_remote_runtime_function(
708			"ParachainHost_validation_code_hash",
709			at,
710			Some((para_id, occupied_core_assumption)),
711		)
712		.await
713	}
714
715	pub async fn parachain_host_backing_constraints(
716		&self,
717		at: RelayHash,
718		para_id: ParaId,
719	) -> Result<Option<Constraints>, RelayChainError> {
720		self.call_remote_runtime_function("ParachainHost_backing_constraints", at, Some(para_id))
721			.await
722	}
723
724	fn send_register_message_to_worker(
725		&self,
726		message: RpcDispatcherMessage,
727	) -> Result<(), RelayChainError> {
728		self.worker_channel
729			.try_send(message)
730			.map_err(|e| RelayChainError::WorkerCommunicationError(e.to_string()))
731	}
732
733	/// Get a stream of all imported relay chain headers
734	pub fn get_imported_heads_stream(&self) -> Result<Receiver<RelayHeader>, RelayChainError> {
735		let (tx, rx) =
736			futures::channel::mpsc::channel::<RelayHeader>(NOTIFICATION_CHANNEL_SIZE_LIMIT);
737		self.send_register_message_to_worker(RpcDispatcherMessage::RegisterImportListener(tx))?;
738		Ok(rx)
739	}
740
741	/// Get a stream of new best relay chain headers
742	pub fn get_best_heads_stream(&self) -> Result<Receiver<RelayHeader>, RelayChainError> {
743		let (tx, rx) =
744			futures::channel::mpsc::channel::<RelayHeader>(NOTIFICATION_CHANNEL_SIZE_LIMIT);
745		self.send_register_message_to_worker(RpcDispatcherMessage::RegisterBestHeadListener(tx))?;
746		Ok(rx)
747	}
748
749	/// Get a stream of finalized relay chain headers
750	pub fn get_finalized_heads_stream(&self) -> Result<Receiver<RelayHeader>, RelayChainError> {
751		let (tx, rx) =
752			futures::channel::mpsc::channel::<RelayHeader>(NOTIFICATION_CHANNEL_SIZE_LIMIT);
753		self.send_register_message_to_worker(RpcDispatcherMessage::RegisterFinalizationListener(
754			tx,
755		))?;
756		Ok(rx)
757	}
758
759	pub async fn parachain_host_para_ids(
760		&self,
761		at: RelayHash,
762	) -> Result<Vec<ParaId>, RelayChainError> {
763		self.call_remote_runtime_function("ParachainHost_para_ids", at, None::<()>)
764			.await
765	}
766}
767
768/// Send `header` through all channels contained in `senders`.
769/// If no one is listening to the sender, it is removed from the vector.
770pub fn distribute_header(header: RelayHeader, senders: &mut Vec<Sender<RelayHeader>>) {
771	senders.retain_mut(|e| {
772				match e.try_send(header.clone()) {
773					// Receiver has been dropped, remove Sender from list.
774					Err(error) if error.is_disconnected() => false,
775					// Channel is full. This should not happen.
776					// TODO: Improve error handling here
777					// https://github.com/paritytech/cumulus/issues/1482
778					Err(error) => {
779						tracing::error!(target: LOG_TARGET, ?error, "Event distribution channel has reached its limit. This can lead to missed notifications.");
780						true
781					},
782					_ => true,
783				}
784			});
785}