cumulus_relay_chain_streams/
lib.rs1use std::sync::Arc;
21
22use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
23use futures::{Stream, StreamExt};
24use polkadot_node_subsystem::messages::RuntimeApiRequest;
25use polkadot_primitives::{
26 CommittedCandidateReceiptV2 as CommittedCandidateReceipt, Hash as PHash, Id as ParaId,
27 OccupiedCoreAssumption, SessionIndex,
28};
29use sp_api::RuntimeApiInfo;
30use sp_consensus::SyncOracle;
31
32const LOG_TARGET: &str = "cumulus-relay-chain-streams";
33
34pub type RelayHeader = polkadot_primitives::Header;
35
36pub async fn pending_candidates(
38 relay_chain_client: impl RelayChainInterface + Clone,
39 para_id: ParaId,
40 sync_service: Arc<dyn SyncOracle + Sync + Send>,
41) -> RelayChainResult<impl Stream<Item = (Vec<CommittedCandidateReceipt>, SessionIndex, RelayHeader)>>
42{
43 let import_notification_stream = relay_chain_client.import_notification_stream().await?;
44
45 let filtered_stream = import_notification_stream.filter_map(move |n| {
46 let client = relay_chain_client.clone();
47 let sync_oracle = sync_service.clone();
48 async move {
49 let hash = n.hash();
50 if sync_oracle.is_major_syncing() {
51 tracing::debug!(
52 target: LOG_TARGET,
53 relay_hash = ?hash,
54 "Skipping candidate due to sync.",
55 );
56 return None
57 }
58
59 let runtime_api_version = client
60 .version(hash)
61 .await
62 .map_err(|e| {
63 tracing::error!(
64 target: LOG_TARGET,
65 error = ?e,
66 "Failed to fetch relay chain runtime version.",
67 )
68 })
69 .ok()?;
70 let parachain_host_runtime_api_version = runtime_api_version
71 .api_version(
72 &<dyn polkadot_primitives::runtime_api::ParachainHost<
73 polkadot_primitives::Block,
74 >>::ID,
75 )
76 .unwrap_or_default();
77
78 let pending_availability_result = if parachain_host_runtime_api_version <
81 RuntimeApiRequest::CANDIDATES_PENDING_AVAILABILITY_RUNTIME_REQUIREMENT
82 {
83 #[allow(deprecated)]
84 client
85 .candidate_pending_availability(hash, para_id)
86 .await
87 .map_err(|e| {
88 tracing::error!(
89 target: LOG_TARGET,
90 error = ?e,
91 "Failed to fetch pending candidates.",
92 )
93 })
94 .map(|candidate| candidate.into_iter().collect::<Vec<_>>())
95 } else {
96 client.candidates_pending_availability(hash, para_id).await.map_err(|e| {
97 tracing::error!(
98 target: LOG_TARGET,
99 error = ?e,
100 "Failed to fetch pending candidates.",
101 )
102 })
103 };
104
105 let session_index_result = client.session_index_for_child(hash).await.map_err(|e| {
106 tracing::error!(
107 target: LOG_TARGET,
108 error = ?e,
109 "Failed to fetch session index.",
110 )
111 });
112
113 if let Ok(candidates) = pending_availability_result {
114 session_index_result.map(|session_index| (candidates, session_index, n)).ok()
115 } else {
116 None
117 }
118 }
119 });
120 Ok(filtered_stream)
121}
122
123pub async fn new_best_heads(
125 relay_chain: impl RelayChainInterface + Clone,
126 para_id: ParaId,
127) -> RelayChainResult<impl Stream<Item = Vec<u8>>> {
128 let new_best_notification_stream =
129 relay_chain.new_best_notification_stream().await?.filter_map(move |n| {
130 let relay_chain = relay_chain.clone();
131 async move { parachain_head_at(&relay_chain, n.hash(), para_id).await.ok().flatten() }
132 });
133
134 Ok(new_best_notification_stream)
135}
136
137pub async fn finalized_heads(
139 relay_chain: impl RelayChainInterface + Clone,
140 para_id: ParaId,
141) -> RelayChainResult<impl Stream<Item = (Vec<u8>, RelayHeader)>> {
142 let finality_notification_stream =
143 relay_chain.finality_notification_stream().await?.filter_map(move |n| {
144 let relay_chain = relay_chain.clone();
145 async move {
146 parachain_head_at(&relay_chain, n.hash(), para_id)
147 .await
148 .ok()
149 .flatten()
150 .map(|h| (h, n))
151 }
152 });
153
154 Ok(finality_notification_stream)
155}
156
157async fn parachain_head_at(
159 relay_chain: &impl RelayChainInterface,
160 at: PHash,
161 para_id: ParaId,
162) -> RelayChainResult<Option<Vec<u8>>> {
163 relay_chain
164 .persisted_validation_data(at, para_id, OccupiedCoreAssumption::TimedOut)
165 .await
166 .map(|s| s.map(|s| s.parent_head.0))
167}