1use futures::channel::{
19 mpsc::{Receiver, Sender},
20 oneshot::Sender as OneshotSender,
21};
22use jsonrpsee::{
23 core::{params::ArrayParams, ClientError as JsonRpseeError},
24 rpc_params,
25};
26use prometheus::Registry;
27use serde::{de::DeserializeOwned, Serialize};
28use serde_json::Value as JsonValue;
29use std::collections::{btree_map::BTreeMap, VecDeque};
30use tokio::sync::mpsc::Sender as TokioSender;
31
32use codec::{Decode, Encode};
33
34use cumulus_primitives_core::{
35 relay_chain::{
36 async_backing::{AsyncBackingParams, BackingState, Constraints},
37 slashing, ApprovalVotingParams, BlockNumber, CandidateCommitments, CandidateEvent,
38 CandidateHash, CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreIndex,
39 CoreState, DisputeState, ExecutorParams, GroupRotationInfo, Hash as RelayHash,
40 Header as RelayHeader, InboundHrmpMessage, NodeFeatures, OccupiedCoreAssumption,
41 PvfCheckStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo, ValidationCode,
42 ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature,
43 },
44 InboundDownwardMessage, ParaId, PersistedValidationData,
45};
46use cumulus_relay_chain_interface::{RelayChainError, RelayChainResult};
47
48use sc_client_api::StorageData;
49use sc_rpc_api::{state::ReadProof, system::Health};
50use sc_service::TaskManager;
51use sp_consensus_babe::Epoch;
52use sp_storage::StorageKey;
53use sp_version::RuntimeVersion;
54
55use crate::{metrics::RelaychainRpcMetrics, reconnecting_ws_client::ReconnectingWebsocketWorker};
56pub use url::Url;
57
58const LOG_TARGET: &str = "relay-chain-rpc-client";
59const NOTIFICATION_CHANNEL_SIZE_LIMIT: usize = 20;
60
61#[derive(Debug)]
63pub enum RpcDispatcherMessage {
64 RegisterBestHeadListener(Sender<RelayHeader>),
67
68 RegisterImportListener(Sender<RelayHeader>),
71
72 RegisterFinalizationListener(Sender<RelayHeader>),
75
76 Request(String, ArrayParams, OneshotSender<Result<JsonValue, JsonRpseeError>>),
82}
83
84pub async fn create_client_and_start_worker(
87 urls: Vec<Url>,
88 task_manager: &mut TaskManager,
89 prometheus_registry: Option<&Registry>,
90) -> RelayChainResult<RelayChainRpcClient> {
91 let (worker, sender) = ReconnectingWebsocketWorker::new(urls).await;
92
93 task_manager
94 .spawn_essential_handle()
95 .spawn("relay-chain-rpc-worker", None, worker.run());
96
97 let client = RelayChainRpcClient::new(sender, prometheus_registry);
98
99 Ok(client)
100}
101
102#[derive(Serialize)]
103struct PayloadToHex<'a>(#[serde(with = "sp_core::bytes")] &'a [u8]);
104
105#[derive(Clone)]
107pub struct RelayChainRpcClient {
108 worker_channel: TokioSender<RpcDispatcherMessage>,
110 metrics: Option<RelaychainRpcMetrics>,
111}
112
113impl RelayChainRpcClient {
114 pub(crate) fn new(
119 worker_channel: TokioSender<RpcDispatcherMessage>,
120 prometheus_registry: Option<&Registry>,
121 ) -> Self {
122 RelayChainRpcClient {
123 worker_channel,
124 metrics: prometheus_registry
125 .and_then(|inner| RelaychainRpcMetrics::register(inner).map_err(|err| {
126 tracing::warn!(target: LOG_TARGET, error = %err, "Unable to instantiate the RPC client metrics, continuing w/o metrics setup.");
127 }).ok()),
128 }
129 }
130
131 pub async fn call_remote_runtime_function_encoded(
133 &self,
134 method_name: &str,
135 hash: RelayHash,
136 payload: &[u8],
137 ) -> RelayChainResult<sp_core::Bytes> {
138 let payload = PayloadToHex(payload);
139
140 let params = rpc_params! {
141 method_name,
142 payload,
143 hash
144 };
145
146 self.request_tracing::<sp_core::Bytes, _>("state_call", params, |err| {
147 tracing::trace!(
148 target: LOG_TARGET,
149 %method_name,
150 %hash,
151 error = %err,
152 "Error during call to 'state_call'.",
153 );
154 })
155 .await
156 }
157
158 pub async fn call_remote_runtime_function<R: Decode>(
160 &self,
161 method_name: &str,
162 hash: RelayHash,
163 payload: Option<impl Encode>,
164 ) -> RelayChainResult<R> {
165 let payload_bytes =
166 payload.map_or(sp_core::Bytes(Vec::new()), |v| sp_core::Bytes(v.encode()));
167 let res = self
168 .call_remote_runtime_function_encoded(method_name, hash, &payload_bytes)
169 .await?;
170 Decode::decode(&mut &*res.0).map_err(Into::into)
171 }
172
173 async fn request<'a, R>(
175 &self,
176 method: &'a str,
177 params: ArrayParams,
178 ) -> Result<R, RelayChainError>
179 where
180 R: DeserializeOwned + std::fmt::Debug,
181 {
182 self.request_tracing(
183 method,
184 params,
185 |e| tracing::trace!(target:LOG_TARGET, error = %e, %method, "Unable to complete RPC request"),
186 )
187 .await
188 }
189
190 async fn request_tracing<'a, R, OR>(
192 &self,
193 method: &'a str,
194 params: ArrayParams,
195 trace_error: OR,
196 ) -> Result<R, RelayChainError>
197 where
198 R: DeserializeOwned + std::fmt::Debug,
199 OR: Fn(&RelayChainError),
200 {
201 let _timer = self.metrics.as_ref().map(|inner| inner.start_request_timer(method));
202
203 let (tx, rx) = futures::channel::oneshot::channel();
204
205 let message = RpcDispatcherMessage::Request(method.into(), params, tx);
206 self.worker_channel.send(message).await.map_err(|err| {
207 RelayChainError::WorkerCommunicationError(format!(
208 "Unable to send message to RPC worker: {}",
209 err
210 ))
211 })?;
212
213 let value = rx.await.map_err(|err| {
214 RelayChainError::WorkerCommunicationError(format!(
215 "RPC worker channel closed. This can hint and connectivity issues with the supplied RPC endpoints. Message: {}",
216 err
217 ))
218 })??;
219
220 serde_json::from_value(value).map_err(|_| {
221 trace_error(&RelayChainError::GenericError("Unable to deserialize value".to_string()));
222 RelayChainError::RpcCallError(method.to_string())
223 })
224 }
225
226 pub async fn babe_api_current_epoch(&self, at: RelayHash) -> Result<Epoch, RelayChainError> {
228 self.call_remote_runtime_function("BabeApi_current_epoch", at, None::<()>).await
229 }
230
231 pub async fn parachain_host_on_chain_votes(
233 &self,
234 at: RelayHash,
235 ) -> Result<Option<ScrapedOnChainVotes<RelayHash>>, RelayChainError> {
236 self.call_remote_runtime_function("ParachainHost_on_chain_votes", at, None::<()>)
237 .await
238 }
239
240 pub async fn parachain_host_pvfs_require_precheck(
242 &self,
243 at: RelayHash,
244 ) -> Result<Vec<ValidationCodeHash>, RelayChainError> {
245 self.call_remote_runtime_function("ParachainHost_pvfs_require_precheck", at, None::<()>)
246 .await
247 }
248
249 pub async fn parachain_host_submit_pvf_check_statement(
251 &self,
252 at: RelayHash,
253 stmt: PvfCheckStatement,
254 signature: ValidatorSignature,
255 ) -> Result<(), RelayChainError> {
256 self.call_remote_runtime_function(
257 "ParachainHost_submit_pvf_check_statement",
258 at,
259 Some((stmt, signature)),
260 )
261 .await
262 }
263
264 pub async fn system_health(&self) -> Result<Health, RelayChainError> {
266 self.request("system_health", rpc_params![]).await
267 }
268
269 pub async fn state_get_read_proof(
271 &self,
272 storage_keys: Vec<StorageKey>,
273 at: Option<RelayHash>,
274 ) -> Result<ReadProof<RelayHash>, RelayChainError> {
275 let params = rpc_params![storage_keys, at];
276 self.request("state_getReadProof", params).await
277 }
278
279 pub async fn state_get_storage(
281 &self,
282 storage_key: StorageKey,
283 at: Option<RelayHash>,
284 ) -> Result<Option<StorageData>, RelayChainError> {
285 let params = rpc_params![storage_key, at];
286 self.request("state_getStorage", params).await
287 }
288
289 pub async fn chain_get_head(&self, at: Option<u64>) -> Result<RelayHash, RelayChainError> {
293 let params = rpc_params![at];
294 self.request("chain_getHead", params).await
295 }
296
297 pub async fn parachain_host_validator_groups(
301 &self,
302 at: RelayHash,
303 ) -> Result<(Vec<Vec<ValidatorIndex>>, GroupRotationInfo), RelayChainError> {
304 self.call_remote_runtime_function("ParachainHost_validator_groups", at, None::<()>)
305 .await
306 }
307
308 pub async fn parachain_host_candidate_events(
310 &self,
311 at: RelayHash,
312 ) -> Result<Vec<CandidateEvent>, RelayChainError> {
313 self.call_remote_runtime_function("ParachainHost_candidate_events", at, None::<()>)
314 .await
315 }
316
317 pub async fn parachain_host_check_validation_outputs(
319 &self,
320 at: RelayHash,
321 para_id: ParaId,
322 outputs: CandidateCommitments,
323 ) -> Result<bool, RelayChainError> {
324 self.call_remote_runtime_function(
325 "ParachainHost_check_validation_outputs",
326 at,
327 Some((para_id, outputs)),
328 )
329 .await
330 }
331
332 pub async fn parachain_host_assumed_validation_data(
336 &self,
337 at: RelayHash,
338 para_id: ParaId,
339 expected_hash: RelayHash,
340 ) -> Result<Option<(PersistedValidationData, ValidationCodeHash)>, RelayChainError> {
341 self.call_remote_runtime_function(
342 "ParachainHost_persisted_assumed_validation_data",
343 at,
344 Some((para_id, expected_hash)),
345 )
346 .await
347 }
348
349 pub async fn chain_get_finalized_head(&self) -> Result<RelayHash, RelayChainError> {
351 self.request("chain_getFinalizedHead", rpc_params![]).await
352 }
353
354 pub async fn chain_get_block_hash(
356 &self,
357 block_number: Option<BlockNumber>,
358 ) -> Result<Option<RelayHash>, RelayChainError> {
359 let params = rpc_params![block_number];
360 self.request("chain_getBlockHash", params).await
361 }
362
363 pub async fn parachain_host_persisted_validation_data(
369 &self,
370 at: RelayHash,
371 para_id: ParaId,
372 occupied_core_assumption: OccupiedCoreAssumption,
373 ) -> Result<Option<PersistedValidationData>, RelayChainError> {
374 self.call_remote_runtime_function(
375 "ParachainHost_persisted_validation_data",
376 at,
377 Some((para_id, occupied_core_assumption)),
378 )
379 .await
380 }
381
382 pub async fn parachain_host_validation_code_by_hash(
384 &self,
385 at: RelayHash,
386 validation_code_hash: ValidationCodeHash,
387 ) -> Result<Option<ValidationCode>, RelayChainError> {
388 self.call_remote_runtime_function(
389 "ParachainHost_validation_code_by_hash",
390 at,
391 Some(validation_code_hash),
392 )
393 .await
394 }
395
396 pub async fn parachain_host_availability_cores(
399 &self,
400 at: RelayHash,
401 ) -> Result<Vec<CoreState<RelayHash, BlockNumber>>, RelayChainError> {
402 self.call_remote_runtime_function("ParachainHost_availability_cores", at, None::<()>)
403 .await
404 }
405
406 pub async fn runtime_version(&self, at: RelayHash) -> Result<RuntimeVersion, RelayChainError> {
408 let params = rpc_params![at];
409 self.request("state_getRuntimeVersion", params).await
410 }
411
412 pub async fn parachain_host_disputes(
414 &self,
415 at: RelayHash,
416 ) -> Result<Vec<(SessionIndex, CandidateHash, DisputeState<BlockNumber>)>, RelayChainError> {
417 self.call_remote_runtime_function("ParachainHost_disputes", at, None::<()>)
418 .await
419 }
420
421 pub async fn parachain_host_unapplied_slashes(
425 &self,
426 at: RelayHash,
427 ) -> Result<Vec<(SessionIndex, CandidateHash, slashing::PendingSlashes)>, RelayChainError> {
428 self.call_remote_runtime_function("ParachainHost_unapplied_slashes", at, None::<()>)
429 .await
430 }
431
432 pub async fn parachain_host_key_ownership_proof(
436 &self,
437 at: RelayHash,
438 validator_id: ValidatorId,
439 ) -> Result<Option<slashing::OpaqueKeyOwnershipProof>, RelayChainError> {
440 self.call_remote_runtime_function(
441 "ParachainHost_key_ownership_proof",
442 at,
443 Some(validator_id),
444 )
445 .await
446 }
447
448 pub async fn parachain_host_submit_report_dispute_lost(
453 &self,
454 at: RelayHash,
455 dispute_proof: slashing::DisputeProof,
456 key_ownership_proof: slashing::OpaqueKeyOwnershipProof,
457 ) -> Result<Option<()>, RelayChainError> {
458 self.call_remote_runtime_function(
459 "ParachainHost_submit_report_dispute_lost",
460 at,
461 Some((dispute_proof, key_ownership_proof)),
462 )
463 .await
464 }
465
466 pub async fn authority_discovery_authorities(
467 &self,
468 at: RelayHash,
469 ) -> Result<Vec<sp_authority_discovery::AuthorityId>, RelayChainError> {
470 self.call_remote_runtime_function("AuthorityDiscoveryApi_authorities", at, None::<()>)
471 .await
472 }
473
474 pub async fn parachain_host_validation_code(
479 &self,
480 at: RelayHash,
481 para_id: ParaId,
482 occupied_core_assumption: OccupiedCoreAssumption,
483 ) -> Result<Option<ValidationCode>, RelayChainError> {
484 self.call_remote_runtime_function(
485 "ParachainHost_validation_code",
486 at,
487 Some((para_id, occupied_core_assumption)),
488 )
489 .await
490 }
491
492 pub async fn parachain_host_validation_code_hash(
495 &self,
496 at: RelayHash,
497 para_id: ParaId,
498 occupied_core_assumption: OccupiedCoreAssumption,
499 ) -> Result<Option<ValidationCodeHash>, RelayChainError> {
500 self.call_remote_runtime_function(
501 "ParachainHost_validation_code_hash",
502 at,
503 Some((para_id, occupied_core_assumption)),
504 )
505 .await
506 }
507
508 pub async fn parachain_host_session_info(
510 &self,
511 at: RelayHash,
512 index: SessionIndex,
513 ) -> Result<Option<SessionInfo>, RelayChainError> {
514 self.call_remote_runtime_function("ParachainHost_session_info", at, Some(index))
515 .await
516 }
517
518 pub async fn parachain_host_session_executor_params(
520 &self,
521 at: RelayHash,
522 session_index: SessionIndex,
523 ) -> Result<Option<ExecutorParams>, RelayChainError> {
524 self.call_remote_runtime_function(
525 "ParachainHost_session_executor_params",
526 at,
527 Some(session_index),
528 )
529 .await
530 }
531
532 pub async fn chain_get_header(
534 &self,
535 hash: Option<RelayHash>,
536 ) -> Result<Option<RelayHeader>, RelayChainError> {
537 let params = rpc_params![hash];
538 self.request("chain_getHeader", params).await
539 }
540
541 pub async fn parachain_host_candidate_pending_availability(
544 &self,
545 at: RelayHash,
546 para_id: ParaId,
547 ) -> Result<Option<CommittedCandidateReceipt>, RelayChainError> {
548 self.call_remote_runtime_function(
549 "ParachainHost_candidate_pending_availability",
550 at,
551 Some(para_id),
552 )
553 .await
554 }
555
556 pub async fn parachain_host_session_index_for_child(
560 &self,
561 at: RelayHash,
562 ) -> Result<SessionIndex, RelayChainError> {
563 self.call_remote_runtime_function("ParachainHost_session_index_for_child", at, None::<()>)
564 .await
565 }
566
567 pub async fn parachain_host_validators(
569 &self,
570 at: RelayHash,
571 ) -> Result<Vec<ValidatorId>, RelayChainError> {
572 self.call_remote_runtime_function("ParachainHost_validators", at, None::<()>)
573 .await
574 }
575
576 pub async fn parachain_host_inbound_hrmp_channels_contents(
579 &self,
580 para_id: ParaId,
581 at: RelayHash,
582 ) -> Result<BTreeMap<ParaId, Vec<InboundHrmpMessage>>, RelayChainError> {
583 self.call_remote_runtime_function(
584 "ParachainHost_inbound_hrmp_channels_contents",
585 at,
586 Some(para_id),
587 )
588 .await
589 }
590
591 pub async fn parachain_host_dmq_contents(
593 &self,
594 para_id: ParaId,
595 at: RelayHash,
596 ) -> Result<Vec<InboundDownwardMessage>, RelayChainError> {
597 self.call_remote_runtime_function("ParachainHost_dmq_contents", at, Some(para_id))
598 .await
599 }
600
601 pub async fn parachain_host_minimum_backing_votes(
603 &self,
604 at: RelayHash,
605 _session_index: SessionIndex,
606 ) -> Result<u32, RelayChainError> {
607 self.call_remote_runtime_function("ParachainHost_minimum_backing_votes", at, None::<()>)
608 .await
609 }
610
611 pub async fn parachain_host_node_features(
612 &self,
613 at: RelayHash,
614 ) -> Result<NodeFeatures, RelayChainError> {
615 self.call_remote_runtime_function("ParachainHost_node_features", at, None::<()>)
616 .await
617 }
618
619 pub async fn parachain_host_disabled_validators(
620 &self,
621 at: RelayHash,
622 ) -> Result<Vec<ValidatorIndex>, RelayChainError> {
623 self.call_remote_runtime_function("ParachainHost_disabled_validators", at, None::<()>)
624 .await
625 }
626
627 #[allow(missing_docs)]
628 pub async fn parachain_host_async_backing_params(
629 &self,
630 at: RelayHash,
631 ) -> Result<AsyncBackingParams, RelayChainError> {
632 self.call_remote_runtime_function("ParachainHost_async_backing_params", at, None::<()>)
633 .await
634 }
635
636 #[allow(missing_docs)]
637 pub async fn parachain_host_staging_approval_voting_params(
638 &self,
639 at: RelayHash,
640 _session_index: SessionIndex,
641 ) -> Result<ApprovalVotingParams, RelayChainError> {
642 self.call_remote_runtime_function(
643 "ParachainHost_staging_approval_voting_params",
644 at,
645 None::<()>,
646 )
647 .await
648 }
649
650 pub async fn parachain_host_para_backing_state(
651 &self,
652 at: RelayHash,
653 para_id: ParaId,
654 ) -> Result<Option<BackingState>, RelayChainError> {
655 self.call_remote_runtime_function("ParachainHost_para_backing_state", at, Some(para_id))
656 .await
657 }
658
659 pub async fn parachain_host_claim_queue(
660 &self,
661 at: RelayHash,
662 ) -> Result<BTreeMap<CoreIndex, VecDeque<ParaId>>, RelayChainError> {
663 self.call_remote_runtime_function("ParachainHost_claim_queue", at, None::<()>)
664 .await
665 }
666
667 pub async fn parachain_host_candidates_pending_availability(
669 &self,
670 at: RelayHash,
671 para_id: ParaId,
672 ) -> Result<Vec<CommittedCandidateReceipt>, RelayChainError> {
673 self.call_remote_runtime_function(
674 "ParachainHost_candidates_pending_availability",
675 at,
676 Some(para_id),
677 )
678 .await
679 }
680
681 pub async fn parachain_host_scheduling_lookahead(
682 &self,
683 at: RelayHash,
684 ) -> Result<u32, RelayChainError> {
685 self.call_remote_runtime_function("ParachainHost_scheduling_lookahead", at, None::<()>)
686 .await
687 }
688
689 pub async fn parachain_host_validation_code_bomb_limit(
690 &self,
691 at: RelayHash,
692 ) -> Result<u32, RelayChainError> {
693 self.call_remote_runtime_function(
694 "ParachainHost_validation_code_bomb_limit",
695 at,
696 None::<()>,
697 )
698 .await
699 }
700
701 pub async fn validation_code_hash(
702 &self,
703 at: RelayHash,
704 para_id: ParaId,
705 occupied_core_assumption: OccupiedCoreAssumption,
706 ) -> Result<Option<ValidationCodeHash>, RelayChainError> {
707 self.call_remote_runtime_function(
708 "ParachainHost_validation_code_hash",
709 at,
710 Some((para_id, occupied_core_assumption)),
711 )
712 .await
713 }
714
715 pub async fn parachain_host_backing_constraints(
716 &self,
717 at: RelayHash,
718 para_id: ParaId,
719 ) -> Result<Option<Constraints>, RelayChainError> {
720 self.call_remote_runtime_function("ParachainHost_backing_constraints", at, Some(para_id))
721 .await
722 }
723
724 fn send_register_message_to_worker(
725 &self,
726 message: RpcDispatcherMessage,
727 ) -> Result<(), RelayChainError> {
728 self.worker_channel
729 .try_send(message)
730 .map_err(|e| RelayChainError::WorkerCommunicationError(e.to_string()))
731 }
732
733 pub fn get_imported_heads_stream(&self) -> Result<Receiver<RelayHeader>, RelayChainError> {
735 let (tx, rx) =
736 futures::channel::mpsc::channel::<RelayHeader>(NOTIFICATION_CHANNEL_SIZE_LIMIT);
737 self.send_register_message_to_worker(RpcDispatcherMessage::RegisterImportListener(tx))?;
738 Ok(rx)
739 }
740
741 pub fn get_best_heads_stream(&self) -> Result<Receiver<RelayHeader>, RelayChainError> {
743 let (tx, rx) =
744 futures::channel::mpsc::channel::<RelayHeader>(NOTIFICATION_CHANNEL_SIZE_LIMIT);
745 self.send_register_message_to_worker(RpcDispatcherMessage::RegisterBestHeadListener(tx))?;
746 Ok(rx)
747 }
748
749 pub fn get_finalized_heads_stream(&self) -> Result<Receiver<RelayHeader>, RelayChainError> {
751 let (tx, rx) =
752 futures::channel::mpsc::channel::<RelayHeader>(NOTIFICATION_CHANNEL_SIZE_LIMIT);
753 self.send_register_message_to_worker(RpcDispatcherMessage::RegisterFinalizationListener(
754 tx,
755 ))?;
756 Ok(rx)
757 }
758
759 pub async fn parachain_host_para_ids(
760 &self,
761 at: RelayHash,
762 ) -> Result<Vec<ParaId>, RelayChainError> {
763 self.call_remote_runtime_function("ParachainHost_para_ids", at, None::<()>)
764 .await
765 }
766}
767
768pub fn distribute_header(header: RelayHeader, senders: &mut Vec<Sender<RelayHeader>>) {
771 senders.retain_mut(|e| {
772 match e.try_send(header.clone()) {
773 Err(error) if error.is_disconnected() => false,
775 Err(error) => {
779 tracing::error!(target: LOG_TARGET, ?error, "Event distribution channel has reached its limit. This can lead to missed notifications.");
780 true
781 },
782 _ => true,
783 }
784 });
785}