referrerpolicy=no-referrer-when-downgrade

cumulus_relay_chain_streams/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18//! Common utilities for transforming relay chain streams.
19
20use std::sync::Arc;
21
22use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
23use futures::{Stream, StreamExt};
24use polkadot_node_subsystem::messages::RuntimeApiRequest;
25use polkadot_primitives::{
26	CommittedCandidateReceiptV2 as CommittedCandidateReceipt, Hash as PHash, Id as ParaId,
27	OccupiedCoreAssumption, SessionIndex,
28};
29use sp_api::RuntimeApiInfo;
30use sp_consensus::SyncOracle;
31
32const LOG_TARGET: &str = "cumulus-relay-chain-streams";
33
34pub type RelayHeader = polkadot_primitives::Header;
35
36/// Returns a stream over pending candidates for the parachain corresponding to `para_id`.
37pub async fn pending_candidates(
38	relay_chain_client: impl RelayChainInterface + Clone,
39	para_id: ParaId,
40	sync_service: Arc<dyn SyncOracle + Sync + Send>,
41) -> RelayChainResult<impl Stream<Item = (Vec<CommittedCandidateReceipt>, SessionIndex, RelayHeader)>>
42{
43	let import_notification_stream = relay_chain_client.import_notification_stream().await?;
44
45	let filtered_stream = import_notification_stream.filter_map(move |n| {
46		let client = relay_chain_client.clone();
47		let sync_oracle = sync_service.clone();
48		async move {
49			let hash = n.hash();
50			if sync_oracle.is_major_syncing() {
51				tracing::debug!(
52					target: LOG_TARGET,
53					relay_hash = ?hash,
54					"Skipping candidate due to sync.",
55				);
56				return None
57			}
58
59			let runtime_api_version = client
60				.version(hash)
61				.await
62				.map_err(|e| {
63					tracing::error!(
64						target: LOG_TARGET,
65						error = ?e,
66						"Failed to fetch relay chain runtime version.",
67					)
68				})
69				.ok()?;
70			let parachain_host_runtime_api_version = runtime_api_version
71				.api_version(
72					&<dyn polkadot_primitives::runtime_api::ParachainHost<
73						polkadot_primitives::Block,
74					>>::ID,
75				)
76				.unwrap_or_default();
77
78			// If the relay chain runtime does not support the new runtime API, fallback to the
79			// deprecated one.
80			let pending_availability_result = if parachain_host_runtime_api_version <
81				RuntimeApiRequest::CANDIDATES_PENDING_AVAILABILITY_RUNTIME_REQUIREMENT
82			{
83				#[allow(deprecated)]
84				client
85					.candidate_pending_availability(hash, para_id)
86					.await
87					.map_err(|e| {
88						tracing::error!(
89							target: LOG_TARGET,
90							error = ?e,
91							"Failed to fetch pending candidates.",
92						)
93					})
94					.map(|candidate| candidate.into_iter().collect::<Vec<_>>())
95			} else {
96				client.candidates_pending_availability(hash, para_id).await.map_err(|e| {
97					tracing::error!(
98						target: LOG_TARGET,
99						error = ?e,
100						"Failed to fetch pending candidates.",
101					)
102				})
103			};
104
105			let session_index_result = client.session_index_for_child(hash).await.map_err(|e| {
106				tracing::error!(
107					target: LOG_TARGET,
108					error = ?e,
109					"Failed to fetch session index.",
110				)
111			});
112
113			if let Ok(candidates) = pending_availability_result {
114				session_index_result.map(|session_index| (candidates, session_index, n)).ok()
115			} else {
116				None
117			}
118		}
119	});
120	Ok(filtered_stream)
121}
122
123/// Returns a stream that will yield best heads for the given `para_id`.
124pub async fn new_best_heads(
125	relay_chain: impl RelayChainInterface + Clone,
126	para_id: ParaId,
127) -> RelayChainResult<impl Stream<Item = Vec<u8>>> {
128	let new_best_notification_stream =
129		relay_chain.new_best_notification_stream().await?.filter_map(move |n| {
130			let relay_chain = relay_chain.clone();
131			async move { parachain_head_at(&relay_chain, n.hash(), para_id).await.ok().flatten() }
132		});
133
134	Ok(new_best_notification_stream)
135}
136
137/// Returns a stream that will yield finalized heads for the given `para_id`.
138pub async fn finalized_heads(
139	relay_chain: impl RelayChainInterface + Clone,
140	para_id: ParaId,
141) -> RelayChainResult<impl Stream<Item = (Vec<u8>, RelayHeader)>> {
142	let finality_notification_stream =
143		relay_chain.finality_notification_stream().await?.filter_map(move |n| {
144			let relay_chain = relay_chain.clone();
145			async move {
146				parachain_head_at(&relay_chain, n.hash(), para_id)
147					.await
148					.ok()
149					.flatten()
150					.map(|h| (h, n))
151			}
152		});
153
154	Ok(finality_notification_stream)
155}
156
157/// Returns head of the parachain at the given relay chain block.
158async fn parachain_head_at(
159	relay_chain: &impl RelayChainInterface,
160	at: PHash,
161	para_id: ParaId,
162) -> RelayChainResult<Option<Vec<u8>>> {
163	relay_chain
164		.persisted_validation_data(at, para_id, OccupiedCoreAssumption::TimedOut)
165		.await
166		.map(|s| s.map(|s| s.parent_head.0))
167}